You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/05/07 07:44:27 UTC

[flink] branch master updated (a3cf3f1 -> b20e57d)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from a3cf3f1  [FLINK-12232][hive] Support database related operations in HiveCatalog
     new a9e5395  [hotfix][kafka,test] Improve error message
     new f487d8c  [hotfix][kafka,test] Add missing shutdown call propagation
     new 5b49f17  [hotfix][kafka,test] Allow exceptions in KafkaTestEnvironment#prepare
     new 3b1976a  [hotfix][kafka,test] Make brokers list final and avoid potential null pointer exceptions
     new a47b276  [hotfix][kafka,test] Synchronize 0.11 KafkaTestEnvironmentImpl with universal
     new 39710d3  [FLINK-11249][kafka,test] Add FlinkKafkaProducer(011) migration tests
     new b20e57d  [hotfix][kafka,test] Handle shutdownCluster even if it wasn't initialized

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../connectors/kafka/KafkaTestEnvironmentImpl.java |  36 ++-
 .../kafka/FlinkKafkaProducer011MigrationTest.java  |  82 +++++++
 .../streaming/connectors/kafka/Kafka011ITCase.java |   2 +-
 .../kafka/Kafka011ProducerAtLeastOnceITCase.java   |   2 +-
 .../kafka/Kafka011ProducerExactlyOnceITCase.java   |   2 +-
 .../connectors/kafka/KafkaTestEnvironmentImpl.java | 256 ++++++++++-----------
 ...fka-migration-kafka-producer-flink-1.8-snapshot | Bin 0 -> 1746 bytes
 .../streaming/connectors/kafka/Kafka08ITCase.java  |   2 +-
 .../connectors/kafka/KafkaTestEnvironmentImpl.java |  45 ++--
 .../connectors/kafka/Kafka09SecuredRunITCase.java  |   2 +-
 .../connectors/kafka/KafkaTestEnvironmentImpl.java |  38 ++-
 .../connectors/kafka/KafkaMigrationTestBase.java   | 171 ++++++++++++++
 .../kafka/KafkaShortRetentionTestBase.java         |   2 +-
 .../streaming/connectors/kafka/KafkaTestBase.java  |  47 ++--
 .../connectors/kafka/KafkaTestEnvironment.java     |   2 +-
 .../kafka/FlinkKafkaProducerMigrationTest.java     |  82 +++++++
 .../streaming/connectors/kafka/KafkaITCase.java    |   2 +-
 .../kafka/KafkaProducerAtLeastOnceITCase.java      |   2 +-
 .../kafka/KafkaProducerExactlyOnceITCase.java      |   2 +-
 .../connectors/kafka/KafkaTestEnvironmentImpl.java |  40 ++--
 ...fka-migration-kafka-producer-flink-1.8-snapshot | Bin 0 -> 1731 bytes
 pom.xml                                            |   2 +
 22 files changed, 572 insertions(+), 247 deletions(-)
 create mode 100644 flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011MigrationTest.java
 create mode 100644 flink-connectors/flink-connector-kafka-0.11/src/test/resources/kafka-migration-kafka-producer-flink-1.8-snapshot
 create mode 100644 flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaMigrationTestBase.java
 create mode 100644 flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerMigrationTest.java
 create mode 100644 flink-connectors/flink-connector-kafka/src/test/resources/kafka-migration-kafka-producer-flink-1.8-snapshot


[flink] 06/07: [FLINK-11249][kafka, test] Add FlinkKafkaProducer(011) migration tests

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 39710d3c748a0d24cda6ea8d94b539d7abeaaf3c
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Fri Feb 8 09:26:01 2019 +0100

    [FLINK-11249][kafka,test] Add FlinkKafkaProducer(011) migration tests
    
    Add migration test for 0.11 and universal connector to make sure
    that those sinks can be restarted from old save points.
    
    Note this doesn't test for migration from 0.11 to universal connector.
    It only checks that master version of 0.11 (or universal) connector is
    compatible with state created by previous Flink versions.
---
 .../kafka/FlinkKafkaProducer011MigrationTest.java  |  82 ++++++++++
 ...fka-migration-kafka-producer-flink-1.8-snapshot | Bin 0 -> 1746 bytes
 .../connectors/kafka/KafkaMigrationTestBase.java   | 171 +++++++++++++++++++++
 .../streaming/connectors/kafka/KafkaTestBase.java  |  26 +++-
 .../kafka/FlinkKafkaProducerMigrationTest.java     |  82 ++++++++++
 ...fka-migration-kafka-producer-flink-1.8-snapshot | Bin 0 -> 1731 bytes
 pom.xml                                            |   2 +
 7 files changed, 355 insertions(+), 8 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011MigrationTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011MigrationTest.java
