You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/07/08 14:18:27 UTC

[5/5] git commit: [FLINK-994] Added test cases for AbstractPagedInputView, fixed problem with hbase.

[FLINK-994] Added test cases for AbstractPagedInputView, fixed problem with hbase.

This closes #53


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/7b6295d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/7b6295d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/7b6295d9

Branch: refs/heads/master
Commit: 7b6295d9829af1e94bea106dc37a169a39895d92
Parents: 98e659e
Author: Till Rohrmann <ti...@gmail.com>
Authored: Mon Jul 7 16:20:07 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jul 8 12:37:41 2014 +0200

----------------------------------------------------------------------
 .../addons/hbase/TableInputSplit.java           |   8 +-
 .../addons/hbase/common/HBaseKey.java           |   8 +-
 .../addons/hbase/common/HBaseResult.java        |   8 +-
 .../memorymanager/AbstractPagedInputView.java   |   5 +-
 .../io/serialization/PagedViewsTest.java        | 259 ++++++++++++++++++-
 5 files changed, 273 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7b6295d9/stratosphere-addons/hbase/src/main/java/eu/stratosphere/addons/hbase/TableInputSplit.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/hbase/src/main/java/eu/stratosphere/addons/hbase/TableInputSplit.java b/stratosphere-addons/hbase/src/main/java/eu/stratosphere/addons/hbase/TableInputSplit.java
index 382befa..bcf5d00 100644
--- a/stratosphere-addons/hbase/src/main/java/eu/stratosphere/addons/hbase/TableInputSplit.java
+++ b/stratosphere-addons/hbase/src/main/java/eu/stratosphere/addons/hbase/TableInputSplit.java
@@ -13,10 +13,10 @@
 
 package eu.stratosphere.addons.hbase;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
