You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/05/07 07:44:30 UTC

[flink] 03/07: [hotfix][kafka, test] Allow exceptions in KafkaTestEnvironment#prepare

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5b49f17236379ba0afdc6a536bc70c7e7fcd14ff
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Wed Feb 6 15:11:55 2019 +0100

    [hotfix][kafka,test] Allow exceptions in KafkaTestEnvironment#prepare
---
 .../connectors/kafka/KafkaTestEnvironmentImpl.java | 36 ++++++++---------
 .../streaming/connectors/kafka/Kafka011ITCase.java |  2 +-
 .../kafka/Kafka011ProducerAtLeastOnceITCase.java   |  2 +-
 .../kafka/Kafka011ProducerExactlyOnceITCase.java   |  2 +-
 .../connectors/kafka/KafkaTestEnvironmentImpl.java | 36 ++++++++---------
 .../streaming/connectors/kafka/Kafka08ITCase.java  |  2 +-
 .../connectors/kafka/KafkaTestEnvironmentImpl.java | 45 ++++++++--------------
 .../connectors/kafka/Kafka09SecuredRunITCase.java  |  2 +-
 .../connectors/kafka/KafkaTestEnvironmentImpl.java | 38 ++++++++----------
 .../kafka/KafkaShortRetentionTestBase.java         |  2 +-
 .../streaming/connectors/kafka/KafkaTestBase.java  |  6 +--
 .../connectors/kafka/KafkaTestEnvironment.java     |  2 +-
 .../streaming/connectors/kafka/KafkaITCase.java    |  2 +-
 .../kafka/KafkaProducerAtLeastOnceITCase.java      |  2 +-
 .../kafka/KafkaProducerExactlyOnceITCase.java      |  2 +-
 .../connectors/kafka/KafkaTestEnvironmentImpl.java | 36 ++++++++---------
 16 files changed, 89 insertions(+), 128 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 8c69f36..9e51aac 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -217,7 +217,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public void prepare(Config config) {
+	public void prepare(Config config) throws Exception {
 		//increase the timeout since in Travis ZK connection takes long time for secure connection.
 		if (config.isSecureMode()) {
 			//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
@@ -243,29 +243,23 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		zookeeper = null;
 		brokers = null;
 
-		try {
-			zookeeper = new TestingServer(-1, tmpZkDir);
-			zookeeperConnectionString = zookeeper.getConnectString();
-			LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
-
-			LOG.info("Starting KafkaServer");
-			brokers = new ArrayList<>(config.getKafkaServersNumber());
-
-			ListenerName listenerName = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT);
-			for (int i = 0; i < config.getKafkaServersNumber(); i++) {
-				KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i));
-				brokers.add(kafkaServer);
-				brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(listenerName));
-				brokerConnectionString +=  ",";
-			}
+		zookeeper = new TestingServer(-1, tmpZkDir);
+		zookeeperConnectionString = zookeeper.getConnectString();
+		LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
 
-			LOG.info("ZK and KafkaServer started.");
-		}
-		catch (Throwable t) {
-			t.printStackTrace();
-			fail("Test setup failed: " + t.getMessage());
+		LOG.info("Starting KafkaServer");
+		brokers = new ArrayList<>(config.getKafkaServersNumber());
+
+		ListenerName listenerName = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT);
+		for (int i = 0; i < config.getKafkaServersNumber(); i++) {
+			KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i));
+			brokers.add(kafkaServer);
+			brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(listenerName));
+			brokerConnectionString +=  ",";
 		}
 
