You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/09/28 12:23:08 UTC
flink git commit: [FLINK-2690] [api-breaking] [scala api] [java api]
Adds functionality to the CsvInputFormat to find fields defined in a super
class of a Pojo. Refactors CsvInputFormat to share code between this format
and ScalaCsvInputFormat.
Repository: flink
Updated Branches:
refs/heads/master 0b3ca57b4 -> 501a9b085
[FLINK-2690] [api-breaking] [scala api] [java api] Adds functionality to the CsvInputFormat to find fields defined in a super class of a Pojo. Refactors CsvInputFormat to share code between this format and ScalaCsvInputFormat.
This closes #1141.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/501a9b08
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/501a9b08
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/501a9b08
Branch: refs/heads/master
Commit: 501a9b0854995b3de1c293b3391eeb3048daeef0
Parents: 0b3ca57
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Sep 17 15:28:25 2015 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Sep 28 12:22:27 2015 +0200
----------------------------------------------------------------------
.../wordcount/BoltTokenizerWordCountPojo.java | 4 +-
.../BoltTokenizerWordCountWithNames.java | 4 +-
.../api/common/io/GenericCsvInputFormat.java | 1 -
.../flink/api/java/io/CommonCsvInputFormat.java | 258 +++++++++++++++++++
.../flink/api/java/io/CsvInputFormat.java | 211 +--------------
.../flink/api/java/io/CsvInputFormatTest.java | 114 +++++++-
.../scala/operators/ScalaCsvInputFormat.java | 204 ++-------------
.../scala/operators/ScalaCsvOutputFormat.java | 6 +-
.../flink/api/scala/ExecutionEnvironment.scala | 21 +-
.../api/java/common/PlanBinder.java | 11 +-
.../flink/api/scala/io/CsvInputFormatTest.scala | 90 +++++--
11 files changed, 497 insertions(+), 427 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/501a9b08/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java
index befb18f..20e69db 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java
@@ -19,9 +19,9 @@ package org.apache.flink.stormcompatibility.wordcount;
import backtype.storm.topology.IRichBolt;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.io.CsvInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.fs.Path;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
@@ -121,7 +121,7 @@ public class BoltTokenizerWordCountPojo {
private static DataStream<Sentence> getTextDataStream(final StreamExecutionEnvironment env) {
if (fileOutput) {
// read the text file from given input path
- TypeInformation<Sentence> sourceType = TypeExtractor
+ PojoTypeInfo<Sentence> sourceType = (PojoTypeInfo)TypeExtractor
.getForObject(new Sentence(""));
return env.createInput(new CsvInputFormat<Sentence>(new Path(
textPath), CsvInputFormat.DEFAULT_LINE_DELIMITER,
http://git-wip-us.apache.org/repos/asf/flink/blob/501a9b08/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java
index 8483f48..e233da1 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java
@@ -20,11 +20,11 @@ package org.apache.flink.stormcompatibility.wordcount;
import backtype.storm.topology.IRichBolt;
import backtype.storm.tuple.Fields;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.io.CsvInputFormat;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.fs.Path;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
@@ -124,7 +124,7 @@ public class BoltTokenizerWordCountWithNames {
private static DataStream<Tuple1<String>> getTextDataStream(final StreamExecutionEnvironment env) {
if (fileOutput) {
// read the text file from given input path
- TypeInformation<Tuple1<String>> sourceType = TypeExtractor
+ TupleTypeInfo<Tuple1<String>> sourceType = (TupleTypeInfo<Tuple1<String>>)TypeExtractor
.getForObject(new Tuple1<String>(""));
return env.createInput(new CsvInputFormat<Tuple1<String>>(new Path(
textPath), CsvInputFormat.DEFAULT_LINE_DELIMITER,
http://git-wip-us.apache.org/repos/asf/flink/blob/501a9b08/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
index 8d979bb..e68d271 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
@@ -40,7 +40,6 @@ import java.util.ArrayList;
import java.util.Map;
import java.util.TreeMap;
-
public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> {
private static final Logger LOG = LoggerFactory.getLogger(GenericCsvInputFormat.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/501a9b08/flink-java/src/main/java/org/apache/flink/api/java/io/CommonCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CommonCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CommonCsvInputFormat.java
new file mode 100644
index 0000000..444d151
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CommonCsvInputFormat.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.io.GenericCsvInputFormat;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public abstract class CommonCsvInputFormat<OUT> extends GenericCsvInputFormat<OUT> {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+ public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+ protected transient Object[] parsedValues;
+
+ private final Class<OUT> pojoTypeClass;
+
+ private String[] pojoFieldNames;
+
+ private transient PojoTypeInfo<OUT> pojoTypeInfo;
+ private transient Field[] pojoFields;
+
+ public CommonCsvInputFormat(Path filePath, CompositeType<OUT> typeInformation) {
+ this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, typeInformation);
+ }
+
+ public CommonCsvInputFormat(
+ Path filePath,
+ String lineDelimiter,
+ String fieldDelimiter,
+ CompositeType<OUT> compositeTypeInfo) {
+ super(filePath);
+
+ setDelimiter(lineDelimiter);
+ setFieldDelimiter(fieldDelimiter);
+
+ Class<?>[] classes = new Class<?>[compositeTypeInfo.getArity()];
+
+ for (int i = 0; i < compositeTypeInfo.getArity(); i++) {
+ classes[i] = compositeTypeInfo.getTypeAt(i).getTypeClass();
+ }
+
+ setFieldTypes(classes);
+
+ if (compositeTypeInfo instanceof PojoTypeInfo) {
+ pojoTypeInfo = (PojoTypeInfo<OUT>) compositeTypeInfo;
+
+ pojoTypeClass = compositeTypeInfo.getTypeClass();
+ setOrderOfPOJOFields(compositeTypeInfo.getFieldNames());
+ } else {
+ pojoTypeClass = null;
+ pojoFieldNames = null;
+ }
+ }
+
+ public void setOrderOfPOJOFields(String[] fieldNames) {
+ Preconditions.checkNotNull(pojoTypeClass, "Field order can only be specified if output type is a POJO.");
+ Preconditions.checkNotNull(fieldNames);
+
+ int includedCount = 0;
+ for (boolean isIncluded : fieldIncluded) {
+ if (isIncluded) {
+ includedCount++;
+ }
+ }
+
+ Preconditions.checkArgument(includedCount == fieldNames.length, includedCount +
+ " CSV fields and " + fieldNames.length + " POJO fields selected. The number of selected CSV and POJO fields must be equal.");
+
+ for (String field : fieldNames) {
+ Preconditions.checkNotNull(field, "The field name cannot be null.");
+ Preconditions.checkArgument(pojoTypeInfo.getFieldIndex(field) != -1,
+ "Field \""+ field + "\" is not a member of POJO class " + pojoTypeClass.getName());
+ }
+
+ pojoFieldNames = Arrays.copyOfRange(fieldNames, 0, fieldNames.length);
+ }
+
+ public void setFieldTypes(Class<?>... fieldTypes) {
+ if (fieldTypes == null || fieldTypes.length == 0) {
+ throw new IllegalArgumentException("Field types must not be null or empty.");
+ }
+
+ setFieldTypesGeneric(fieldTypes);
+ }
+
+ public void setFields(int[] sourceFieldIndices, Class<?>[] fieldTypes) {
+ Preconditions.checkNotNull(sourceFieldIndices);
+ Preconditions.checkNotNull(fieldTypes);
+
+ checkForMonotonousOrder(sourceFieldIndices, fieldTypes);
+
+ setFieldsGeneric(sourceFieldIndices, fieldTypes);
+ }
+
+ public void setFields(boolean[] sourceFieldMask, Class<?>[] fieldTypes) {
+ Preconditions.checkNotNull(sourceFieldMask);
+ Preconditions.checkNotNull(fieldTypes);
+
+ setFieldsGeneric(sourceFieldMask, fieldTypes);
+ }
+
+ public Class<?>[] getFieldTypes() {
+ return super.getGenericFieldTypes();
+ }
+
+ @Override
+ public void open(FileInputSplit split) throws IOException {
+ super.open(split);
+
+ @SuppressWarnings("unchecked")
+ FieldParser<Object>[] fieldParsers = (FieldParser<Object>[]) getFieldParsers();
+
+ //throw exception if no field parsers are available
+ if (fieldParsers.length == 0) {
+ throw new IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to parse input");
+ }
+
+ // create the value holders
+ this.parsedValues = new Object[fieldParsers.length];
+ for (int i = 0; i < fieldParsers.length; i++) {
+ this.parsedValues[i] = fieldParsers[i].createValue();
+ }
+
+ // left to right evaluation makes access [0] okay
+ // this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default
+ if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) {
+ this.lineDelimiterIsLinebreak = true;
+ }
+
+ // for POJO type
+ if (pojoTypeClass != null) {
+ pojoFields = new Field[pojoFieldNames.length];
+
+ Map<String, Field> allFields = new HashMap<String, Field>();
+
+ findAllFields(pojoTypeClass, allFields);
+
+ for (int i = 0; i < pojoFieldNames.length; i++) {
+ pojoFields[i] = allFields.get(pojoFieldNames[i]);
+
+ if (pojoFields[i] != null) {
+ pojoFields[i].setAccessible(true);
+ } else {
+ throw new RuntimeException("There is no field called \"" + pojoFieldNames[i] + "\" in " + pojoTypeClass.getName());
+ }
+ }
+ }
+
+ this.commentCount = 0;
+ this.invalidLineCount = 0;
+ }
+
+ /**
+ * Finds all declared fields in a class and all its super classes.
+ *
+ * @param clazz Class for which all declared fields are found
+ * @param allFields Map containing all found fields so far
+ */
+ private void findAllFields(Class<?> clazz, Map<String, Field> allFields) {
+ for (Field field: clazz.getDeclaredFields()) {
+ allFields.put(field.getName(), field);
+ }
+
+ if (clazz.getSuperclass() != null) {
+ findAllFields(clazz.getSuperclass(), allFields);
+ }
+ }
+
+ @Override
+ public OUT nextRecord(OUT record) throws IOException {
+ OUT returnRecord = null;
+ do {
+ returnRecord = super.nextRecord(record);
+ } while (returnRecord == null && !reachedEnd());
+
+ return returnRecord;
+ }
+
+ @Override
+ public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws IOException {
+ /*
+ * Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n
+ */
+ //Find windows end line, so find carriage return before the newline
+ if (this.lineDelimiterIsLinebreak == true && numBytes > 0 && bytes[offset + numBytes -1] == '\r' ) {
+ //reduce the number of bytes so that the Carriage return is not taken as data
+ numBytes--;
+ }
+
+ if (commentPrefix != null && commentPrefix.length <= numBytes) {
+ //check record for comments
+ boolean isComment = true;
+ for (int i = 0; i < commentPrefix.length; i++) {
+ if (commentPrefix[i] != bytes[offset + i]) {
+ isComment = false;
+ break;
+ }
+ }
+ if (isComment) {
+ this.commentCount++;
+ return null;
+ }
+ }
+
+ if (parseRecord(parsedValues, bytes, offset, numBytes)) {
+ if (pojoTypeClass == null) {
+ // result type is tuple
+ return createTuple(reuse);
+ } else {
+ // result type is POJO
+ for (int i = 0; i < parsedValues.length; i++) {
+ try {
+ pojoFields[i].set(reuse, parsedValues[i]);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException("Parsed value could not be set in POJO field \"" + pojoFieldNames[i] + "\"", e);
+ }
+ }
+ return reuse;
+ }
+
+ } else {
+ this.invalidLineCount++;
+ return null;
+ }
+ }
+
+ protected abstract OUT createTuple(OUT reuse);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/501a9b08/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
index ee33484..7d86f39 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
@@ -19,224 +19,33 @@
package org.apache.flink.api.java.io;
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.io.GenericCsvInputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.PojoTypeInfo;
-import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.parser.FieldParser;
import org.apache.flink.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.Arrays;
-
-
-public class CsvInputFormat<OUT> extends GenericCsvInputFormat<OUT> {
+public class CsvInputFormat<OUT> extends CommonCsvInputFormat<OUT> {
private static final long serialVersionUID = 1L;
- /**
- * The log.
- */
- private static final Logger LOG = LoggerFactory.getLogger(CsvInputFormat.class);
-
- public static final String DEFAULT_LINE_DELIMITER = "\n";
-
- public static final String DEFAULT_FIELD_DELIMITER = ",";
-
- private transient Object[] parsedValues;
-
- private Class<OUT> pojoTypeClass = null;
- private String[] pojoFieldsName = null;
- private transient Field[] pojoFields = null;
- private transient PojoTypeInfo<OUT> pojoTypeInfo = null;
-
- public CsvInputFormat(Path filePath, TypeInformation<OUT> typeInformation) {
- this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, typeInformation);
+ public CsvInputFormat(Path filePath, CompositeType<OUT> typeInformation) {
+ super(filePath, typeInformation);
}
- public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TypeInformation<OUT> typeInformation) {
- super(filePath);
-
- Preconditions.checkArgument(typeInformation instanceof CompositeType);
- CompositeType<OUT> compositeType = (CompositeType<OUT>) typeInformation;
-
- setDelimiter(lineDelimiter);
- setFieldDelimiter(fieldDelimiter);
-
- Class<?>[] classes = new Class<?>[typeInformation.getArity()];
- for (int i = 0, arity = typeInformation.getArity(); i < arity; i++) {
- classes[i] = compositeType.getTypeAt(i).getTypeClass();
- }
- setFieldTypes(classes);
-
- if (typeInformation instanceof PojoTypeInfo) {
- pojoTypeInfo = (PojoTypeInfo<OUT>) typeInformation;
- pojoTypeClass = typeInformation.getTypeClass();
- pojoFieldsName = compositeType.getFieldNames();
- setOrderOfPOJOFields(pojoFieldsName);
- }
- }
-
- public void setOrderOfPOJOFields(String[] fieldsOrder) {
- Preconditions.checkNotNull(pojoTypeClass, "Field order can only be specified if output type is a POJO.");
- Preconditions.checkNotNull(fieldsOrder);
-
- int includedCount = 0;
- for (boolean isIncluded : fieldIncluded) {
- if (isIncluded) {
- includedCount++;
- }
- }
-
- Preconditions.checkArgument(includedCount == fieldsOrder.length, includedCount +
- " CSV fields and " + fieldsOrder.length + " POJO fields selected. The number of selected CSV and POJO fields must be equal.");
-
- for (String field : fieldsOrder) {
- Preconditions.checkNotNull(field, "The field name cannot be null.");
- Preconditions.checkArgument(pojoTypeInfo.getFieldIndex(field) != -1,
- "Field \""+ field + "\" is not a member of POJO class " + pojoTypeClass.getName());
- }
-
- pojoFieldsName = Arrays.copyOfRange(fieldsOrder, 0, fieldsOrder.length);
- }
-
- public void setFieldTypes(Class<?>... fieldTypes) {
- if (fieldTypes == null || fieldTypes.length == 0) {
- throw new IllegalArgumentException("Field types must not be null or empty.");
- }
-
- setFieldTypesGeneric(fieldTypes);
- }
-
- public void setFields(int[] sourceFieldIndices, Class<?>[] fieldTypes) {
- Preconditions.checkNotNull(sourceFieldIndices);
- Preconditions.checkNotNull(fieldTypes);
-
- checkForMonotonousOrder(sourceFieldIndices, fieldTypes);
-
- setFieldsGeneric(sourceFieldIndices, fieldTypes);
- }
-
- public void setFields(boolean[] sourceFieldMask, Class<?>[] fieldTypes) {
- Preconditions.checkNotNull(sourceFieldMask);
- Preconditions.checkNotNull(fieldTypes);
-
- setFieldsGeneric(sourceFieldMask, fieldTypes);
+ public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, CompositeType<OUT> typeInformation) {
+ super(filePath, lineDelimiter, fieldDelimiter, typeInformation);
}
- public Class<?>[] getFieldTypes() {
- return super.getGenericFieldTypes();
- }
-
@Override
- public void open(FileInputSplit split) throws IOException {
- super.open(split);
-
- @SuppressWarnings("unchecked")
- FieldParser<Object>[] fieldParsers = (FieldParser<Object>[]) getFieldParsers();
-
- //throw exception if no field parsers are available
- if (fieldParsers.length == 0) {
- throw new IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to parse input");
- }
-
- // create the value holders
- this.parsedValues = new Object[fieldParsers.length];
- for (int i = 0; i < fieldParsers.length; i++) {
- this.parsedValues[i] = fieldParsers[i].createValue();
- }
-
- // left to right evaluation makes access [0] okay
- // this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default
- if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) {
- this.lineDelimiterIsLinebreak = true;
+ protected OUT createTuple(OUT reuse) {
+ Tuple result = (Tuple) reuse;
+ for (int i = 0; i < parsedValues.length; i++) {
+ result.setField(parsedValues[i], i);
}
- // for POJO type
- if (pojoTypeClass != null) {
- pojoFields = new Field[pojoFieldsName.length];
- for (int i = 0; i < pojoFieldsName.length; i++) {
- try {
- pojoFields[i] = pojoTypeClass.getDeclaredField(pojoFieldsName[i]);
- pojoFields[i].setAccessible(true);
- } catch (NoSuchFieldException e) {
- throw new RuntimeException("There is no field called \"" + pojoFieldsName[i] + "\" in " + pojoTypeClass.getName(), e);
- }
- }
- }
-
- this.commentCount = 0;
- this.invalidLineCount = 0;
+ return reuse;
}
-
- @Override
- public OUT nextRecord(OUT record) throws IOException {
- OUT returnRecord = null;
- do {
- returnRecord = super.nextRecord(record);
- } while (returnRecord == null && !reachedEnd());
- return returnRecord;
- }
-
- @Override
- public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws IOException {
- /*
- * Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n
- */
- //Find windows end line, so find carriage return before the newline
- if (this.lineDelimiterIsLinebreak == true && numBytes > 0 && bytes[offset + numBytes -1] == '\r' ) {
- //reduce the number of bytes so that the Carriage return is not taken as data
- numBytes--;
- }
-
- if (commentPrefix != null && commentPrefix.length <= numBytes) {
- //check record for comments
- boolean isComment = true;
- for (int i = 0; i < commentPrefix.length; i++) {
- if (commentPrefix[i] != bytes[offset + i]) {
- isComment = false;
- break;
- }
- }
- if (isComment) {
- this.commentCount++;
- return null;
- }
- }
-
- if (parseRecord(parsedValues, bytes, offset, numBytes)) {
- if (pojoTypeClass == null) {
- // result type is tuple
- Tuple result = (Tuple) reuse;
- for (int i = 0; i < parsedValues.length; i++) {
- result.setField(parsedValues[i], i);
- }
- } else {
- // result type is POJO
- for (int i = 0; i < parsedValues.length; i++) {
- try {
- pojoFields[i].set(reuse, parsedValues[i]);
- } catch (IllegalAccessException e) {
- throw new RuntimeException("Parsed value could not be set in POJO field \"" + pojoFieldsName[i] + "\"", e);
- }
- }
- }
- return reuse;
- } else {
- this.invalidLineCount++;
- return null;
- }
- }
-
-
@Override
public String toString() {
return "CSV Input (" + StringUtils.showControlCharacters(String.valueOf(getFieldDelimiter())) + ") " + getFilePath();
http://git-wip-us.apache.org/repos/asf/flink/blob/501a9b08/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index 4f1c339..2c2ffa5 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -22,8 +22,9 @@ package org.apache.flink.api.java.io;
import com.google.common.base.Charsets;
import org.apache.flink.api.common.io.ParseException;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.java.tuple.*;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
@@ -37,6 +38,8 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.List;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
@@ -548,7 +551,7 @@ public class CsvInputFormatTest {
format.setFieldDelimiter("|");
- format.setFields(new int[] {0, 3, 7}, new Class<?>[] {Integer.class, Integer.class, Integer.class});
+ format.setFields(new int[]{0, 3, 7}, new Class<?>[]{Integer.class, Integer.class, Integer.class});
format.configure(new Configuration());
@@ -823,7 +826,7 @@ public class CsvInputFormatTest {
wrt.close();
@SuppressWarnings("unchecked")
- TypeInformation<PojoItem> typeInfo = (TypeInformation<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
+ PojoTypeInfo<PojoItem> typeInfo = (PojoTypeInfo<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
CsvInputFormat<PojoItem> inputFormat = new CsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo);
inputFormat.configure(new Configuration());
@@ -846,7 +849,7 @@ public class CsvInputFormatTest {
wrt.close();
@SuppressWarnings("unchecked")
- TypeInformation<PrivatePojoItem> typeInfo = (TypeInformation<PrivatePojoItem>) TypeExtractor.createTypeInfo(PrivatePojoItem.class);
+ PojoTypeInfo<PrivatePojoItem> typeInfo = (PojoTypeInfo<PrivatePojoItem>) TypeExtractor.createTypeInfo(PrivatePojoItem.class);
CsvInputFormat<PrivatePojoItem> inputFormat = new CsvInputFormat<PrivatePojoItem>(new Path(tempFile.toURI().toString()), typeInfo);
inputFormat.configure(new Configuration());
@@ -882,7 +885,7 @@ public class CsvInputFormatTest {
wrt.close();
@SuppressWarnings("unchecked")
- TypeInformation<PojoItem> typeInfo = (TypeInformation<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
+ PojoTypeInfo<PojoItem> typeInfo = (PojoTypeInfo<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
CsvInputFormat<PojoItem> inputFormat = new CsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo);
inputFormat.setFields(new boolean[]{true, true, true, true}, new Class<?>[]{Integer.class, Double.class, String.class, String.class});
inputFormat.setOrderOfPOJOFields(new String[]{"field1", "field3", "field2", "field4"});
@@ -907,7 +910,7 @@ public class CsvInputFormatTest {
wrt.close();
@SuppressWarnings("unchecked")
- TypeInformation<PojoItem> typeInfo = (TypeInformation<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
+ PojoTypeInfo<PojoItem> typeInfo = (PojoTypeInfo<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
CsvInputFormat<PojoItem> inputFormat = new CsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo);
inputFormat.setFields(new boolean[]{true, false, true, false, true, true}, new Class[]{Integer.class, String
.class, Double.class, String.class});
@@ -932,7 +935,7 @@ public class CsvInputFormatTest {
wrt.close();
@SuppressWarnings("unchecked")
- TypeInformation<PojoItem> typeInfo = (TypeInformation<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
+ PojoTypeInfo<PojoItem> typeInfo = (PojoTypeInfo<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
CsvInputFormat<PojoItem> inputFormat = new CsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo);
inputFormat.setFields(new boolean[]{true, false, false, true}, new Class[]{Integer.class, String.class});
inputFormat.setOrderOfPOJOFields(new String[]{"field1", "field4"});
@@ -956,7 +959,7 @@ public class CsvInputFormatTest {
tempFile.setWritable(true);
@SuppressWarnings("unchecked")
- TypeInformation<PojoItem> typeInfo = (TypeInformation<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
+ PojoTypeInfo<PojoItem> typeInfo = (PojoTypeInfo<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
CsvInputFormat<PojoItem> inputFormat = new CsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo);
try {
@@ -994,7 +997,7 @@ public class CsvInputFormatTest {
writer.write(fileContent);
writer.close();
- TypeInformation<Tuple2<String, String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class);
+ CompositeType<Tuple2<String, String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class);
CsvInputFormat<Tuple2<String, String>> inputFormat = new CsvInputFormat<Tuple2<String, String>>(new Path(tempFile.toURI().toString()), typeInfo);
inputFormat.enableQuotedStringParsing('"');
@@ -1025,7 +1028,7 @@ public class CsvInputFormatTest {
writer.write(fileContent);
writer.close();
- TypeInformation<Tuple2<String, String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class);
+ TupleTypeInfo<Tuple2<String, String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class);
CsvInputFormat<Tuple2<String, String>> inputFormat = new CsvInputFormat<>(new Path(tempFile.toURI().toString()), typeInfo);
inputFormat.enableQuotedStringParsing('"');
@@ -1043,6 +1046,49 @@ public class CsvInputFormatTest {
assertEquals("We are\\\" young", record.f1);
}
+ /**
+ * Tests that the CSV input format can deal with POJOs which are subclasses.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testPojoSubclassType() throws Exception {
+ final String fileContent = "t1,foobar,tweet2\nt2,barfoo,tweet2";
+
+ final File tempFile = File.createTempFile("CsvReaderPOJOSubclass", "tmp");
+ tempFile.deleteOnExit();
+
+ OutputStreamWriter writer = new OutputStreamWriter(new FileOutputStream(tempFile));
+ writer.write(fileContent);
+ writer.close();
+
+ @SuppressWarnings("unchecked")
+ PojoTypeInfo<TwitterPOJO> typeInfo = (PojoTypeInfo<TwitterPOJO>)TypeExtractor.createTypeInfo(TwitterPOJO.class);
+ CsvInputFormat<TwitterPOJO> inputFormat = new CsvInputFormat<>(new Path(tempFile.toURI().toString()), typeInfo);
+
+ inputFormat.configure(new Configuration());
+ FileInputSplit[] splits = inputFormat.createInputSplits(1);
+
+ inputFormat.open(splits[0]);
+
+ List<TwitterPOJO> expected = new ArrayList<>();
+
+ for (String line: fileContent.split("\n")) {
+ String[] elements = line.split(",");
+ expected.add(new TwitterPOJO(elements[0], elements[1], elements[2]));
+ }
+
+ List<TwitterPOJO> actual = new ArrayList<>();
+
+ TwitterPOJO pojo;
+
+ while((pojo = inputFormat.nextRecord(new TwitterPOJO())) != null) {
+ actual.add(pojo);
+ }
+
+ assertEquals(expected, actual);
+ }
+
// --------------------------------------------------------------------------------------------
// Custom types for testing
// --------------------------------------------------------------------------------------------
@@ -1093,4 +1139,52 @@ public class CsvInputFormatTest {
}
}
+ public static class POJO {
+ public String table;
+ public String time;
+
+ public POJO() {
+ this("", "");
+ }
+
+ public POJO(String table, String time) {
+ this.table = table;
+ this.time = time;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof POJO) {
+ POJO other = (POJO) obj;
+ return table.equals(other.table) && time.equals(other.time);
+ } else {
+ return false;
+ }
+ }
+ }
+
+ public static class TwitterPOJO extends POJO {
+ public String tweet;
+
+ public TwitterPOJO() {
+ this("", "", "");
+ }
+
+ public TwitterPOJO(String table, String time, String tweet) {
+ super(table, time);
+ this.tweet = tweet;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof TwitterPOJO) {
+ TwitterPOJO other = (TwitterPOJO) obj;
+
+ return super.equals(other) && tweet.equals(other.tweet);
+ } else {
+ return false;
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/501a9b08/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
index 9adbed8..522cc0c 100644
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
@@ -21,213 +21,45 @@ package org.apache.flink.api.scala.operators;
import com.google.common.base.Preconditions;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.io.GenericCsvInputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.io.CommonCsvInputFormat;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
-import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.StringUtils;
-import org.apache.flink.types.parser.FieldParser;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.Arrays;
-
-public class ScalaCsvInputFormat<OUT> extends GenericCsvInputFormat<OUT> {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(ScalaCsvInputFormat.class);
-
- private transient Object[] parsedValues;
+public class ScalaCsvInputFormat<OUT> extends CommonCsvInputFormat<OUT> {
+ private static final long serialVersionUID = -7347888812778968640L;
private final TupleSerializerBase<OUT> tupleSerializer;
- private Class<OUT> pojoTypeClass = null;
- private String[] pojoFieldsName = null;
- private transient Field[] pojoFields = null;
- private transient PojoTypeInfo<OUT> pojoTypeInfo = null;
-
- public ScalaCsvInputFormat(Path filePath, TypeInformation<OUT> typeInfo) {
- super(filePath);
+ public ScalaCsvInputFormat(Path filePath, CompositeType<OUT> typeInfo) {
+ super(filePath, typeInfo);
- Class<?>[] classes = new Class[typeInfo.getArity()];
+ Preconditions.checkArgument(typeInfo instanceof PojoTypeInfo || typeInfo instanceof TupleTypeInfoBase,
+ "Only pojo types or tuple types are supported.");
if (typeInfo instanceof TupleTypeInfoBase) {
- TupleTypeInfoBase<OUT> tupleType = (TupleTypeInfoBase<OUT>) typeInfo;
- // We can use an empty config here, since we only use the serializer to create
- // the top-level case class
- tupleSerializer = (TupleSerializerBase<OUT>) tupleType.createSerializer(new ExecutionConfig());
-
- for (int i = 0; i < tupleType.getArity(); i++) {
- classes[i] = tupleType.getTypeAt(i).getTypeClass();
- }
+ TupleTypeInfoBase<OUT> tupleTypeInfo = (TupleTypeInfoBase<OUT>) typeInfo;
- setFieldTypes(classes);
+ tupleSerializer = (TupleSerializerBase<OUT>)tupleTypeInfo.createSerializer(new ExecutionConfig());
} else {
tupleSerializer = null;
- pojoTypeInfo = (PojoTypeInfo<OUT>) typeInfo;
- pojoTypeClass = typeInfo.getTypeClass();
- pojoFieldsName = pojoTypeInfo.getFieldNames();
-
- for (int i = 0, arity = pojoTypeInfo.getArity(); i < arity; i++) {
- classes[i] = pojoTypeInfo.getTypeAt(i).getTypeClass();
- }
-
- setFieldTypes(classes);
- setOrderOfPOJOFields(pojoFieldsName);
- }
- }
-
- public void setOrderOfPOJOFields(String[] fieldsOrder) {
- Preconditions.checkNotNull(pojoTypeClass, "Field order can only be specified if output type is a POJO.");
- Preconditions.checkNotNull(fieldsOrder);
-
- int includedCount = 0;
- for (boolean isIncluded : fieldIncluded) {
- if (isIncluded) {
- includedCount++;
- }
- }
-
- Preconditions.checkArgument(includedCount == fieldsOrder.length,
- "The number of selected POJO fields should be the same as that of CSV fields.");
-
- for (String field : fieldsOrder) {
- Preconditions.checkNotNull(field, "The field name cannot be null.");
- Preconditions.checkArgument(pojoTypeInfo.getFieldIndex(field) != -1,
- "The given field name isn't matched to POJO fields.");
- }
-
- pojoFieldsName = Arrays.copyOfRange(fieldsOrder, 0, fieldsOrder.length);
- }
-
- public void setFieldTypes(Class<?>[] fieldTypes) {
- if (fieldTypes == null || fieldTypes.length == 0) {
- throw new IllegalArgumentException("Field types must not be null or empty.");
- }
-
- setFieldTypesGeneric(fieldTypes);
- }
-
- public void setFields(int[] sourceFieldIndices, Class<?>[] fieldTypes) {
- Preconditions.checkNotNull(sourceFieldIndices);
- Preconditions.checkNotNull(fieldTypes);
-
- checkForMonotonousOrder(sourceFieldIndices, fieldTypes);
-
- setFieldsGeneric(sourceFieldIndices, fieldTypes);
- }
-
- public void setFields(boolean[] sourceFieldMask, Class<?>[] fieldTypes) {
- Preconditions.checkNotNull(sourceFieldMask);
- Preconditions.checkNotNull(fieldTypes);
-
- setFieldsGeneric(sourceFieldMask, fieldTypes);
- }
-
- public Class<?>[] getFieldTypes() {
- return super.getGenericFieldTypes();
- }
-
- @Override
- public void open(FileInputSplit split) throws IOException {
- super.open(split);
-
- @SuppressWarnings("unchecked")
- FieldParser<Object>[] fieldParsers = (FieldParser<Object>[]) getFieldParsers();
-
- //throw exception if no field parsers are available
- if (fieldParsers.length == 0) {
- throw new IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to parse input");
- }
-
- // create the value holders
- this.parsedValues = new Object[fieldParsers.length];
- for (int i = 0; i < fieldParsers.length; i++) {
- this.parsedValues[i] = fieldParsers[i].createValue();
}
-
- // left to right evaluation makes access [0] okay
- // this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default
- if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) {
- this.lineDelimiterIsLinebreak = true;
- }
-
- // for POJO type
- if (pojoTypeClass != null) {
- pojoFields = new Field[pojoFieldsName.length];
- for (int i = 0; i < pojoFieldsName.length; i++) {
- try {
- pojoFields[i] = pojoTypeClass.getDeclaredField(pojoFieldsName[i]);
- pojoFields[i].setAccessible(true);
- } catch (NoSuchFieldException e) {
- throw new RuntimeException("There is no field called \"" + pojoFieldsName[i] + "\" in " + pojoTypeClass.getName(), e);
- }
- }
- }
-
- this.commentCount = 0;
- this.invalidLineCount = 0;
}
@Override
- public OUT nextRecord(OUT record) throws IOException {
- OUT returnRecord = null;
- do {
- returnRecord = super.nextRecord(record);
- } while (returnRecord == null && !reachedEnd());
+ protected OUT createTuple(OUT reuse) {
+ Preconditions.checkNotNull(tupleSerializer, "The tuple serializer must be initialised." +
+ " It is not initialized if the given type was not a " +
+ TupleTypeInfoBase.class.getName() + ".");
- return returnRecord;
+ return tupleSerializer.createInstance(parsedValues);
}
@Override
- public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) {
- /*
- * Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n
- */
- //Find windows end line, so find carriage return before the newline
- if (this.lineDelimiterIsLinebreak == true && numBytes > 0 && bytes[offset + numBytes -1] == '\r' ) {
- //reduce the number of bytes so that the Carriage return is not taken as data
- numBytes--;
- }
-
- if (commentPrefix != null && commentPrefix.length <= numBytes) {
- //check record for comments
- boolean isComment = true;
- for (int i = 0; i < commentPrefix.length; i++) {
- if (commentPrefix[i] != bytes[offset + i]) {
- isComment = false;
- break;
- }
- }
- if (isComment) {
- this.commentCount++;
- return null;
- }
- }
-
- if (parseRecord(parsedValues, bytes, offset, numBytes)) {
- if (tupleSerializer != null) {
- return tupleSerializer.createInstance(parsedValues);
- } else {
- for (int i = 0; i < pojoFields.length; i++) {
- try {
- pojoFields[i].set(reuse, parsedValues[i]);
- } catch (IllegalAccessException e) {
- throw new RuntimeException("Parsed value could not be set in POJO field \"" + pojoFieldsName[i] + "\"", e);
- }
- }
-
- return reuse;
- }
- } else {
- this.invalidLineCount++;
- return null;
- }
+ public String toString() {
+ return "Scala CSV Input (" + StringUtils.showControlCharacters(String.valueOf(getFieldDelimiter())) + ") " + getFilePath();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/501a9b08/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java
index afcdc4a..e29a6e6 100644
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.io.CommonCsvInputFormat;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.core.fs.Path;
import org.apache.flink.types.StringValue;
@@ -51,9 +51,9 @@ public class ScalaCsvOutputFormat<T extends Product> extends FileOutputFormat<T>
// --------------------------------------------------------------------------------------------
- public static final String DEFAULT_LINE_DELIMITER = CsvInputFormat.DEFAULT_LINE_DELIMITER;
+ public static final String DEFAULT_LINE_DELIMITER = CommonCsvInputFormat.DEFAULT_LINE_DELIMITER;
- public static final String DEFAULT_FIELD_DELIMITER = String.valueOf(CsvInputFormat.DEFAULT_FIELD_DELIMITER);
+ public static final String DEFAULT_FIELD_DELIMITER = String.valueOf(CommonCsvInputFormat.DEFAULT_FIELD_DELIMITER);
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/501a9b08/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 745b0d3..85c5410 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -20,9 +20,11 @@ package org.apache.flink.api.scala
import java.util.UUID
import com.esotericsoftware.kryo.Serializer
+import com.google.common.base.Preconditions
import org.apache.flink.api.common.io.{FileInputFormat, InputFormat}
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.common.{JobID, ExecutionConfig, JobExecutionResult}
+import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.io._
import org.apache.flink.api.java.operators.DataSource
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
@@ -53,11 +55,11 @@ import scala.reflect.ClassTag
*
* To get an execution environment use the methods on the companion object:
*
- * - [[ExecutionEnvironment.getExecutionEnvironment]]
- * - [[ExecutionEnvironment.createLocalEnvironment]]
- * - [[ExecutionEnvironment.createRemoteEnvironment]]
+ * - [[ExecutionEnvironment#getExecutionEnvironment]]
+ * - [[ExecutionEnvironment#createLocalEnvironment]]
+ * - [[ExecutionEnvironment#createRemoteEnvironment]]
*
- * Use [[ExecutionEnvironment.getExecutionEnvironment]] to get the correct environment depending
+ * Use [[ExecutionEnvironment#getExecutionEnvironment]] to get the correct environment depending
* on where the program is executed. If it is run inside an IDE a loca environment will be
* created. If the program is submitted to a cluster a remote execution environment will
* be created.
@@ -294,7 +296,14 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
val typeInfo = implicitly[TypeInformation[T]]
- val inputFormat = new ScalaCsvInputFormat[T](new Path(filePath), typeInfo)
+ Preconditions.checkArgument(
+ typeInfo.isInstanceOf[CompositeType[T]],
+ s"The type $typeInfo has to be a tuple or pojo type.",
+ null)
+
+ val inputFormat = new ScalaCsvInputFormat[T](
+ new Path(filePath),
+ typeInfo.asInstanceOf[CompositeType[T]])
inputFormat.setDelimiter(lineDelimiter)
inputFormat.setFieldDelimiter(fieldDelimiter)
inputFormat.setSkipFirstLineAsHeader(ignoreFirstLine)
@@ -334,7 +343,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
" included fields must match.")
inputFormat.setFields(includedFields, classesBuf.toArray)
} else {
- inputFormat.setFieldTypes(classesBuf.toArray)
+ inputFormat.setFieldTypes(classesBuf: _*)
}
if (pojoFields != null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/501a9b08/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
index 69cd501..8ca0405 100644
--- a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
+++ b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
@@ -14,6 +14,8 @@ package org.apache.flink.languagebinding.api.java.common;
import java.io.IOException;
import java.util.HashMap;
+
+import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.CsvInputFormat;
@@ -256,7 +258,14 @@ public abstract class PlanBinder<INFO extends OperationInfo> {
protected abstract INFO createOperationInfo(AbstractOperation operationIdentifier) throws IOException;
private void createCsvSource(OperationInfo info) throws IOException {
- sets.put(info.setID, env.createInput(new CsvInputFormat(new Path(info.path), info.lineDelimiter, info.fieldDelimiter, info.types), info.types).name("CsvSource"));
+ if (!(info.types instanceof CompositeType)) {
+ throw new RuntimeException("The output type of a csv source has to be a tuple or a " +
+ "pojo type. The derived type is " + info);
+ }
+
+ sets.put(info.setID, env.createInput(new CsvInputFormat(new Path(info.path),
+ info.lineDelimiter, info.fieldDelimiter, (CompositeType)info.types), info.types)
+ .name("CsvSource"));
}
private void createTextSource(OperationInfo info) throws IOException {
http://git-wip-us.apache.org/repos/asf/flink/blob/501a9b08/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
index 0d74515..e49f737 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
@@ -20,13 +20,19 @@ package org.apache.flink.api.scala.io
import java.io.{File, FileOutputStream, FileWriter, OutputStreamWriter}
import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.io.CsvInputFormatTest.TwitterPOJO
+import org.apache.flink.api.java.typeutils.{TupleTypeInfo, PojoTypeInfo}
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.operators.ScalaCsvInputFormat
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.{FileInputSplit, Path}
import org.junit.Assert.{assertEquals, assertNotNull, assertNull, assertTrue, fail}
import org.junit.Test
+import scala.collection.mutable.ArrayBuffer
+
class CsvInputFormatTest {
private final val PATH: Path = new Path("an/ignored/file/")
@@ -45,7 +51,9 @@ class CsvInputFormatTest {
"#next|5|6.0|\n"
val split = createTempFile(fileContent)
val format = new ScalaCsvInputFormat[(String, Integer, Double)](
- PATH, createTypeInformation[(String, Integer, Double)])
+ PATH,
+ createTypeInformation[(String, Integer, Double)]
+ .asInstanceOf[CaseClassTypeInfo[(String, Integer, Double)]])
format.setDelimiter("\n")
format.setFieldDelimiter('|')
format.setCommentPrefix("#")
@@ -85,7 +93,9 @@ class CsvInputFormatTest {
"//next|5|6.0|\n"
val split = createTempFile(fileContent)
val format = new ScalaCsvInputFormat[(String, Integer, Double)](
- PATH, createTypeInformation[(String, Integer, Double)])
+ PATH,
+ createTypeInformation[(String, Integer, Double)]
+ .asInstanceOf[CaseClassTypeInfo[(String, Integer, Double)]])
format.setDelimiter("\n")
format.setFieldDelimiter('|')
format.setCommentPrefix("//")
@@ -121,7 +131,9 @@ class CsvInputFormatTest {
val fileContent = "abc|def|ghijk\nabc||hhg\n|||"
val split = createTempFile(fileContent)
val format = new ScalaCsvInputFormat[(String, String, String)](
- PATH, createTypeInformation[(String, String, String)])
+ PATH,
+ createTypeInformation[(String, String, String)]
+ .asInstanceOf[CaseClassTypeInfo[(String, String, String)]])
format.setDelimiter("\n")
format.setFieldDelimiter("|")
val parameters = new Configuration
@@ -161,7 +173,9 @@ class CsvInputFormatTest {
val fileContent = "abc|\"de|f\"|ghijk\n\"a|bc\"||hhg\n|||"
val split = createTempFile(fileContent)
val format = new ScalaCsvInputFormat[(String, String, String)](
- PATH, createTypeInformation[(String, String, String)])
+ PATH,
+ createTypeInformation[(String, String, String)]
+ .asInstanceOf[CaseClassTypeInfo[(String, String, String)]])
format.setDelimiter("\n")
format.enableQuotedStringParsing('"')
format.setFieldDelimiter("|")
@@ -202,7 +216,9 @@ class CsvInputFormatTest {
val fileContent = "abc|-def|-ghijk\nabc|-|-hhg\n|-|-|-\n"
val split = createTempFile(fileContent)
val format = new ScalaCsvInputFormat[(String, String, String)](
- PATH, createTypeInformation[(String, String, String)])
+ PATH,
+ createTypeInformation[(String, String, String)]
+ .asInstanceOf[CaseClassTypeInfo[(String, String, String)]])
format.setDelimiter("\n")
format.setFieldDelimiter("|-")
val parameters = new Configuration
@@ -241,7 +257,8 @@ class CsvInputFormatTest {
val fileContent = "111|222|333|444|555\n666|777|888|999|000|\n"
val split = createTempFile(fileContent)
val format = new ScalaCsvInputFormat[(Int, Int, Int, Int, Int)](
- PATH, createTypeInformation[(Int, Int, Int, Int, Int)])
+ PATH, createTypeInformation[(Int, Int, Int, Int, Int)].
+ asInstanceOf[CaseClassTypeInfo[(Int, Int, Int, Int, Int)]])
format.setFieldDelimiter("|")
format.configure(new Configuration)
format.open(split)
@@ -276,7 +293,9 @@ class CsvInputFormatTest {
val fileContent = "111|x|222|x|333|x|444|x|555|x|\n" +
"666|x|777|x|888|x|999|x|000|x|\n"
val split = createTempFile(fileContent)
- val format = new ScalaCsvInputFormat[(Int, Int)](PATH, createTypeInformation[(Int, Int)])
+ val format = new ScalaCsvInputFormat[(Int, Int)](
+ PATH,
+ createTypeInformation[(Int, Int)].asInstanceOf[CaseClassTypeInfo[(Int, Int)]])
format.setFieldDelimiter("|x|")
format.configure(new Configuration)
format.open(split)
@@ -307,7 +326,7 @@ class CsvInputFormatTest {
val split = createTempFile(fileContent)
val format = new ScalaCsvInputFormat[(Int, Int, Int)](
PATH,
- createTypeInformation[(Int, Int, Int)])
+ createTypeInformation[(Int, Int, Int)].asInstanceOf[CaseClassTypeInfo[(Int, Int, Int)]])
format.setFieldDelimiter("|")
format.setFields(Array(0, 3, 7),
Array(classOf[Integer], classOf[Integer], classOf[Integer]): Array[Class[_]])
@@ -339,7 +358,7 @@ class CsvInputFormatTest {
try {
val format = new ScalaCsvInputFormat[(Int, Int, Int)](
PATH,
- createTypeInformation[(Int, Int, Int)])
+ createTypeInformation[(Int, Int, Int)].asInstanceOf[CaseClassTypeInfo[(Int, Int, Int)]])
format.setFieldDelimiter("|")
try {
format.setFields(Array(8, 1, 3),
@@ -384,7 +403,7 @@ class CsvInputFormatTest {
wrt.write(fileContent)
wrt.close()
val inputFormat = new ScalaCsvInputFormat[Tuple1[String]](new Path(tempFile.toURI.toString),
- createTypeInformation[Tuple1[String]])
+ createTypeInformation[Tuple1[String]].asInstanceOf[CaseClassTypeInfo[Tuple1[String]]])
val parameters = new Configuration
inputFormat.configure(parameters)
inputFormat.setDelimiter(lineBreakerSetup)
@@ -442,7 +461,8 @@ class CsvInputFormatTest {
def testPOJOType(): Unit = {
val fileContent = "123,HELLO,3.123\n" + "456,ABC,1.234"
val tempFile = createTempFile(fileContent)
- val typeInfo: TypeInformation[POJOItem] = createTypeInformation[POJOItem]
+ val typeInfo: PojoTypeInfo[POJOItem] = createTypeInformation[POJOItem]
+ .asInstanceOf[PojoTypeInfo[POJOItem]]
val format = new ScalaCsvInputFormat[POJOItem](PATH, typeInfo)
format.setDelimiter('\n')
@@ -457,7 +477,8 @@ class CsvInputFormatTest {
def testCaseClass(): Unit = {
val fileContent = "123,HELLO,3.123\n" + "456,ABC,1.234"
val tempFile = createTempFile(fileContent)
- val typeInfo: TypeInformation[CaseClassItem] = createTypeInformation[CaseClassItem]
+ val typeInfo: CompositeType[CaseClassItem] = createTypeInformation[CaseClassItem]
+ .asInstanceOf[CompositeType[CaseClassItem]]
val format = new ScalaCsvInputFormat[CaseClassItem](PATH, typeInfo)
format.setDelimiter('\n')
@@ -472,12 +493,13 @@ class CsvInputFormatTest {
def testPOJOTypeWithFieldMapping(): Unit = {
val fileContent = "HELLO,123,3.123\n" + "ABC,456,1.234"
val tempFile = createTempFile(fileContent)
- val typeInfo: TypeInformation[POJOItem] = createTypeInformation[POJOItem]
+ val typeInfo: PojoTypeInfo[POJOItem] = createTypeInformation[POJOItem]
+ .asInstanceOf[PojoTypeInfo[POJOItem]]
val format = new ScalaCsvInputFormat[POJOItem](PATH, typeInfo)
format.setDelimiter('\n')
format.setFieldDelimiter(',')
- format.setFieldTypes(Array(classOf[String], classOf[Integer], classOf[java.lang.Double]))
+ format.setFieldTypes(classOf[String], classOf[Integer], classOf[java.lang.Double])
format.setOrderOfPOJOFields(Array("field2", "field1", "field3"))
format.configure(new Configuration)
format.open(tempFile)
@@ -489,7 +511,8 @@ class CsvInputFormatTest {
def testPOJOTypeWithFieldSubsetAndDataSubset(): Unit = {
val fileContent = "HELLO,123,NODATA,3.123,NODATA\n" + "ABC,456,NODATA,1.234,NODATA"
val tempFile = createTempFile(fileContent)
- val typeInfo: TypeInformation[POJOItem] = createTypeInformation[POJOItem]
+ val typeInfo: PojoTypeInfo[POJOItem] = createTypeInformation[POJOItem]
+ .asInstanceOf[PojoTypeInfo[POJOItem]]
val format = new ScalaCsvInputFormat[POJOItem](PATH, typeInfo)
format.setDelimiter('\n')
@@ -502,4 +525,41 @@ class CsvInputFormatTest {
validatePOJOItem(format)
}
+
+ @Test
+ def testPOJOSubclassType(): Unit = {
+ val fileContent = "t1,foobar,tweet2\nt2,barfoo,tweet2"
+ val tempFile = createTempFile(fileContent)
+ val typeInfo: PojoTypeInfo[TwitterPOJO] = createTypeInformation[TwitterPOJO]
+ .asInstanceOf[PojoTypeInfo[TwitterPOJO]]
+ val format = new ScalaCsvInputFormat[TwitterPOJO](PATH, typeInfo)
+
+ format.setDelimiter('\n')
+ format.setFieldDelimiter(',')
+ format.configure(new Configuration)
+ format.open(tempFile)
+
+ val expected = for (line <- fileContent.split("\n")) yield {
+ val elements = line.split(",")
+ new TwitterPOJO(elements(0), elements(1), elements(2))
+ }
+
+ val actual = ArrayBuffer[TwitterPOJO]()
+ var readNextElement = true
+
+ while (readNextElement) {
+ val element = format.nextRecord(new TwitterPOJO())
+
+ if (element != null) {
+ actual += element
+ } else {
+ readNextElement = false
+ }
+ }
+
+ assert(expected.sameElements(actual))
+ }
+
+
+
}