new file mode 100644
index 0000000..4ac703e
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011MigrationTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Properties;
+
+/**
+ * Tests for checking whether {@link FlinkKafkaProducer011} can restore from snapshots that were
+ * done using previous Flink versions' {@link FlinkKafkaProducer011}.
+ *
+ * <p>For regenerating the binary snapshot files run {@link #writeSnapshot()} on the corresponding
+ * Flink release-* branch.
+ */
+@RunWith(Parameterized.class)
+public class FlinkKafkaProducer011MigrationTest extends KafkaMigrationTestBase {
+	@Parameterized.Parameters(name = "Migration Savepoint: {0}")
+	public static Collection<MigrationVersion> parameters() {
+		return Arrays.asList(
+			MigrationVersion.v1_8);
+	}
+
+	public FlinkKafkaProducer011MigrationTest(MigrationVersion testMigrateVersion) {
+		super(testMigrateVersion);
+	}
+
+	@Override
+	protected Properties createProperties() {
+		Properties properties = new Properties();
+		properties.putAll(standardProps);
+		properties.putAll(secureProps);
+		properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-client-id");
+		properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "producer-transaction-id");
+		properties.put(FlinkKafkaProducer011.KEY_DISABLE_METRICS, "true");
+		return properties;
+	}
+
+	@Override
+	protected OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness() throws Exception {
+		FlinkKafkaProducer011<Integer> kafkaProducer = new FlinkKafkaProducer011<>(
+			TOPIC,
+			integerKeyedSerializationSchema,
+			createProperties(),
+			FlinkKafkaProducer011.Semantic.EXACTLY_ONCE
+		).ignoreFailuresAfterTransactionTimeout();
+
+		return new OneInputStreamOperatorTestHarness<>(
+			new StreamSink<>(kafkaProducer),
+			1,
+			1,
+			0,
+			IntSerializer.INSTANCE,
+			new OperatorID(1, 1));
+	}
+}
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/resources/kafka-migration-kafka-producer-flink-1.8-snapshot b/flink-connectors/flink-connector-kafka-0.11/src/test/resources/kafka-migration-kafka-producer-flink-1.8-snapshot
new file mode 100644
index 0000000..29c6ccc
Binary files /dev/null and b/flink-connectors/flink-connector-kafka-0.11/src/test/resources/kafka-migration-kafka-producer-flink-1.8-snapshot differ
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaMigrationTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaMigrationTestBase.java
new file mode 100644
index 0000000..0dbc077
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaMigrationTestBase.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OperatorSnapshotUtil;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The base class with migration tests for the Kafka Exactly-Once Producer.
+ */
+@SuppressWarnings("serial")
+public abstract class KafkaMigrationTestBase extends KafkaTestBase {
+
+	protected static final Logger LOG = LoggerFactory.getLogger(KafkaMigrationTestBase.class);
+	protected static final String TOPIC = "flink-kafka-producer-migration-test";
+
+	protected final MigrationVersion testMigrateVersion;
+	protected final TypeInformationSerializationSchema<Integer> integerSerializationSchema =
+		new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+	protected final KeyedSerializationSchema<Integer> integerKeyedSerializationSchema =
+		new KeyedSerializationSchemaWrapper<>(integerSerializationSchema);
+
+	/**
+	 * TODO change this to the corresponding savepoint version to be written (e.g. {@link MigrationVersion#v1_3} for 1.3)
+	 * TODO and remove all @Ignore annotations on write*Snapshot() methods to generate savepoints
+	 * TODO Note: You should generate the savepoint based on the release branch instead of the master.
+	 */
+	protected final Optional<MigrationVersion> flinkGenerateSavepointVersion = Optional.empty();
+
+	public KafkaMigrationTestBase(MigrationVersion testMigrateVersion) {
+		this.testMigrateVersion = checkNotNull(testMigrateVersion);
+	}
+
+	public String getOperatorSnapshotPath() {
+		return getOperatorSnapshotPath(testMigrateVersion);
+	}
+
+	public String getOperatorSnapshotPath(MigrationVersion version) {
+		return "src/test/resources/kafka-migration-kafka-producer-flink-" + version + "-snapshot";
+	}
+
+	/**
+	 * Override {@link KafkaTestBase}. Kafka Migration Tests are starting up Kafka/ZooKeeper cluster manually
+	 */
+	@BeforeClass
+	public static void prepare() throws Exception {
+	}
+
+	/**
+	 * Override {@link KafkaTestBase}. Kafka Migration Tests are starting up Kafka/ZooKeeper cluster manually
+	 */
+	@AfterClass
+	public static void shutDownServices() throws Exception {
+	}
+
+	/**
+	 * Manually run this to write binary snapshot data.
+	 */
+	@Ignore
+	@Test
+	public void writeSnapshot() throws Exception {
+		try {
+			checkState(flinkGenerateSavepointVersion.isPresent());
+			startClusters();
+
+			OperatorSubtaskState snapshot = initializeTestState();
+			OperatorSnapshotUtil.writeStateHandle(snapshot, getOperatorSnapshotPath(flinkGenerateSavepointVersion.get()));
+		}
+		finally {
+			shutdownClusters();
+		}
+	}
+
+	private OperatorSubtaskState initializeTestState() throws Exception {
+		try (OneInputStreamOperatorTestHarness testHarness = createTestHarness()) {
+			testHarness.setup();
+			testHarness.open();
+
+			// Create a committed transaction
+			testHarness.processElement(42, 0L);
+
+			// TODO: when stop with savepoint is available, replace this code with it (with stop with savepoint
+			// there won't be any pending transactions)
+			OperatorSubtaskState snapshot = testHarness.snapshot(0L, 1L);
+			// We kind of simulate stop with savepoint by making sure that notifyOfCompletedCheckpoint is called
+			testHarness.notifyOfCompletedCheckpoint(0L);
+
+			// Create a Pending transaction
+			testHarness.processElement(43, 2L);
+			return snapshot;
+		}
+	}
+
+	@SuppressWarnings("warning")
+	@Test
+	public void testRestoreProducer() throws Exception {
+		try {
+			startClusters();
+
+			initializeTestState();
+
+			try (OneInputStreamOperatorTestHarness testHarness = createTestHarness()) {
+				initializeState(testHarness);
+
+				// Create a committed transaction
+				testHarness.processElement(44, 4L);
+				testHarness.snapshot(2L, 5L);
+				testHarness.notifyOfCompletedCheckpoint(2L);
+
+				// Create a pending transaction
+				testHarness.processElement(45, 6L);
+
+				// We should have:
+				// - committed transaction 42
+				// - transaction 43 aborted
+				// - committed transaction 44
+				// - transaction 45 pending
+				assertExactlyOnceForTopic(createProperties(), TOPIC, 0, Arrays.asList(42, 44));
+			}
+		}
+		finally {
+			shutdownClusters();
+		}
+	}
+
+	protected abstract OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness() throws Exception;
+
+	protected abstract Properties createProperties();
+
+	protected void initializeState(OneInputStreamOperatorTestHarness testHarness) throws Exception{
+		testHarness.setup();
+		testHarness.initializeState(getOperatorSnapshotPath());
+		testHarness.open();
+	}
+}
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index bc91afb..073ba6e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -126,24 +126,29 @@ public abstract class KafkaTestBase extends TestLogger {
 		return flinkConfig;
 	}
 
+	protected static void startClusters() throws Exception {
+		startClusters(KafkaTestEnvironment.createConfig().setKafkaServersNumber(NUMBER_OF_KAFKA_SERVERS));
+	}
+
 	protected static void startClusters(boolean secureMode, boolean hideKafkaBehindProxy) throws Exception {
+		startClusters(KafkaTestEnvironment.createConfig()
+			.setKafkaServersNumber(NUMBER_OF_KAFKA_SERVERS)
+			.setSecureMode(secureMode)
+			.setHideKafkaBehindProxy(hideKafkaBehindProxy));
+	}
 
