You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2014/11/19 21:19:43 UTC

[2/2] incubator-flink git commit: [FLINK-993] Primitive input format fails for invalid input. Adapted documentation.

[FLINK-993] Primitive input format fails for invalid input. Adapted documentation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/3ac4df81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/3ac4df81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/3ac4df81

Branch: refs/heads/master
Commit: 3ac4df81e388f38432390fe5761a0ca69918db67
Parents: d640b6c
Author: Fabian Hueske <fh...@apache.org>
Authored: Wed Nov 12 18:02:59 2014 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 19 18:01:23 2014 +0100

----------------------------------------------------------------------
 docs/programming_guide.md                       |  1 +
 .../api/common/io/DelimitedInputFormat.java     |  8 +++----
 .../flink/api/java/ExecutionEnvironment.java    |  4 ++--
 .../flink/api/java/io/PrimitiveInputFormat.java | 14 ++++++++-----
 .../api/java/io/PrimitiveInputFormatTest.java   | 22 ++++++++++++++++++++
 .../flink/api/scala/ExecutionEnvironment.scala  |  3 ++-
 6 files changed, 40 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3ac4df81/docs/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/programming_guide.md b/docs/programming_guide.md
index 937292c..3c909c8 100644
--- a/docs/programming_guide.md
+++ b/docs/programming_guide.md
@@ -1580,6 +1580,7 @@ File-based:
 - `readCsvFile(path)` / `CsvInputFormat` - Parses files of comma (or another char) delimited fields.
   Returns a DataSet of tuples. Supports the basic java types and their Value counterparts as field
   types.
+- `readFileOfPrimitives(path, Class)` / `PrimitiveInputFormat` - Parses files of new-line (or another char sequence) delimited primitive data types such as `String` or `Integer`. 
 
 Collection-based:
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3ac4df81/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index 0337d30..52edfa7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -252,16 +252,16 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * This function parses the given byte array which represents a serialized records.
-	 * The parsed content is then returned by setting the pair variables. If the
-	 * byte array contains invalid content the record can be skipped by returning <tt>false</tt>.
+	 * This function parses the given byte array which represents a serialized record.
+	 * The function returns a valid record or throws an IOException.
 	 * 
 	 * @param reuse An optionally reusable object.
 	 * @param bytes Binary data of serialized records.
 	 * @param offset The offset where to start to read the record data. 
 	 * @param numBytes The number of bytes that can be read starting at the offset position.
 	 * 
-	 * @return returns whether the record was successfully deserialized or not.
+	 * @return Returns the read record if it was successfully deserialized.
+	 * @throws IOException if the record could not be read.
 	 */
 	public abstract OT readRecord(OT reuse, byte[] bytes, int offset, int numBytes) throws IOException;
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3ac4df81/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 5c85408..cae3157 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -283,7 +283,7 @@ public abstract class ExecutionEnvironment {
 	public <X> DataSource<X> readFileOfPrimitives(String filePath, Class<X> typeClass) {
 		Validate.notNull(filePath, "The file path may not be null.");
 
-		return new DataSource<X>(this, new PrimitiveInputFormat<X>(new Path(filePath), typeClass), TypeExtractor.getForClass(typeClass));
+		return new DataSource<X>(this, new PrimitiveInputFormat<X>(new Path(filePath), typeClass), TypeExtractor.getForClass(typeClass), Utils.getCallLocationName());
 	}
 
 	/**
@@ -299,7 +299,7 @@ public abstract class ExecutionEnvironment {
 	public <X> DataSource<X> readFileOfPrimitives(String filePath, String delimiter, Class<X> typeClass) {
 		Validate.notNull(filePath, "The file path may not be null.");
 
-		return new DataSource<X>(this, new PrimitiveInputFormat<X>(new Path(filePath), delimiter, typeClass), TypeExtractor.getForClass(typeClass));
+		return new DataSource<X>(this, new PrimitiveInputFormat<X>(new Path(filePath), delimiter, typeClass), TypeExtractor.getForClass(typeClass), Utils.getCallLocationName());
 	}
 
 	// ----------------------------------- CSV Input Format ---------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3ac4df81/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
index b3648e9..e2f9f35 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
@@ -65,15 +65,19 @@ public class PrimitiveInputFormat<OT> extends DelimitedInputFormat<OT> {
 	}
 
 	@Override
-	public OT readRecord(OT reuse, byte[] bytes, int offset, int numBytes) {
-		//Check if \n is used as delimiter and the end of this line is a \r, then remove \r from the line
+	public OT readRecord(OT reuse, byte[] bytes, int offset, int numBytes) throws IOException {
+		// Check if \n is used as delimiter and the end of this line is a \r, then remove \r from the line
 		if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == NEW_LINE
 			&& offset+numBytes >= 1 && bytes[offset+numBytes-1] == CARRIAGE_RETURN){
 			numBytes -= 1;
 		}
 
-		//Null character as delimiter is used because there's only 1 field to be parsed
-		parser.parseField(bytes, offset, numBytes + offset, '\0', reuse);
-		return parser.getLastResult();
+		// Null character as delimiter is used because there's only 1 field to be parsed
+		if (parser.parseField(bytes, offset, numBytes + offset, '\0', reuse) >= 0) {
+			return parser.getLastResult();
+		} else {
+			String s = new String(bytes, offset, numBytes);
+			throw new IOException("Could not parse value: \""+s+"\" as type "+primitiveClass.getSimpleName());
+		}
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3ac4df81/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java
index c7b8627..4a92702 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java
@@ -99,6 +99,7 @@ public class PrimitiveInputFormatTest {
 			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
 		}
 	}
+	
 
 	@Test
 	public void testDoubleInputLinewise() throws IOException {
@@ -152,6 +153,27 @@ public class PrimitiveInputFormatTest {
 			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
 		}
 	}
+	
+	@Test(expected = IOException.class)
+	public void testFailingInput() throws IOException {
+		
+		final String fileContent = "111|222|asdf|17";
+		final FileInputSplit split = createInputSplit(fileContent);
+
+		final PrimitiveInputFormat<Integer> format = new PrimitiveInputFormat<Integer>(PATH,"|", Integer.class);
+
+		format.configure(new Configuration());
+		format.open(split);
+
+		Integer result = null;
+		result = format.nextRecord(result);
+		assertEquals(Integer.valueOf(111), result);
+
+		result = format.nextRecord(result);
+		assertEquals(Integer.valueOf(222), result);
+
+		result = format.nextRecord(result);
+	}
 
 	private FileInputSplit createInputSplit(String content) throws IOException {
 		File tempFile = File.createTempFile("test_contents", "tmp");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3ac4df81/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index cf556c6..4792e58 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -208,7 +208,8 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
     val datasource = new DataSource[T](
       javaEnv,
       new PrimitiveInputFormat(new Path(filePath), delimiter, typeInfo.getTypeClass),
-      typeInfo)
+      typeInfo,
+      getCallLocationName())
     wrap(datasource)
   }