You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/08/08 08:13:30 UTC
[2/4] flink git commit: [hotfix][Kafka] Refactor properties for
KafkaTestEnvironment setup
[hotfix][Kafka] Refactor properties for KafkaTestEnvironment setup
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e11a5919
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e11a5919
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e11a5919
Branch: refs/heads/master
Commit: e11a591956ba608308ccf81e13030291f150739b
Parents: b7f96f7
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Mon Aug 7 15:53:35 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Tue Aug 8 10:13:02 2017 +0200
----------------------------------------------------------------------
.../kafka/KafkaTestEnvironmentImpl.java | 33 ++++++-------
.../kafka/KafkaTestEnvironmentImpl.java | 19 ++++----
.../kafka/KafkaTestEnvironmentImpl.java | 35 +++++++-------
.../kafka/KafkaShortRetentionTestBase.java | 2 +-
.../connectors/kafka/KafkaTestBase.java | 2 +-
.../connectors/kafka/KafkaTestEnvironment.java | 49 ++++++++++++++++++--
6 files changed, 88 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e11a5919/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index d3b45a9..9f1d379 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -79,8 +79,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
private String zookeeperConnectionString;
private String brokerConnectionString = "";
private Properties standardProps;
- private Properties additionalServerProperties;
- private boolean secureMode = false;
+ private Config config;
// 6 seconds is default. Seems to be too small for travis. 30 seconds
private int zkTimeout = 30000;
@@ -96,7 +95,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
@Override
public Properties getSecureProperties() {
Properties prop = new Properties();
- if (secureMode) {
+ if (config.isSecureMode()) {
prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
prop.put("security.protocol", "SASL_PLAINTEXT");
prop.put("sasl.kerberos.service.name", "kafka");
@@ -215,26 +214,24 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
- public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) {
+ public void prepare(Config config) {
//increase the timeout since in Travis ZK connection takes long time for secure connection.
- if (secureMode) {
+ if (config.isSecureMode()) {
//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
- numKafkaServers = 1;
+ config.setKafkaServersNumber(1);
zkTimeout = zkTimeout * 15;
}
+ this.config = config;
- this.additionalServerProperties = additionalServerProperties;
- this.secureMode = secureMode;
File tempDir = new File(System.getProperty("java.io.tmpdir"));
-
tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs());
tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + (UUID.randomUUID().toString()));
assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());
- tmpKafkaDirs = new ArrayList<>(numKafkaServers);
- for (int i = 0; i < numKafkaServers; i++) {
+ tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber());
+ for (int i = 0; i < config.getKafkaServersNumber(); i++) {
File tmpDir = new File(tmpKafkaParent, "server-" + i);
assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
tmpKafkaDirs.add(tmpDir);
@@ -249,12 +246,12 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
LOG.info("Starting KafkaServer");
- brokers = new ArrayList<>(numKafkaServers);
+ brokers = new ArrayList<>(config.getKafkaServersNumber());
- for (int i = 0; i < numKafkaServers; i++) {
+ for (int i = 0; i < config.getKafkaServersNumber(); i++) {
brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
- if (secureMode) {
+ if (config.isSecureMode()) {
brokerConnectionString += hostAndPortToUrlString(
KafkaTestEnvironment.KAFKA_HOST,
brokers.get(i).socketServer().boundPort(
@@ -347,7 +344,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
final long deadline = System.nanoTime() + 30_000_000_000L;
do {
try {
- if (secureMode) {
+ if (config.isSecureMode()) {
//increase wait time since in Travis ZK timeout occurs frequently
int wait = zkTimeout / 100;
LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic);
@@ -407,8 +404,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
// for CI stability, increase zookeeper session timeout
kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout);
kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout);
- if (additionalServerProperties != null) {
- kafkaProperties.putAll(additionalServerProperties);
+ if (config.getKafkaServerProperties() != null) {
+ kafkaProperties.putAll(config.getKafkaServerProperties());
}
final int numTries = 5;
@@ -418,7 +415,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
kafkaProperties.put("port", Integer.toString(kafkaPort));
//to support secure kafka cluster
- if (secureMode) {
+ if (config.isSecureMode()) {
LOG.info("Adding Kafka secure configurations");
kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
http://git-wip-us.apache.org/repos/asf/flink/blob/e11a5919/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index ab976e1..af5ad67 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -84,7 +84,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
private String zookeeperConnectionString;
private String brokerConnectionString = "";
private Properties standardProps;
- private Properties additionalServerProperties;
+
+ private Config config;
public String getBrokerConnectionString() {
return brokerConnectionString;
@@ -206,8 +207,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
- public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) {
- this.additionalServerProperties = additionalServerProperties;
+ public void prepare(Config config) {
+ this.config = config;
File tempDir = new File(System.getProperty("java.io.tmpdir"));
tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
@@ -224,8 +225,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
fail("cannot create kafka temp dir: " + e.getMessage());
}
- tmpKafkaDirs = new ArrayList<>(numKafkaServers);
- for (int i = 0; i < numKafkaServers; i++) {
+ tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber());
+ for (int i = 0; i < config.getKafkaServersNumber(); i++) {
File tmpDir = new File(tmpKafkaParent, "server-" + i);
assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
tmpKafkaDirs.add(tmpDir);
@@ -240,9 +241,9 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
zookeeperConnectionString = zookeeper.getConnectString();
LOG.info("Starting KafkaServer");
- brokers = new ArrayList<>(numKafkaServers);
+ brokers = new ArrayList<>(config.getKafkaServersNumber());
- for (int i = 0; i < numKafkaServers; i++) {
+ for (int i = 0; i < config.getKafkaServersNumber(); i++) {
brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
SocketServer socketServer = brokers.get(i).socketServer();
@@ -391,8 +392,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
// for CI stability, increase zookeeper session timeout
kafkaProperties.put("zookeeper.session.timeout.ms", "30000");
kafkaProperties.put("zookeeper.connection.timeout.ms", "30000");
- if (additionalServerProperties != null) {
- kafkaProperties.putAll(additionalServerProperties);
+ if (config.getKafkaServerProperties() != null) {
+ kafkaProperties.putAll(config.getKafkaServerProperties());
}
final int numTries = 5;
http://git-wip-us.apache.org/repos/asf/flink/blob/e11a5919/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index df95420..517f096 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -78,11 +78,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
private String zookeeperConnectionString;
private String brokerConnectionString = "";
private Properties standardProps;
- private Properties additionalServerProperties;
- private boolean secureMode = false;
// 6 seconds is default. Seems to be too small for travis. 30 seconds
private String zkTimeout = "30000";
+ private Config config;
+
public String getBrokerConnectionString() {
return brokerConnectionString;
}
@@ -200,27 +200,24 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
- public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) {
-
+ public void prepare(Config config) {
//increase the timeout since in Travis ZK connection takes long time for secure connection.
- if (secureMode) {
+ if (config.isSecureMode()) {
//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
- numKafkaServers = 1;
+ config.setKafkaServersNumber(1);
zkTimeout = String.valueOf(Integer.parseInt(zkTimeout) * 15);
}
+ this.config = config;
- this.additionalServerProperties = additionalServerProperties;
- this.secureMode = secureMode;
File tempDir = new File(System.getProperty("java.io.tmpdir"));
-
tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs());
tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + (UUID.randomUUID().toString()));
assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());
- tmpKafkaDirs = new ArrayList<>(numKafkaServers);
- for (int i = 0; i < numKafkaServers; i++) {
+ tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber());
+ for (int i = 0; i < config.getKafkaServersNumber(); i++) {
File tmpDir = new File(tmpKafkaParent, "server-" + i);
assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
tmpKafkaDirs.add(tmpDir);
@@ -236,13 +233,13 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
LOG.info("zookeeperConnectionString: {}", zookeeperConnectionString);
LOG.info("Starting KafkaServer");
- brokers = new ArrayList<>(numKafkaServers);
+ brokers = new ArrayList<>(config.getKafkaServersNumber());
- for (int i = 0; i < numKafkaServers; i++) {
+ for (int i = 0; i < config.getKafkaServersNumber(); i++) {
brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
SocketServer socketServer = brokers.get(i).socketServer();
- if (secureMode) {
+ if (this.config.isSecureMode()) {
brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ",";
} else {
brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
@@ -335,7 +332,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
final long deadline = System.nanoTime() + Integer.parseInt(zkTimeout) * 1_000_000L;
do {
try {
- if (secureMode) {
+ if (config.isSecureMode()) {
//increase wait time since in Travis ZK timeout occurs frequently
int wait = Integer.parseInt(zkTimeout) / 100;
LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic);
@@ -400,8 +397,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
// for CI stability, increase zookeeper session timeout
kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout);
kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout);
- if (additionalServerProperties != null) {
- kafkaProperties.putAll(additionalServerProperties);
+ if (config.getKafkaServerProperties() != null) {
+ kafkaProperties.putAll(config.getKafkaServerProperties());
}
final int numTries = 5;
@@ -411,7 +408,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
kafkaProperties.put("port", Integer.toString(kafkaPort));
//to support secure kafka cluster
- if (secureMode) {
+ if (config.isSecureMode()) {
LOG.info("Adding Kafka secure configurations");
kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
@@ -442,7 +439,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
public Properties getSecureProperties() {
Properties prop = new Properties();
- if (secureMode) {
+ if (config.isSecureMode()) {
prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
prop.put("security.protocol", "SASL_PLAINTEXT");
prop.put("sasl.kerberos.service.name", "kafka");
http://git-wip-us.apache.org/repos/asf/flink/blob/e11a5919/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index d5c9276..3163f52 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -98,7 +98,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
specificProperties.setProperty("log.retention.minutes", "0");
specificProperties.setProperty("log.retention.ms", "250");
specificProperties.setProperty("log.retention.check.interval.ms", "100");
- kafkaServer.prepare(1, specificProperties, false);
+ kafkaServer.prepare(kafkaServer.createConfig().setKafkaServerProperties(specificProperties));
standardProps = kafkaServer.getStandardProperties();
http://git-wip-us.apache.org/repos/asf/flink/blob/e11a5919/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index c484a4b..8eb0351 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -135,7 +135,7 @@ public abstract class KafkaTestBase extends TestLogger {
LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion());
- kafkaServer.prepare(NUMBER_OF_KAFKA_SERVERS, secureMode);
+ kafkaServer.prepare(kafkaServer.createConfig().setKafkaServersNumber(NUMBER_OF_KAFKA_SERVERS).setSecureMode(secureMode));
standardProps = kafkaServer.getStandardProperties();
http://git-wip-us.apache.org/repos/asf/flink/blob/e11a5919/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 50eff23..ea292a9 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -38,15 +38,56 @@ import java.util.Properties;
* Abstract class providing a Kafka test environment.
*/
public abstract class KafkaTestEnvironment {
+ /**
+ * Configuration class for {@link KafkaTestEnvironment}.
+ */
+ public static class Config {
+ private int kafkaServersNumber = 1;
+ private Properties kafkaServerProperties = null;
+ private boolean secureMode = false;
+
+ /**
+ * Please use {@link KafkaTestEnvironment#createConfig()} method.
+ */
+ private Config() {
+ }
+
+ public int getKafkaServersNumber() {
+ return kafkaServersNumber;
+ }
+
+ public Config setKafkaServersNumber(int kafkaServersNumber) {
+ this.kafkaServersNumber = kafkaServersNumber;
+ return this;
+ }
+
+ public Properties getKafkaServerProperties() {
+ return kafkaServerProperties;
+ }
+
+ public Config setKafkaServerProperties(Properties kafkaServerProperties) {
+ this.kafkaServerProperties = kafkaServerProperties;
+ return this;
+ }
+
+ public boolean isSecureMode() {
+ return secureMode;
+ }
+
+ public Config setSecureMode(boolean secureMode) {
+ this.secureMode = secureMode;
+ return this;
+ }
+ }
protected static final String KAFKA_HOST = "localhost";
- public abstract void prepare(int numKafkaServers, Properties kafkaServerProperties, boolean secureMode);
-
- public void prepare(int numberOfKafkaServers, boolean secureMode) {
- this.prepare(numberOfKafkaServers, null, secureMode);
+ public static Config createConfig() {
+ return new Config();
}
+ public abstract void prepare(Config config);
+
public abstract void shutdown();
public abstract void deleteTestTopic(String topic);