You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2014/11/27 17:51:21 UTC
incubator-flink git commit: Removed TypeSerializerFactory from
ScalaCsvInputFormat. The TypeSerializer is now serialized directly.
Repository: incubator-flink
Updated Branches:
refs/heads/master 1a7a50f82 -> 3d242fd7a
Removed TypeSerializerFactory from ScalaCsvInputFormat. The TypeSerializer is now serialized directly.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/3d242fd7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/3d242fd7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/3d242fd7
Branch: refs/heads/master
Commit: 3d242fd7aea6add18465d628a258be11def2d0af
Parents: 1a7a50f
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Nov 27 17:32:23 2014 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Nov 27 17:49:57 2014 +0100
----------------------------------------------------------------------
.../scala/operators/ScalaCsvInputFormat.java | 21 +++-----------------
1 file changed, 3 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3d242fd7/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
index b2075c6..abe46a0 100644
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
@@ -23,11 +23,7 @@ import com.google.common.base.Preconditions;
import org.apache.flink.api.common.io.GenericCsvInputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
-import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory;
-import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
@@ -50,26 +46,17 @@ public class ScalaCsvInputFormat<OUT extends Product> extends GenericCsvInputFor
// It is set when open so that readRecord does not have to evaluate it
private boolean lineDelimiterIsLinebreak = false;
- private final TypeSerializerFactory<OUT> serializerFactory;
-
- private transient TupleSerializerBase<OUT> serializer;
+ private final TupleSerializerBase<OUT> serializer;
public ScalaCsvInputFormat(Path filePath, TypeInformation<OUT> typeInfo) {
super(filePath);
- TypeSerializer<OUT> serializer = typeInfo.createSerializer();
- if (serializer.isStateful()) {
- serializerFactory = new RuntimeStatefulSerializerFactory<OUT>(
- serializer, typeInfo.getTypeClass());
- } else {
- serializerFactory = new RuntimeStatelessSerializerFactory<OUT>(
- serializer, typeInfo.getTypeClass());
- }
-
if (!(typeInfo.isTupleType())) {
throw new UnsupportedOperationException("This only works on tuple types.");
}
TupleTypeInfoBase<OUT> tupleType = (TupleTypeInfoBase<OUT>) typeInfo;
+ serializer = (TupleSerializerBase<OUT>)tupleType.createSerializer();
+
Class[] classes = new Class[tupleType.getArity()];
for (int i = 0; i < tupleType.getArity(); i++) {
classes[i] = tupleType.getTypeAt(i).getTypeClass();
@@ -117,8 +104,6 @@ public class ScalaCsvInputFormat<OUT extends Product> extends GenericCsvInputFor
if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) {
this.lineDelimiterIsLinebreak = true;
}
-
- serializer = (TupleSerializerBase<OUT>)serializerFactory.getSerializer();
}
@Override