You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/05/07 07:44:33 UTC

[flink] 06/07: [FLINK-11249][kafka, test] Add FlinkKafkaProducer(011) migration tests

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 39710d3c748a0d24cda6ea8d94b539d7abeaaf3c
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Fri Feb 8 09:26:01 2019 +0100

    [FLINK-11249][kafka,test] Add FlinkKafkaProducer(011) migration tests
    
    Add migration test for 0.11 and universal connector to make sure
    that those sinks can be restarted from old save points.
    
    Note this doesn't test for migration from 0.11 to universal connector.
    It only checks that master version of 0.11 (or universal) connector is
    compatible with state created by previous Flink versions.
---
 .../kafka/FlinkKafkaProducer011MigrationTest.java  |  82 ++++++++++
 ...fka-migration-kafka-producer-flink-1.8-snapshot | Bin 0 -> 1746 bytes
 .../connectors/kafka/KafkaMigrationTestBase.java   | 171 +++++++++++++++++++++
 .../streaming/connectors/kafka/KafkaTestBase.java  |  26 +++-
 .../kafka/FlinkKafkaProducerMigrationTest.java     |  82 ++++++++++
 ...fka-migration-kafka-producer-flink-1.8-snapshot | Bin 0 -> 1731 bytes
 pom.xml                                            |   2 +
 7 files changed, 355 insertions(+), 8 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011MigrationTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011MigrationTest.java
