You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/07/08 14:18:27 UTC
[5/5] git commit: [FLINK-994] Added test cases for
AbstractPagedInputView, fixed problem with hbase.
[FLINK-994] Added test cases for AbstractPagedInputView, fixed problem with hbase.
This closes #53
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/7b6295d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/7b6295d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/7b6295d9
Branch: refs/heads/master
Commit: 7b6295d9829af1e94bea106dc37a169a39895d92
Parents: 98e659e
Author: Till Rohrmann <ti...@gmail.com>
Authored: Mon Jul 7 16:20:07 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jul 8 12:37:41 2014 +0200
----------------------------------------------------------------------
.../addons/hbase/TableInputSplit.java | 8 +-
.../addons/hbase/common/HBaseKey.java | 8 +-
.../addons/hbase/common/HBaseResult.java | 8 +-
.../memorymanager/AbstractPagedInputView.java | 5 +-
.../io/serialization/PagedViewsTest.java | 259 ++++++++++++++++++-
5 files changed, 273 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7b6295d9/stratosphere-addons/hbase/src/main/java/eu/stratosphere/addons/hbase/TableInputSplit.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/hbase/src/main/java/eu/stratosphere/addons/hbase/TableInputSplit.java b/stratosphere-addons/hbase/src/main/java/eu/stratosphere/addons/hbase/TableInputSplit.java
index 382befa..bcf5d00 100644
--- a/stratosphere-addons/hbase/src/main/java/eu/stratosphere/addons/hbase/TableInputSplit.java
+++ b/stratosphere-addons/hbase/src/main/java/eu/stratosphere/addons/hbase/TableInputSplit.java
@@ -13,10 +13,10 @@
package eu.stratosphere.addons.hbase;
-import java.io.DataInput;
-import java.io.DataOutput;
import java.io.IOException;
+import eu.stratosphere.core.memory.DataOutputView;
+import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.io.LocatableInputSplit;
/**
@@ -103,7 +103,7 @@ public class TableInputSplit extends LocatableInputSplit {
@Override
- public void write(final DataOutput out) throws IOException {
+ public void write(final DataOutputView out) throws IOException {
super.write(out);
@@ -134,7 +134,7 @@ public class TableInputSplit extends LocatableInputSplit {
@Override
- public void read(final DataInput in) throws IOException {
+ public void read(final DataInputView in) throws IOException {
super.read(in);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7b6295d9/stratosphere-addons/hbase/src/main/java/eu/stratosphere/addons/hbase/common/HBaseKey.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/hbase/src/main/java/eu/stratosphere/addons/hbase/common/HBaseKey.java b/stratosphere-addons/hbase/src/main/java/eu/stratosphere/addons/hbase/common/HBaseKey.java
index 700db05..eba09cd 100644
--- a/stratosphere-addons/hbase/src/main/java/eu/stratosphere/addons/hbase/common/HBaseKey.java
+++ b/stratosphere-addons/hbase/src/main/java/eu/stratosphere/addons/hbase/common/HBaseKey.java
@@ -13,12 +13,12 @@
package eu.stratosphere.addons.hbase.common;
-import java.io.DataInput;
-import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.types.Key;
/**
@@ -52,12 +52,12 @@ public class HBaseKey implements Key<HBaseKey> {
// --------------------------------------------------------------------------------------------
@Override
- public void write(DataOutput out) throws IOException {
+ public void write(DataOutputView out) throws IOException {
this.writable.write(out);
}
@Override
- public void read(DataInput in) throws IOException {
+ public void read(DataInputView in) throws IOException {
this.writable.readFields(in);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7b6295d9/stratosphere-addons/hbase/src/main/java/eu/stratosphere/addons/hbase/common/HBaseResult.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/hbase/src/main/java/eu/stratosphere/addons/hbase/common/HBaseResult.java b/stratosphere-addons/hbase/src/main/java/eu/stratosphere/addons/hbase/common/HBaseResult.java
index a8a2df9..a827551 100644
--- a/stratosphere-addons/hbase/src/main/java/eu/stratosphere/addons/hbase/common/HBaseResult.java
+++ b/stratosphere-addons/hbase/src/main/java/eu/stratosphere/addons/hbase/common/HBaseResult.java
@@ -13,12 +13,12 @@
package eu.stratosphere.addons.hbase.common;
-import java.io.DataInput;
-import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Result;
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.types.Value;
public class HBaseResult implements Value {
@@ -53,12 +53,12 @@ public class HBaseResult implements Value {
}
@Override
- public void read(DataInput in) throws IOException {
+ public void read(DataInputView in) throws IOException {
this.result.readFields(in);
}
@Override
- public void write(DataOutput out) throws IOException {
+ public void write(DataOutputView out) throws IOException {
this.result.write(out);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7b6295d9/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/AbstractPagedInputView.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/AbstractPagedInputView.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/AbstractPagedInputView.java
index c6ca7b9..35badef 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/AbstractPagedInputView.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/AbstractPagedInputView.java
@@ -222,6 +222,7 @@ public abstract class AbstractPagedInputView implements DataInputView {
try {
advance();
}catch(EOFException eof){
+ this.positionInSegment += toRead;
return bytesRead;
}
remaining = this.limitInSegment - this.positionInSegment;
@@ -244,8 +245,8 @@ public abstract class AbstractPagedInputView implements DataInputView {
public void readFully(byte[] b, int off, int len) throws IOException {
int bytesRead = read(b,off,len);
- if(bytesRead == -1){
- throw new EOFException("There is no more data left in the DataInputView.");
+ if(bytesRead < len){
+ throw new EOFException("There is no enough data left in the DataInputView.");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7b6295d9/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/PagedViewsTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/PagedViewsTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/PagedViewsTest.java
index 817c0e6..e128344 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/PagedViewsTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/PagedViewsTest.java
@@ -21,10 +21,13 @@ import eu.stratosphere.runtime.io.serialization.types.SerializationTestTypeFacto
import eu.stratosphere.runtime.io.serialization.types.Util;
import org.junit.Test;
+import java.io.EOFException;
import java.io.IOException;
import java.util.*;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class PagedViewsTest {
@@ -69,6 +72,260 @@ public class PagedViewsTest {
}
}
+ @Test
+ public void testReadFully() {
+ int bufferSize = 100;
+ byte[] expected = new byte[bufferSize];
+ new Random().nextBytes(expected);
+
+ TestOutputView outputView = new TestOutputView(bufferSize);
+
+ try {
+ outputView.write(expected);
+ }catch(Exception e){
+ e.printStackTrace();
+ fail("Unexpected exception: Could not write to TestOutputView.");
+ }
+
+ outputView.close();
+
+ TestInputView inputView = new TestInputView(outputView.segments);
+ byte[] buffer = new byte[bufferSize];
+
+ try {
+ inputView.readFully(buffer);
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail("Unexpected exception: Could not read TestInputView.");
+ }
+
+ assertEquals(inputView.getCurrentPositionInSegment(), bufferSize);
+ assertArrayEquals(expected, buffer);
+ }
+
+ @Test
+ public void testReadFullyAcrossSegments() {
+ int bufferSize = 100;
+ int segmentSize = 30;
+ byte[] expected = new byte[bufferSize];
+ new Random().nextBytes(expected);
+
+ TestOutputView outputView = new TestOutputView(segmentSize);
+
+ try {
+ outputView.write(expected);
+ }catch(Exception e){
+ e.printStackTrace();
+ fail("Unexpected exception: Could not write to TestOutputView.");
+ }
+
+ outputView.close();
+
+ TestInputView inputView = new TestInputView(outputView.segments);
+ byte[] buffer = new byte[bufferSize];
+
+ try {
+ inputView.readFully(buffer);
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail("Unexpected exception: Could not read TestInputView.");
+ }
+
+ assertEquals(inputView.getCurrentPositionInSegment(), bufferSize % segmentSize);
+ assertArrayEquals(expected, buffer);
+ }
+
+ @Test
+ public void testReadAcrossSegments() {
+ int bufferSize = 100;
+ int bytes2Write = 75;
+ int segmentSize = 30;
+ byte[] expected = new byte[bytes2Write];
+ new Random().nextBytes(expected);
+
+ TestOutputView outputView = new TestOutputView(segmentSize);
+
+ try {
+ outputView.write(expected);
+ }catch(Exception e){
+ e.printStackTrace();
+ fail("Unexpected exception: Could not write to TestOutputView.");
+ }
+
+ outputView.close();
+
+ TestInputView inputView = new TestInputView(outputView.segments);
+ byte[] buffer = new byte[bufferSize];
+ int bytesRead = 0;
+
+ try {
+ bytesRead = inputView.read(buffer);
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail("Unexpected exception: Could not read TestInputView.");
+ }
+
+ assertEquals(bytes2Write, bytesRead);
+ assertEquals(inputView.getCurrentPositionInSegment(), bytes2Write % segmentSize);
+
+ byte[] tempBuffer = new byte[bytesRead];
+ System.arraycopy(buffer,0,tempBuffer,0,bytesRead);
+ assertArrayEquals(expected, tempBuffer);
+ }
+
+ @Test
+ public void testEmptyingInputView() {
+ int bufferSize = 100;
+ int bytes2Write = 75;
+ int segmentSize = 30;
+ byte[] expected = new byte[bytes2Write];
+ new Random().nextBytes(expected);
+
+ TestOutputView outputView = new TestOutputView(segmentSize);
+
+ try {
+ outputView.write(expected);
+ }catch(Exception e){
+ e.printStackTrace();
+ fail("Unexpected exception: Could not write to TestOutputView.");
+ }
+
+ outputView.close();
+
+ TestInputView inputView = new TestInputView(outputView.segments);
+ byte[] buffer = new byte[bufferSize];
+ int bytesRead = 0;
+
+ try {
+ bytesRead = inputView.read(buffer);
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail("Unexpected exception: Could not read TestInputView.");
+ }
+
+ assertEquals(bytes2Write, bytesRead);
+
+ byte[] tempBuffer = new byte[bytesRead];
+ System.arraycopy(buffer,0,tempBuffer,0,bytesRead);
+ assertArrayEquals(expected, tempBuffer);
+
+ try{
+ bytesRead = inputView.read(buffer);
+ }catch(IOException e){
+ e.printStackTrace();
+ fail("Unexpected exception: Input view should be empty and thus return -1.");
+ }
+
+ assertEquals(-1, bytesRead);
+ assertEquals(inputView.getCurrentPositionInSegment(), bytes2Write % segmentSize);
+ }
+
+ @Test
+ public void testReadFullyWithNotEnoughData() {
+ int bufferSize = 100;
+ int bytes2Write = 99;
+ int segmentSize = 30;
+ byte[] expected = new byte[bytes2Write];
+ new Random().nextBytes(expected);
+
+ TestOutputView outputView = new TestOutputView(segmentSize);
+
+ try {
+ outputView.write(expected);
+ }catch(Exception e){
+ e.printStackTrace();
+ fail("Unexpected exception: Could not write to TestOutputView.");
+ }
+
+ outputView.close();
+
+ TestInputView inputView = new TestInputView(outputView.segments);
+ byte[] buffer = new byte[bufferSize];
+ boolean eofException = false;
+
+ try {
+ inputView.readFully(buffer);
+ }catch(EOFException e){
+ //Expected exception
+ eofException = true;
+ }
+ catch (IOException e) {
+ e.printStackTrace();
+ fail("Unexpected exception: Could not read TestInputView.");
+ }
+
+ assertTrue("EOFException should have occurred.", eofException);
+
+ int bytesRead = 0;
+
+ try{
+ bytesRead =inputView.read(buffer);
+ }catch(Exception e){
+ e.printStackTrace();
+ fail("Unexpected exception: Could not read TestInputView.");
+ }
+
+ assertEquals(-1, bytesRead);
+ }
+
+ @Test
+ public void testReadFullyWithOffset(){
+ int bufferSize = 100;
+ int segmentSize = 30;
+ byte[] expected = new byte[bufferSize];
+ new Random().nextBytes(expected);
+
+ TestOutputView outputView = new TestOutputView(segmentSize);
+
+ try {
+ outputView.write(expected);
+ }catch(Exception e){
+ e.printStackTrace();
+ fail("Unexpected exception: Could not write to TestOutputView.");
+ }
+
+ outputView.close();
+
+ TestInputView inputView = new TestInputView(outputView.segments);
+ byte[] buffer = new byte[2*bufferSize];
+
+ try {
+ inputView.readFully(buffer, bufferSize, bufferSize);
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail("Unexpected exception: Could not read TestInputView.");
+ }
+
+ assertEquals(inputView.getCurrentPositionInSegment(), bufferSize % segmentSize);
+ byte[] tempBuffer = new byte[bufferSize];
+ System.arraycopy(buffer, bufferSize, tempBuffer,0, bufferSize);
+ assertArrayEquals(expected, tempBuffer);
+ }
+
+ @Test
+ public void testReadFullyEmptyView(){
+ int segmentSize = 30;
+ TestOutputView outputView = new TestOutputView(segmentSize);
+ outputView.close();
+
+ TestInputView inputView = new TestInputView(outputView.segments);
+ byte[] buffer = new byte[segmentSize];
+ boolean eofException = false;
+
+ try{
+ inputView.readFully(buffer);
+ }catch(EOFException e){
+ //expected Exception
+ eofException = true;
+ }catch(Exception e){
+ e.printStackTrace();
+ fail("Unexpected exception: Could not read TestInputView.");
+ }
+
+ assertTrue("EOFException expected.", eofException);
+ }
+
+
private static void testSequenceOfTypes(Iterable<SerializationTestType> sequence, int segmentSize) throws Exception {
List<SerializationTestType> elements = new ArrayList<SerializationTestType>(512);
@@ -148,7 +405,7 @@ public class PagedViewsTest {
if (num < segments.size()) {
return segments.get(num).segment;
} else {
- return null;
+ throw new EOFException();
}
}