You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/04/22 20:40:41 UTC
[05/17] flink git commit: [hotfix] [core] Cleanup
TypeInformationSerializationSchema
[hotfix] [core] Cleanup TypeInformationSerializationSchema
Prevents the class from dropping the TypeInformation, adds a convenience constructor.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/36767dd8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/36767dd8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/36767dd8
Branch: refs/heads/master
Commit: 36767dd84ce85b7806bed373912024da01cda867
Parents: 6bd65f9
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Apr 17 17:44:48 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Apr 22 16:28:34 2018 +0200
----------------------------------------------------------------------
.../TypeInformationSerializationSchema.java | 40 ++++++++++----------
1 file changed, 19 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/36767dd8/flink-core/src/main/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchema.java b/flink-core/src/main/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchema.java
index 217a889..78da3fa 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchema.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchema.java
@@ -27,6 +27,8 @@ import org.apache.flink.core.memory.DataOutputSerializer;
import java.io.IOException;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* A serialization and deserialization schema that uses Flink's serialization stack to
* transform typed from and to byte arrays.
@@ -38,6 +40,9 @@ public class TypeInformationSerializationSchema<T> implements DeserializationSch
private static final long serialVersionUID = -5359448468131559102L;
+ /** The type information, to be returned by {@link #getProducedType()}. */
+ private final TypeInformation<T> typeInfo;
+
/** The serializer for the actual de-/serialization. */
private final TypeSerializer<T> serializer;
@@ -47,13 +52,6 @@ public class TypeInformationSerializationSchema<T> implements DeserializationSch
/** The reusable input deserialization buffer. */
private transient DataInputDeserializer dis;
- /**
- * The type information, to be returned by {@link #getProducedType()}. It is transient, because
- * it is not serializable. Note that this means that the type information is not available at
- * runtime, but only prior to the first serialization / deserialization.
- */
- private transient TypeInformation<T> typeInfo;
-
// ------------------------------------------------------------------------
/**
@@ -63,10 +61,21 @@ public class TypeInformationSerializationSchema<T> implements DeserializationSch
* @param ec The execution config, which is used to parametrize the type serializers.
*/
public TypeInformationSerializationSchema(TypeInformation<T> typeInfo, ExecutionConfig ec) {
- this.typeInfo = typeInfo;
+ this.typeInfo = checkNotNull(typeInfo, "typeInfo");
this.serializer = typeInfo.createSerializer(ec);
}
+ /**
+ * Creates a new de-/serialization schema for the given type.
+ *
+ * @param typeInfo The type information for the type de-/serialized by this schema.
+ * @param serializer The serializer to use for de-/serialization.
+ */
+ public TypeInformationSerializationSchema(TypeInformation<T> typeInfo, TypeSerializer<T> serializer) {
+ this.typeInfo = checkNotNull(typeInfo, "typeInfo");
+ this.serializer = checkNotNull(serializer, "serializer");
+ }
+
// ------------------------------------------------------------------------
@Override
@@ -108,24 +117,13 @@ public class TypeInformationSerializationSchema<T> implements DeserializationSch
throw new RuntimeException("Unable to serialize record", e);
}
- byte[] ret = dos.getByteArray();
- if (ret.length != dos.length()) {
- byte[] n = new byte[dos.length()];
- System.arraycopy(ret, 0, n, 0, dos.length());
- ret = n;
- }
+ byte[] ret = dos.getCopyOfBuffer();
dos.clear();
return ret;
}
@Override
public TypeInformation<T> getProducedType() {
- if (typeInfo != null) {
- return typeInfo;
- }
- else {
- throw new IllegalStateException(
- "The type information is not available after this class has been serialized and distributed.");
- }
+ return typeInfo;
}
}