new file mode 100644
index 0000000..4ac703e
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011MigrationTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Properties;
+
+/**
+ * Tests for checking whether {@link FlinkKafkaProducer011} can restore from snapshots that were
+ * done using previous Flink versions' {@link FlinkKafkaProducer011}.
+ *
+ * <p>For regenerating the binary snapshot files run {@link #writeSnapshot()} on the corresponding
+ * Flink release-* branch.
+ */
+@RunWith(Parameterized.class)
+public class FlinkKafkaProducer011MigrationTest extends KafkaMigrationTestBase {
+	@Parameterized.Parameters(name = "Migration Savepoint: {0}")
+	public static Collection<MigrationVersion> parameters() {
+		return Arrays.asList(
+			MigrationVersion.v1_8);
+	}
+
+	public FlinkKafkaProducer011MigrationTest(MigrationVersion testMigrateVersion) {
+		super(testMigrateVersion);
+	}
+
+	@Override
+	protected Properties createProperties() {
+		Properties properties = new Properties();
+		properties.putAll(standardProps);
+		properties.putAll(secureProps);
+		properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-client-id");
+		properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "producer-transaction-id");
+		properties.put(FlinkKafkaProducer011.KEY_DISABLE_METRICS, "true");
+		return properties;
+	}
+
+	@Override
+	protected OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness() throws Exception {
+		FlinkKafkaProducer011<Integer> kafkaProducer = new FlinkKafkaProducer011<>(
+			TOPIC,
+			integerKeyedSerializationSchema,
+			createProperties(),
+			FlinkKafkaProducer011.Semantic.EXACTLY_ONCE
+		).ignoreFailuresAfterTransactionTimeout();
+
+		return new OneInputStreamOperatorTestHarness<>(
+			new StreamSink<>(kafkaProducer),
+			1,
+			1,
+			0,
+			IntSerializer.INSTANCE,
+			new OperatorID(1, 1));
+	}
+}
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/resources/kafka-migration-kafka-producer-flink-1.8-snapshot b/flink-connectors/flink-connector-kafka-0.11/src/test/resources/kafka-migration-kafka-producer-flink-1.8-snapshot
new file mode 100644
index 0000000..29c6ccc
Binary files /dev/null and b/flink-connectors/flink-connector-kafka-0.11/src/test/resources/kafka-migration-kafka-producer-flink-1.8-snapshot differ
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaMigrationTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaMigrationTestBase.java
new file mode 100644
index 0000000..0dbc077
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaMigrationTestBase.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OperatorSnapshotUtil;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The base class with migration tests for the Kafka Exactly-Once Producer.
+ */
+@SuppressWarnings("serial")
+public abstract class KafkaMigrationTestBase extends KafkaTestBase {
+
+	protected static final Logger LOG = LoggerFactory.getLogger(KafkaMigrationTestBase.class);
+	protected static final String TOPIC = "flink-kafka-producer-migration-test";
+
+	protected final MigrationVersion testMigrateVersion;
+	protected final TypeInformationSerializationSchema<Integer> integerSerializationSchema =
+		new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+	protected final KeyedSerializationSchema<Integer> integerKeyedSerializationSchema =
+		new KeyedSerializationSchemaWrapper<>(integerSerializationSchema);
+
+	/**
+	 * TODO change this to the corresponding savepoint version to be written (e.g. {@link MigrationVersion#v1_3} for 1.3)
+	 * TODO and remove all @Ignore annotations on write*Snapshot() methods to generate savepoints
+	 * TODO Note: You should generate the savepoint based on the release branch instead of the master.
+	 */
+	protected final Optional<MigrationVersion> flinkGenerateSavepointVersion = Optional.empty();
+
+	public KafkaMigrationTestBase(MigrationVersion testMigrateVersion) {
+		this.testMigrateVersion = checkNotNull(testMigrateVersion);
+	}
+
+	public String getOperatorSnapshotPath() {
+		return getOperatorSnapshotPath(testMigrateVersion);
+	}
+
+	public String getOperatorSnapshotPath(MigrationVersion version) {
+		return "src/test/resources/kafka-migration-kafka-producer-flink-" + version + "-snapshot";
+	}
+
+	/**
+	 * Override {@link KafkaTestBase}. Kafka Migration Tests are starting up Kafka/ZooKeeper cluster manually
+	 */
+	@BeforeClass
+	public static void prepare() throws Exception {
+	}
+
+	/**
+	 * Override {@link KafkaTestBase}. Kafka Migration Tests are starting up Kafka/ZooKeeper cluster manually
+	 */
+	@AfterClass
+	public static void shutDownServices() throws Exception {
+	}
+
+	/**
+	 * Manually run this to write binary snapshot data.
+	 */
+	@Ignore
+	@Test
+	public void writeSnapshot() throws Exception {
+		try {
+			checkState(flinkGenerateSavepointVersion.isPresent());
+			startClusters();
+
+			OperatorSubtaskState snapshot = initializeTestState();
+			OperatorSnapshotUtil.writeStateHandle(snapshot, getOperatorSnapshotPath(flinkGenerateSavepointVersion.get()));
+		}
+		finally {
+			shutdownClusters();
+		}
+	}
+
+	private OperatorSubtaskState initializeTestState() throws Exception {
+		try (OneInputStreamOperatorTestHarness testHarness = createTestHarness()) {
+			testHarness.setup();
+			testHarness.open();
+
+			// Create a committed transaction
+			testHarness.processElement(42, 0L);
+
+			// TODO: when stop with savepoint is available, replace this code with it (with stop with savepoint
+			// there won't be any pending transactions)
+			OperatorSubtaskState snapshot = testHarness.snapshot(0L, 1L);
+			// We kind of simulate stop with savepoint by making sure that notifyOfCompletedCheckpoint is called
+			testHarness.notifyOfCompletedCheckpoint(0L);
+
+			// Create a Pending transaction
+			testHarness.processElement(43, 2L);
+			return snapshot;
+		}
+	}
+
+	@SuppressWarnings("warning")
+	@Test
+	public void testRestoreProducer() throws Exception {
+		try {
+			startClusters();
+
+			initializeTestState();
+
+			try (OneInputStreamOperatorTestHarness testHarness = createTestHarness()) {
+				initializeState(testHarness);
+
+				// Create a committed transaction
+				testHarness.processElement(44, 4L);
+				testHarness.snapshot(2L, 5L);
+				testHarness.notifyOfCompletedCheckpoint(2L);
+
+				// Create a pending transaction
+				testHarness.processElement(45, 6L);
+
+				// We should have:
+				// - committed transaction 42
+				// - transaction 43 aborted
+				// - committed transaction 44
+				// - transaction 45 pending
+				assertExactlyOnceForTopic(createProperties(), TOPIC, 0, Arrays.asList(42, 44));
+			}
+		}
+		finally {
+			shutdownClusters();
+		}
+	}
+
+	protected abstract OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness() throws Exception;
+
+	protected abstract Properties createProperties();
+
+	protected void initializeState(OneInputStreamOperatorTestHarness testHarness) throws Exception{
+		testHarness.setup();
+		testHarness.initializeState(getOperatorSnapshotPath());
+		testHarness.open();
+	}
+}
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index bc91afb..073ba6e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -126,24 +126,29 @@ public abstract class KafkaTestBase extends TestLogger {
 		return flinkConfig;
 	}
 
+	protected static void startClusters() throws Exception {
+		startClusters(KafkaTestEnvironment.createConfig().setKafkaServersNumber(NUMBER_OF_KAFKA_SERVERS));
+	}
+
 	protected static void startClusters(boolean secureMode, boolean hideKafkaBehindProxy) throws Exception {
+		startClusters(KafkaTestEnvironment.createConfig()
+			.setKafkaServersNumber(NUMBER_OF_KAFKA_SERVERS)
+			.setSecureMode(secureMode)
+			.setHideKafkaBehindProxy(hideKafkaBehindProxy));
+	}
 