-		// dynamically load the implementation for the test
-		Class<?> clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
-		kafkaServer = (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz);
+	protected static void startClusters(KafkaTestEnvironment.Config environmentConfig) throws Exception {
+		kafkaServer = constructKafkaTestEnvionment();
 
 		LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion());
 
-		kafkaServer.prepare(kafkaServer.createConfig()
-			.setKafkaServersNumber(NUMBER_OF_KAFKA_SERVERS)
-			.setSecureMode(secureMode)
-			.setHideKafkaBehindProxy(hideKafkaBehindProxy));
+		kafkaServer.prepare(environmentConfig);
 
 		standardProps = kafkaServer.getStandardProperties();
 
 		brokerConnectionStrings = kafkaServer.getBrokerConnectionString();
 
-		if (secureMode) {
+		if (environmentConfig.isSecureMode()) {
 			if (!kafkaServer.isSecureRunSupported()) {
 				throw new IllegalStateException(
 					"Attempting to test in secure mode but secure mode not supported by the KafkaTestEnvironment.");
@@ -152,6 +157,11 @@ public abstract class KafkaTestBase extends TestLogger {
 		}
 	}
 
+	protected static KafkaTestEnvironment constructKafkaTestEnvionment() throws Exception {
+		Class<?> clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
+		return (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz);
+	}
+
 	protected static void shutdownClusters() throws Exception {
 		if (secureProps != null) {
 			secureProps.clear();
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerMigrationTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerMigrationTest.java
new file mode 100644
index 0000000..508f0b5
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerMigrationTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Properties;
+
+/**
+ * Tests for checking whether {@link FlinkKafkaProducer} can restore from snapshots that were
+ * done using previous Flink versions' {@link FlinkKafkaProducer}.
+ *
+ * <p>For regenerating the binary snapshot files run {@link #writeSnapshot()} on the corresponding
+ * Flink release-* branch.
+ */
+@RunWith(Parameterized.class)
+public class FlinkKafkaProducerMigrationTest extends KafkaMigrationTestBase {
+	@Parameterized.Parameters(name = "Migration Savepoint: {0}")
+	public static Collection<MigrationVersion> parameters() {
+		return Arrays.asList(
+			MigrationVersion.v1_8);
+	}
+
+	public FlinkKafkaProducerMigrationTest(MigrationVersion testMigrateVersion) {
+		super(testMigrateVersion);
+	}
+
+	@Override
+	protected Properties createProperties() {
+		Properties properties = new Properties();
+		properties.putAll(standardProps);
+		properties.putAll(secureProps);
+		properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-client-id");
+		properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "producer-transaction-id");
+		properties.put(FlinkKafkaProducer.KEY_DISABLE_METRICS, "true");
+		return properties;
+	}
+
+	@Override
+	protected OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness() throws Exception {
+		FlinkKafkaProducer<Integer> kafkaProducer = new FlinkKafkaProducer<>(
+			TOPIC,
+			integerKeyedSerializationSchema,
+			createProperties(),
+			FlinkKafkaProducer.Semantic.EXACTLY_ONCE
+		).ignoreFailuresAfterTransactionTimeout();
+
+		return new OneInputStreamOperatorTestHarness<>(
+			new StreamSink<>(kafkaProducer),
+			1,
+			1,
+			0,
+			IntSerializer.INSTANCE,
+			new OperatorID(1, 1));
+	}
+}
diff --git a/flink-connectors/flink-connector-kafka/src/test/resources/kafka-migration-kafka-producer-flink-1.8-snapshot b/flink-connectors/flink-connector-kafka/src/test/resources/kafka-migration-kafka-producer-flink-1.8-snapshot
new file mode 100644
index 0000000..58c832a
Binary files /dev/null and b/flink-connectors/flink-connector-kafka/src/test/resources/kafka-migration-kafka-producer-flink-1.8-snapshot differ
diff --git a/pom.xml b/pom.xml
index dd47603..235e154 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1361,6 +1361,8 @@ under the License.
 						<exclude>flink-yarn/src/test/resources/krb5.keytab</exclude>
 						<exclude>flink-end-to-end-tests/test-scripts/test-data/*</exclude>
 						<exclude>flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/config/keystore.jks</exclude>
+						<exclude>flink-connectors/flink-connector-kafka/src/test/resources/**</exclude>
+						<exclude>flink-connectors/flink-connector-kafka-0.11/src/test/resources/**</exclude>
 
 						<!-- snapshots -->
 						<exclude>**/src/test/resources/*-snapshot</exclude>


[flink] 03/07: [hotfix][kafka, test] Allow exceptions in KafkaTestEnvironment#prepare

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5b49f17236379ba0afdc6a536bc70c7e7fcd14ff
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Wed Feb 6 15:11:55 2019 +0100

    [hotfix][kafka,test] Allow exceptions in KafkaTestEnvironment#prepare
---
 .../connectors/kafka/KafkaTestEnvironmentImpl.java | 36 ++++++++---------
 .../streaming/connectors/kafka/Kafka011ITCase.java |  2 +-
 .../kafka/Kafka011ProducerAtLeastOnceITCase.java   |  2 +-
 .../kafka/Kafka011ProducerExactlyOnceITCase.java   |  2 +-
 .../connectors/kafka/KafkaTestEnvironmentImpl.java | 36 ++++++++---------
 .../streaming/connectors/kafka/Kafka08ITCase.java  |  2 +-
 .../connectors/kafka/KafkaTestEnvironmentImpl.java | 45 ++++++++--------------
 .../connectors/kafka/Kafka09SecuredRunITCase.java  |  2 +-
 .../connectors/kafka/KafkaTestEnvironmentImpl.java | 38 ++++++++----------
 .../kafka/KafkaShortRetentionTestBase.java         |  2 +-
 .../streaming/connectors/kafka/KafkaTestBase.java  |  6 +--
 .../connectors/kafka/KafkaTestEnvironment.java     |  2 +-
 .../streaming/connectors/kafka/KafkaITCase.java    |  2 +-
 .../kafka/KafkaProducerAtLeastOnceITCase.java      |  2 +-
 .../kafka/KafkaProducerExactlyOnceITCase.java      |  2 +-
 .../connectors/kafka/KafkaTestEnvironmentImpl.java | 36 ++++++++---------
 16 files changed, 89 insertions(+), 128 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 8c69f36..9e51aac 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -217,7 +217,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public void prepare(Config config) {
+	public void prepare(Config config) throws Exception {
 		//increase the timeout since in Travis ZK connection takes long time for secure connection.
 		if (config.isSecureMode()) {
 			//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
@@ -243,29 +243,23 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		zookeeper = null;
 		brokers = null;
 
-		try {
-			zookeeper = new TestingServer(-1, tmpZkDir);
-			zookeeperConnectionString = zookeeper.getConnectString();
-			LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
-
-			LOG.info("Starting KafkaServer");
-			brokers = new ArrayList<>(config.getKafkaServersNumber());
-
-			ListenerName listenerName = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT);
-			for (int i = 0; i < config.getKafkaServersNumber(); i++) {
-				KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i));
-				brokers.add(kafkaServer);
-				brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(listenerName));
-				brokerConnectionString +=  ",";
-			}
+		zookeeper = new TestingServer(-1, tmpZkDir);
+		zookeeperConnectionString = zookeeper.getConnectString();
+		LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
 
-			LOG.info("ZK and KafkaServer started.");
-		}
-		catch (Throwable t) {
-			t.printStackTrace();
-			fail("Test setup failed: " + t.getMessage());
+		LOG.info("Starting KafkaServer");
+		brokers = new ArrayList<>(config.getKafkaServersNumber());
+
+		ListenerName listenerName = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT);
+		for (int i = 0; i < config.getKafkaServersNumber(); i++) {
+			KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i));
+			brokers.add(kafkaServer);
+			brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(listenerName));
+			brokerConnectionString +=  ",";
 		}
 
+		LOG.info("ZK and KafkaServer started.");
+
 		standardProps = new Properties();
 		standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
 		standardProps.setProperty("bootstrap.servers", brokerConnectionString);
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
index 1678134..3677daa 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
@@ -54,7 +54,7 @@ import java.util.Optional;
 public class Kafka011ITCase extends KafkaConsumerTestBase {
 
 	@BeforeClass
-	public static void prepare() throws ClassNotFoundException {
+	public static void prepare() throws Exception {
 		KafkaProducerTestBase.prepare();
 		((KafkaTestEnvironmentImpl) kafkaServer).setProducerSemantic(FlinkKafkaProducer011.Semantic.AT_LEAST_ONCE);
 	}
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerAtLeastOnceITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerAtLeastOnceITCase.java
index ad63662..84efe25 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerAtLeastOnceITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerAtLeastOnceITCase.java
@@ -27,7 +27,7 @@ import org.junit.BeforeClass;
 public class Kafka011ProducerAtLeastOnceITCase extends KafkaProducerTestBase {
 
 	@BeforeClass
-	public static void prepare() throws ClassNotFoundException {
+	public static void prepare() throws Exception {
 		KafkaProducerTestBase.prepare();
 		((KafkaTestEnvironmentImpl) kafkaServer).setProducerSemantic(FlinkKafkaProducer011.Semantic.AT_LEAST_ONCE);
 	}
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java
index 5038b7f..8fb1599 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java
@@ -27,7 +27,7 @@ import org.junit.Test;
 @SuppressWarnings("serial")
 public class Kafka011ProducerExactlyOnceITCase extends KafkaProducerTestBase {
 	@BeforeClass
-	public static void prepare() throws ClassNotFoundException {
+	public static void prepare() throws Exception {
 		KafkaProducerTestBase.prepare();
 		((KafkaTestEnvironmentImpl) kafkaServer).setProducerSemantic(FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
 	}
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 57dc663..539bc59 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -234,7 +234,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public void prepare(Config config) {
+	public void prepare(Config config) throws Exception {
 		//increase the timeout since in Travis ZK connection takes long time for secure connection.
 		if (config.isSecureMode()) {
 			//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
@@ -260,29 +260,23 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		zookeeper = null;
 		brokers = null;
 
-		try {
-			zookeeper = new TestingServer(-1, tmpZkDir);
-			zookeeperConnectionString = zookeeper.getConnectString();
-			LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
-
-			LOG.info("Starting KafkaServer");
-			brokers = new ArrayList<>(config.getKafkaServersNumber());
-
-			ListenerName listenerName = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT);
-			for (int i = 0; i < config.getKafkaServersNumber(); i++) {
-				KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i));
-				brokers.add(kafkaServer);
-				brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(listenerName));
-				brokerConnectionString +=  ",";
-			}
+		zookeeper = new TestingServer(-1, tmpZkDir);
+		zookeeperConnectionString = zookeeper.getConnectString();
+		LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
 
-			LOG.info("ZK and KafkaServer started.");
-		}
-		catch (Throwable t) {
-			t.printStackTrace();
-			fail("Test setup failed: " + t.getMessage());
+		LOG.info("Starting KafkaServer");
+		brokers = new ArrayList<>(config.getKafkaServersNumber());
+
+		ListenerName listenerName = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT);
+		for (int i = 0; i < config.getKafkaServersNumber(); i++) {
+			KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i));
+			brokers.add(kafkaServer);
+			brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(listenerName));
+			brokerConnectionString +=  ",";
 		}
 
+		LOG.info("ZK and KafkaServer started.");
+
 		standardProps = new Properties();
 		standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
 		standardProps.setProperty("bootstrap.servers", brokerConnectionString);
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index a250d86..e360c19 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -38,7 +38,7 @@ import static org.junit.Assert.fail;
 public class Kafka08ITCase extends KafkaConsumerTestBase {
 
 	@BeforeClass
-	public static void prepare() throws ClassNotFoundException {
+	public static void prepare() throws Exception {
 		// Somehow KafkaConsumer 0.8 doesn't handle broker failures if they are behind a proxy
 		prepare(false);
 	}
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index cddef88..15c83e4 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -53,7 +53,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.IOException;
 import java.net.BindException;
 import java.nio.file.Files;
 import java.util.ArrayList;
@@ -206,23 +205,15 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public void prepare(Config config) {
+	public void prepare(Config config) throws Exception {
 		this.config = config;
 		File tempDir = new File(System.getProperty("java.io.tmpdir"));
 
 		tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
-		try {
-			Files.createDirectories(tmpZkDir.toPath());
-		} catch (IOException e) {
-			fail("cannot create zookeeper temp dir: " + e.getMessage());
-		}
+		Files.createDirectories(tmpZkDir.toPath());
 
 		tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir" + (UUID.randomUUID().toString()));
-		try {
-			Files.createDirectories(tmpKafkaParent.toPath());
-		} catch (IOException e) {
-			fail("cannot create kafka temp dir: " + e.getMessage());
-		}
+		Files.createDirectories(tmpKafkaParent.toPath());
 
 		tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber());
 		for (int i = 0; i < config.getKafkaServersNumber(); i++) {
@@ -234,29 +225,23 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		zookeeper = null;
 		brokers = null;
 
-		try {
-			LOG.info("Starting Zookeeper");
-			zookeeper = new TestingServer(-1, tmpZkDir);
-			zookeeperConnectionString = zookeeper.getConnectString();
-
-			LOG.info("Starting KafkaServer");
-			brokers = new ArrayList<>(config.getKafkaServersNumber());
+		LOG.info("Starting Zookeeper");
+		zookeeper = new TestingServer(-1, tmpZkDir);
+		zookeeperConnectionString = zookeeper.getConnectString();
 
-			for (int i = 0; i < config.getKafkaServersNumber(); i++) {
-				brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
-				SocketServer socketServer = brokers.get(i).socketServer();
+		LOG.info("Starting KafkaServer");
+		brokers = new ArrayList<>(config.getKafkaServersNumber());
 
-				String host = socketServer.host() == null ? "localhost" : socketServer.host();
-				brokerConnectionString += hostAndPortToUrlString(host, socketServer.port()) + ",";
-			}
+		for (int i = 0; i < config.getKafkaServersNumber(); i++) {
+			brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
+			SocketServer socketServer = brokers.get(i).socketServer();
 
-			LOG.info("ZK and KafkaServer started.");
-		}
-		catch (Throwable t) {
-			t.printStackTrace();
-			fail("Test setup failed: " + t.getMessage());
+			String host = socketServer.host() == null ? "localhost" : socketServer.host();
+			brokerConnectionString += hostAndPortToUrlString(host, socketServer.port()) + ",";
 		}
 
+		LOG.info("ZK and KafkaServer started.");
+
 		standardProps = new Properties();
 		standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
 		standardProps.setProperty("bootstrap.servers", brokerConnectionString);
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
index b4002c7..8cd61cc 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
@@ -33,7 +33,7 @@ public class Kafka09SecuredRunITCase extends KafkaConsumerTestBase {
 	protected static final Logger LOG = LoggerFactory.getLogger(Kafka09SecuredRunITCase.class);
 
 	@BeforeClass
-	public static void prepare() throws ClassNotFoundException {
+	public static void prepare() throws Exception {
 		LOG.info("-------------------------------------------------------------------------");
 		LOG.info("    Starting Kafka09SecuredRunITCase ");
 		LOG.info("-------------------------------------------------------------------------");
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 2ed600b..7a5c9e5 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -199,7 +199,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public void prepare(Config config) {
+	public void prepare(Config config) throws Exception {
 		//increase the timeout since in Travis ZK connection takes long time for secure connection.
 		if (config.isSecureMode()) {
 			//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
@@ -225,30 +225,24 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		zookeeper = null;
 		brokers = null;
 
-		try {
-			LOG.info("Starting Zookeeper");
-			zookeeper = new TestingServer(-1, tmpZkDir);
-			zookeeperConnectionString = zookeeper.getConnectString();
-			LOG.info("zookeeperConnectionString: {}", zookeeperConnectionString);
-
-			LOG.info("Starting KafkaServer");
-			brokers = new ArrayList<>(config.getKafkaServersNumber());
-
-			SecurityProtocol securityProtocol = config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT;
-			for (int i = 0; i < config.getKafkaServersNumber(); i++) {
-				KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i));
-				brokers.add(kafkaServer);
-				brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(securityProtocol));
-				brokerConnectionString +=  ",";
-			}
+		LOG.info("Starting Zookeeper");
+		zookeeper = new TestingServer(-1, tmpZkDir);
+		zookeeperConnectionString = zookeeper.getConnectString();
+		LOG.info("zookeeperConnectionString: {}", zookeeperConnectionString);
 
-			LOG.info("ZK and KafkaServer started.");
-		}
-		catch (Throwable t) {
-			t.printStackTrace();
-			fail("Test setup failed: " + t.getMessage());
+		LOG.info("Starting KafkaServer");
+		brokers = new ArrayList<>(config.getKafkaServersNumber());
+
+		SecurityProtocol securityProtocol = config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT;
+		for (int i = 0; i < config.getKafkaServersNumber(); i++) {
+			KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i));
+			brokers.add(kafkaServer);
+			brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(securityProtocol));
+			brokerConnectionString +=  ",";
 		}
 
+		LOG.info("ZK and KafkaServer started.");
+
 		LOG.info("brokerConnectionString --> {}", brokerConnectionString);
 
 		standardProps = new Properties();
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index 93bd7bc..42cfb89 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -89,7 +89,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
 	}
 
 	@BeforeClass
-	public static void prepare() throws ClassNotFoundException {
+	public static void prepare() throws Exception {
 		LOG.info("-------------------------------------------------------------------------");
 		LOG.info("    Starting KafkaShortRetentionTestBase ");
 		LOG.info("-------------------------------------------------------------------------");
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 712c0a8..bc91afb 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -88,11 +88,11 @@ public abstract class KafkaTestBase extends TestLogger {
 	// ------------------------------------------------------------------------
 
 	@BeforeClass
-	public static void prepare() throws ClassNotFoundException {
+	public static void prepare() throws Exception {
 		prepare(true);
 	}
 
-	public static void prepare(boolean hideKafkaBehindProxy) throws ClassNotFoundException {
+	public static void prepare(boolean hideKafkaBehindProxy) throws Exception {
 		LOG.info("-------------------------------------------------------------------------");
 		LOG.info("    Starting KafkaTestBase ");
 		LOG.info("-------------------------------------------------------------------------");
@@ -126,7 +126,7 @@ public abstract class KafkaTestBase extends TestLogger {
 		return flinkConfig;
 	}
 
-	protected static void startClusters(boolean secureMode, boolean hideKafkaBehindProxy) throws ClassNotFoundException {
+	protected static void startClusters(boolean secureMode, boolean hideKafkaBehindProxy) throws Exception {
 
 		// dynamically load the implementation for the test
 		Class<?> clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 53501fb..a787f27 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -99,7 +99,7 @@ public abstract class KafkaTestEnvironment {
 		return new Config();
 	}
 
-	public abstract void prepare(Config config);
+	public abstract void prepare(Config config) throws Exception;
 
 	public void shutdown() throws Exception {
 		for (NetworkFailuresProxy proxy : networkFailuresProxies) {
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index bc945a2..8729319 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -54,7 +54,7 @@ import java.util.Optional;
 public class KafkaITCase extends KafkaConsumerTestBase {
 
 	@BeforeClass
-	public static void prepare() throws ClassNotFoundException {
+	public static void prepare() throws Exception {
 		KafkaProducerTestBase.prepare();
 		((KafkaTestEnvironmentImpl) kafkaServer).setProducerSemantic(FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
 	}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerAtLeastOnceITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerAtLeastOnceITCase.java
index c07b84f..6d82d3e 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerAtLeastOnceITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerAtLeastOnceITCase.java
@@ -27,7 +27,7 @@ import org.junit.BeforeClass;
 public class KafkaProducerAtLeastOnceITCase extends KafkaProducerTestBase {
 
 	@BeforeClass
-	public static void prepare() throws ClassNotFoundException {
+	public static void prepare() throws Exception {
 		KafkaProducerTestBase.prepare();
 		((KafkaTestEnvironmentImpl) kafkaServer).setProducerSemantic(FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
 	}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExactlyOnceITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExactlyOnceITCase.java
index 45a0312..1752b27 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExactlyOnceITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExactlyOnceITCase.java
@@ -27,7 +27,7 @@ import org.junit.Test;
 @SuppressWarnings("serial")
 public class KafkaProducerExactlyOnceITCase extends KafkaProducerTestBase {
 	@BeforeClass
-	public static void prepare() throws ClassNotFoundException {
+	public static void prepare() throws Exception {
 		KafkaProducerTestBase.prepare();
 		((KafkaTestEnvironmentImpl) kafkaServer).setProducerSemantic(FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
 	}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 0e9036d..ad68d59 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -93,7 +93,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public void prepare(Config config) {
+	public void prepare(Config config) throws Exception {
 		//increase the timeout since in Travis ZK connection takes long time for secure connection.
 		if (config.isSecureMode()) {
 			//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
@@ -119,29 +119,23 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		zookeeper = null;
 		brokers = null;
 
-		try {
-			zookeeper = new TestingServer(-1, tmpZkDir);
-			zookeeperConnectionString = zookeeper.getConnectString();
-			LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
-
-			LOG.info("Starting KafkaServer");
-			brokers = new ArrayList<>(config.getKafkaServersNumber());
-
-			ListenerName listenerName = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT);
-			for (int i = 0; i < config.getKafkaServersNumber(); i++) {
-				KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i));
-				brokers.add(kafkaServer);
-				brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(listenerName));
-				brokerConnectionString +=  ",";
-			}
+		zookeeper = new TestingServer(-1, tmpZkDir);
+		zookeeperConnectionString = zookeeper.getConnectString();
+		LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
 
-			LOG.info("ZK and KafkaServer started.");
-		}
-		catch (Throwable t) {
-			t.printStackTrace();
-			fail("Test setup failed: " + t.getMessage());
+		LOG.info("Starting KafkaServer");
+		brokers = new ArrayList<>(config.getKafkaServersNumber());
+
+		ListenerName listenerName = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT);
+		for (int i = 0; i < config.getKafkaServersNumber(); i++) {
+			KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i));
+			brokers.add(kafkaServer);
+			brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(listenerName));
+			brokerConnectionString +=  ",";
 		}
 
+		LOG.info("ZK and KafkaServer started.");
+
 		standardProps = new Properties();
 		standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
 		standardProps.setProperty("bootstrap.servers", brokerConnectionString);


[flink] 05/07: [hotfix][kafka, test] Synchronize 0.11 KafkaTestEnvironmentImpl with universal

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a47b276314e24fc9f153bc163d8470ce24e36b2f
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Fri Feb 8 15:28:33 2019 +0100

    [hotfix][kafka,test] Synchronize 0.11 KafkaTestEnvironmentImpl with universal
    
    This is a pure refactor, that reorders the method so that those two KafkaTestEnvironmentImpl implementations are more inline
---
 .../connectors/kafka/KafkaTestEnvironmentImpl.java | 244 +++++++++++----------
 1 file changed, 125 insertions(+), 119 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 160adf5..ae237f6 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -85,16 +85,126 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	// 6 seconds is default. Seems to be too small for travis. 30 seconds
 	private int zkTimeout = 30000;
 	private Config config;
-
-	public String getBrokerConnectionString() {
-		return brokerConnectionString;
-	}
+	private static final int DELETE_TIMEOUT_SECONDS = 30;
 
 	public void setProducerSemantic(FlinkKafkaProducer011.Semantic producerSemantic) {
 		this.producerSemantic = producerSemantic;
 	}
 
 	@Override
+	public void prepare(Config config) throws Exception {
+		//increase the timeout since in Travis ZK connection takes long time for secure connection.
+		if (config.isSecureMode()) {
+			//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
+			config.setKafkaServersNumber(1);
+			zkTimeout = zkTimeout * 15;
+		}
+		this.config = config;
+
+		File tempDir = new File(System.getProperty("java.io.tmpdir"));
+		tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
+		assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs());
+
+		tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + (UUID.randomUUID().toString()));
+		assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());
+
+		tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber());
+		for (int i = 0; i < config.getKafkaServersNumber(); i++) {
+			File tmpDir = new File(tmpKafkaParent, "server-" + i);
+			assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
+			tmpKafkaDirs.add(tmpDir);
+		}
+
+		zookeeper = null;
+		brokers.clear();
+
+		zookeeper = new TestingServer(-1, tmpZkDir);
+		zookeeperConnectionString = zookeeper.getConnectString();
+		LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
+
+		LOG.info("Starting KafkaServer");
+
+		ListenerName listenerName = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT);
+		for (int i = 0; i < config.getKafkaServersNumber(); i++) {
+			KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i));
+			brokers.add(kafkaServer);
+			brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(listenerName));
+			brokerConnectionString +=  ",";
+		}
+
+		LOG.info("ZK and KafkaServer started.");
+
+		standardProps = new Properties();
+		standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
+		standardProps.setProperty("bootstrap.servers", brokerConnectionString);
+		standardProps.setProperty("group.id", "flink-tests");
+		standardProps.setProperty("enable.auto.commit", "false");
+		standardProps.setProperty("zookeeper.session.timeout.ms", String.valueOf(zkTimeout));
+		standardProps.setProperty("zookeeper.connection.timeout.ms", String.valueOf(zkTimeout));
+		standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning.
+		standardProps.setProperty("max.partition.fetch.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)
+	}
+
+	@Override
+	public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor, Properties topicConfig) {
+		// create topic with one client
+		LOG.info("Creating topic {}", topic);
+
+		ZkUtils zkUtils = getZkUtils();
+		try {
+			AdminUtils.createTopic(zkUtils, topic, numberOfPartitions, replicationFactor, topicConfig, kafka.admin.RackAwareMode.Enforced$.MODULE$);
+		} finally {
+			zkUtils.close();
+		}
+
+		// validate that the topic has been created
+		final long deadline = System.nanoTime() + 30_000_000_000L;
+		do {
+			try {
+				if (config.isSecureMode()) {
+					//increase wait time since in Travis ZK timeout occurs frequently
+					int wait = zkTimeout / 100;
+					LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic);
+					Thread.sleep(wait);
+				} else {
+					Thread.sleep(100);
+				}
+			} catch (InterruptedException e) {
+				// restore interrupted state
+			}
+			// we could use AdminUtils.topicExists(zkUtils, topic) here, but it's results are
+			// not always correct.
+
+			// create a new ZK utils connection
+			ZkUtils checkZKConn = getZkUtils();
+			if (AdminUtils.topicExists(checkZKConn, topic)) {
+				checkZKConn.close();
+				return;
+			}
+			checkZKConn.close();
+		}
+		while (System.nanoTime() < deadline);
+		fail("Test topic could not be created");
+	}
+
+	@Override
+	public void deleteTestTopic(String topic) {
+		ZkUtils zkUtils = getZkUtils();
+		try {
+			LOG.info("Deleting topic {}", topic);
+
+			ZkClient zk = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
+				Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer());
+
+			AdminUtils.deleteTopic(zkUtils, topic);
+
+			zk.close();
+		} finally {
+			zkUtils.close();
+		}
+	}
+
+	@Override
 	public Properties getStandardProperties() {
 		return standardProps;
 	}
@@ -116,6 +226,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
+	public String getBrokerConnectionString() {
+		return brokerConnectionString;
+	}
+
+	@Override
 	public String getVersion() {
 		return "0.11";
 	}
@@ -183,7 +298,12 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	@Override
 	public <T> DataStreamSink<T> writeToKafkaWithTimestamps(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props) {
 		FlinkKafkaProducer011<T> prod = new FlinkKafkaProducer011<>(
-			topic, serSchema, props, Optional.of(new FlinkFixedPartitioner<>()), producerSemantic, FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
+			topic,
+			serSchema,
+			props,
+			Optional.of(new FlinkFixedPartitioner<>()),
+			producerSemantic,
+			FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
 
 		prod.setWriteTimestampToKafka(true);
 
@@ -234,60 +354,6 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public void prepare(Config config) throws Exception {
-		//increase the timeout since in Travis ZK connection takes long time for secure connection.
-		if (config.isSecureMode()) {
-			//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
-			config.setKafkaServersNumber(1);
-			zkTimeout = zkTimeout * 15;
-		}
-		this.config = config;
-
-		File tempDir = new File(System.getProperty("java.io.tmpdir"));
-		tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
-		assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs());
-
-		tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + (UUID.randomUUID().toString()));
-		assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());
-
-		tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber());
-		for (int i = 0; i < config.getKafkaServersNumber(); i++) {
-			File tmpDir = new File(tmpKafkaParent, "server-" + i);
-			assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
-			tmpKafkaDirs.add(tmpDir);
-		}
-
-		zookeeper = null;
-		brokers.clear();
-
-		zookeeper = new TestingServer(-1, tmpZkDir);
-		zookeeperConnectionString = zookeeper.getConnectString();
-		LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
-
-		LOG.info("Starting KafkaServer");
-
-		ListenerName listenerName = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT);
-		for (int i = 0; i < config.getKafkaServersNumber(); i++) {
-			KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i));
-			brokers.add(kafkaServer);
-			brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(listenerName));
-			brokerConnectionString +=  ",";
-		}
-
-		LOG.info("ZK and KafkaServer started.");
-
-		standardProps = new Properties();
-		standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
-		standardProps.setProperty("bootstrap.servers", brokerConnectionString);
-		standardProps.setProperty("group.id", "flink-tests");
-		standardProps.setProperty("enable.auto.commit", "false");
-		standardProps.setProperty("zookeeper.session.timeout.ms", String.valueOf(zkTimeout));
-		standardProps.setProperty("zookeeper.connection.timeout.ms", String.valueOf(zkTimeout));
-		standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. (earliest is kafka 0.11 value)
-		standardProps.setProperty("max.partition.fetch.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)
-	}
-
-	@Override
 	public void shutdown() throws Exception {
 		for (KafkaServer broker : brokers) {
 			if (broker != null) {
@@ -333,65 +399,6 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		return ZkUtils.apply(creator, false);
 	}
 
-	@Override
-	public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor, Properties topicConfig) {
-		// create topic with one client
-		LOG.info("Creating topic {}", topic);
-
-		ZkUtils zkUtils = getZkUtils();
-		try {
-			AdminUtils.createTopic(zkUtils, topic, numberOfPartitions, replicationFactor, topicConfig, kafka.admin.RackAwareMode.Enforced$.MODULE$);
-		} finally {
-			zkUtils.close();
-		}
-
-		// validate that the topic has been created
-		final long deadline = System.nanoTime() + 30_000_000_000L;
-		do {
-			try {
-				if (config.isSecureMode()) {
-					//increase wait time since in Travis ZK timeout occurs frequently
-					int wait = zkTimeout / 100;
-					LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic);
-					Thread.sleep(wait);
-				} else {
-					Thread.sleep(100);
-				}
-			} catch (InterruptedException e) {
-				// restore interrupted state
-			}
-			// we could use AdminUtils.topicExists(zkUtils, topic) here, but it's results are
-			// not always correct.
-
-			// create a new ZK utils connection
-			ZkUtils checkZKConn = getZkUtils();
-			if (AdminUtils.topicExists(checkZKConn, topic)) {
-				checkZKConn.close();
-				return;
-			}
-			checkZKConn.close();
-		}
-		while (System.nanoTime() < deadline);
-		fail("Test topic could not be created");
-	}
-
-	@Override
-	public void deleteTestTopic(String topic) {
-		ZkUtils zkUtils = getZkUtils();
-		try {
-			LOG.info("Deleting topic {}", topic);
-
-			ZkClient zk = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
-				Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer());
-
-			AdminUtils.deleteTopic(zkUtils, topic);
-
-			zk.close();
-		} finally {
-			zkUtils.close();
-		}
-	}
-
 	/**
 	 * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed).
 	 */
@@ -486,5 +493,4 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 			offsetClient.close();
 		}
 	}
-
 }


