You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/04/23 04:45:01 UTC

[09/17] flink git commit: [FLINK-9198] [core] Improve AbstractDeserializationSchema to eagerly extract generic types.

[FLINK-9198] [core] Improve AbstractDeserializationSchema to eagerly extract generic types.

This results in better error messages.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e0e94b6a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e0e94b6a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e0e94b6a

Branch: refs/heads/release-1.5
Commit: e0e94b6afebcb94719dac2510eadf5085fa464e7
Parents: 6de1d12
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Apr 17 20:46:26 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Apr 22 22:42:30 2018 +0200

----------------------------------------------------------------------
 .../AbstractDeserializationSchema.java          | 123 ++++++++++++++++++-
 .../serialization/DeserializationSchema.java    |   8 +-
 .../AbstractDeserializationSchemaTest.java      |  39 +++++-
 3 files changed, 164 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e0e94b6a/flink-core/src/main/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchema.java b/flink-core/src/main/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchema.java
index 871b7b1..a88f3cf 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchema.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchema.java
@@ -19,11 +19,16 @@
 package org.apache.flink.api.common.serialization;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.util.FlinkRuntimeException;
 
 import java.io.IOException;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * The deserialization schema describes how to turn the byte messages delivered by certain
  * data sources (for example Apache Kafka) into data types (Java/Scala objects) that are
@@ -32,12 +37,121 @@ import java.io.IOException;
  * <p>This base variant of the deserialization schema produces the type information
  * automatically by extracting it from the generic class arguments.
  *
