You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/04/22 20:40:41 UTC

[05/17] flink git commit: [hotfix] [core] Cleanup TypeInformationSerializationSchema

[hotfix] [core] Cleanup TypeInformationSerializationSchema

Prevents the class from dropping the TypeInformation, adds a convenience constructor.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/36767dd8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/36767dd8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/36767dd8

Branch: refs/heads/master
Commit: 36767dd84ce85b7806bed373912024da01cda867
Parents: 6bd65f9
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Apr 17 17:44:48 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Apr 22 16:28:34 2018 +0200

----------------------------------------------------------------------
 .../TypeInformationSerializationSchema.java     | 40 ++++++++++----------
 1 file changed, 19 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/36767dd8/flink-core/src/main/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchema.java b/flink-core/src/main/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchema.java
index 217a889..78da3fa 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchema.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchema.java
@@ -27,6 +27,8 @@ import org.apache.flink.core.memory.DataOutputSerializer;
 
 import java.io.IOException;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A serialization and deserialization schema that uses Flink's serialization stack to
  * transform typed from and to byte arrays.
@@ -38,6 +40,9 @@ public class TypeInformationSerializationSchema<T> implements DeserializationSch
 
 	private static final long serialVersionUID = -5359448468131559102L;
 
+	/** The type information, to be returned by {@link #getProducedType()}. */
+	private final TypeInformation<T> typeInfo;
+
 	/** The serializer for the actual de-/serialization. */
 	private final TypeSerializer<T> serializer;
 
@@ -47,13 +52,6 @@ public class TypeInformationSerializationSchema<T> implements DeserializationSch
 	/** The reusable input deserialization buffer. */
 	private transient DataInputDeserializer dis;
 
-	/**
-	 * The type information, to be returned by {@link #getProducedType()}. It is transient, because
-	 * it is not serializable. Note that this means that the type information is not available at
-	 * runtime, but only prior to the first serialization / deserialization.
-	 */
-	private transient TypeInformation<T> typeInfo;
-
 	// ------------------------------------------------------------------------
 
 	/**
@@ -63,10 +61,21 @@ public class TypeInformationSerializationSchema<T> implements DeserializationSch
 	 * @param ec The execution config, which is used to parametrize the type serializers.
 	 */
 	public TypeInformationSerializationSchema(TypeInformation<T> typeInfo, ExecutionConfig ec) {
-		this.typeInfo = typeInfo;
+		this.typeInfo = checkNotNull(typeInfo, "typeInfo");
 		this.serializer = typeInfo.createSerializer(ec);
 	}
 
+	/**
+	 * Creates a new de-/serialization schema for the given type.
+	 *
+	 * @param typeInfo The type information for the type de-/serialized by this schema.
+	 * @param serializer The serializer to use for de-/serialization.
+	 */
+	public TypeInformationSerializationSchema(TypeInformation<T> typeInfo, TypeSerializer<T> serializer) {
+		this.typeInfo = checkNotNull(typeInfo, "typeInfo");
+		this.serializer = checkNotNull(serializer, "serializer");
+	}
+
 	// ------------------------------------------------------------------------
 
 	@Override
@@ -108,24 +117,13 @@ public class TypeInformationSerializationSchema<T> implements DeserializationSch
 			throw new RuntimeException("Unable to serialize record", e);
 		}
 
-		byte[] ret = dos.getByteArray();
-		if (ret.length != dos.length()) {
-			byte[] n = new byte[dos.length()];
-			System.arraycopy(ret, 0, n, 0, dos.length());
-			ret = n;
-		}
+		byte[] ret = dos.getCopyOfBuffer();
 		dos.clear();
 		return ret;
 	}
 
 	@Override
 	public TypeInformation<T> getProducedType() {
-		if (typeInfo != null) {
-			return typeInfo;
-		}
-		else {
-			throw new IllegalStateException(
-					"The type information is not available after this class has been serialized and distributed.");
-		}
+		return typeInfo;
 	}
 }