+		LOG.info("ZK and KafkaServer started.");
+
 		standardProps = new Properties();
 		standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
 		standardProps.setProperty("bootstrap.servers", brokerConnectionString);
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
index 1678134..3677daa 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
@@ -54,7 +54,7 @@ import java.util.Optional;
 public class Kafka011ITCase extends KafkaConsumerTestBase {
 
 	@BeforeClass
-	public static void prepare() throws ClassNotFoundException {
+	public static void prepare() throws Exception {
 		KafkaProducerTestBase.prepare();
 		((KafkaTestEnvironmentImpl) kafkaServer).setProducerSemantic(FlinkKafkaProducer011.Semantic.AT_LEAST_ONCE);
 	}
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerAtLeastOnceITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerAtLeastOnceITCase.java
index ad63662..84efe25 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerAtLeastOnceITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerAtLeastOnceITCase.java
@@ -27,7 +27,7 @@ import org.junit.BeforeClass;
 public class Kafka011ProducerAtLeastOnceITCase extends KafkaProducerTestBase {
 
 	@BeforeClass
-	public static void prepare() throws ClassNotFoundException {
+	public static void prepare() throws Exception {
 		KafkaProducerTestBase.prepare();
 		((KafkaTestEnvironmentImpl) kafkaServer).setProducerSemantic(FlinkKafkaProducer011.Semantic.AT_LEAST_ONCE);
 	}
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java
index 5038b7f..8fb1599 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java
@@ -27,7 +27,7 @@ import org.junit.Test;
 @SuppressWarnings("serial")
 public class Kafka011ProducerExactlyOnceITCase extends KafkaProducerTestBase {
 	@BeforeClass
-	public static void prepare() throws ClassNotFoundException {
+	public static void prepare() throws Exception {
 		KafkaProducerTestBase.prepare();
 		((KafkaTestEnvironmentImpl) kafkaServer).setProducerSemantic(FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
 	}
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 57dc663..539bc59 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -234,7 +234,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public void prepare(Config config) {
+	public void prepare(Config config) throws Exception {
 		//increase the timeout since in Travis ZK connection takes long time for secure connection.
 		if (config.isSecureMode()) {
 			//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
@@ -260,29 +260,23 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		zookeeper = null;
 		brokers = null;
 
-		try {
-			zookeeper = new TestingServer(-1, tmpZkDir);
-			zookeeperConnectionString = zookeeper.getConnectString();
-			LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
-
-			LOG.info("Starting KafkaServer");
-			brokers = new ArrayList<>(config.getKafkaServersNumber());
-
-			ListenerName listenerName = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT);
-			for (int i = 0; i < config.getKafkaServersNumber(); i++) {
-				KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i));
-				brokers.add(kafkaServer);
-				brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(listenerName));
-				brokerConnectionString +=  ",";
-			}
+		zookeeper = new TestingServer(-1, tmpZkDir);
+		zookeeperConnectionString = zookeeper.getConnectString();
+		LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
 
-			LOG.info("ZK and KafkaServer started.");
-		}
-		catch (Throwable t) {
-			t.printStackTrace();
-			fail("Test setup failed: " + t.getMessage());
+		LOG.info("Starting KafkaServer");
+		brokers = new ArrayList<>(config.getKafkaServersNumber());
+
+		ListenerName listenerName = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT);
+		for (int i = 0; i < config.getKafkaServersNumber(); i++) {
+			KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i));
+			brokers.add(kafkaServer);
+			brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(listenerName));
+			brokerConnectionString +=  ",";
 		}
 
+		LOG.info("ZK and KafkaServer started.");
+
 		standardProps = new Properties();
 		standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
 		standardProps.setProperty("bootstrap.servers", brokerConnectionString);
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index a250d86..e360c19 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -38,7 +38,7 @@ import static org.junit.Assert.fail;
 public class Kafka08ITCase extends KafkaConsumerTestBase {
 
 	@BeforeClass
-	public static void prepare() throws ClassNotFoundException {
+	public static void prepare() throws Exception {
 		// Somehow KafkaConsumer 0.8 doesn't handle broker failures if they are behind a proxy
 		prepare(false);
 	}
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index cddef88..15c83e4 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -53,7 +53,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.IOException;
 import java.net.BindException;
 import java.nio.file.Files;
 import java.util.ArrayList;
@@ -206,23 +205,15 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public void prepare(Config config) {
+	public void prepare(Config config) throws Exception {
 		this.config = config;
 		File tempDir = new File(System.getProperty("java.io.tmpdir"));
 
 		tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
-		try {
-			Files.createDirectories(tmpZkDir.toPath());
-		} catch (IOException e) {
-			fail("cannot create zookeeper temp dir: " + e.getMessage());
-		}
+		Files.createDirectories(tmpZkDir.toPath());
 
 		tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir" + (UUID.randomUUID().toString()));
-		try {
-			Files.createDirectories(tmpKafkaParent.toPath());
-		} catch (IOException e) {
-			fail("cannot create kafka temp dir: " + e.getMessage());
-		}
+		Files.createDirectories(tmpKafkaParent.toPath());
 
 		tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber());
 		for (int i = 0; i < config.getKafkaServersNumber(); i++) {
@@ -234,29 +225,23 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		zookeeper = null;
 		brokers = null;
 
-		try {
-			LOG.info("Starting Zookeeper");
-			zookeeper = new TestingServer(-1, tmpZkDir);
-			zookeeperConnectionString = zookeeper.getConnectString();
-
-			LOG.info("Starting KafkaServer");
-			brokers = new ArrayList<>(config.getKafkaServersNumber());
+		LOG.info("Starting Zookeeper");
+		zookeeper = new TestingServer(-1, tmpZkDir);
+		zookeeperConnectionString = zookeeper.getConnectString();
 
-			for (int i = 0; i < config.getKafkaServersNumber(); i++) {
-				brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
-				SocketServer socketServer = brokers.get(i).socketServer();
+		LOG.info("Starting KafkaServer");
+		brokers = new ArrayList<>(config.getKafkaServersNumber());
 
-				String host = socketServer.host() == null ? "localhost" : socketServer.host();
-				brokerConnectionString += hostAndPortToUrlString(host, socketServer.port()) + ",";
-			}
+		for (int i = 0; i < config.getKafkaServersNumber(); i++) {
+			brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
+			SocketServer socketServer = brokers.get(i).socketServer();
 
-			LOG.info("ZK and KafkaServer started.");
-		}
-		catch (Throwable t) {
-			t.printStackTrace();
-			fail("Test setup failed: " + t.getMessage());
+			String host = socketServer.host() == null ? "localhost" : socketServer.host();
+			brokerConnectionString += hostAndPortToUrlString(host, socketServer.port()) + ",";
 		}
 
+		LOG.info("ZK and KafkaServer started.");
+
 		standardProps = new Properties();
 		standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
 		standardProps.setProperty("bootstrap.servers", brokerConnectionString);
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
index b4002c7..8cd61cc 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
@@ -33,7 +33,7 @@ public class Kafka09SecuredRunITCase extends KafkaConsumerTestBase {
 	protected static final Logger LOG = LoggerFactory.getLogger(Kafka09SecuredRunITCase.class);
 
 	@BeforeClass
-	public static void prepare() throws ClassNotFoundException {
+	public static void prepare() throws Exception {
 		LOG.info("-------------------------------------------------------------------------");
 		LOG.info("    Starting Kafka09SecuredRunITCase ");
 		LOG.info("-------------------------------------------------------------------------");
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 2ed600b..7a5c9e5 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -199,7 +199,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public void prepare(Config config) {
+	public void prepare(Config config) throws Exception {
 		//increase the timeout since in Travis ZK connection takes long time for secure connection.
 		if (config.isSecureMode()) {
 			//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
@@ -225,30 +225,24 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		zookeeper = null;
 		brokers = null;
 
-		try {
-			LOG.info("Starting Zookeeper");
-			zookeeper = new TestingServer(-1, tmpZkDir);
-			zookeeperConnectionString = zookeeper.getConnectString();
-			LOG.info("zookeeperConnectionString: {}", zookeeperConnectionString);
-
-			LOG.info("Starting KafkaServer");
-			brokers = new ArrayList<>(config.getKafkaServersNumber());
-
-			SecurityProtocol securityProtocol = config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT;
-			for (int i = 0; i < config.getKafkaServersNumber(); i++) {
-				KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i));
-				brokers.add(kafkaServer);
-				brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(securityProtocol));
-				brokerConnectionString +=  ",";
-			}
+		LOG.info("Starting Zookeeper");
+		zookeeper = new TestingServer(-1, tmpZkDir);
+		zookeeperConnectionString = zookeeper.getConnectString();
+		LOG.info("zookeeperConnectionString: {}", zookeeperConnectionString);
 
-			LOG.info("ZK and KafkaServer started.");
-		}
-		catch (Throwable t) {
-			t.printStackTrace();
-			fail("Test setup failed: " + t.getMessage());
+		LOG.info("Starting KafkaServer");
+		brokers = new ArrayList<>(config.getKafkaServersNumber());
+
+		SecurityProtocol securityProtocol = config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT;
+		for (int i = 0; i < config.getKafkaServersNumber(); i++) {
+			KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i));
+			brokers.add(kafkaServer);
+			brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(securityProtocol));
+			brokerConnectionString +=  ",";
 		}
 
+		LOG.info("ZK and KafkaServer started.");
+
 		LOG.info("brokerConnectionString --> {}", brokerConnectionString);
 
 		standardProps = new Properties();
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index 93bd7bc..42cfb89 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -89,7 +89,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
 	}
 
 	@BeforeClass
-	public static void prepare() throws ClassNotFoundException {
+	public static void prepare() throws Exception {
 		LOG.info("-------------------------------------------------------------------------");
 		LOG.info("    Starting KafkaShortRetentionTestBase ");
 		LOG.info("-------------------------------------------------------------------------");
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 712c0a8..bc91afb 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -88,11 +88,11 @@ public abstract class KafkaTestBase extends TestLogger {
 	// ------------------------------------------------------------------------
 
 	@BeforeClass
-	public static void prepare() throws ClassNotFoundException {
+	public static void prepare() throws Exception {
 		prepare(true);
 	}
 
-	public static void prepare(boolean hideKafkaBehindProxy) throws ClassNotFoundException {
+	public static void prepare(boolean hideKafkaBehindProxy) throws Exception {
 		LOG.info("-------------------------------------------------------------------------");
 		LOG.info("    Starting KafkaTestBase ");
 		LOG.info("-------------------------------------------------------------------------");
@@ -126,7 +126,7 @@ public abstract class KafkaTestBase extends TestLogger {
 		return flinkConfig;
 	}
 
-	protected static void startClusters(boolean secureMode, boolean hideKafkaBehindProxy) throws ClassNotFoundException {
+	protected static void startClusters(boolean secureMode, boolean hideKafkaBehindProxy) throws Exception {
 
 		// dynamically load the implementation for the test
 		Class<?> clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 53501fb..a787f27 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -99,7 +99,7 @@ public abstract class KafkaTestEnvironment {
 		return new Config();
 	}
 
-	public abstract void prepare(Config config);
+	public abstract void prepare(Config config) throws Exception;
 
 	public void shutdown() throws Exception {
 		for (NetworkFailuresProxy proxy : networkFailuresProxies) {
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index bc945a2..8729319 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -54,7 +54,7 @@ import java.util.Optional;
 public class KafkaITCase extends KafkaConsumerTestBase {
 
 	@BeforeClass
-	public static void prepare() throws ClassNotFoundException {
+	public static void prepare() throws Exception {
 		KafkaProducerTestBase.prepare();
 		((KafkaTestEnvironmentImpl) kafkaServer).setProducerSemantic(FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
 	}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerAtLeastOnceITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerAtLeastOnceITCase.java
index c07b84f..6d82d3e 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerAtLeastOnceITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerAtLeastOnceITCase.java
@@ -27,7 +27,7 @@ import org.junit.BeforeClass;
 public class KafkaProducerAtLeastOnceITCase extends KafkaProducerTestBase {
 
 	@BeforeClass
-	public static void prepare() throws ClassNotFoundException {
+	public static void prepare() throws Exception {
 		KafkaProducerTestBase.prepare();
 		((KafkaTestEnvironmentImpl) kafkaServer).setProducerSemantic(FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
 	}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExactlyOnceITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExactlyOnceITCase.java
index 45a0312..1752b27 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExactlyOnceITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExactlyOnceITCase.java
@@ -27,7 +27,7 @@ import org.junit.Test;
 @SuppressWarnings("serial")
 public class KafkaProducerExactlyOnceITCase extends KafkaProducerTestBase {
 	@BeforeClass
-	public static void prepare() throws ClassNotFoundException {
+	public static void prepare() throws Exception {
 		KafkaProducerTestBase.prepare();
 		((KafkaTestEnvironmentImpl) kafkaServer).setProducerSemantic(FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
 	}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 0e9036d..ad68d59 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -93,7 +93,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public void prepare(Config config) {
+	public void prepare(Config config) throws Exception {
 		//increase the timeout since in Travis ZK connection takes long time for secure connection.
 		if (config.isSecureMode()) {
 			//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
@@ -119,29 +119,23 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		zookeeper = null;
 		brokers = null;
 
-		try {
-			zookeeper = new TestingServer(-1, tmpZkDir);
-			zookeeperConnectionString = zookeeper.getConnectString();
-			LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
-
-			LOG.info("Starting KafkaServer");
-			brokers = new ArrayList<>(config.getKafkaServersNumber());
-
-			ListenerName listenerName = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT);
-			for (int i = 0; i < config.getKafkaServersNumber(); i++) {
-				KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i));
-				brokers.add(kafkaServer);
-				brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(listenerName));
-				brokerConnectionString +=  ",";
-			}
+		zookeeper = new TestingServer(-1, tmpZkDir);
+		zookeeperConnectionString = zookeeper.getConnectString();
+		LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
 
-			LOG.info("ZK and KafkaServer started.");
-		}
-		catch (Throwable t) {
-			t.printStackTrace();
-			fail("Test setup failed: " + t.getMessage());
+		LOG.info("Starting KafkaServer");
+		brokers = new ArrayList<>(config.getKafkaServersNumber());
+
+		ListenerName listenerName = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT);
+		for (int i = 0; i < config.getKafkaServersNumber(); i++) {
+			KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i));
+			brokers.add(kafkaServer);
+			brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(listenerName));
+			brokerConnectionString +=  ",";
 		}
 
+		LOG.info("ZK and KafkaServer started.");
+
 		standardProps = new Properties();
 		standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
 		standardProps.setProperty("bootstrap.servers", brokerConnectionString);