You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/05/07 07:44:30 UTC
[flink] 03/07: [hotfix][kafka,
test] Allow exceptions in KafkaTestEnvironment#prepare
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5b49f17236379ba0afdc6a536bc70c7e7fcd14ff
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Wed Feb 6 15:11:55 2019 +0100
[hotfix][kafka,test] Allow exceptions in KafkaTestEnvironment#prepare
---
.../connectors/kafka/KafkaTestEnvironmentImpl.java | 36 ++++++++---------
.../streaming/connectors/kafka/Kafka011ITCase.java | 2 +-
.../kafka/Kafka011ProducerAtLeastOnceITCase.java | 2 +-
.../kafka/Kafka011ProducerExactlyOnceITCase.java | 2 +-
.../connectors/kafka/KafkaTestEnvironmentImpl.java | 36 ++++++++---------
.../streaming/connectors/kafka/Kafka08ITCase.java | 2 +-
.../connectors/kafka/KafkaTestEnvironmentImpl.java | 45 ++++++++--------------
.../connectors/kafka/Kafka09SecuredRunITCase.java | 2 +-
.../connectors/kafka/KafkaTestEnvironmentImpl.java | 38 ++++++++----------
.../kafka/KafkaShortRetentionTestBase.java | 2 +-
.../streaming/connectors/kafka/KafkaTestBase.java | 6 +--
.../connectors/kafka/KafkaTestEnvironment.java | 2 +-
.../streaming/connectors/kafka/KafkaITCase.java | 2 +-
.../kafka/KafkaProducerAtLeastOnceITCase.java | 2 +-
.../kafka/KafkaProducerExactlyOnceITCase.java | 2 +-
.../connectors/kafka/KafkaTestEnvironmentImpl.java | 36 ++++++++---------
16 files changed, 89 insertions(+), 128 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 8c69f36..9e51aac 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -217,7 +217,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
- public void prepare(Config config) {
+ public void prepare(Config config) throws Exception {
//increase the timeout since in Travis ZK connection takes long time for secure connection.
if (config.isSecureMode()) {
//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
@@ -243,29 +243,23 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
zookeeper = null;
brokers = null;
- try {
- zookeeper = new TestingServer(-1, tmpZkDir);
- zookeeperConnectionString = zookeeper.getConnectString();
- LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
-
- LOG.info("Starting KafkaServer");
- brokers = new ArrayList<>(config.getKafkaServersNumber());
-
- ListenerName listenerName = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT);
- for (int i = 0; i < config.getKafkaServersNumber(); i++) {
- KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i));
- brokers.add(kafkaServer);
- brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(listenerName));
- brokerConnectionString += ",";
- }
+ zookeeper = new TestingServer(-1, tmpZkDir);
+ zookeeperConnectionString = zookeeper.getConnectString();
+ LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
- LOG.info("ZK and KafkaServer started.");
- }
- catch (Throwable t) {
- t.printStackTrace();
- fail("Test setup failed: " + t.getMessage());
+ LOG.info("Starting KafkaServer");
+ brokers = new ArrayList<>(config.getKafkaServersNumber());
+
+ ListenerName listenerName = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT);
+ for (int i = 0; i < config.getKafkaServersNumber(); i++) {
+ KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i));
+ brokers.add(kafkaServer);
+ brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(listenerName));
+ brokerConnectionString += ",";
}
+ LOG.info("ZK and KafkaServer started.");
+
standardProps = new Properties();
standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
standardProps.setProperty("bootstrap.servers", brokerConnectionString);
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
index 1678134..3677daa 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
@@ -54,7 +54,7 @@ import java.util.Optional;
public class Kafka011ITCase extends KafkaConsumerTestBase {
@BeforeClass
- public static void prepare() throws ClassNotFoundException {
+ public static void prepare() throws Exception {
KafkaProducerTestBase.prepare();
((KafkaTestEnvironmentImpl) kafkaServer).setProducerSemantic(FlinkKafkaProducer011.Semantic.AT_LEAST_ONCE);
}
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerAtLeastOnceITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerAtLeastOnceITCase.java
index ad63662..84efe25 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerAtLeastOnceITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerAtLeastOnceITCase.java
@@ -27,7 +27,7 @@ import org.junit.BeforeClass;
public class Kafka011ProducerAtLeastOnceITCase extends KafkaProducerTestBase {
@BeforeClass
- public static void prepare() throws ClassNotFoundException {
+ public static void prepare() throws Exception {
KafkaProducerTestBase.prepare();
((KafkaTestEnvironmentImpl) kafkaServer).setProducerSemantic(FlinkKafkaProducer011.Semantic.AT_LEAST_ONCE);
}
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java
index 5038b7f..8fb1599 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java
@@ -27,7 +27,7 @@ import org.junit.Test;
@SuppressWarnings("serial")
public class Kafka011ProducerExactlyOnceITCase extends KafkaProducerTestBase {
@BeforeClass
- public static void prepare() throws ClassNotFoundException {
+ public static void prepare() throws Exception {
KafkaProducerTestBase.prepare();
((KafkaTestEnvironmentImpl) kafkaServer).setProducerSemantic(FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
}
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 57dc663..539bc59 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -234,7 +234,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
- public void prepare(Config config) {
+ public void prepare(Config config) throws Exception {
//increase the timeout since in Travis ZK connection takes long time for secure connection.
if (config.isSecureMode()) {
//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
@@ -260,29 +260,23 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
zookeeper = null;
brokers = null;
- try {
- zookeeper = new TestingServer(-1, tmpZkDir);
- zookeeperConnectionString = zookeeper.getConnectString();
- LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
-
- LOG.info("Starting KafkaServer");
- brokers = new ArrayList<>(config.getKafkaServersNumber());
-
- ListenerName listenerName = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT);
- for (int i = 0; i < config.getKafkaServersNumber(); i++) {
- KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i));
- brokers.add(kafkaServer);
- brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(listenerName));
- brokerConnectionString += ",";
- }
+ zookeeper = new TestingServer(-1, tmpZkDir);
+ zookeeperConnectionString = zookeeper.getConnectString();
+ LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
- LOG.info("ZK and KafkaServer started.");
- }
- catch (Throwable t) {
- t.printStackTrace();
- fail("Test setup failed: " + t.getMessage());
+ LOG.info("Starting KafkaServer");
+ brokers = new ArrayList<>(config.getKafkaServersNumber());
+
+ ListenerName listenerName = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT);
+ for (int i = 0; i < config.getKafkaServersNumber(); i++) {
+ KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i));
+ brokers.add(kafkaServer);
+ brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(listenerName));
+ brokerConnectionString += ",";
}
+ LOG.info("ZK and KafkaServer started.");
+
standardProps = new Properties();
standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
standardProps.setProperty("bootstrap.servers", brokerConnectionString);
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index a250d86..e360c19 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -38,7 +38,7 @@ import static org.junit.Assert.fail;
public class Kafka08ITCase extends KafkaConsumerTestBase {
@BeforeClass
- public static void prepare() throws ClassNotFoundException {
+ public static void prepare() throws Exception {
// Somehow KafkaConsumer 0.8 doesn't handle broker failures if they are behind a proxy
prepare(false);
}
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index cddef88..15c83e4 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -53,7 +53,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.io.IOException;
import java.net.BindException;
import java.nio.file.Files;
import java.util.ArrayList;
@@ -206,23 +205,15 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
- public void prepare(Config config) {
+ public void prepare(Config config) throws Exception {
this.config = config;
File tempDir = new File(System.getProperty("java.io.tmpdir"));
tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
- try {
- Files.createDirectories(tmpZkDir.toPath());
- } catch (IOException e) {
- fail("cannot create zookeeper temp dir: " + e.getMessage());
- }
+ Files.createDirectories(tmpZkDir.toPath());
tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir" + (UUID.randomUUID().toString()));
- try {
- Files.createDirectories(tmpKafkaParent.toPath());
- } catch (IOException e) {
- fail("cannot create kafka temp dir: " + e.getMessage());
- }
+ Files.createDirectories(tmpKafkaParent.toPath());
tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber());
for (int i = 0; i < config.getKafkaServersNumber(); i++) {
@@ -234,29 +225,23 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
zookeeper = null;
brokers = null;
- try {
- LOG.info("Starting Zookeeper");
- zookeeper = new TestingServer(-1, tmpZkDir);
- zookeeperConnectionString = zookeeper.getConnectString();
-
- LOG.info("Starting KafkaServer");
- brokers = new ArrayList<>(config.getKafkaServersNumber());
+ LOG.info("Starting Zookeeper");
+ zookeeper = new TestingServer(-1, tmpZkDir);
+ zookeeperConnectionString = zookeeper.getConnectString();
- for (int i = 0; i < config.getKafkaServersNumber(); i++) {
- brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
- SocketServer socketServer = brokers.get(i).socketServer();
+ LOG.info("Starting KafkaServer");
+ brokers = new ArrayList<>(config.getKafkaServersNumber());
- String host = socketServer.host() == null ? "localhost" : socketServer.host();
- brokerConnectionString += hostAndPortToUrlString(host, socketServer.port()) + ",";
- }
+ for (int i = 0; i < config.getKafkaServersNumber(); i++) {
+ brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
+ SocketServer socketServer = brokers.get(i).socketServer();
- LOG.info("ZK and KafkaServer started.");
- }
- catch (Throwable t) {
- t.printStackTrace();
- fail("Test setup failed: " + t.getMessage());
+ String host = socketServer.host() == null ? "localhost" : socketServer.host();
+ brokerConnectionString += hostAndPortToUrlString(host, socketServer.port()) + ",";
}
+ LOG.info("ZK and KafkaServer started.");
+
standardProps = new Properties();
standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
standardProps.setProperty("bootstrap.servers", brokerConnectionString);
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
index b4002c7..8cd61cc 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
@@ -33,7 +33,7 @@ public class Kafka09SecuredRunITCase extends KafkaConsumerTestBase {
protected static final Logger LOG = LoggerFactory.getLogger(Kafka09SecuredRunITCase.class);
@BeforeClass
- public static void prepare() throws ClassNotFoundException {
+ public static void prepare() throws Exception {
LOG.info("-------------------------------------------------------------------------");
LOG.info(" Starting Kafka09SecuredRunITCase ");
LOG.info("-------------------------------------------------------------------------");
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 2ed600b..7a5c9e5 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -199,7 +199,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
- public void prepare(Config config) {
+ public void prepare(Config config) throws Exception {
//increase the timeout since in Travis ZK connection takes long time for secure connection.
if (config.isSecureMode()) {
//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
@@ -225,30 +225,24 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
zookeeper = null;
brokers = null;
- try {
- LOG.info("Starting Zookeeper");
- zookeeper = new TestingServer(-1, tmpZkDir);
- zookeeperConnectionString = zookeeper.getConnectString();
- LOG.info("zookeeperConnectionString: {}", zookeeperConnectionString);
-
- LOG.info("Starting KafkaServer");
- brokers = new ArrayList<>(config.getKafkaServersNumber());
-
- SecurityProtocol securityProtocol = config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT;
- for (int i = 0; i < config.getKafkaServersNumber(); i++) {
- KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i));
- brokers.add(kafkaServer);
- brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(securityProtocol));
- brokerConnectionString += ",";
- }
+ LOG.info("Starting Zookeeper");
+ zookeeper = new TestingServer(-1, tmpZkDir);
+ zookeeperConnectionString = zookeeper.getConnectString();
+ LOG.info("zookeeperConnectionString: {}", zookeeperConnectionString);
- LOG.info("ZK and KafkaServer started.");
- }
- catch (Throwable t) {
- t.printStackTrace();
- fail("Test setup failed: " + t.getMessage());
+ LOG.info("Starting KafkaServer");
+ brokers = new ArrayList<>(config.getKafkaServersNumber());
+
+ SecurityProtocol securityProtocol = config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT;
+ for (int i = 0; i < config.getKafkaServersNumber(); i++) {
+ KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i));
+ brokers.add(kafkaServer);
+ brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(securityProtocol));
+ brokerConnectionString += ",";
}
+ LOG.info("ZK and KafkaServer started.");
+
LOG.info("brokerConnectionString --> {}", brokerConnectionString);
standardProps = new Properties();
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index 93bd7bc..42cfb89 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -89,7 +89,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
}
@BeforeClass
- public static void prepare() throws ClassNotFoundException {
+ public static void prepare() throws Exception {
LOG.info("-------------------------------------------------------------------------");
LOG.info(" Starting KafkaShortRetentionTestBase ");
LOG.info("-------------------------------------------------------------------------");
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 712c0a8..bc91afb 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -88,11 +88,11 @@ public abstract class KafkaTestBase extends TestLogger {
// ------------------------------------------------------------------------
@BeforeClass
- public static void prepare() throws ClassNotFoundException {
+ public static void prepare() throws Exception {
prepare(true);
}
- public static void prepare(boolean hideKafkaBehindProxy) throws ClassNotFoundException {
+ public static void prepare(boolean hideKafkaBehindProxy) throws Exception {
LOG.info("-------------------------------------------------------------------------");
LOG.info(" Starting KafkaTestBase ");
LOG.info("-------------------------------------------------------------------------");
@@ -126,7 +126,7 @@ public abstract class KafkaTestBase extends TestLogger {
return flinkConfig;
}
- protected static void startClusters(boolean secureMode, boolean hideKafkaBehindProxy) throws ClassNotFoundException {
+ protected static void startClusters(boolean secureMode, boolean hideKafkaBehindProxy) throws Exception {
// dynamically load the implementation for the test
Class<?> clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 53501fb..a787f27 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -99,7 +99,7 @@ public abstract class KafkaTestEnvironment {
return new Config();
}
- public abstract void prepare(Config config);
+ public abstract void prepare(Config config) throws Exception;
public void shutdown() throws Exception {
for (NetworkFailuresProxy proxy : networkFailuresProxies) {
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index bc945a2..8729319 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -54,7 +54,7 @@ import java.util.Optional;
public class KafkaITCase extends KafkaConsumerTestBase {
@BeforeClass
- public static void prepare() throws ClassNotFoundException {
+ public static void prepare() throws Exception {
KafkaProducerTestBase.prepare();
((KafkaTestEnvironmentImpl) kafkaServer).setProducerSemantic(FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerAtLeastOnceITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerAtLeastOnceITCase.java
index c07b84f..6d82d3e 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerAtLeastOnceITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerAtLeastOnceITCase.java
@@ -27,7 +27,7 @@ import org.junit.BeforeClass;
public class KafkaProducerAtLeastOnceITCase extends KafkaProducerTestBase {
@BeforeClass
- public static void prepare() throws ClassNotFoundException {
+ public static void prepare() throws Exception {
KafkaProducerTestBase.prepare();
((KafkaTestEnvironmentImpl) kafkaServer).setProducerSemantic(FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExactlyOnceITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExactlyOnceITCase.java
index 45a0312..1752b27 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExactlyOnceITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExactlyOnceITCase.java
@@ -27,7 +27,7 @@ import org.junit.Test;
@SuppressWarnings("serial")
public class KafkaProducerExactlyOnceITCase extends KafkaProducerTestBase {
@BeforeClass
- public static void prepare() throws ClassNotFoundException {
+ public static void prepare() throws Exception {
KafkaProducerTestBase.prepare();
((KafkaTestEnvironmentImpl) kafkaServer).setProducerSemantic(FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 0e9036d..ad68d59 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -93,7 +93,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
- public void prepare(Config config) {
+ public void prepare(Config config) throws Exception {
//increase the timeout since in Travis ZK connection takes long time for secure connection.
if (config.isSecureMode()) {
//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
@@ -119,29 +119,23 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
zookeeper = null;
brokers = null;
- try {
- zookeeper = new TestingServer(-1, tmpZkDir);
- zookeeperConnectionString = zookeeper.getConnectString();
- LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
-
- LOG.info("Starting KafkaServer");
- brokers = new ArrayList<>(config.getKafkaServersNumber());
-
- ListenerName listenerName = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT);
- for (int i = 0; i < config.getKafkaServersNumber(); i++) {
- KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i));
- brokers.add(kafkaServer);
- brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(listenerName));
- brokerConnectionString += ",";
- }
+ zookeeper = new TestingServer(-1, tmpZkDir);
+ zookeeperConnectionString = zookeeper.getConnectString();
+ LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
- LOG.info("ZK and KafkaServer started.");
- }
- catch (Throwable t) {
- t.printStackTrace();
- fail("Test setup failed: " + t.getMessage());
+ LOG.info("Starting KafkaServer");
+ brokers = new ArrayList<>(config.getKafkaServersNumber());
+
+ ListenerName listenerName = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT);
+ for (int i = 0; i < config.getKafkaServersNumber(); i++) {
+ KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i));
+ brokers.add(kafkaServer);
+ brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(listenerName));
+ brokerConnectionString += ",";
}
+ LOG.info("ZK and KafkaServer started.");
+
standardProps = new Properties();
standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
standardProps.setProperty("bootstrap.servers", brokerConnectionString);