You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/09/07 06:19:28 UTC

[4/6] flink git commit: [FLINK-7440] [kinesis] Eagerly check that provided schema and partitioner is serializable in FlinkKinesisProducer

[FLINK-7440] [kinesis] Eagerly check that provided schema and partitioner is serializable in FlinkKinesisProducer

This commit also adds a test to verify that the FlinkKinesisProducer is
serializable.

This closes #4537.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/98737f9a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/98737f9a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/98737f9a

Branch: refs/heads/master
Commit: 98737f9a875f1899cb14b3dcef1bd2ac1c6530ba
Parents: eaafb61
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Aug 14 15:35:43 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Sep 7 12:54:19 2017 +0800

----------------------------------------------------------------------
 .../kinesis/FlinkKinesisProducer.java           |  19 ++-
 .../kinesis/FlinkKinesisProducerTest.java       | 161 +++++++++++++++++++
 2 files changed, 175 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/98737f9a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
index 04d7055..e0d3e38 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.connectors.kinesis;
 
-import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
@@ -25,6 +24,7 @@ import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerial
 import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.PropertiesUtil;
 
 import com.amazonaws.services.kinesis.producer.Attempt;
@@ -35,14 +35,15 @@ import com.amazonaws.services.kinesis.producer.UserRecordResult;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.util.List;
-import java.util.Objects;
 import java.util.Properties;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -123,7 +124,11 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
 		// check the configuration properties for any conflicting settings
 		KinesisConfigUtil.validateProducerConfiguration(this.configProps);
 
-		ClosureCleaner.ensureSerializable(Objects.requireNonNull(schema));
+		checkNotNull(schema, "serialization schema cannot be null");
+		checkArgument(
+			InstantiationUtil.isSerializable(schema),
+			"The provided serialization schema is not serializable: " + schema.getClass().getName() + ". " +
+				"Please check that it does not contain references to non-serializable instances.");
 		this.schema = schema;
 	}
 
@@ -154,8 +159,12 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
 	}
 
 	public void setCustomPartitioner(KinesisPartitioner<OUT> partitioner) {
-		Objects.requireNonNull(partitioner);
-		ClosureCleaner.ensureSerializable(partitioner);
+		checkNotNull(partitioner, "partitioner cannot be null");
+		checkArgument(
+			InstantiationUtil.isSerializable(partitioner),
+			"The provided custom partitioner is not serializable: " + partitioner.getClass().getName() + ". " +
+				"Please check that it does not contain references to non-serializable instances.");
+
 		this.customPartitioner = partitioner;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/98737f9a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
new file mode 100644
index 0000000..ac03cfe
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.nio.ByteBuffer;
+import java.util.Properties;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Suite of {@link FlinkKinesisProducer} tests.
+ */
+public class FlinkKinesisProducerTest {
+
+	@Rule
+	public ExpectedException exception = ExpectedException.none();
+
+	// ----------------------------------------------------------------------
+	// Tests to verify serializability
+	// ----------------------------------------------------------------------
+
+	@Test
+	public void testCreateWithNonSerializableDeserializerFails() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("The provided serialization schema is not serializable");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+		new FlinkKinesisProducer<>(new NonSerializableSerializationSchema(), testConfig);
+	}
+
+	@Test
+	public void testCreateWithSerializableDeserializer() {
+		Properties testConfig = new Properties();
+		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+		new FlinkKinesisProducer<>(new SerializableSerializationSchema(), testConfig);
+	}
+
+	@Test
+	public void testConfigureWithNonSerializableCustomPartitionerFails() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("The provided custom partitioner is not serializable");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+		new FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig)
+			.setCustomPartitioner(new NonSerializableCustomPartitioner());
+	}
+
+	@Test
+	public void testConfigureWithSerializableCustomPartitioner() {
+		Properties testConfig = new Properties();
+		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+		new FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig)
+			.setCustomPartitioner(new SerializableCustomPartitioner());
+	}
+
+	@Test
+	public void testConsumerIsSerializable() {
+		Properties testConfig = new Properties();
+		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+		FlinkKinesisProducer<String> consumer = new FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig);
+		assertTrue(InstantiationUtil.isSerializable(consumer));
+	}
+
+	// ----------------------------------------------------------------------
+	// Utility test classes
+	// ----------------------------------------------------------------------
+
+	/**
+	 * A non-serializable {@link KinesisSerializationSchema} (because it is a nested class with reference
+	 * to the enclosing class, which is not serializable) used for testing.
+	 */
+	private final class NonSerializableSerializationSchema implements KinesisSerializationSchema<String> {
+		@Override
+		public ByteBuffer serialize(String element) {
+			return ByteBuffer.wrap(element.getBytes());
+		}
+
+		@Override
+		public String getTargetStream(String element) {
+			return "test-stream";
+		}
+	}
+
+	/**
+	 * A static, serializable {@link KinesisSerializationSchema}.
+	 */
+	private static final class SerializableSerializationSchema implements KinesisSerializationSchema<String> {
+		@Override
+		public ByteBuffer serialize(String element) {
+			return ByteBuffer.wrap(element.getBytes());
+		}
+
+		@Override
+		public String getTargetStream(String element) {
+			return "test-stream";
+		}
+	}
+
+	/**
+	 * A non-serializable {@link KinesisPartitioner} (because it is a nested class with reference
+	 * to the enclosing class, which is not serializable) used for testing.
+	 */
+	private final class NonSerializableCustomPartitioner extends KinesisPartitioner<String> {
+		@Override
+		public String getPartitionId(String element) {
+			return "test-partition";
+		}
+	}
+
+	/**
+	 * A static, serializable {@link KinesisPartitioner}.
+	 */
+	private static final class SerializableCustomPartitioner extends KinesisPartitioner<String> {
+		@Override
+		public String getPartitionId(String element) {
+			return "test-partition";
+		}
+	}
+}