You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2014/11/19 21:19:42 UTC
[1/2] incubator-flink git commit: [FLINK-933] Add primitive input
format to read a sequence of primitives
Repository: incubator-flink
Updated Branches:
refs/heads/master ae505adb8 -> 3ac4df81e
[FLINK-933] Add primitive input format to read a sequence of primitives
This closes #47
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d640b6c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d640b6c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d640b6c5
Branch: refs/heads/master
Commit: d640b6c5a49b3f3cae7c7a62a2a5e87bc4c83fe8
Parents: ae505ad
Author: mingliang <qm...@gmail.com>
Authored: Wed Jun 25 18:54:36 2014 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 19 18:01:06 2014 +0100
----------------------------------------------------------------------
.../flink/api/java/ExecutionEnvironment.java | 36 +++-
.../flink/api/java/io/PrimitiveInputFormat.java | 79 +++++++++
.../api/java/io/PrimitiveInputFormatTest.java | 167 +++++++++++++++++++
.../flink/api/scala/ExecutionEnvironment.scala | 22 +++
4 files changed, 303 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d640b6c5/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 2f9f661..5c85408 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -41,6 +41,7 @@ import org.apache.flink.api.java.io.CollectionInputFormat;
import org.apache.flink.api.java.io.CsvReader;
import org.apache.flink.api.java.io.IteratorInputFormat;
import org.apache.flink.api.java.io.ParallelIteratorInputFormat;
+import org.apache.flink.api.java.io.PrimitiveInputFormat;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.io.TextValueInputFormat;
import org.apache.flink.api.java.operators.DataSink;
@@ -267,7 +268,40 @@ public abstract class ExecutionEnvironment {
format.setSkipInvalidLines(skipInvalidLines);
return new DataSource<StringValue>(this, format, new ValueTypeInfo<StringValue>(StringValue.class), Utils.getCallLocationName());
}
-
+
+ // ----------------------------------- Primitive Input Format ---------------------------------------
+
+ /**
+ * Creates a DataSet that represents the primitive type produced by reading the given file line wise.
+ * This method is similar to {@link #readCsvFile(String)} with single field, but it produces a DataSet not through
+ * {@link org.apache.flink.api.java.tuple.Tuple1}.
+ *
+ * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
+ * @param typeClass The primitive type class to be read.
+ * @return A DataSet that represents the data read from the given file as primitive type.
+ */
+ public <X> DataSource<X> readFileOfPrimitives(String filePath, Class<X> typeClass) {
+ Validate.notNull(filePath, "The file path may not be null.");
+
+ return new DataSource<X>(this, new PrimitiveInputFormat<X>(new Path(filePath), typeClass), TypeExtractor.getForClass(typeClass));
+ }
+
+ /**
+ * Creates a DataSet that represents the primitive type produced by reading the given file in delimited way.
+ * This method is similar to {@link #readCsvFile(String)} with single field, but it produces a DataSet not through
+ * {@link org.apache.flink.api.java.tuple.Tuple1}.
+ *
+ * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
+ * @param delimiter The delimiter of the given file.
+ * @param typeClass The primitive type class to be read.
+ * @return A DataSet that represents the data read from the given file as primitive type.
+ */
+ public <X> DataSource<X> readFileOfPrimitives(String filePath, String delimiter, Class<X> typeClass) {
+ Validate.notNull(filePath, "The file path may not be null.");
+
+ return new DataSource<X>(this, new PrimitiveInputFormat<X>(new Path(filePath), delimiter, typeClass), TypeExtractor.getForClass(typeClass));
+ }
+
// ----------------------------------- CSV Input Format ---------------------------------------
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d640b6c5/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
new file mode 100644
index 0000000..b3648e9
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.api.common.io.DelimitedInputFormat;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+
+/**
+ * An input format that reads single field primitive data from a given file. The difference between this and
+ * {@link org.apache.flink.api.java.io.CsvInputFormat} is that it won't go through {@link org.apache.flink.api.java.tuple.Tuple1}.
+ */
+public class PrimitiveInputFormat<OT> extends DelimitedInputFormat<OT> {
+
+ private static final long serialVersionUID = 1L;
+
+ private Class<OT> primitiveClass;
+
+ private static final byte CARRIAGE_RETURN = (byte) '\r';
+
+ private static final byte NEW_LINE = (byte) '\n';
+
+ private transient FieldParser<OT> parser;
+
+
+ public PrimitiveInputFormat(Path filePath, Class<OT> primitiveClass) {
+ super(filePath);
+ this.primitiveClass = primitiveClass;
+ }
+
+ public PrimitiveInputFormat(Path filePath, String delimiter, Class<OT> primitiveClass) {
+ super(filePath);
+ this.primitiveClass = primitiveClass;
+ this.setDelimiter(delimiter);
+ }
+
+ @Override
+ public void open(FileInputSplit split) throws IOException {
+ super.open(split);
+ Class<? extends FieldParser<OT>> parserType = FieldParser.getParserForType(primitiveClass);
+ if (parserType == null) {
+ throw new IllegalArgumentException("The type '" + primitiveClass.getName() + "' is not supported for the primitive input format.");
+ }
+ parser = InstantiationUtil.instantiate(parserType, FieldParser.class);
+ }
+
+ @Override
+ public OT readRecord(OT reuse, byte[] bytes, int offset, int numBytes) {
+ //Check if \n is used as delimiter and the end of this line is a \r, then remove \r from the line
+ if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == NEW_LINE
+ && offset+numBytes >= 1 && bytes[offset+numBytes-1] == CARRIAGE_RETURN){
+ numBytes -= 1;
+ }
+
+ //Null character as delimiter is used because there's only 1 field to be parsed
+ parser.parseField(bytes, offset, numBytes + offset, '\0', reuse);
+ return parser.getLastResult();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d640b6c5/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java
new file mode 100644
index 0000000..c7b8627
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+
+public class PrimitiveInputFormatTest {
+
+ private static final Path PATH = new Path("an/ignored/file/");
+
+
+ @Test
+ public void testStringInput() {
+ try {
+ final String fileContent = "abc||def||||";
+ final FileInputSplit split = createInputSplit(fileContent);
+
+ final PrimitiveInputFormat<String> format = new PrimitiveInputFormat<String>(PATH, "||", String.class);
+
+ final Configuration parameters = new Configuration();
+ format.configure(parameters);
+ format.open(split);
+
+ String result = null;
+
+ result = format.nextRecord(result);
+ assertEquals("abc", result);
+
+ result = format.nextRecord(result);
+ assertEquals("def", result);
+
+ result = format.nextRecord(result);
+ assertEquals("", result);
+
+ result = format.nextRecord(result);
+ assertNull(result);
+ assertTrue(format.reachedEnd());
+ }
+ catch (Exception ex) {
+ ex.printStackTrace();
+ fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
+ }
+ }
+
+
+
+ @Test
+ public void testIntegerInput() throws IOException {
+ try {
+ final String fileContent = "111|222|";
+ final FileInputSplit split = createInputSplit(fileContent);
+
+ final PrimitiveInputFormat<Integer> format = new PrimitiveInputFormat<Integer>(PATH,"|", Integer.class);
+
+ format.configure(new Configuration());
+ format.open(split);
+
+ Integer result = null;
+ result = format.nextRecord(result);
+ assertEquals(Integer.valueOf(111), result);
+
+ result = format.nextRecord(result);
+ assertEquals(Integer.valueOf(222), result);
+
+ result = format.nextRecord(result);
+ assertNull(result);
+ assertTrue(format.reachedEnd());
+ }
+ catch (Exception ex) {
+ fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
+ }
+ }
+
+ @Test
+ public void testDoubleInputLinewise() throws IOException {
+ try {
+ final String fileContent = "1.21\n2.23\n";
+ final FileInputSplit split = createInputSplit(fileContent);
+
+ final PrimitiveInputFormat<Double> format = new PrimitiveInputFormat<Double>(PATH, Double.class);
+
+ format.configure(new Configuration());
+ format.open(split);
+
+ Double result = null;
+ result = format.nextRecord(result);
+ assertEquals(Double.valueOf(1.21), result);
+
+ result = format.nextRecord(result);
+ assertEquals(Double.valueOf(2.23), result);
+
+ result = format.nextRecord(result);
+ assertNull(result);
+ assertTrue(format.reachedEnd());
+ }
+ catch (Exception ex) {
+ fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
+ }
+ }
+
+ @Test
+ public void testRemovingTrailingCR() {
+ try {
+ String first = "First line";
+ String second = "Second line";
+ String fileContent = first + "\r\n" + second + "\r\n";
+ final FileInputSplit split = createInputSplit(fileContent);
+
+ final PrimitiveInputFormat<String> format = new PrimitiveInputFormat<String>(PATH ,String.class);
+
+ format.configure(new Configuration());
+ format.open(split);
+
+ String result = null;
+
+ result = format.nextRecord(result);
+ assertEquals(first, result);
+
+ result = format.nextRecord(result);
+ assertEquals(second, result);
+ }
+ catch (Exception ex) {
+ fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
+ }
+ }
+
+ private FileInputSplit createInputSplit(String content) throws IOException {
+ File tempFile = File.createTempFile("test_contents", "tmp");
+ tempFile.deleteOnExit();
+
+ FileWriter wrt = new FileWriter(tempFile);
+ wrt.write(content);
+ wrt.close();
+
+ return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] {"localhost"});
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d640b6c5/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index ee92fd6..cf556c6 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -191,6 +191,28 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
}
/**
+ * Creates a DataSet that represents the primitive type produced by reading the
+ * given file in delimited way.This method is similar to [[readCsvFile]] with
+ * single field, but it produces a DataSet not through Tuple.
+ * The type parameter must be used to specify the primitive type.
+ *
+ * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or
+ * "hdfs://host:port/file/path").
+ * @param delimiter The string that separates primitives , defaults to newline.
+ */
+ def readFileOfPrimitives[T : ClassTag : TypeInformation](
+ filePath : String,
+ delimiter : String = "\n") : DataSet[T] = {
+ Validate.notNull(filePath, "File path must not be null.")
+ val typeInfo = implicitly[TypeInformation[T]]
+ val datasource = new DataSource[T](
+ javaEnv,
+ new PrimitiveInputFormat(new Path(filePath), delimiter, typeInfo.getTypeClass),
+ typeInfo)
+ wrap(datasource)
+ }
+
+ /**
* Creates a new DataSource by reading the specified file using the custom
* [[org.apache.flink.api.common.io.FileInputFormat]].
*/
[2/2] incubator-flink git commit: [FLINK-993] Primitive input format
fails for invalid input. Adapted documentation.
Posted by fh...@apache.org.
[FLINK-993] Primitive input format fails for invalid input. Adapted documentation.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/3ac4df81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/3ac4df81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/3ac4df81
Branch: refs/heads/master
Commit: 3ac4df81e388f38432390fe5761a0ca69918db67
Parents: d640b6c
Author: Fabian Hueske <fh...@apache.org>
Authored: Wed Nov 12 18:02:59 2014 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 19 18:01:23 2014 +0100
----------------------------------------------------------------------
docs/programming_guide.md | 1 +
.../api/common/io/DelimitedInputFormat.java | 8 +++----
.../flink/api/java/ExecutionEnvironment.java | 4 ++--
.../flink/api/java/io/PrimitiveInputFormat.java | 14 ++++++++-----
.../api/java/io/PrimitiveInputFormatTest.java | 22 ++++++++++++++++++++
.../flink/api/scala/ExecutionEnvironment.scala | 3 ++-
6 files changed, 40 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3ac4df81/docs/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/programming_guide.md b/docs/programming_guide.md
index 937292c..3c909c8 100644
--- a/docs/programming_guide.md
+++ b/docs/programming_guide.md
@@ -1580,6 +1580,7 @@ File-based:
- `readCsvFile(path)` / `CsvInputFormat` - Parses files of comma (or another char) delimited fields.
Returns a DataSet of tuples. Supports the basic java types and their Value counterparts as field
types.
+- `readFileOfPrimitives(path, Class)` / `PrimitiveInputFormat` - Parses files of new-line (or another char sequence) delimited primitive data types such as `String` or `Integer`.
Collection-based:
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3ac4df81/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index 0337d30..52edfa7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -252,16 +252,16 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
// --------------------------------------------------------------------------------------------
/**
- * This function parses the given byte array which represents a serialized records.
- * The parsed content is then returned by setting the pair variables. If the
- * byte array contains invalid content the record can be skipped by returning <tt>false</tt>.
+ * This function parses the given byte array which represents a serialized record.
+ * The function returns a valid record or throws an IOException.
*
* @param reuse An optionally reusable object.
* @param bytes Binary data of serialized records.
* @param offset The offset where to start to read the record data.
* @param numBytes The number of bytes that can be read starting at the offset position.
*
- * @return returns whether the record was successfully deserialized or not.
+ * @return Returns the read record if it was successfully deserialized.
+ * @throws IOException if the record could not be read.
*/
public abstract OT readRecord(OT reuse, byte[] bytes, int offset, int numBytes) throws IOException;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3ac4df81/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 5c85408..cae3157 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -283,7 +283,7 @@ public abstract class ExecutionEnvironment {
public <X> DataSource<X> readFileOfPrimitives(String filePath, Class<X> typeClass) {
Validate.notNull(filePath, "The file path may not be null.");
- return new DataSource<X>(this, new PrimitiveInputFormat<X>(new Path(filePath), typeClass), TypeExtractor.getForClass(typeClass));
+ return new DataSource<X>(this, new PrimitiveInputFormat<X>(new Path(filePath), typeClass), TypeExtractor.getForClass(typeClass), Utils.getCallLocationName());
}
/**
@@ -299,7 +299,7 @@ public abstract class ExecutionEnvironment {
public <X> DataSource<X> readFileOfPrimitives(String filePath, String delimiter, Class<X> typeClass) {
Validate.notNull(filePath, "The file path may not be null.");
- return new DataSource<X>(this, new PrimitiveInputFormat<X>(new Path(filePath), delimiter, typeClass), TypeExtractor.getForClass(typeClass));
+ return new DataSource<X>(this, new PrimitiveInputFormat<X>(new Path(filePath), delimiter, typeClass), TypeExtractor.getForClass(typeClass), Utils.getCallLocationName());
}
// ----------------------------------- CSV Input Format ---------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3ac4df81/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
index b3648e9..e2f9f35 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
@@ -65,15 +65,19 @@ public class PrimitiveInputFormat<OT> extends DelimitedInputFormat<OT> {
}
@Override
- public OT readRecord(OT reuse, byte[] bytes, int offset, int numBytes) {
- //Check if \n is used as delimiter and the end of this line is a \r, then remove \r from the line
+ public OT readRecord(OT reuse, byte[] bytes, int offset, int numBytes) throws IOException {
+ // Check if \n is used as delimiter and the end of this line is a \r, then remove \r from the line
if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == NEW_LINE
&& offset+numBytes >= 1 && bytes[offset+numBytes-1] == CARRIAGE_RETURN){
numBytes -= 1;
}
- //Null character as delimiter is used because there's only 1 field to be parsed
- parser.parseField(bytes, offset, numBytes + offset, '\0', reuse);
- return parser.getLastResult();
+ // Null character as delimiter is used because there's only 1 field to be parsed
+ if (parser.parseField(bytes, offset, numBytes + offset, '\0', reuse) >= 0) {
+ return parser.getLastResult();
+ } else {
+ String s = new String(bytes, offset, numBytes);
+ throw new IOException("Could not parse value: \""+s+"\" as type "+primitiveClass.getSimpleName());
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3ac4df81/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java
index c7b8627..4a92702 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java
@@ -99,6 +99,7 @@ public class PrimitiveInputFormatTest {
fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
}
}
+
@Test
public void testDoubleInputLinewise() throws IOException {
@@ -152,6 +153,27 @@ public class PrimitiveInputFormatTest {
fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
}
}
+
+ @Test(expected = IOException.class)
+ public void testFailingInput() throws IOException {
+
+ final String fileContent = "111|222|asdf|17";
+ final FileInputSplit split = createInputSplit(fileContent);
+
+ final PrimitiveInputFormat<Integer> format = new PrimitiveInputFormat<Integer>(PATH,"|", Integer.class);
+
+ format.configure(new Configuration());
+ format.open(split);
+
+ Integer result = null;
+ result = format.nextRecord(result);
+ assertEquals(Integer.valueOf(111), result);
+
+ result = format.nextRecord(result);
+ assertEquals(Integer.valueOf(222), result);
+
+ result = format.nextRecord(result);
+ }
private FileInputSplit createInputSplit(String content) throws IOException {
File tempFile = File.createTempFile("test_contents", "tmp");
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3ac4df81/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index cf556c6..4792e58 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -208,7 +208,8 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
val datasource = new DataSource[T](
javaEnv,
new PrimitiveInputFormat(new Path(filePath), delimiter, typeInfo.getTypeClass),
- typeInfo)
+ typeInfo,
+ getCallLocationName())
wrap(datasource)
}