[flink] 07/07: [hotfix][kafka, test] Handle shutdownCluster even if it wasn't initialized

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b20e57d29387a84c85836e62b6c02f24fefccd7c
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Mon Feb 11 13:39:05 2019 +0100

    [hotfix][kafka,test] Handle shutdownCluster even if it wasn't initialized
    
    Previously null pointer exception thrown from @AfterClass shutdown call could hide
    original underlying issue if there was a failure that prevented kafkaServer from being constructed
---
 .../org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java    | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 073ba6e..c86dd08 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -167,7 +167,9 @@ public abstract class KafkaTestBase extends TestLogger {
 			secureProps.clear();
 		}
 
-		kafkaServer.shutdown();
+		if (kafkaServer != null) {
+			kafkaServer.shutdown();
+		}
 	}
 
 	// ------------------------------------------------------------------------


[flink] 04/07: [hotfix][kafka, test] Make brokers list final and avoid potential null pointer exceptions

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3b1976a5c8df28f737b04a7cb6cf96312d363c7d
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Fri Feb 8 13:35:13 2019 +0100

    [hotfix][kafka,test] Make brokers list final and avoid potential null pointer exceptions
---
 .../flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java   | 5 ++---
 .../flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java   | 5 ++---
 2 files changed, 4 insertions(+), 6 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 539bc59..160adf5 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -73,10 +73,10 @@ import static org.junit.Assert.fail;
 public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 
 	protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class);
