You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/04/23 04:45:01 UTC
[09/17] flink git commit: [FLINK-9198] [core] Improve
AbstractDeserializationSchema to eagerly extract generic types.
[FLINK-9198] [core] Improve AbstractDeserializationSchema to eagerly extract generic types.
This results in better error messages.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e0e94b6a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e0e94b6a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e0e94b6a
Branch: refs/heads/release-1.5
Commit: e0e94b6afebcb94719dac2510eadf5085fa464e7
Parents: 6de1d12
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Apr 17 20:46:26 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Apr 22 22:42:30 2018 +0200
----------------------------------------------------------------------
.../AbstractDeserializationSchema.java | 123 ++++++++++++++++++-
.../serialization/DeserializationSchema.java | 8 +-
.../AbstractDeserializationSchemaTest.java | 39 +++++-
3 files changed, 164 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e0e94b6a/flink-core/src/main/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchema.java b/flink-core/src/main/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchema.java
index 871b7b1..a88f3cf 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchema.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchema.java
@@ -19,11 +19,16 @@
package org.apache.flink.api.common.serialization;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.util.FlinkRuntimeException;
import java.io.IOException;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* The deserialization schema describes how to turn the byte messages delivered by certain
* data sources (for example Apache Kafka) into data types (Java/Scala objects) that are
@@ -32,12 +37,121 @@ import java.io.IOException;
* <p>This base variant of the deserialization schema produces the type information
* automatically by extracting it from the generic class arguments.
*
+ * <h3>Common Use</h3>
+ *
+ * <p>To write a deserialization schema for a specific type, simply extend this class and declare
+ * the type in the class signature. Flink will reflectively determine the type and create the
+ * proper TypeInformation:
+ *
+ * <pre>{@code
+ * public class MyDeserializationSchema extends AbstractDeserializationSchema<MyType> {
+ *
+ * public MyType deserialize(byte[] message) throws IOException {
+ * ...
+ * }
+ * }
+ * }</pre>
+ *
+ * <h3>Generic Use</h3>
+ *
+ * <p>If you want to write a more generic DeserializationSchema that works for different types,
+ * you need to pass the TypeInformation (or an equivalent hint) to the constructor:
+ *
+ * <pre>{@code
+ * public class MyGenericSchema<T> extends AbstractDeserializationSchema<T> {
+ *
+ * public MyGenericSchema(Class<T> type) {
+ * super(type);
+ * }
+ *
+ * public T deserialize(byte[] message) throws IOException {
+ * ...
+ * }
+ * }
+ * }</pre>
+ *
* @param <T> The type created by the deserialization schema.
*/
@PublicEvolving
public abstract class AbstractDeserializationSchema<T> implements DeserializationSchema<T> {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
+
+ /** The type produced by this {@code DeserializationSchema}. */
+ private final TypeInformation<T> type;
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a new AbstractDeserializationSchema and tries to infer the type returned by this
+ * DeserializationSchema.
+ *
+ * <p>This constructor is usable whenever the DeserializationSchema concretely defines
+ * its type, without generic variables:
+ *
+ * <pre>{@code
+ * public class MyDeserializationSchema extends AbstractDeserializationSchema<MyType> {
+ *
+ * public MyType deserialize(byte[] message) throws IOException {
+ * ...
+ * }
+ * }
+ * }</pre>
+ */
+ protected AbstractDeserializationSchema() {
+ try {
+ this.type = TypeExtractor.createTypeInfo(
+ AbstractDeserializationSchema.class, getClass(), 0, null, null);
+ }
+ catch (InvalidTypesException e) {
+ throw new FlinkRuntimeException(
+ "The implementation of AbstractDeserializationSchema is using a generic variable. " +
+ "This is not supported, because due to Java's generic type erasure, it will not be possible to " +
+ "determine the full type at runtime. For generic implementations, please pass the TypeInformation " +
+ "or type class explicitly to the constructor.");
+ }
+ }
+
+ /**
+ * Creates an AbstractDeserializationSchema that returns the TypeInformation
+ * indicated by the given class. This constructor is only necessary when creating a generic
+ * implementation, see {@link AbstractDeserializationSchema Generic Use}.
+ *
+ * <p>This constructor may fail if the class is generic. In that case, please
+ * use the constructor that accepts a {@link #AbstractDeserializationSchema(TypeHint) TypeHint},
+ * or a {@link #AbstractDeserializationSchema(TypeInformation) TypeInformation}.
+ *
+ * @param type The class of the produced type.
+ */
+ protected AbstractDeserializationSchema(Class<T> type) {
+ checkNotNull(type, "type");
+ this.type = TypeInformation.of(type);
+ }
+
+ /**
+ * Creates an AbstractDeserializationSchema that returns the TypeInformation
+ * indicated by the given type hint. This constructor is only necessary when creating a generic
+ * implementation, see {@link AbstractDeserializationSchema Generic Use}.
+ *
+ * @param typeHint The TypeHint for the produced type.
+ */
+ protected AbstractDeserializationSchema(TypeHint<T> typeHint) {
+ checkNotNull(typeHint, "typeHint");
+ this.type = typeHint.getTypeInfo();
+ }
+
+ /**
+ * Creates an AbstractDeserializationSchema that returns the given TypeInformation
+ * for the produced type. This constructor is only necessary when creating a generic
+ * implementation, see {@link AbstractDeserializationSchema Generic Use}.
+ *
+ * @param typeInfo The TypeInformation for the produced type.
+ */
+ protected AbstractDeserializationSchema(TypeInformation<T> typeInfo) {
+ this.type = checkNotNull(typeInfo, "typeInfo");
+ }
+
+ // ------------------------------------------------------------------------
/**
* De-serializes the byte message.
@@ -63,8 +177,13 @@ public abstract class AbstractDeserializationSchema<T> implements Deserializatio
return false;
}
+ /**
+ * Gets the type produced by this deserializer.
+ * This is the type that was passed to the constructor, or reflectively inferred
+ * (if the default constructor was called).
+ */
@Override
public TypeInformation<T> getProducedType() {
- return TypeExtractor.createTypeInfo(AbstractDeserializationSchema.class, getClass(), 0, null, null);
+ return type;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e0e94b6a/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java b/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java
index 9de4743..cacddde 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java
@@ -28,9 +28,15 @@ import java.io.Serializable;
* data sources (for example Apache Kafka) into data types (Java/Scala objects) that are
* processed by Flink.
*
- * <p>Note: In most cases, one should start from {@link AbstractDeserializationSchema}, which
+ * <p>In addition, the DeserializationSchema describes the produced type ({@link #getProducedType()}),
+ * which lets Flink create internal serializers and structures to handle the type.
+ *
+ * <p><b>Note:</b> In most cases, one should start from {@link AbstractDeserializationSchema}, which
* takes care of producing the return type information automatically.
*
+ * <p>A DeserializationSchema must be {@link Serializable} because its instances are often part of
+ * an operator or transformation function.
+ *
* @param <T> The type created by the deserialization schema.
*/
@Public
http://git-wip-us.apache.org/repos/asf/flink/blob/e0e94b6a/flink-core/src/test/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchemaTest.java b/flink-core/src/test/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchemaTest.java
index ec241b4..68655f8 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchemaTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchemaTest.java
@@ -18,10 +18,11 @@
package org.apache.flink.api.common.serialization;
-import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.util.JSONPObject;
@@ -81,13 +82,29 @@ public class AbstractDeserializationSchemaTest {
@Test
public void testTypeExtractionRawException() {
try {
- new RawSchema().getProducedType();
+ new RawSchema();
fail();
- } catch (InvalidTypesException e) {
+ } catch (FlinkRuntimeException e) {
// expected
}
}
+ @Test
+ public void testTypeExtractionGenericException() {
+ try {
+ new GenericSchema<>();
+ fail();
+ } catch (FlinkRuntimeException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testIndirectGenericExtension() {
+ TypeInformation<String> type = new IndirectExtension().getProducedType();
+ assertEquals(BasicTypeInfo.STRING_TYPE_INFO, type);
+ }
+
// ------------------------------------------------------------------------
// Test types
// ------------------------------------------------------------------------
@@ -116,4 +133,20 @@ public class AbstractDeserializationSchemaTest {
throw new UnsupportedOperationException();
}
}
+
+ private static class GenericSchema<T> extends AbstractDeserializationSchema<T> {
+
+ @Override
+ public T deserialize(byte[] message) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private static class IndirectExtension extends GenericSchema<String> {
+
+ @Override
+ public String deserialize(byte[] message) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+ }
}