You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/09/07 06:19:27 UTC

[3/6] flink git commit: [FLINK-7440] [kinesis] Eagerly check serializability of deserialization schema in FlinkKinesisConsumer

[FLINK-7440] [kinesis] Eagerly check serializability of deserialization schema in FlinkKinesisConsumer

This commit also adds tests for verifying that the FlinkKinesisConsumer
itself is serializable.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eaafb61f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eaafb61f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eaafb61f

Branch: refs/heads/master
Commit: eaafb61f8f4e69ec683f8e4cab5c1c6f34a9b4ff
Parents: 36412c6
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Aug 14 15:16:54 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Sep 7 12:54:12 2017 +0800

----------------------------------------------------------------------
 .../kinesis/FlinkKinesisConsumer.java           |  8 ++-
 .../kinesis/FlinkKinesisConsumerTest.java       | 74 ++++++++++++++++++++
 2 files changed, 81 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/eaafb61f/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index 5689229..a3681ec 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -42,6 +42,7 @@ import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeseri
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.util.InstantiationUtil;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -173,7 +174,12 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 		// check the configuration properties for any conflicting settings
 		KinesisConfigUtil.validateConsumerConfiguration(this.configProps);
 
-		this.deserializer = checkNotNull(deserializer, "deserializer can not be null");
+		checkNotNull(deserializer, "deserializer can not be null");
+		checkArgument(
+			InstantiationUtil.isSerializable(deserializer),
+			"The provided deserialization schema is not serializable: " + deserializer.getClass().getName() + ". " +
+				"Please check that it does not contain references to non-serializable instances.");
+		this.deserializer = deserializer;
 
 		if (LOG.isInfoEnabled()) {
 			StringBuilder sb = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/flink/blob/eaafb61f/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index 69d30cd..6af4c62 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
@@ -40,14 +41,17 @@ import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumbe
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
+import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
 import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.flink.util.InstantiationUtil;
 
 import com.amazonaws.services.kinesis.model.HashKeyRange;
 import com.amazonaws.services.kinesis.model.SequenceNumberRange;
 import com.amazonaws.services.kinesis.model.Shard;
+
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -60,6 +64,7 @@ import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -534,6 +539,44 @@ public class FlinkKinesisConsumerTest {
 	}
 
 	// ----------------------------------------------------------------------
+	// Tests to verify serializability
+	// ----------------------------------------------------------------------
+
+	@Test
+	public void testCreateWithNonSerializableDeserializerFails() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("The provided deserialization schema is not serializable");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+		new FlinkKinesisConsumer<>("test-stream", new NonSerializableDeserializationSchema(), testConfig);
+	}
+
+	@Test
+	public void testCreateWithSerializableDeserializer() {
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+		new FlinkKinesisConsumer<>("test-stream", new SerializableDeserializationSchema(), testConfig);
+	}
+
+	@Test
+	public void testConsumerIsSerializable() {
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+		FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("test-stream", new SimpleStringSchema(), testConfig);
+		assertTrue(InstantiationUtil.isSerializable(consumer));
+	}
+
+	// ----------------------------------------------------------------------
 	// Tests related to state initialization
 	// ----------------------------------------------------------------------
 
@@ -1030,4 +1073,35 @@ public class FlinkKinesisConsumerTest {
 
 		return fakeRestoredState;
 	}
+
+	/**
+	 * A non-serializable {@link KinesisDeserializationSchema} (because it is a nested class with reference
+	 * to the enclosing class, which is not serializable) used for testing.
+	 */
+	private final class NonSerializableDeserializationSchema implements KinesisDeserializationSchema<String> {
+		@Override
+		public String deserialize(byte[] recordValue, String partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String shardId) throws IOException {
+			return new String(recordValue);
+		}
+
+		@Override
+		public TypeInformation<String> getProducedType() {
+			return BasicTypeInfo.STRING_TYPE_INFO;
+		}
+	}
+
+	/**
+	 * A static, serializable {@link KinesisDeserializationSchema}.
+	 */
+	private static final class SerializableDeserializationSchema implements KinesisDeserializationSchema<String> {
+		@Override
+		public String deserialize(byte[] recordValue, String partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String shardId) throws IOException {
+			return new String(recordValue);
+		}
+
+		@Override
+		public TypeInformation<String> getProducedType() {
+			return BasicTypeInfo.STRING_TYPE_INFO;
+		}
+	}
 }