+ * <h3>Common Use</h3>
+ *
+ * <p>To write a deserialization schema for a specific type, simply extend this class and declare
+ * the type in the class signature. Flink will reflectively determine the type and create the
+ * proper TypeInformation:
+ *
+ * <pre>{@code
+ * public class MyDeserializationSchema extends AbstractDeserializationSchema<MyType> {
+ *
+ *     public MyType deserialize(byte[] message) throws IOException {
+ *         ...
+ *     }
+ * }
+ * }</pre>
+ *
+ * <h3>Generic Use</h3>
+ *
+ * <p>If you want to write a more generic DeserializationSchema that works for different types,
+ * you need to pass the TypeInformation (or an equivalent hint) to the constructor:
+ *
+ * <pre>{@code
+ * public class MyGenericSchema<T> extends AbstractDeserializationSchema<T> {
+ *
+ *     public MyGenericSchema(Class<T> type) {
+ *         super(type);
+ *     }
+ *
+ *     public T deserialize(byte[] message) throws IOException {
+ *         ...
+ *     }
+ * }
+ * }</pre>
+ *
  * @param <T> The type created by the deserialization schema.
  */
 @PublicEvolving
 public abstract class AbstractDeserializationSchema<T> implements DeserializationSchema<T> {
 
-	private static final long serialVersionUID = 1L;
+	private static final long serialVersionUID = 2L;
+
+	/** The type produced by this {@code DeserializationSchema}. */
+	private final TypeInformation<T> type;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new AbstractDeserializationSchema and tries to infer the type returned by this
+	 * DeserializationSchema.
+	 *
+	 * <p>This constructor is usable whenever the DeserializationSchema concretely defines
+	 * its type, without generic variables:
+	 *
+	 * <pre>{@code
+	 * public class MyDeserializationSchema extends AbstractDeserializationSchema<MyType> {
+	 *
+	 *     public MyType deserialize(byte[] message) throws IOException {
+	 *         ...
+	 *     }
+	 * }
+	 * }</pre>
+	 */
+	protected AbstractDeserializationSchema() {
+		try {
+			this.type = TypeExtractor.createTypeInfo(
+					AbstractDeserializationSchema.class, getClass(), 0, null, null);
+		}
+		catch (InvalidTypesException e) {
+			throw new FlinkRuntimeException(
+					"The implementation of AbstractDeserializationSchema is using a generic variable. " +
+					"This is not supported, because due to Java's generic type erasure, it will not be possible to " +
+					"determine the full type at runtime. For generic implementations, please pass the TypeInformation " +
+					"or type class explicitly to the constructor.");
+		}
+	}
+
+	/**
+	 * Creates an AbstractDeserializationSchema that returns the TypeInformation
+	 * indicated by the given class. This constructor is only necessary when creating a generic
+	 * implementation, see {@link AbstractDeserializationSchema Generic Use}.
+	 *
+	 * <p>This constructor may fail if the class is generic. In that case, please
+	 * use the constructor that accepts a {@link #AbstractDeserializationSchema(TypeHint) TypeHint},
+	 * or a {@link #AbstractDeserializationSchema(TypeInformation) TypeInformation}.
+	 *
+	 * @param type The class of the produced type.
+	 */
+	protected AbstractDeserializationSchema(Class<T> type) {
+		checkNotNull(type, "type");
+		this.type = TypeInformation.of(type);
+	}
+
+	/**
+	 * Creates an AbstractDeserializationSchema that returns the TypeInformation
+	 * indicated by the given type hint. This constructor is only necessary when creating a generic
+	 * implementation, see {@link AbstractDeserializationSchema Generic Use}.
+	 *
+	 * @param typeHint The TypeHint for the produced type.
+	 */
+	protected AbstractDeserializationSchema(TypeHint<T> typeHint) {
+		checkNotNull(typeHint, "typeHint");
+		this.type = typeHint.getTypeInfo();
+	}
+
+	/**
+	 * Creates an AbstractDeserializationSchema that returns the given TypeInformation
+	 * for the produced type. This constructor is only necessary when creating a generic
+	 * implementation, see {@link AbstractDeserializationSchema Generic Use}.
+	 *
+	 * @param typeInfo The TypeInformation for the produced type.
+	 */
+	protected AbstractDeserializationSchema(TypeInformation<T> typeInfo) {
+		this.type = checkNotNull(typeInfo, "typeInfo");
+	}
+
+	// ------------------------------------------------------------------------
 
 	/**
 	 * De-serializes the byte message.
@@ -63,8 +177,13 @@ public abstract class AbstractDeserializationSchema<T> implements Deserializatio
 		return false;
 	}
 
+	/**
+	 * Gets the type produced by this deserializer.
+	 * This is the type that was passed to the  constructor, or reflectively inferred
+	 * (if the default constructor was called).
+	 */
 	@Override
 	public TypeInformation<T> getProducedType() {
-		return TypeExtractor.createTypeInfo(AbstractDeserializationSchema.class, getClass(), 0, null, null);
+		return type;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e0e94b6a/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java b/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java
index 9de4743..cacddde 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java
@@ -28,9 +28,15 @@ import java.io.Serializable;
  * data sources (for example Apache Kafka) into data types (Java/Scala objects) that are
  * processed by Flink.
  *
- * <p>Note: In most cases, one should start from {@link AbstractDeserializationSchema}, which
+ * <p>In addition, the DeserializationSchema describes the produced type ({@link #getProducedType()}),
+ * which lets Flink create internal serializers and structures to handle the type.
+ *
+ * <p><b>Note:</b> In most cases, one should start from {@link AbstractDeserializationSchema}, which
  * takes care of producing the return type information automatically.
  *
+ * <p>A DeserializationSchema must be {@link Serializable} because its instances are often part of
+ * an operator or transformation function.
+ *
  * @param <T> The type created by the deserialization schema.
  */
 @Public

http://git-wip-us.apache.org/repos/asf/flink/blob/e0e94b6a/flink-core/src/test/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchemaTest.java b/flink-core/src/test/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchemaTest.java
index ec241b4..68655f8 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchemaTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchemaTest.java
@@ -18,10 +18,11 @@
 
 package org.apache.flink.api.common.serialization;
 
-import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.FlinkRuntimeException;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.util.JSONPObject;
 
@@ -81,13 +82,29 @@ public class AbstractDeserializationSchemaTest {
 	@Test
 	public void testTypeExtractionRawException() {
 		try {
-			new RawSchema().getProducedType();
+			new RawSchema();
 			fail();
-		} catch (InvalidTypesException e) {
+		} catch (FlinkRuntimeException e) {
 			// expected
 		}
 	}
 
+	@Test
+	public void testTypeExtractionGenericException() {
+		try {
+			new GenericSchema<>();
+			fail();
+		} catch (FlinkRuntimeException e) {
+			// expected
+		}
+	}
+
+	@Test
+	public void testIndirectGenericExtension() {
+		TypeInformation<String> type = new IndirectExtension().getProducedType();
+		assertEquals(BasicTypeInfo.STRING_TYPE_INFO, type);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Test types
 	// ------------------------------------------------------------------------
@@ -116,4 +133,20 @@ public class AbstractDeserializationSchemaTest {
 			throw new UnsupportedOperationException();
 		}
 	}
+
+	private static class GenericSchema<T> extends AbstractDeserializationSchema<T> {
+
+		@Override
+		public T deserialize(byte[] message) throws IOException {
+			throw new UnsupportedOperationException();
+		}
+	}
+
+	private static class IndirectExtension extends GenericSchema<String> {
+
+		@Override
+		public String deserialize(byte[] message) throws IOException {
+			throw new UnsupportedOperationException();
+		}
+	}
 }