You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2014/11/27 17:51:21 UTC

incubator-flink git commit: Removed TypeSerializerFactory from ScalaCsvInputFormat. The TypeSerializer is now serialized directly.

Repository: incubator-flink
Updated Branches:
  refs/heads/master 1a7a50f82 -> 3d242fd7a


Removed TypeSerializerFactory from ScalaCsvInputFormat. The TypeSerializer is now serialized directly.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/3d242fd7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/3d242fd7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/3d242fd7

Branch: refs/heads/master
Commit: 3d242fd7aea6add18465d628a258be11def2d0af
Parents: 1a7a50f
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Nov 27 17:32:23 2014 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Nov 27 17:49:57 2014 +0100

----------------------------------------------------------------------
 .../scala/operators/ScalaCsvInputFormat.java    | 21 +++-----------------
 1 file changed, 3 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3d242fd7/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
index b2075c6..abe46a0 100644
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
@@ -23,11 +23,7 @@ import com.google.common.base.Preconditions;
 
 import org.apache.flink.api.common.io.GenericCsvInputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
-import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory;
-import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
@@ -50,26 +46,17 @@ public class ScalaCsvInputFormat<OUT extends Product> extends GenericCsvInputFor
 	// It is set when open so that readRecord does not have to evaluate it
 	private boolean lineDelimiterIsLinebreak = false;
 
-	private final TypeSerializerFactory<OUT> serializerFactory;
-
-	private transient TupleSerializerBase<OUT> serializer;
+	private final TupleSerializerBase<OUT> serializer;
 	
 	public ScalaCsvInputFormat(Path filePath, TypeInformation<OUT> typeInfo) {
 		super(filePath);
 
-		TypeSerializer<OUT> serializer = typeInfo.createSerializer();
-		if (serializer.isStateful()) {
-			serializerFactory = new RuntimeStatefulSerializerFactory<OUT>(
-					serializer, typeInfo.getTypeClass());
-		} else {
-			serializerFactory = new RuntimeStatelessSerializerFactory<OUT>(
-					serializer, typeInfo.getTypeClass());
-		}
-
 		if (!(typeInfo.isTupleType())) {
 			throw new UnsupportedOperationException("This only works on tuple types.");
 		}
 		TupleTypeInfoBase<OUT> tupleType = (TupleTypeInfoBase<OUT>) typeInfo;
+		serializer = (TupleSerializerBase<OUT>)tupleType.createSerializer();
+
 		Class[] classes = new Class[tupleType.getArity()];
 		for (int i = 0; i < tupleType.getArity(); i++) {
 			classes[i] = tupleType.getTypeAt(i).getTypeClass();
@@ -117,8 +104,6 @@ public class ScalaCsvInputFormat<OUT extends Product> extends GenericCsvInputFor
 		if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) {
 			this.lineDelimiterIsLinebreak = true;
 		}
-
-		serializer = (TupleSerializerBase<OUT>)serializerFactory.getSerializer();
 	}
 
 	@Override