You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/09/07 06:19:27 UTC
[3/6] flink git commit: [FLINK-7440] [kinesis] Eagerly check
serializability of deserialization schema in FlinkKinesisConsumer
[FLINK-7440] [kinesis] Eagerly check serializability of deserialization schema in FlinkKinesisConsumer
This commit also adds tests for verifying that the FlinkKinesisConsumer
itself is serializable.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eaafb61f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eaafb61f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eaafb61f
Branch: refs/heads/master
Commit: eaafb61f8f4e69ec683f8e4cab5c1c6f34a9b4ff
Parents: 36412c6
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Aug 14 15:16:54 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Sep 7 12:54:12 2017 +0800
----------------------------------------------------------------------
.../kinesis/FlinkKinesisConsumer.java | 8 ++-
.../kinesis/FlinkKinesisConsumerTest.java | 74 ++++++++++++++++++++
2 files changed, 81 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/eaafb61f/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index 5689229..a3681ec 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -42,6 +42,7 @@ import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeseri
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -173,7 +174,12 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
// check the configuration properties for any conflicting settings
KinesisConfigUtil.validateConsumerConfiguration(this.configProps);
- this.deserializer = checkNotNull(deserializer, "deserializer can not be null");
+ checkNotNull(deserializer, "deserializer can not be null");
+ checkArgument(
+ InstantiationUtil.isSerializable(deserializer),
+ "The provided deserialization schema is not serializable: " + deserializer.getClass().getName() + ". " +
+ "Please check that it does not contain references to non-serializable instances.");
+ this.deserializer = deserializer;
if (LOG.isInfoEnabled()) {
StringBuilder sb = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/flink/blob/eaafb61f/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index 69d30cd..6af4c62 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
@@ -40,14 +41,17 @@ import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumbe
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
+import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.flink.util.InstantiationUtil;
import com.amazonaws.services.kinesis.model.HashKeyRange;
import com.amazonaws.services.kinesis.model.SequenceNumberRange;
import com.amazonaws.services.kinesis.model.Shard;
+
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -60,6 +64,7 @@ import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -534,6 +539,44 @@ public class FlinkKinesisConsumerTest {
}
// ----------------------------------------------------------------------
+ // Tests to verify serializability
+ // ----------------------------------------------------------------------
+
+ @Test
+ public void testCreateWithNonSerializableDeserializerFails() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("The provided deserialization schema is not serializable");
+
+ Properties testConfig = new Properties();
+ testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+ new FlinkKinesisConsumer<>("test-stream", new NonSerializableDeserializationSchema(), testConfig);
+ }
+
+ @Test
+ public void testCreateWithSerializableDeserializer() {
+ Properties testConfig = new Properties();
+ testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+ new FlinkKinesisConsumer<>("test-stream", new SerializableDeserializationSchema(), testConfig);
+ }
+
+ @Test
+ public void testConsumerIsSerializable() {
+ Properties testConfig = new Properties();
+ testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+ FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("test-stream", new SimpleStringSchema(), testConfig);
+ assertTrue(InstantiationUtil.isSerializable(consumer));
+ }
+
+ // ----------------------------------------------------------------------
// Tests related to state initialization
// ----------------------------------------------------------------------
@@ -1030,4 +1073,35 @@ public class FlinkKinesisConsumerTest {
return fakeRestoredState;
}
+
+ /**
+ * A non-serializable {@link KinesisDeserializationSchema} (because it is a nested class with reference
+ * to the enclosing class, which is not serializable) used for testing.
+ */
+ private final class NonSerializableDeserializationSchema implements KinesisDeserializationSchema<String> {
+ @Override
+ public String deserialize(byte[] recordValue, String partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String shardId) throws IOException {
+ return new String(recordValue);
+ }
+
+ @Override
+ public TypeInformation<String> getProducedType() {
+ return BasicTypeInfo.STRING_TYPE_INFO;
+ }
+ }
+
+ /**
+ * A static, serializable {@link KinesisDeserializationSchema}.
+ */
+ private static final class SerializableDeserializationSchema implements KinesisDeserializationSchema<String> {
+ @Override
+ public String deserialize(byte[] recordValue, String partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String shardId) throws IOException {
+ return new String(recordValue);
+ }
+
+ @Override
+ public TypeInformation<String> getProducedType() {
+ return BasicTypeInfo.STRING_TYPE_INFO;
+ }
+ }
}