You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/08/08 08:13:30 UTC

[2/4] flink git commit: [hotfix][Kafka] Refactor properties for KafkaTestEnvironment setup

[hotfix][Kafka] Refactor properties for KafkaTestEnvironment setup


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e11a5919
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e11a5919
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e11a5919

Branch: refs/heads/master
Commit: e11a591956ba608308ccf81e13030291f150739b
Parents: b7f96f7
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Mon Aug 7 15:53:35 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Tue Aug 8 10:13:02 2017 +0200

----------------------------------------------------------------------
 .../kafka/KafkaTestEnvironmentImpl.java         | 33 ++++++-------
 .../kafka/KafkaTestEnvironmentImpl.java         | 19 ++++----
 .../kafka/KafkaTestEnvironmentImpl.java         | 35 +++++++-------
 .../kafka/KafkaShortRetentionTestBase.java      |  2 +-
 .../connectors/kafka/KafkaTestBase.java         |  2 +-
 .../connectors/kafka/KafkaTestEnvironment.java  | 49 ++++++++++++++++++--
 6 files changed, 88 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e11a5919/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index d3b45a9..9f1d379 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -79,8 +79,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	private String zookeeperConnectionString;
 	private String brokerConnectionString = "";
 	private Properties standardProps;
-	private Properties additionalServerProperties;
-	private boolean secureMode = false;
+	private Config config;
 	// 6 seconds is default. Seems to be too small for travis. 30 seconds
 	private int zkTimeout = 30000;
 