+	private final List<KafkaServer> brokers = new ArrayList<>();
 	private File tmpZkDir;
 	private File tmpKafkaParent;
 	private List<File> tmpKafkaDirs;
-	private List<KafkaServer> brokers;
 	private TestingServer zookeeper;
 	private String zookeeperConnectionString;
 	private String brokerConnectionString = "";
@@ -258,14 +258,13 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		}
 
 		zookeeper = null;
-		brokers = null;
+		brokers.clear();
 
 		zookeeper = new TestingServer(-1, tmpZkDir);
 		zookeeperConnectionString = zookeeper.getConnectString();
 		LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
 
 		LOG.info("Starting KafkaServer");
-		brokers = new ArrayList<>(config.getKafkaServersNumber());
 
 		ListenerName listenerName = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT);
 		for (int i = 0; i < config.getKafkaServersNumber(); i++) {
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index ad68d59..d6a7705 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -74,10 +74,10 @@ import static org.junit.Assert.fail;
 public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 
 	protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class);
+	private final List<KafkaServer> brokers = new ArrayList<>();
 	private File tmpZkDir;
 	private File tmpKafkaParent;
 	private List<File> tmpKafkaDirs;
-	private List<KafkaServer> brokers;
 	private TestingServer zookeeper;
 	private String zookeeperConnectionString;
 	private String brokerConnectionString = "";