-		// dynamically load the implementation for the test
-		Class<?> clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
-		kafkaServer = (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz);
+	protected static void startClusters(KafkaTestEnvironment.Config environmentConfig) throws Exception {
+		kafkaServer = constructKafkaTestEnvionment();
 
 		LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion());
 
-		kafkaServer.prepare(kafkaServer.createConfig()
-			.setKafkaServersNumber(NUMBER_OF_KAFKA_SERVERS)
-			.setSecureMode(secureMode)
-			.setHideKafkaBehindProxy(hideKafkaBehindProxy));
+		kafkaServer.prepare(environmentConfig);
 
 		standardProps = kafkaServer.getStandardProperties();
 
 		brokerConnectionStrings = kafkaServer.getBrokerConnectionString();
 
-		if (secureMode) {
+		if (environmentConfig.isSecureMode()) {
 			if (!kafkaServer.isSecureRunSupported()) {
 				throw new IllegalStateException(
 					"Attempting to test in secure mode but secure mode not supported by the KafkaTestEnvironment.");
@@ -152,6 +157,11 @@ public abstract class KafkaTestBase extends TestLogger {
 		}
 	}
 
+	protected static KafkaTestEnvironment constructKafkaTestEnvionment() throws Exception {
+		Class<?> clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
+		return (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz);
+	}
+
 	protected static void shutdownClusters() throws Exception {
 		if (secureProps != null) {
 			secureProps.clear();
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerMigrationTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerMigrationTest.java
new file mode 100644
index 0000000..508f0b5
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerMigrationTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Properties;
+
+/**
+ * Tests for checking whether {@link FlinkKafkaProducer} can restore from snapshots that were
+ * done using previous Flink versions' {@link FlinkKafkaProducer}.
+ *
+ * <p>For regenerating the binary snapshot files run {@link #writeSnapshot()} on the corresponding
+ * Flink release-* branch.
+ */
+@RunWith(Parameterized.class)
+public class FlinkKafkaProducerMigrationTest extends KafkaMigrationTestBase {
+	@Parameterized.Parameters(name = "Migration Savepoint: {0}")
+	public static Collection<MigrationVersion> parameters() {
+		return Arrays.asList(
+			MigrationVersion.v1_8);
+	}
+
+	public FlinkKafkaProducerMigrationTest(MigrationVersion testMigrateVersion) {
+		super(testMigrateVersion);
+	}
+
+	@Override
+	protected Properties createProperties() {
+		Properties properties = new Properties();
+		properties.putAll(standardProps);
+		properties.putAll(secureProps);
+		properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-client-id");
+		properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "producer-transaction-id");
+		properties.put(FlinkKafkaProducer.KEY_DISABLE_METRICS, "true");
+		return properties;
+	}
+
+	@Override
+	protected OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness() throws Exception {
+		FlinkKafkaProducer<Integer> kafkaProducer = new FlinkKafkaProducer<>(
+			TOPIC,
+			integerKeyedSerializationSchema,
+			createProperties(),
+			FlinkKafkaProducer.Semantic.EXACTLY_ONCE
+		).ignoreFailuresAfterTransactionTimeout();
+
+		return new OneInputStreamOperatorTestHarness<>(
+			new StreamSink<>(kafkaProducer),
+			1,
+			1,
+			0,
+			IntSerializer.INSTANCE,
+			new OperatorID(1, 1));
+	}
+}
diff --git a/flink-connectors/flink-connector-kafka/src/test/resources/kafka-migration-kafka-producer-flink-1.8-snapshot b/flink-connectors/flink-connector-kafka/src/test/resources/kafka-migration-kafka-producer-flink-1.8-snapshot
new file mode 100644
index 0000000..58c832a
Binary files /dev/null and b/flink-connectors/flink-connector-kafka/src/test/resources/kafka-migration-kafka-producer-flink-1.8-snapshot differ
diff --git a/pom.xml b/pom.xml
index dd47603..235e154 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1361,6 +1361,8 @@ under the License.
 						<exclude>flink-yarn/src/test/resources/krb5.keytab</exclude>
 						<exclude>flink-end-to-end-tests/test-scripts/test-data/*</exclude>
 						<exclude>flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/config/keystore.jks</exclude>
+						<exclude>flink-connectors/flink-connector-kafka/src/test/resources/**</exclude>
+						<exclude>flink-connectors/flink-connector-kafka-0.11/src/test/resources/**</exclude>
 
 						<!-- snapshots -->
 						<exclude>**/src/test/resources/*-snapshot</exclude>