+import eu.stratosphere.core.memory.DataOutputView;
+import eu.stratosphere.core.memory.DataInputView;
 import eu.stratosphere.core.io.LocatableInputSplit;
 
 /**
@@ -103,7 +103,7 @@ public class TableInputSplit extends LocatableInputSplit {
 
 
 	@Override
-	public void write(final DataOutput out) throws IOException {
+	public void write(final DataOutputView out) throws IOException {
 
 		super.write(out);
 
@@ -134,7 +134,7 @@ public class TableInputSplit extends LocatableInputSplit {
 
 
 	@Override
-	public void read(final DataInput in) throws IOException {
+	public void read(final DataInputView in) throws IOException {
 
 		super.read(in);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7b6295d9/stratosphere-addons/hbase/src/main/java/eu/stratosphere/addons/hbase/common/HBaseKey.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/hbase/src/main/java/eu/stratosphere/addons/hbase/common/HBaseKey.java b/stratosphere-addons/hbase/src/main/java/eu/stratosphere/addons/hbase/common/HBaseKey.java
index 700db05..eba09cd 100644
--- a/stratosphere-addons/hbase/src/main/java/eu/stratosphere/addons/hbase/common/HBaseKey.java
+++ b/stratosphere-addons/hbase/src/main/java/eu/stratosphere/addons/hbase/common/HBaseKey.java
@@ -13,12 +13,12 @@
 
 package eu.stratosphere.addons.hbase.common;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
 import eu.stratosphere.types.Key;
 
 /**
@@ -52,12 +52,12 @@ public class HBaseKey implements Key<HBaseKey> {
 	// --------------------------------------------------------------------------------------------
 	
 	@Override
-	public void write(DataOutput out) throws IOException {
+	public void write(DataOutputView out) throws IOException {
 		this.writable.write(out);
 	}
 
 	@Override
-	public void read(DataInput in) throws IOException {
+	public void read(DataInputView in) throws IOException {
 		this.writable.readFields(in);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7b6295d9/stratosphere-addons/hbase/src/main/java/eu/stratosphere/addons/hbase/common/HBaseResult.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/hbase/src/main/java/eu/stratosphere/addons/hbase/common/HBaseResult.java b/stratosphere-addons/hbase/src/main/java/eu/stratosphere/addons/hbase/common/HBaseResult.java
index a8a2df9..a827551 100644
--- a/stratosphere-addons/hbase/src/main/java/eu/stratosphere/addons/hbase/common/HBaseResult.java
+++ b/stratosphere-addons/hbase/src/main/java/eu/stratosphere/addons/hbase/common/HBaseResult.java
@@ -13,12 +13,12 @@
 
 package eu.stratosphere.addons.hbase.common;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.hadoop.hbase.client.Result;
 
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
 import eu.stratosphere.types.Value;
 
 public class HBaseResult implements Value {
@@ -53,12 +53,12 @@ public class HBaseResult implements Value {
 	}
 	
 	@Override
-	public void read(DataInput in) throws IOException {
+	public void read(DataInputView in) throws IOException {
 		this.result.readFields(in);
 	}
 	
 	@Override
-	public void write(DataOutput out) throws IOException {
+	public void write(DataOutputView out) throws IOException {
 		this.result.write(out);	
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7b6295d9/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/AbstractPagedInputView.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/AbstractPagedInputView.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/AbstractPagedInputView.java
index c6ca7b9..35badef 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/AbstractPagedInputView.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/AbstractPagedInputView.java
@@ -222,6 +222,7 @@ public abstract class AbstractPagedInputView implements DataInputView {
 					try {
 						advance();
 					}catch(EOFException eof){
+						this.positionInSegment += toRead;
 						return bytesRead;
 					}
 					remaining = this.limitInSegment - this.positionInSegment;
@@ -244,8 +245,8 @@ public abstract class AbstractPagedInputView implements DataInputView {
 	public void readFully(byte[] b, int off, int len) throws IOException {
 		int bytesRead = read(b,off,len);
 
-		if(bytesRead == -1){
-			throw new EOFException("There is no more data left in the DataInputView.");
+		if(bytesRead < len){
+			throw new EOFException("There is no enough data left in the DataInputView.");
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7b6295d9/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/PagedViewsTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/PagedViewsTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/PagedViewsTest.java
index 817c0e6..e128344 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/PagedViewsTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/PagedViewsTest.java
@@ -21,10 +21,13 @@ import eu.stratosphere.runtime.io.serialization.types.SerializationTestTypeFacto
 import eu.stratosphere.runtime.io.serialization.types.Util;
 import org.junit.Test;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.util.*;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class PagedViewsTest {
@@ -69,6 +72,260 @@ public class PagedViewsTest {
 		}
 	}
 
+	@Test
+	public void testReadFully() {
+		int bufferSize = 100;
+		byte[] expected = new byte[bufferSize];
+		new Random().nextBytes(expected);
+
+		TestOutputView outputView = new TestOutputView(bufferSize);
+
+		try {
+			outputView.write(expected);
+		}catch(Exception e){
+			e.printStackTrace();
+			fail("Unexpected exception: Could not write to TestOutputView.");
+		}
+
+		outputView.close();
+
+		TestInputView inputView = new TestInputView(outputView.segments);
+		byte[] buffer = new byte[bufferSize];
+
+		try {
+			inputView.readFully(buffer);
+		} catch (IOException e) {
+			e.printStackTrace();
+			fail("Unexpected exception: Could not read TestInputView.");
+		}
+
+		assertEquals(inputView.getCurrentPositionInSegment(), bufferSize);
+		assertArrayEquals(expected, buffer);
+	}
+
+	@Test
+	public void testReadFullyAcrossSegments() {
+		int bufferSize = 100;
+		int segmentSize = 30;
+		byte[] expected = new byte[bufferSize];
+		new Random().nextBytes(expected);
+
+		TestOutputView outputView = new TestOutputView(segmentSize);
+
+		try {
+			outputView.write(expected);
+		}catch(Exception e){
+			e.printStackTrace();
+			fail("Unexpected exception: Could not write to TestOutputView.");
+		}
+
+		outputView.close();
+
+		TestInputView inputView = new TestInputView(outputView.segments);
+		byte[] buffer = new byte[bufferSize];
+
+		try {
+			inputView.readFully(buffer);
+		} catch (IOException e) {
+			e.printStackTrace();
+			fail("Unexpected exception: Could not read TestInputView.");
+		}
+
+		assertEquals(inputView.getCurrentPositionInSegment(), bufferSize % segmentSize);
+		assertArrayEquals(expected, buffer);
+	}
+
+	@Test
+	public void testReadAcrossSegments() {
+		int bufferSize = 100;
+		int bytes2Write = 75;
+		int segmentSize = 30;
+		byte[] expected = new byte[bytes2Write];
+		new Random().nextBytes(expected);
+
+		TestOutputView outputView = new TestOutputView(segmentSize);
+
+		try {
+			outputView.write(expected);
+		}catch(Exception e){
+			e.printStackTrace();
+			fail("Unexpected exception: Could not write to TestOutputView.");
+		}
+
+		outputView.close();
+
+		TestInputView inputView = new TestInputView(outputView.segments);
+		byte[] buffer = new byte[bufferSize];
+		int bytesRead = 0;
+
+		try {
+			bytesRead = inputView.read(buffer);
+		} catch (IOException e) {
+			e.printStackTrace();
+			fail("Unexpected exception: Could not read TestInputView.");
+		}
+
+		assertEquals(bytes2Write, bytesRead);
+		assertEquals(inputView.getCurrentPositionInSegment(), bytes2Write % segmentSize);
+
+		byte[] tempBuffer = new byte[bytesRead];
+		System.arraycopy(buffer,0,tempBuffer,0,bytesRead);
+		assertArrayEquals(expected, tempBuffer);
+	}
+
+	@Test
+	public void testEmptyingInputView() {
+		int bufferSize = 100;
+		int bytes2Write = 75;
+		int segmentSize = 30;
+		byte[] expected = new byte[bytes2Write];
+		new Random().nextBytes(expected);
+
+		TestOutputView outputView = new TestOutputView(segmentSize);
+
+		try {
+			outputView.write(expected);
+		}catch(Exception e){
+			e.printStackTrace();
+			fail("Unexpected exception: Could not write to TestOutputView.");
+		}
+
+		outputView.close();
+
+		TestInputView inputView = new TestInputView(outputView.segments);
+		byte[] buffer = new byte[bufferSize];
+		int bytesRead = 0;
+
+		try {
+			bytesRead = inputView.read(buffer);
+		} catch (IOException e) {
+			e.printStackTrace();
+			fail("Unexpected exception: Could not read TestInputView.");
+		}
+
+		assertEquals(bytes2Write, bytesRead);
+
+		byte[] tempBuffer = new byte[bytesRead];
+		System.arraycopy(buffer,0,tempBuffer,0,bytesRead);
+		assertArrayEquals(expected, tempBuffer);
+
+		try{
+			bytesRead = inputView.read(buffer);
+		}catch(IOException e){
+			e.printStackTrace();
+			fail("Unexpected exception: Input view should be empty and thus return -1.");
+		}
+
+		assertEquals(-1, bytesRead);
+		assertEquals(inputView.getCurrentPositionInSegment(), bytes2Write % segmentSize);
+	}
+
+	@Test
+	public void testReadFullyWithNotEnoughData() {
+		int bufferSize = 100;
+		int bytes2Write = 99;
+		int segmentSize = 30;
+		byte[] expected = new byte[bytes2Write];
+		new Random().nextBytes(expected);
+
+		TestOutputView outputView = new TestOutputView(segmentSize);
+
+		try {
+			outputView.write(expected);
+		}catch(Exception e){
+			e.printStackTrace();
+			fail("Unexpected exception: Could not write to TestOutputView.");
+		}
+
+		outputView.close();
+
+		TestInputView inputView = new TestInputView(outputView.segments);
+		byte[] buffer = new byte[bufferSize];
+		boolean eofException = false;
+
+		try {
+			inputView.readFully(buffer);
+		}catch(EOFException e){
+			//Expected exception
+			eofException = true;
+		}
+		catch (IOException e) {
+			e.printStackTrace();
+			fail("Unexpected exception: Could not read TestInputView.");
+		}
+
+		assertTrue("EOFException should have occurred.", eofException);
+
+		int bytesRead = 0;
+
+		try{
+			bytesRead =inputView.read(buffer);
+		}catch(Exception e){
+			e.printStackTrace();
+			fail("Unexpected exception: Could not read TestInputView.");
+		}
+
+		assertEquals(-1, bytesRead);
+	}
+
+	@Test
+	public void testReadFullyWithOffset(){
+		int bufferSize = 100;
+		int segmentSize = 30;
+		byte[] expected = new byte[bufferSize];
+		new Random().nextBytes(expected);
+
+		TestOutputView outputView = new TestOutputView(segmentSize);
+
+		try {
+			outputView.write(expected);
+		}catch(Exception e){
+			e.printStackTrace();
+			fail("Unexpected exception: Could not write to TestOutputView.");
+		}
+
+		outputView.close();
+
+		TestInputView inputView = new TestInputView(outputView.segments);
+		byte[] buffer = new byte[2*bufferSize];
+
+		try {
+			inputView.readFully(buffer, bufferSize, bufferSize);
+		} catch (IOException e) {
+			e.printStackTrace();
+			fail("Unexpected exception: Could not read TestInputView.");
+		}
+
+		assertEquals(inputView.getCurrentPositionInSegment(), bufferSize % segmentSize);
+		byte[] tempBuffer = new byte[bufferSize];
+		System.arraycopy(buffer, bufferSize, tempBuffer,0, bufferSize);
+		assertArrayEquals(expected, tempBuffer);
+	}
+
+	@Test
+	public void testReadFullyEmptyView(){
+		int segmentSize = 30;
+		TestOutputView outputView = new TestOutputView(segmentSize);
+		outputView.close();
+
+		TestInputView inputView = new TestInputView(outputView.segments);
+		byte[] buffer = new byte[segmentSize];
+		boolean eofException = false;
+
+		try{
+			inputView.readFully(buffer);
+		}catch(EOFException e){
+			//expected Exception
+			eofException = true;
+		}catch(Exception e){
+			e.printStackTrace();
+			fail("Unexpected exception: Could not read TestInputView.");
+		}
+
+		assertTrue("EOFException expected.", eofException);
+	}
+
+
 	private static void testSequenceOfTypes(Iterable<SerializationTestType> sequence, int segmentSize) throws Exception {
 
 		List<SerializationTestType> elements = new ArrayList<SerializationTestType>(512);
@@ -148,7 +405,7 @@ public class PagedViewsTest {
 			if (num < segments.size()) {
 				return segments.get(num).segment;
 			} else {
-				return null;
+				throw new EOFException();
 			}
 		}