You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/09/07 06:19:28 UTC
[4/6] flink git commit: [FLINK-7440] [kinesis] Eagerly check that
provided schema and partitioner is serializable in FlinkKinesisProducer
[FLINK-7440] [kinesis] Eagerly check that provided schema and partitioner is serializable in FlinkKinesisProducer
This commit also adds a test to verify that the FlinkKinesisProducer is
serializable.
This closes #4537.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/98737f9a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/98737f9a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/98737f9a
Branch: refs/heads/master
Commit: 98737f9a875f1899cb14b3dcef1bd2ac1c6530ba
Parents: eaafb61
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Aug 14 15:35:43 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Sep 7 12:54:19 2017 +0800
----------------------------------------------------------------------
.../kinesis/FlinkKinesisProducer.java | 19 ++-
.../kinesis/FlinkKinesisProducerTest.java | 161 +++++++++++++++++++
2 files changed, 175 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/98737f9a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
index 04d7055..e0d3e38 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
@@ -17,7 +17,6 @@
package org.apache.flink.streaming.connectors.kinesis;
-import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
@@ -25,6 +24,7 @@ import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerial
import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.PropertiesUtil;
import com.amazonaws.services.kinesis.producer.Attempt;
@@ -35,14 +35,15 @@ import com.amazonaws.services.kinesis.producer.UserRecordResult;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.List;
-import java.util.Objects;
import java.util.Properties;
+import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -123,7 +124,11 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
// check the configuration properties for any conflicting settings
KinesisConfigUtil.validateProducerConfiguration(this.configProps);
- ClosureCleaner.ensureSerializable(Objects.requireNonNull(schema));
+ checkNotNull(schema, "serialization schema cannot be null");
+ checkArgument(
+ InstantiationUtil.isSerializable(schema),
+ "The provided serialization schema is not serializable: " + schema.getClass().getName() + ". " +
+ "Please check that it does not contain references to non-serializable instances.");
this.schema = schema;
}
@@ -154,8 +159,12 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
}
public void setCustomPartitioner(KinesisPartitioner<OUT> partitioner) {
- Objects.requireNonNull(partitioner);
- ClosureCleaner.ensureSerializable(partitioner);
+ checkNotNull(partitioner, "partitioner cannot be null");
+ checkArgument(
+ InstantiationUtil.isSerializable(partitioner),
+ "The provided custom partitioner is not serializable: " + partitioner.getClass().getName() + ". " +
+ "Please check that it does not contain references to non-serializable instances.");
+
this.customPartitioner = partitioner;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/98737f9a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
new file mode 100644
index 0000000..ac03cfe
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.nio.ByteBuffer;
+import java.util.Properties;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Suite of {@link FlinkKinesisProducer} tests.
+ */
+public class FlinkKinesisProducerTest {
+
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
+
+ // ----------------------------------------------------------------------
+ // Tests to verify serializability
+ // ----------------------------------------------------------------------
+
+ @Test
+ public void testCreateWithNonSerializableDeserializerFails() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("The provided serialization schema is not serializable");
+
+ Properties testConfig = new Properties();
+ testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+ new FlinkKinesisProducer<>(new NonSerializableSerializationSchema(), testConfig);
+ }
+
+ @Test
+ public void testCreateWithSerializableDeserializer() {
+ Properties testConfig = new Properties();
+ testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+ new FlinkKinesisProducer<>(new SerializableSerializationSchema(), testConfig);
+ }
+
+ @Test
+ public void testConfigureWithNonSerializableCustomPartitionerFails() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("The provided custom partitioner is not serializable");
+
+ Properties testConfig = new Properties();
+ testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+ new FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig)
+ .setCustomPartitioner(new NonSerializableCustomPartitioner());
+ }
+
+ @Test
+ public void testConfigureWithSerializableCustomPartitioner() {
+ Properties testConfig = new Properties();
+ testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+ new FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig)
+ .setCustomPartitioner(new SerializableCustomPartitioner());
+ }
+
+ @Test
+ public void testConsumerIsSerializable() {
+ Properties testConfig = new Properties();
+ testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+ FlinkKinesisProducer<String> consumer = new FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig);
+ assertTrue(InstantiationUtil.isSerializable(consumer));
+ }
+
+ // ----------------------------------------------------------------------
+ // Utility test classes
+ // ----------------------------------------------------------------------
+
+ /**
+ * A non-serializable {@link KinesisSerializationSchema} (because it is a nested class with reference
+ * to the enclosing class, which is not serializable) used for testing.
+ */
+ private final class NonSerializableSerializationSchema implements KinesisSerializationSchema<String> {
+ @Override
+ public ByteBuffer serialize(String element) {
+ return ByteBuffer.wrap(element.getBytes());
+ }
+
+ @Override
+ public String getTargetStream(String element) {
+ return "test-stream";
+ }
+ }
+
+ /**
+ * A static, serializable {@link KinesisSerializationSchema}.
+ */
+ private static final class SerializableSerializationSchema implements KinesisSerializationSchema<String> {
+ @Override
+ public ByteBuffer serialize(String element) {
+ return ByteBuffer.wrap(element.getBytes());
+ }
+
+ @Override
+ public String getTargetStream(String element) {
+ return "test-stream";
+ }
+ }
+
+ /**
+ * A non-serializable {@link KinesisPartitioner} (because it is a nested class with reference
+ * to the enclosing class, which is not serializable) used for testing.
+ */
+ private final class NonSerializableCustomPartitioner extends KinesisPartitioner<String> {
+ @Override
+ public String getPartitionId(String element) {
+ return "test-partition";
+ }
+ }
+
+ /**
+ * A static, serializable {@link KinesisPartitioner}.
+ */
+ private static final class SerializableCustomPartitioner extends KinesisPartitioner<String> {
+ @Override
+ public String getPartitionId(String element) {
+ return "test-partition";
+ }
+ }
+}