@@ -96,7 +95,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	@Override
 	public Properties getSecureProperties() {
 		Properties prop = new Properties();
-		if (secureMode) {
+		if (config.isSecureMode()) {
 			prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
 			prop.put("security.protocol", "SASL_PLAINTEXT");
 			prop.put("sasl.kerberos.service.name", "kafka");
@@ -215,26 +214,24 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) {
+	public void prepare(Config config) {
 		//increase the timeout since in Travis ZK connection takes long time for secure connection.
-		if (secureMode) {
+		if (config.isSecureMode()) {
 			//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
-			numKafkaServers = 1;
+			config.setKafkaServersNumber(1);
 			zkTimeout = zkTimeout * 15;
 		}
+		this.config = config;
 
-		this.additionalServerProperties = additionalServerProperties;
-		this.secureMode = secureMode;
 		File tempDir = new File(System.getProperty("java.io.tmpdir"));
-
 		tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
 		assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs());
 
 		tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + (UUID.randomUUID().toString()));
 		assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());
 
-		tmpKafkaDirs = new ArrayList<>(numKafkaServers);
-		for (int i = 0; i < numKafkaServers; i++) {
+		tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber());
+		for (int i = 0; i < config.getKafkaServersNumber(); i++) {
 			File tmpDir = new File(tmpKafkaParent, "server-" + i);
 			assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
 			tmpKafkaDirs.add(tmpDir);
@@ -249,12 +246,12 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 			LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
 
 			LOG.info("Starting KafkaServer");
-			brokers = new ArrayList<>(numKafkaServers);
+			brokers = new ArrayList<>(config.getKafkaServersNumber());
 
-			for (int i = 0; i < numKafkaServers; i++) {
+			for (int i = 0; i < config.getKafkaServersNumber(); i++) {
 				brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
 
-				if (secureMode) {
+				if (config.isSecureMode()) {
 					brokerConnectionString += hostAndPortToUrlString(
 							KafkaTestEnvironment.KAFKA_HOST,
 							brokers.get(i).socketServer().boundPort(
@@ -347,7 +344,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		final long deadline = System.nanoTime() + 30_000_000_000L;
 		do {
 			try {
-				if (secureMode) {
+				if (config.isSecureMode()) {
 					//increase wait time since in Travis ZK timeout occurs frequently
 					int wait = zkTimeout / 100;
 					LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic);
@@ -407,8 +404,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		// for CI stability, increase zookeeper session timeout
 		kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout);
 		kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout);
-		if (additionalServerProperties != null) {
-			kafkaProperties.putAll(additionalServerProperties);
+		if (config.getKafkaServerProperties() != null) {
+			kafkaProperties.putAll(config.getKafkaServerProperties());
 		}
 
 		final int numTries = 5;
@@ -418,7 +415,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 			kafkaProperties.put("port", Integer.toString(kafkaPort));
 
 			//to support secure kafka cluster
-			if (secureMode) {
+			if (config.isSecureMode()) {
 				LOG.info("Adding Kafka secure configurations");
 				kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
 				kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);

http://git-wip-us.apache.org/repos/asf/flink/blob/e11a5919/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index ab976e1..af5ad67 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -84,7 +84,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	private String zookeeperConnectionString;
 	private String brokerConnectionString = "";
 	private Properties standardProps;
-	private Properties additionalServerProperties;
+
+	private Config config;
 
 	public String getBrokerConnectionString() {
 		return brokerConnectionString;
@@ -206,8 +207,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) {
-		this.additionalServerProperties = additionalServerProperties;
+	public void prepare(Config config) {
+		this.config = config;
 		File tempDir = new File(System.getProperty("java.io.tmpdir"));
 
 		tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
@@ -224,8 +225,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 			fail("cannot create kafka temp dir: " + e.getMessage());
 		}
 
-		tmpKafkaDirs = new ArrayList<>(numKafkaServers);
-		for (int i = 0; i < numKafkaServers; i++) {
+		tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber());
+		for (int i = 0; i < config.getKafkaServersNumber(); i++) {
 			File tmpDir = new File(tmpKafkaParent, "server-" + i);
 			assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
 			tmpKafkaDirs.add(tmpDir);
@@ -240,9 +241,9 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 			zookeeperConnectionString = zookeeper.getConnectString();
 
 			LOG.info("Starting KafkaServer");
-			brokers = new ArrayList<>(numKafkaServers);
+			brokers = new ArrayList<>(config.getKafkaServersNumber());
 
-			for (int i = 0; i < numKafkaServers; i++) {
+			for (int i = 0; i < config.getKafkaServersNumber(); i++) {
 				brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
 				SocketServer socketServer = brokers.get(i).socketServer();
 
@@ -391,8 +392,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		// for CI stability, increase zookeeper session timeout
 		kafkaProperties.put("zookeeper.session.timeout.ms", "30000");
 		kafkaProperties.put("zookeeper.connection.timeout.ms", "30000");
-		if (additionalServerProperties != null) {
-			kafkaProperties.putAll(additionalServerProperties);
+		if (config.getKafkaServerProperties() != null) {
+			kafkaProperties.putAll(config.getKafkaServerProperties());
 		}
 
 		final int numTries = 5;

http://git-wip-us.apache.org/repos/asf/flink/blob/e11a5919/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index df95420..517f096 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -78,11 +78,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	private String zookeeperConnectionString;
 	private String brokerConnectionString = "";
 	private Properties standardProps;
-	private Properties additionalServerProperties;
-	private boolean secureMode = false;
 	// 6 seconds is default. Seems to be too small for travis. 30 seconds
 	private String zkTimeout = "30000";
 
+	private Config config;
+
 	public String getBrokerConnectionString() {
 		return brokerConnectionString;
 	}
@@ -200,27 +200,24 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) {
-
+	public void prepare(Config config) {
 		//increase the timeout since in Travis ZK connection takes long time for secure connection.
-		if (secureMode) {
+		if (config.isSecureMode()) {
 			//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
-			numKafkaServers = 1;
+			config.setKafkaServersNumber(1);
 			zkTimeout = String.valueOf(Integer.parseInt(zkTimeout) * 15);
 		}
+		this.config = config;
 
-		this.additionalServerProperties = additionalServerProperties;
-		this.secureMode = secureMode;
 		File tempDir = new File(System.getProperty("java.io.tmpdir"));
-
 		tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
 		assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs());
 
 		tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + (UUID.randomUUID().toString()));
 		assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());
 
-		tmpKafkaDirs = new ArrayList<>(numKafkaServers);
-		for (int i = 0; i < numKafkaServers; i++) {
+		tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber());
+		for (int i = 0; i < config.getKafkaServersNumber(); i++) {
 			File tmpDir = new File(tmpKafkaParent, "server-" + i);
 			assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
 			tmpKafkaDirs.add(tmpDir);
@@ -236,13 +233,13 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 			LOG.info("zookeeperConnectionString: {}", zookeeperConnectionString);
 
 			LOG.info("Starting KafkaServer");
-			brokers = new ArrayList<>(numKafkaServers);
+			brokers = new ArrayList<>(config.getKafkaServersNumber());
 
-			for (int i = 0; i < numKafkaServers; i++) {
+			for (int i = 0; i < config.getKafkaServersNumber(); i++) {
 				brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
 
 				SocketServer socketServer = brokers.get(i).socketServer();
-				if (secureMode) {
+				if (this.config.isSecureMode()) {
 					brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ",";
 				} else {
 					brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
@@ -335,7 +332,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		final long deadline = System.nanoTime() + Integer.parseInt(zkTimeout) * 1_000_000L;
 		do {
 			try {
-				if (secureMode) {
+				if (config.isSecureMode()) {
 					//increase wait time since in Travis ZK timeout occurs frequently
 					int wait = Integer.parseInt(zkTimeout) / 100;
 					LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic);
@@ -400,8 +397,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		// for CI stability, increase zookeeper session timeout
 		kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout);
 		kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout);
-		if (additionalServerProperties != null) {
-			kafkaProperties.putAll(additionalServerProperties);
+		if (config.getKafkaServerProperties() != null) {
+			kafkaProperties.putAll(config.getKafkaServerProperties());
 		}
 
 		final int numTries = 5;
@@ -411,7 +408,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 			kafkaProperties.put("port", Integer.toString(kafkaPort));
 
 			//to support secure kafka cluster
-			if (secureMode) {
+			if (config.isSecureMode()) {
 				LOG.info("Adding Kafka secure configurations");
 				kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
 				kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
@@ -442,7 +439,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 
 	public Properties getSecureProperties() {
 		Properties prop = new Properties();
-		if (secureMode) {
+		if (config.isSecureMode()) {
 			prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
 			prop.put("security.protocol", "SASL_PLAINTEXT");
 			prop.put("sasl.kerberos.service.name", "kafka");

http://git-wip-us.apache.org/repos/asf/flink/blob/e11a5919/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index d5c9276..3163f52 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -98,7 +98,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
 		specificProperties.setProperty("log.retention.minutes", "0");
 		specificProperties.setProperty("log.retention.ms", "250");
 		specificProperties.setProperty("log.retention.check.interval.ms", "100");
-		kafkaServer.prepare(1, specificProperties, false);
+		kafkaServer.prepare(kafkaServer.createConfig().setKafkaServerProperties(specificProperties));
 
 		standardProps = kafkaServer.getStandardProperties();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e11a5919/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index c484a4b..8eb0351 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -135,7 +135,7 @@ public abstract class KafkaTestBase extends TestLogger {
 
 		LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion());
 
-		kafkaServer.prepare(NUMBER_OF_KAFKA_SERVERS, secureMode);
+		kafkaServer.prepare(kafkaServer.createConfig().setKafkaServersNumber(NUMBER_OF_KAFKA_SERVERS).setSecureMode(secureMode));
 
 		standardProps = kafkaServer.getStandardProperties();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e11a5919/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 50eff23..ea292a9 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -38,15 +38,56 @@ import java.util.Properties;
  * Abstract class providing a Kafka test environment.
  */
 public abstract class KafkaTestEnvironment {
+	/**
+	 * Configuration class for {@link KafkaTestEnvironment}.
+	 */
+	public static class Config {
+		private int kafkaServersNumber = 1;
+		private Properties kafkaServerProperties = null;
+		private boolean secureMode = false;
+
+		/**
+		 * Please use {@link KafkaTestEnvironment#createConfig()} method.
+		 */
+		private Config() {
+		}
+
+		public int getKafkaServersNumber() {
+			return kafkaServersNumber;
+		}
+
+		public Config setKafkaServersNumber(int kafkaServersNumber) {
+			this.kafkaServersNumber = kafkaServersNumber;
+			return this;
+		}
+
+		public Properties getKafkaServerProperties() {
+			return kafkaServerProperties;
+		}
+
+		public Config setKafkaServerProperties(Properties kafkaServerProperties) {
+			this.kafkaServerProperties = kafkaServerProperties;
+			return this;
+		}
+
+		public boolean isSecureMode() {
+			return secureMode;
+		}
+
+		public Config setSecureMode(boolean secureMode) {
+			this.secureMode = secureMode;
+			return this;
+		}
+	}
 
 	protected static final String KAFKA_HOST = "localhost";
 
-	public abstract void prepare(int numKafkaServers, Properties kafkaServerProperties, boolean secureMode);
-
-	public void prepare(int numberOfKafkaServers, boolean secureMode) {
-		this.prepare(numberOfKafkaServers, null, secureMode);
+	public static Config createConfig() {
+		return new Config();
 	}
 
+	public abstract void prepare(Config config);
+
 	public abstract void shutdown();
 
 	public abstract void deleteTestTopic(String topic);