@@ -117,14 +117,13 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		}
 
 		zookeeper = null;
-		brokers = null;
+		brokers.clear();
 
 		zookeeper = new TestingServer(-1, tmpZkDir);
 		zookeeperConnectionString = zookeeper.getConnectString();
 		LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
 
 		LOG.info("Starting KafkaServer");
-		brokers = new ArrayList<>(config.getKafkaServersNumber());
 
 		ListenerName listenerName = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT);
 		for (int i = 0; i < config.getKafkaServersNumber(); i++) {


[flink] 01/07: [hotfix][kafka,test] Improve error message

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a9e5395e5bd4980e1451b84ae7116746b81ff109
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Feb 7 13:35:10 2019 +0100

    [hotfix][kafka,test] Improve error message
    
    If expected or actual elements are small enough, print them directly instead of printing the elements count.
---
 .../flink/streaming/connectors/kafka/KafkaTestBase.java       | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index d23523f..712c0a8 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -274,6 +274,15 @@ public abstract class KafkaTestBase extends TestLogger {
 			}
 		}
 
-		fail(String.format("Expected number of elements: <%s>, but was: <%s>", expectedElements.size(), actualElements.size()));
+		fail(String.format("Expected %s, but was: %s", formatElements(expectedElements), formatElements(actualElements)));
+	}
+
+	private String formatElements(List<Integer> elements) {
+		if (elements.size() > 50) {
+			return String.format("number of elements: <%s>", elements.size());
+		}
+		else {
+			return String.format("elements: <%s>", elements);
+		}
 	}
 }


[flink] 02/07: [hotfix][kafka, test] Add missing shutdown call propagation

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f487d8ca54dc3de030b3332825e3faa028f6e590
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Feb 7 14:47:11 2019 +0100

    [hotfix][kafka,test] Add missing shutdown call propagation
---
 .../flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java     | 3 ++-
 .../flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java     | 1 +
 2 files changed, 3 insertions(+), 1 deletion(-)

diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 40728ba..57dc663 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -295,7 +295,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public void shutdown() {
+	public void shutdown() throws Exception {
 		for (KafkaServer broker : brokers) {
 			if (broker != null) {
 				broker.shutdown();
@@ -331,6 +331,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 				// ignore
 			}
 		}
+		super.shutdown();
 	}
 
 	public ZkUtils getZkUtils() {
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 90e9e5d..0e9036d 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -359,6 +359,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 				// ignore
 			}
 		}
+		super.shutdown();
 	}
 
 	protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Exception {