You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/08/08 08:13:29 UTC

[1/4] flink git commit: [hotfix][Kafka] Clean up getKafkaServer method

Repository: flink
Updated Branches:
  refs/heads/master e2d3e1f86 -> 14bcac7b9


[hotfix][Kafka] Clean up getKafkaServer method


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/14bcac7b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/14bcac7b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/14bcac7b

Branch: refs/heads/master
Commit: 14bcac7b92d9df36deca707f437177a7ce370c13
Parents: cd373ef
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Thu Aug 3 11:35:26 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Tue Aug 8 10:13:02 2017 +0200

----------------------------------------------------------------------
 .../connectors/kafka/KafkaTestEnvironmentImpl.java | 17 ++++-------------
 .../connectors/kafka/KafkaTestEnvironmentImpl.java | 14 +++++---------
 2 files changed, 9 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/14bcac7b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 5be802f..5a5caad 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -249,20 +249,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 			LOG.info("Starting KafkaServer");
 			brokers = new ArrayList<>(config.getKafkaServersNumber());
 
+			ListenerName listenerName = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT);
 			for (int i = 0; i < config.getKafkaServersNumber(); i++) {
-				brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
-
-				if (config.isSecureMode()) {
-					brokerConnectionString += hostAndPortToUrlString(
-							KafkaTestEnvironment.KAFKA_HOST,
-							brokers.get(i).socketServer().boundPort(
-									ListenerName.forSecurityProtocol(SecurityProtocol.SASL_PLAINTEXT)));
-				} else {
-					brokerConnectionString += hostAndPortToUrlString(
-							KafkaTestEnvironment.KAFKA_HOST,
-							brokers.get(i).socketServer().boundPort(
-									ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)));
-				}
+				KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i));
+				brokers.add(kafkaServer);
+				brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(listenerName));
 				brokerConnectionString +=  ",";
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/14bcac7b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 676e588..26b41e6 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -30,7 +30,6 @@ import org.apache.flink.util.NetUtils;
 import kafka.admin.AdminUtils;
 import kafka.api.PartitionMetadata;
 import kafka.common.KafkaException;
-import kafka.network.SocketServer;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
 import kafka.utils.SystemTime$;
@@ -236,15 +235,12 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 			LOG.info("Starting KafkaServer");
 			brokers = new ArrayList<>(config.getKafkaServersNumber());
 
+			SecurityProtocol securityProtocol = config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT;
 			for (int i = 0; i < config.getKafkaServersNumber(); i++) {
-				brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
-
-				SocketServer socketServer = brokers.get(i).socketServer();
-				if (this.config.isSecureMode()) {
-					brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ",";
-				} else {
-					brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
-				}
+				KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i));
+				brokers.add(kafkaServer);
+				brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(securityProtocol));
+				brokerConnectionString +=  ",";
 			}
 
 			LOG.info("ZK and KafkaServer started.");


[3/4] flink git commit: [FLINK-7343][Kafka] Use NetworkFailureProxy in kafka tests

Posted by sr...@apache.org.
[FLINK-7343][Kafka] Use NetworkFailureProxy in kafka tests

We shouldn't fail KafkaServers directly, because they might not be able
to flush the data. Since we don't want to test how well Kafka implements
at-least-once/exactly-once semantic, we just simulate network failure
between Flink and Kafka in our at-least-once tests.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cd373efe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cd373efe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cd373efe

Branch: refs/heads/master
Commit: cd373efe7171d71c0797a30efa256c91ca7d2714
Parents: e11a591
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Thu Aug 3 11:27:12 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Tue Aug 8 10:13:02 2017 +0200

----------------------------------------------------------------------
 .../kafka/KafkaTestEnvironmentImpl.java         |  6 +++
 .../connectors/kafka/Kafka08ITCase.java         |  7 +++
 .../kafka/KafkaTestEnvironmentImpl.java         |  8 +++-
 .../kafka/Kafka09SecuredRunITCase.java          |  4 +-
 .../kafka/KafkaTestEnvironmentImpl.java         |  6 +++
 .../connectors/kafka/KafkaProducerTestBase.java | 49 ++++++--------------
 .../kafka/KafkaShortRetentionTestBase.java      |  2 +-
 .../connectors/kafka/KafkaTestBase.java         | 16 +++++--
 .../connectors/kafka/KafkaTestEnvironment.java  | 37 ++++++++++++++-
 9 files changed, 89 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cd373efe/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 9f1d379..5be802f 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.networking.NetworkFailuresProxy;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.operators.StreamSink;
@@ -414,6 +415,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 			int kafkaPort = NetUtils.getAvailablePort();
 			kafkaProperties.put("port", Integer.toString(kafkaPort));
 
+			if (config.isHideKafkaBehindProxy()) {
+				NetworkFailuresProxy proxy = createProxy(KAFKA_HOST, kafkaPort);
+				kafkaProperties.put("advertised.port", proxy.getLocalPort());
+			}
+
 			//to support secure kafka cluster
 			if (config.isSecureMode()) {
 				LOG.info("Adding Kafka secure configurations");

http://git-wip-us.apache.org/repos/asf/flink/blob/cd373efe/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index 91dc929..b3afa57 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.util.Properties;
@@ -36,6 +37,12 @@ import static org.junit.Assert.fail;
  */
 public class Kafka08ITCase extends KafkaConsumerTestBase {
 
+	@BeforeClass
+	public static void prepare() throws ClassNotFoundException {
+		// Somehow KafkaConsumer 0.8 doesn't handle broker failures if they are behind a proxy
+		prepare(false);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Suite of Tests
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/cd373efe/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index af5ad67..eb1f57e 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.networking.NetworkFailuresProxy;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.operators.StreamSink;
@@ -84,7 +85,6 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	private String zookeeperConnectionString;
 	private String brokerConnectionString = "";
 	private Properties standardProps;
-
 	private Config config;
 
 	public String getBrokerConnectionString() {
@@ -401,6 +401,12 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		for (int i = 1; i <= numTries; i++) {
 			int kafkaPort = NetUtils.getAvailablePort();
 			kafkaProperties.put("port", Integer.toString(kafkaPort));
+
+			if (config.isHideKafkaBehindProxy()) {
+				NetworkFailuresProxy proxy = createProxy(KAFKA_HOST, kafkaPort);
+				kafkaProperties.put("advertised.port", Integer.toString(proxy.getLocalPort()));
+			}
+
 			KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
 
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/cd373efe/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
index d41cd91..b4002c7 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
@@ -41,11 +41,11 @@ public class Kafka09SecuredRunITCase extends KafkaConsumerTestBase {
 		SecureTestEnvironment.prepare(tempFolder);
 		SecureTestEnvironment.populateFlinkSecureConfigurations(getFlinkConfiguration());
 
-		startClusters(true);
+		startClusters(true, false);
 	}
 
 	@AfterClass
-	public static void shutDownServices() {
+	public static void shutDownServices() throws Exception {
 		shutdownClusters();
 		SecureTestEnvironment.cleanup();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd373efe/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 517f096..676e588 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.networking.NetworkFailuresProxy;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.operators.StreamSink;
@@ -407,6 +408,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 			int kafkaPort = NetUtils.getAvailablePort();
 			kafkaProperties.put("port", Integer.toString(kafkaPort));
 
+			if (config.isHideKafkaBehindProxy()) {
+				NetworkFailuresProxy proxy = createProxy(KAFKA_HOST, kafkaPort);
+				kafkaProperties.put("advertised.port", proxy.getLocalPort());
+			}
+
 			//to support secure kafka cluster
 			if (config.isSecureMode()) {
 				LOG.info("Adding Kafka secure configurations");

http://git-wip-us.apache.org/repos/asf/flink/blob/cd373efe/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 1af9ca8..4a61103 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -46,7 +46,6 @@ import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Preconditions;
 
 import com.google.common.collect.ImmutableSet;
-import kafka.server.KafkaServer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.Test;
 
@@ -214,7 +213,8 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 
 	/**
 	 * This test sets KafkaProducer so that it will not automatically flush the data and
-	 * and fails the broker to check whether FlinkKafkaProducer flushed records manually on snapshotState.
+	 * simulate network failure between Flink and Kafka to check whether FlinkKafkaProducer
+	 * flushed records manually on snapshotState.
 	 */
 	protected void testOneToOneAtLeastOnce(boolean regularSink) throws Exception {
 		final String topic = regularSink ? "oneToOneTopicRegularSink" : "oneToOneTopicCustomOperator";
@@ -243,13 +243,12 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 		properties.setProperty("batch.size", "10240000");
 		properties.setProperty("linger.ms", "10000");
 
-		int leaderId = kafkaServer.getLeaderToShutDown(topic);
-		BrokerRestartingMapper.resetState();
+		BrokerRestartingMapper.resetState(kafkaServer::blockProxyTraffic);
 
 		// process exactly failAfterElements number of elements and then shutdown Kafka broker and fail application
 		DataStream<Integer> inputStream = env
 			.fromCollection(getIntegersSequence(numElements))
-			.map(new BrokerRestartingMapper<Integer>(leaderId, failAfterElements));
+			.map(new BrokerRestartingMapper<>(failAfterElements));
 
 		StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
 			@Override
@@ -276,10 +275,10 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 			fail("Job should fail!");
 		}
 		catch (JobExecutionException ex) {
-			assertEquals("Broker was shutdown!", ex.getCause().getMessage());
+			// ignore error, it can be one of many errors so it would be hard to check the exception message/cause
 		}
 
-		kafkaServer.restartBroker(leaderId);
+		kafkaServer.unblockProxyTraffic();
 
 		// assert that before failure we successfully snapshot/flushed all expected elements
 		assertAtLeastOnceForTopic(
@@ -438,22 +437,22 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 		public static volatile boolean restartedLeaderBefore;
 		public static volatile boolean hasBeenCheckpointedBeforeFailure;
 		public static volatile int numElementsBeforeSnapshot;
+		public static volatile Runnable shutdownAction;
 
-		private final int shutdownBrokerId;
 		private final int failCount;
 		private int numElementsTotal;
 
 		private boolean failer;
 		private boolean hasBeenCheckpointed;
 
-		public static void resetState() {
+		public static void resetState(Runnable shutdownAction) {
 			restartedLeaderBefore = false;
 			hasBeenCheckpointedBeforeFailure = false;
 			numElementsBeforeSnapshot = 0;
+			BrokerRestartingMapper.shutdownAction = shutdownAction;
 		}
 
-		public BrokerRestartingMapper(int shutdownBrokerId, int failCount) {
-			this.shutdownBrokerId = shutdownBrokerId;
+		public BrokerRestartingMapper(int failCount) {
 			this.failCount = failCount;
 		}
 
@@ -471,31 +470,9 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 
 				if (failer && numElementsTotal >= failCount) {
 					// shut down a Kafka broker
-					KafkaServer toShutDown = null;
-					for (KafkaServer server : kafkaServer.getBrokers()) {
-
-						if (kafkaServer.getBrokerId(server) == shutdownBrokerId) {
-							toShutDown = server;
-							break;
-						}
-					}
-
-					if (toShutDown == null) {
-						StringBuilder listOfBrokers = new StringBuilder();
-						for (KafkaServer server : kafkaServer.getBrokers()) {
-							listOfBrokers.append(kafkaServer.getBrokerId(server));
-							listOfBrokers.append(" ; ");
-						}
-
-						throw new Exception("Cannot find broker to shut down: " + shutdownBrokerId
-												+ " ; available brokers: " + listOfBrokers.toString());
-					} else {
-						hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;
-						restartedLeaderBefore = true;
-						toShutDown.shutdown();
-						toShutDown.awaitShutdown();
-						throw new Exception("Broker was shutdown!");
-					}
+					hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;
+					restartedLeaderBefore = true;
+					shutdownAction.run();
 				}
 			}
 			return value;

http://git-wip-us.apache.org/repos/asf/flink/blob/cd373efe/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index 3163f52..fbf902f 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -115,7 +115,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
 	}
 
 	@AfterClass
-	public static void shutDownServices() {
+	public static void shutDownServices() throws Exception {
 		TestStreamEnvironment.unsetAsContext();
 
 		if (flink != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/cd373efe/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 8eb0351..19f38e2 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -90,18 +90,21 @@ public abstract class KafkaTestBase extends TestLogger {
 
 	@BeforeClass
 	public static void prepare() throws ClassNotFoundException {
+		prepare(true);
+	}
 
+	public static void prepare(boolean hideKafkaBehindProxy) throws ClassNotFoundException {
 		LOG.info("-------------------------------------------------------------------------");
 		LOG.info("    Starting KafkaTestBase ");
 		LOG.info("-------------------------------------------------------------------------");
 
-		startClusters(false);
+		startClusters(false, hideKafkaBehindProxy);
 
 		TestStreamEnvironment.setAsContext(flink, PARALLELISM);
 	}
 
 	@AfterClass
-	public static void shutDownServices() {
+	public static void shutDownServices() throws Exception {
 
 		LOG.info("-------------------------------------------------------------------------");
 		LOG.info("    Shut down KafkaTestBase ");
@@ -127,7 +130,7 @@ public abstract class KafkaTestBase extends TestLogger {
 		return flinkConfig;
 	}
 
-	protected static void startClusters(boolean secureMode) throws ClassNotFoundException {
+	protected static void startClusters(boolean secureMode, boolean hideKafkaBehindProxy) throws ClassNotFoundException {
 
 		// dynamically load the implementation for the test
 		Class<?> clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
@@ -135,7 +138,10 @@ public abstract class KafkaTestBase extends TestLogger {
 
 		LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion());
 
-		kafkaServer.prepare(kafkaServer.createConfig().setKafkaServersNumber(NUMBER_OF_KAFKA_SERVERS).setSecureMode(secureMode));
+		kafkaServer.prepare(kafkaServer.createConfig()
+			.setKafkaServersNumber(NUMBER_OF_KAFKA_SERVERS)
+			.setSecureMode(secureMode)
+			.setHideKafkaBehindProxy(hideKafkaBehindProxy));
 
 		standardProps = kafkaServer.getStandardProperties();
 
@@ -154,7 +160,7 @@ public abstract class KafkaTestBase extends TestLogger {
 		flink.start();
 	}
 
-	protected static void shutdownClusters() {
+	protected static void shutdownClusters() throws Exception {
 
 		if (flink != null) {
 			flink.shutdown();

http://git-wip-us.apache.org/repos/asf/flink/blob/cd373efe/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index ea292a9..21171f8 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.networking.NetworkFailuresProxy;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.operators.StreamSink;
@@ -29,6 +30,7 @@ import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import kafka.server.KafkaServer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -45,6 +47,7 @@ public abstract class KafkaTestEnvironment {
 		private int kafkaServersNumber = 1;
 		private Properties kafkaServerProperties = null;
 		private boolean secureMode = false;
+		private boolean hideKafkaBehindProxy = false;
 
 		/**
 		 * Please use {@link KafkaTestEnvironment#createConfig()} method.
@@ -78,17 +81,32 @@ public abstract class KafkaTestEnvironment {
 			this.secureMode = secureMode;
 			return this;
 		}
+
+		public boolean isHideKafkaBehindProxy() {
+			return hideKafkaBehindProxy;
+		}
+
+		public Config setHideKafkaBehindProxy(boolean hideKafkaBehindProxy) {
+			this.hideKafkaBehindProxy = hideKafkaBehindProxy;
+			return this;
+		}
 	}
 
 	protected static final String KAFKA_HOST = "localhost";
 
+	protected final List<NetworkFailuresProxy> networkFailuresProxies = new ArrayList<>();
+
 	public static Config createConfig() {
 		return new Config();
 	}
 
 	public abstract void prepare(Config config);
 
-	public abstract void shutdown();
+	public void shutdown() throws Exception {
+		for (NetworkFailuresProxy proxy : networkFailuresProxies) {
+			proxy.close();
+		}
+	}
 
 	public abstract void deleteTestTopic(String topic);
 
@@ -168,4 +186,21 @@ public abstract class KafkaTestEnvironment {
 
 	public abstract boolean isSecureRunSupported();
 
+	public void blockProxyTraffic() {
+		for (NetworkFailuresProxy proxy : networkFailuresProxies) {
+			proxy.blockTraffic();
+		}
+	}
+
+	public void unblockProxyTraffic() {
+		for (NetworkFailuresProxy proxy : networkFailuresProxies) {
+			proxy.unblockTraffic();
+		}
+	}
+
+	protected NetworkFailuresProxy createProxy(String remoteHost, int remotePort) {
+		NetworkFailuresProxy proxy = new NetworkFailuresProxy(0, remoteHost, remotePort);
+		networkFailuresProxies.add(proxy);
+		return proxy;
+	}
 }


[4/4] flink git commit: [FLINK-7343][utils] Add network proxy utility to simulate network failures

Posted by sr...@apache.org.
[FLINK-7343][utils] Add network proxy utility to simulate network failures


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7f96f79
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7f96f79
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7f96f79

Branch: refs/heads/master
Commit: b7f96f79e7665f10880333d816d1694a227c5437
Parents: e2d3e1f
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Tue Aug 1 18:11:27 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Tue Aug 8 10:13:02 2017 +0200

----------------------------------------------------------------------
 .../flink/networking/NetworkFailureHandler.java | 178 +++++++++++++++++++
 .../flink/networking/NetworkFailuresProxy.java  | 125 +++++++++++++
 .../org/apache/flink/networking/EchoServer.java | 113 ++++++++++++
 .../networking/NetworkFailuresProxyTest.java    | 124 +++++++++++++
 4 files changed, 540 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b7f96f79/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java
new file mode 100644
index 0000000..0ce0b12
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.networking;
+
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+/**
+ * Handler that is forwarding inbound traffic from the source channel to the target channel on remoteHost:remotePort
+ * and the responses in the opposite direction. All of the network traffic can be blocked at any time using blocked
+ * flag.
+ */
+class NetworkFailureHandler extends SimpleChannelUpstreamHandler {
+	private static final Logger LOG = LoggerFactory.getLogger(NetworkFailureHandler.class);
+	private static final String TARGET_CHANNEL_HANDLER_NAME = "target_channel_handler";
+
+	// mapping between source and target channels, used for finding correct target channel to use for given source.
+	private final Map<Channel, Channel> sourceToTargetChannels = new ConcurrentHashMap<>();
+	private final Consumer<NetworkFailureHandler> onClose;
+	private final ClientSocketChannelFactory channelFactory;
+	private final String remoteHost;
+	private final int remotePort;
+
+	private final AtomicBoolean blocked;
+
+	public NetworkFailureHandler(
+			AtomicBoolean blocked,
+			Consumer<NetworkFailureHandler> onClose,
+			ClientSocketChannelFactory channelFactory,
+			String remoteHost,
+			int remotePort) {
+		this.blocked = blocked;
+		this.onClose = onClose;
+		this.channelFactory = channelFactory;
+		this.remoteHost = remoteHost;
+		this.remotePort = remotePort;
+	}
+
+	/**
+	 * Closes the specified channel after all queued write requests are flushed.
+	 */
+	static void closeOnFlush(Channel channel) {
+		if (channel.isConnected()) {
+			channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
+		}
+	}
+
+	public void closeConnections() {
+		for (Map.Entry<Channel, Channel> entry : sourceToTargetChannels.entrySet()) {
+			// target channel is closed on source's channel channelClosed even
+			entry.getKey().close();
+		}
+	}
+
+	@Override
+	public void channelOpen(ChannelHandlerContext context, ChannelStateEvent event) throws Exception {
+		// Suspend incoming traffic until connected to the remote host.
+		final Channel sourceChannel = event.getChannel();
+		sourceChannel.setReadable(false);
+
+		if (blocked.get()) {
+			sourceChannel.close();
+			return;
+		}
+
+		// Start the connection attempt.
+		ClientBootstrap targetConnectionBootstrap = new ClientBootstrap(channelFactory);
+		targetConnectionBootstrap.getPipeline().addLast(TARGET_CHANNEL_HANDLER_NAME, new TargetChannelHandler(event.getChannel(), blocked));
+		ChannelFuture connectFuture = targetConnectionBootstrap.connect(new InetSocketAddress(remoteHost, remotePort));
+		sourceToTargetChannels.put(sourceChannel, connectFuture.getChannel());
+
+		connectFuture.addListener(future -> {
+			if (future.isSuccess()) {
+				// Connection attempt succeeded:
+				// Begin to accept incoming traffic.
+				sourceChannel.setReadable(true);
+			} else {
+				// Close the connection if the connection attempt has failed.
+				sourceChannel.close();
+			}
+		});
+	}
+
+	@Override
+	public void messageReceived(ChannelHandlerContext context, MessageEvent event) throws Exception {
+		if (blocked.get()) {
+			return;
+		}
+
+		ChannelBuffer msg = (ChannelBuffer) event.getMessage();
+		Channel targetChannel = sourceToTargetChannels.get(event.getChannel());
+		if (targetChannel == null) {
+			throw new IllegalStateException("Could not find a target channel for the source channel");
+		}
+		targetChannel.write(msg);
+	}
+
+	@Override
+	public void channelClosed(ChannelHandlerContext context, ChannelStateEvent event) throws Exception {
+		Channel targetChannel = sourceToTargetChannels.get(event.getChannel());
+		if (targetChannel == null) {
+			return;
+		}
+		closeOnFlush(targetChannel);
+		sourceToTargetChannels.remove(event.getChannel());
+		onClose.accept(this);
+	}
+
+	@Override
+	public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) throws Exception {
+		LOG.error("Closing communication channel because of an exception", event.getCause());
+		closeOnFlush(event.getChannel());
+	}
+
+	private static class TargetChannelHandler extends SimpleChannelUpstreamHandler {
+		private final Channel sourceChannel;
+		private final AtomicBoolean blocked;
+
+		TargetChannelHandler(Channel sourceChannel, AtomicBoolean blocked) {
+			this.sourceChannel = sourceChannel;
+			this.blocked = blocked;
+		}
+
+		@Override
+		public void messageReceived(ChannelHandlerContext context, MessageEvent event) throws Exception {
+			if (blocked.get()) {
+				return;
+			}
+			ChannelBuffer msg = (ChannelBuffer) event.getMessage();
+			sourceChannel.write(msg);
+		}
+
+		@Override
+		public void channelClosed(ChannelHandlerContext context, ChannelStateEvent event) throws Exception {
+			closeOnFlush(sourceChannel);
+		}
+
+		@Override
+		public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) throws Exception {
+			LOG.error("Closing communication channel because of an exception", event.getCause());
+			closeOnFlush(event.getChannel());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7f96f79/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailuresProxy.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailuresProxy.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailuresProxy.java
new file mode 100644
index 0000000..7030049
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailuresProxy.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.networking;
+
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This class acts as a network proxy - listening on local port and forwarding all of the network to the remote
+ * host/port. It allows to simulate a network failures in the communication.
+ */
+public class NetworkFailuresProxy implements AutoCloseable {
+	private static final Logger LOG = LoggerFactory.getLogger(NetworkFailuresProxy.class);
+	private static final String NETWORK_FAILURE_HANDLER_NAME = "network_failure_handler";
+
+	private final Executor executor = Executors.newCachedThreadPool();
+	private final ServerBootstrap serverBootstrap;
+	private final Channel channel;
+	private final AtomicBoolean blocked = new AtomicBoolean();
+	// collection of networkFailureHandlers so that we can call {@link NetworkFailureHandler.closeConnections} on them.
+	private final Set<NetworkFailureHandler> networkFailureHandlers = Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+	public NetworkFailuresProxy(int localPort, String remoteHost, int remotePort) {
+		LOG.info("Proxying [*:{}] to [{}:{}]", localPort, remoteHost, remotePort);
+
+		// Configure the bootstrap.
+		serverBootstrap = new ServerBootstrap(
+			new NioServerSocketChannelFactory(executor, executor));
+
+		// Set up the event pipeline factory.
+		ClientSocketChannelFactory channelFactory = new NioClientSocketChannelFactory(executor, executor);
+		serverBootstrap.setOption("child.tcpNoDelay", true);
+		serverBootstrap.setOption("child.keepAlive", true);
+		serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+			public ChannelPipeline getPipeline() throws Exception {
+				ChannelPipeline pipeline = Channels.pipeline();
+
+				// synchronized for a race between blocking and creating new handlers
+				synchronized (networkFailureHandlers) {
+					NetworkFailureHandler failureHandler = new NetworkFailureHandler(
+						blocked,
+						networkFailureHandler -> networkFailureHandlers.remove(networkFailureHandler),
+						channelFactory,
+						remoteHost,
+						remotePort);
+					networkFailureHandlers.add(failureHandler);
+					pipeline.addLast(NETWORK_FAILURE_HANDLER_NAME, failureHandler);
+				}
+				return pipeline;
+			}
+		});
+		channel = serverBootstrap.bind(new InetSocketAddress(localPort));
+
+	}
+
+	/**
+	 * @return local port on which {@link NetworkFailuresProxy} is listening.
+	 */
+	public int getLocalPort() {
+		return ((InetSocketAddress) channel.getLocalAddress()).getPort();
+	}
+
+	/**
+	 * Blocks all ongoing traffic, closes all ongoing and closes any new incoming connections.
+	 */
+	public void blockTraffic() {
+		setTrafficBlocked(true);
+	}
+
+	/**
+	 * Resumes normal communication.
+	 */
+	public void unblockTraffic() {
+		setTrafficBlocked(false);
+	}
+
+	@Override
+	public void close() throws Exception {
+		channel.close();
+	}
+
+	private void setTrafficBlocked(boolean blocked) {
+		this.blocked.set(blocked);
+		if (blocked) {
+			// synchronized for a race between blocking and creating new handlers
+			synchronized (networkFailureHandlers) {
+				for (NetworkFailureHandler failureHandler : networkFailureHandlers) {
+					failureHandler.closeConnections();
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7f96f79/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/EchoServer.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/EchoServer.java b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/EchoServer.java
new file mode 100644
index 0000000..06e77ea
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/EchoServer.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.networking;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * TCP EchoServer for test purposes.
+ */
+public class EchoServer extends Thread implements AutoCloseable {
+	private final ServerSocket serverSocket = new ServerSocket(0);
+	private final int socketTimeout;
+	private final List<EchoWorkerThread> workerThreads = Collections.synchronizedList(new ArrayList<>());
+
+	private volatile boolean close = false;
+	private Exception threadException;
+
+	public EchoServer(int socketTimeout) throws IOException {
+		serverSocket.setSoTimeout(socketTimeout);
+		this.socketTimeout = socketTimeout;
+	}
+
+	public int getLocalPort() {
+		return serverSocket.getLocalPort();
+	}
+
+	@Override
+	public void run() {
+		while (!close) {
+			try {
+				EchoWorkerThread thread = new EchoWorkerThread(serverSocket.accept(), socketTimeout);
+				thread.start();
+			} catch (IOException e) {
+				threadException = e;
+			}
+		}
+	}
+
+	@Override
+	public void close() throws Exception {
+		for (EchoWorkerThread thread : workerThreads) {
+			thread.close();
+			thread.join();
+		}
+		close = true;
+		if (threadException != null) {
+			throw threadException;
+		}
+		serverSocket.close();
+		this.join();
+	}
+
+	private static class EchoWorkerThread extends Thread implements AutoCloseable {
+		private final PrintWriter output;
+		private final BufferedReader input;
+
+		private volatile boolean close;
+		private Exception threadException;
+
+		public EchoWorkerThread(Socket clientSocket, int socketTimeout) throws IOException {
+			output = new PrintWriter(clientSocket.getOutputStream(), true);
+			input = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
+			clientSocket.setSoTimeout(socketTimeout);
+		}
+
+		@Override
+		public void run() {
+			try {
+				String inputLine;
+				while (!close && (inputLine = input.readLine()) != null) {
+					output.println(inputLine);
+				}
+			} catch (IOException e) {
+				threadException = e;
+			}
+		}
+
+		@Override
+		public void close() throws Exception {
+			close = true;
+			if (threadException != null) {
+				throw threadException;
+			}
+			input.close();
+			output.close();
+			this.join();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7f96f79/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/NetworkFailuresProxyTest.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/NetworkFailuresProxyTest.java b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/NetworkFailuresProxyTest.java
new file mode 100644
index 0000000..0046868
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/NetworkFailuresProxyTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.networking;
+
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.net.Socket;
+import java.net.SocketException;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for NetworkFailuresProxy.
+ */
+public class NetworkFailuresProxyTest {
+	public static final int SOCKET_TIMEOUT = 500_000;
+
+	@Test
+	public void testProxy() throws Exception {
+		try (
+				EchoServer echoServer = new EchoServer(SOCKET_TIMEOUT);
+				NetworkFailuresProxy proxy = new NetworkFailuresProxy(0, "localhost", echoServer.getLocalPort());
+				EchoClient echoClient = new EchoClient("localhost", proxy.getLocalPort(), SOCKET_TIMEOUT)) {
+			echoServer.start();
+
+			assertEquals("42", echoClient.write("42"));
+			assertEquals("Ala ma kota!", echoClient.write("Ala ma kota!"));
+		}
+	}
+
+	@Test
+	public void testMultipleConnections() throws Exception {
+		try (
+				EchoServer echoServer = new EchoServer(SOCKET_TIMEOUT);
+				NetworkFailuresProxy proxy = new NetworkFailuresProxy(0, "localhost", echoServer.getLocalPort());
+				EchoClient echoClient1 = new EchoClient("localhost", proxy.getLocalPort(), SOCKET_TIMEOUT);
+				EchoClient echoClient2 = new EchoClient("localhost", proxy.getLocalPort(), SOCKET_TIMEOUT)) {
+			echoServer.start();
+
+			assertEquals("42", echoClient1.write("42"));
+			assertEquals("Ala ma kota!", echoClient2.write("Ala ma kota!"));
+			assertEquals("Ala hat eine Katze!", echoClient1.write("Ala hat eine Katze!"));
+		}
+	}
+
+	@Test
+	public void testBlockTraffic() throws Exception {
+		try (
+				EchoServer echoServer = new EchoServer(SOCKET_TIMEOUT);
+				NetworkFailuresProxy proxy = new NetworkFailuresProxy(0, "localhost", echoServer.getLocalPort())) {
+			echoServer.start();
+
+			try (EchoClient echoClient = new EchoClient("localhost", proxy.getLocalPort(), SOCKET_TIMEOUT)) {
+				assertEquals("42", echoClient.write("42"));
+				proxy.blockTraffic();
+				try {
+					echoClient.write("Ala ma kota!");
+				} catch (SocketException ex) {
+					assertEquals("Connection reset", ex.getMessage());
+				}
+			}
+
+			try (EchoClient echoClient = new EchoClient("localhost", proxy.getLocalPort(), SOCKET_TIMEOUT)) {
+				assertEquals(null, echoClient.write("42"));
+			} catch (SocketException ex) {
+				assertEquals("Connection reset", ex.getMessage());
+			}
+
+			proxy.unblockTraffic();
+			try (EchoClient echoClient = new EchoClient("localhost", proxy.getLocalPort(), SOCKET_TIMEOUT)) {
+				assertEquals("42", echoClient.write("42"));
+				assertEquals("Ala ma kota!", echoClient.write("Ala ma kota!"));
+			}
+		}
+	}
+
+	/**
+	 * Simple echo client that sends a message over the network and waits for the answer.
+	 */
+	public static class EchoClient implements AutoCloseable {
+		private final Socket socket;
+		private final PrintWriter output;
+		private final BufferedReader input;
+
+		public EchoClient(String hostName, int portNumber, int socketTimeout) throws IOException {
+			socket = new Socket(hostName, portNumber);
+			socket.setSoTimeout(socketTimeout);
+			output = new PrintWriter(socket.getOutputStream(), true);
+			input = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+		}
+
+		public String write(String message) throws IOException {
+			output.println(message);
+			return input.readLine();
+		}
+
+		@Override
+		public void close() throws Exception {
+			input.close();
+			output.close();
+			socket.close();
+		}
+	}
+}


[2/4] flink git commit: [hotfix][Kafka] Refactor properties for KafkaTestEnvironment setup

Posted by sr...@apache.org.
[hotfix][Kafka] Refactor properties for KafkaTestEnvironment setup


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e11a5919
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e11a5919
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e11a5919

Branch: refs/heads/master
Commit: e11a591956ba608308ccf81e13030291f150739b
Parents: b7f96f7
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Mon Aug 7 15:53:35 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Tue Aug 8 10:13:02 2017 +0200

----------------------------------------------------------------------
 .../kafka/KafkaTestEnvironmentImpl.java         | 33 ++++++-------
 .../kafka/KafkaTestEnvironmentImpl.java         | 19 ++++----
 .../kafka/KafkaTestEnvironmentImpl.java         | 35 +++++++-------
 .../kafka/KafkaShortRetentionTestBase.java      |  2 +-
 .../connectors/kafka/KafkaTestBase.java         |  2 +-
 .../connectors/kafka/KafkaTestEnvironment.java  | 49 ++++++++++++++++++--
 6 files changed, 88 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e11a5919/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index d3b45a9..9f1d379 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -79,8 +79,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	private String zookeeperConnectionString;
 	private String brokerConnectionString = "";
 	private Properties standardProps;
-	private Properties additionalServerProperties;
-	private boolean secureMode = false;
+	private Config config;
 	// 6 seconds is default. Seems to be too small for travis. 30 seconds
 	private int zkTimeout = 30000;
 
@@ -96,7 +95,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	@Override
 	public Properties getSecureProperties() {
 		Properties prop = new Properties();
-		if (secureMode) {
+		if (config.isSecureMode()) {
 			prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
 			prop.put("security.protocol", "SASL_PLAINTEXT");
 			prop.put("sasl.kerberos.service.name", "kafka");
@@ -215,26 +214,24 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) {
+	public void prepare(Config config) {
 		//increase the timeout since in Travis ZK connection takes long time for secure connection.
-		if (secureMode) {
+		if (config.isSecureMode()) {
 			//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
-			numKafkaServers = 1;
+			config.setKafkaServersNumber(1);
 			zkTimeout = zkTimeout * 15;
 		}
+		this.config = config;
 
-		this.additionalServerProperties = additionalServerProperties;
-		this.secureMode = secureMode;
 		File tempDir = new File(System.getProperty("java.io.tmpdir"));
-
 		tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
 		assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs());
 
 		tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + (UUID.randomUUID().toString()));
 		assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());
 
-		tmpKafkaDirs = new ArrayList<>(numKafkaServers);
-		for (int i = 0; i < numKafkaServers; i++) {
+		tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber());
+		for (int i = 0; i < config.getKafkaServersNumber(); i++) {
 			File tmpDir = new File(tmpKafkaParent, "server-" + i);
 			assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
 			tmpKafkaDirs.add(tmpDir);
@@ -249,12 +246,12 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 			LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
 
 			LOG.info("Starting KafkaServer");
-			brokers = new ArrayList<>(numKafkaServers);
+			brokers = new ArrayList<>(config.getKafkaServersNumber());
 
-			for (int i = 0; i < numKafkaServers; i++) {
+			for (int i = 0; i < config.getKafkaServersNumber(); i++) {
 				brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
 
-				if (secureMode) {
+				if (config.isSecureMode()) {
 					brokerConnectionString += hostAndPortToUrlString(
 							KafkaTestEnvironment.KAFKA_HOST,
 							brokers.get(i).socketServer().boundPort(
@@ -347,7 +344,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		final long deadline = System.nanoTime() + 30_000_000_000L;
 		do {
 			try {
-				if (secureMode) {
+				if (config.isSecureMode()) {
 					//increase wait time since in Travis ZK timeout occurs frequently
 					int wait = zkTimeout / 100;
 					LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic);
@@ -407,8 +404,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		// for CI stability, increase zookeeper session timeout
 		kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout);
 		kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout);
-		if (additionalServerProperties != null) {
-			kafkaProperties.putAll(additionalServerProperties);
+		if (config.getKafkaServerProperties() != null) {
+			kafkaProperties.putAll(config.getKafkaServerProperties());
 		}
 
 		final int numTries = 5;
@@ -418,7 +415,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 			kafkaProperties.put("port", Integer.toString(kafkaPort));
 
 			//to support secure kafka cluster
-			if (secureMode) {
+			if (config.isSecureMode()) {
 				LOG.info("Adding Kafka secure configurations");
 				kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
 				kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);

http://git-wip-us.apache.org/repos/asf/flink/blob/e11a5919/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index ab976e1..af5ad67 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -84,7 +84,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	private String zookeeperConnectionString;
 	private String brokerConnectionString = "";
 	private Properties standardProps;
-	private Properties additionalServerProperties;
+
+	private Config config;
 
 	public String getBrokerConnectionString() {
 		return brokerConnectionString;
@@ -206,8 +207,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) {
-		this.additionalServerProperties = additionalServerProperties;
+	public void prepare(Config config) {
+		this.config = config;
 		File tempDir = new File(System.getProperty("java.io.tmpdir"));
 
 		tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
@@ -224,8 +225,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 			fail("cannot create kafka temp dir: " + e.getMessage());
 		}
 
-		tmpKafkaDirs = new ArrayList<>(numKafkaServers);
-		for (int i = 0; i < numKafkaServers; i++) {
+		tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber());
+		for (int i = 0; i < config.getKafkaServersNumber(); i++) {
 			File tmpDir = new File(tmpKafkaParent, "server-" + i);
 			assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
 			tmpKafkaDirs.add(tmpDir);
@@ -240,9 +241,9 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 			zookeeperConnectionString = zookeeper.getConnectString();
 
 			LOG.info("Starting KafkaServer");
-			brokers = new ArrayList<>(numKafkaServers);
+			brokers = new ArrayList<>(config.getKafkaServersNumber());
 
-			for (int i = 0; i < numKafkaServers; i++) {
+			for (int i = 0; i < config.getKafkaServersNumber(); i++) {
 				brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
 				SocketServer socketServer = brokers.get(i).socketServer();
 
@@ -391,8 +392,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		// for CI stability, increase zookeeper session timeout
 		kafkaProperties.put("zookeeper.session.timeout.ms", "30000");
 		kafkaProperties.put("zookeeper.connection.timeout.ms", "30000");
-		if (additionalServerProperties != null) {
-			kafkaProperties.putAll(additionalServerProperties);
+		if (config.getKafkaServerProperties() != null) {
+			kafkaProperties.putAll(config.getKafkaServerProperties());
 		}
 
 		final int numTries = 5;

http://git-wip-us.apache.org/repos/asf/flink/blob/e11a5919/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index df95420..517f096 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -78,11 +78,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	private String zookeeperConnectionString;
 	private String brokerConnectionString = "";
 	private Properties standardProps;
-	private Properties additionalServerProperties;
-	private boolean secureMode = false;
 	// 6 seconds is default. Seems to be too small for travis. 30 seconds
 	private String zkTimeout = "30000";
 
+	private Config config;
+
 	public String getBrokerConnectionString() {
 		return brokerConnectionString;
 	}
@@ -200,27 +200,24 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) {
-
+	public void prepare(Config config) {
 		//increase the timeout since in Travis ZK connection takes long time for secure connection.
-		if (secureMode) {
+		if (config.isSecureMode()) {
 			//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
-			numKafkaServers = 1;
+			config.setKafkaServersNumber(1);
 			zkTimeout = String.valueOf(Integer.parseInt(zkTimeout) * 15);
 		}
+		this.config = config;
 
-		this.additionalServerProperties = additionalServerProperties;
-		this.secureMode = secureMode;
 		File tempDir = new File(System.getProperty("java.io.tmpdir"));
-
 		tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
 		assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs());
 
 		tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + (UUID.randomUUID().toString()));
 		assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());
 
-		tmpKafkaDirs = new ArrayList<>(numKafkaServers);
-		for (int i = 0; i < numKafkaServers; i++) {
+		tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber());
+		for (int i = 0; i < config.getKafkaServersNumber(); i++) {
 			File tmpDir = new File(tmpKafkaParent, "server-" + i);
 			assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
 			tmpKafkaDirs.add(tmpDir);
@@ -236,13 +233,13 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 			LOG.info("zookeeperConnectionString: {}", zookeeperConnectionString);
 
 			LOG.info("Starting KafkaServer");
-			brokers = new ArrayList<>(numKafkaServers);
+			brokers = new ArrayList<>(config.getKafkaServersNumber());
 
-			for (int i = 0; i < numKafkaServers; i++) {
+			for (int i = 0; i < config.getKafkaServersNumber(); i++) {
 				brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
 
 				SocketServer socketServer = brokers.get(i).socketServer();
-				if (secureMode) {
+				if (this.config.isSecureMode()) {
 					brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ",";
 				} else {
 					brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
@@ -335,7 +332,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		final long deadline = System.nanoTime() + Integer.parseInt(zkTimeout) * 1_000_000L;
 		do {
 			try {
-				if (secureMode) {
+				if (config.isSecureMode()) {
 					//increase wait time since in Travis ZK timeout occurs frequently
 					int wait = Integer.parseInt(zkTimeout) / 100;
 					LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic);
@@ -400,8 +397,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		// for CI stability, increase zookeeper session timeout
 		kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout);
 		kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout);
-		if (additionalServerProperties != null) {
-			kafkaProperties.putAll(additionalServerProperties);
+		if (config.getKafkaServerProperties() != null) {
+			kafkaProperties.putAll(config.getKafkaServerProperties());
 		}
 
 		final int numTries = 5;
@@ -411,7 +408,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 			kafkaProperties.put("port", Integer.toString(kafkaPort));
 
 			//to support secure kafka cluster
-			if (secureMode) {
+			if (config.isSecureMode()) {
 				LOG.info("Adding Kafka secure configurations");
 				kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
 				kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
@@ -442,7 +439,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 
 	public Properties getSecureProperties() {
 		Properties prop = new Properties();
-		if (secureMode) {
+		if (config.isSecureMode()) {
 			prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
 			prop.put("security.protocol", "SASL_PLAINTEXT");
 			prop.put("sasl.kerberos.service.name", "kafka");

http://git-wip-us.apache.org/repos/asf/flink/blob/e11a5919/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index d5c9276..3163f52 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -98,7 +98,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
 		specificProperties.setProperty("log.retention.minutes", "0");
 		specificProperties.setProperty("log.retention.ms", "250");
 		specificProperties.setProperty("log.retention.check.interval.ms", "100");
-		kafkaServer.prepare(1, specificProperties, false);
+		kafkaServer.prepare(kafkaServer.createConfig().setKafkaServerProperties(specificProperties));
 
 		standardProps = kafkaServer.getStandardProperties();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e11a5919/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index c484a4b..8eb0351 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -135,7 +135,7 @@ public abstract class KafkaTestBase extends TestLogger {
 
 		LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion());
 
-		kafkaServer.prepare(NUMBER_OF_KAFKA_SERVERS, secureMode);
+		kafkaServer.prepare(kafkaServer.createConfig().setKafkaServersNumber(NUMBER_OF_KAFKA_SERVERS).setSecureMode(secureMode));
 
 		standardProps = kafkaServer.getStandardProperties();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e11a5919/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 50eff23..ea292a9 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -38,15 +38,56 @@ import java.util.Properties;
  * Abstract class providing a Kafka test environment.
  */
 public abstract class KafkaTestEnvironment {
+	/**
+	 * Configuration class for {@link KafkaTestEnvironment}.
+	 */
+	public static class Config {
+		private int kafkaServersNumber = 1;
+		private Properties kafkaServerProperties = null;
+		private boolean secureMode = false;
+
+		/**
+		 * Please use {@link KafkaTestEnvironment#createConfig()} method.
+		 */
+		private Config() {
+		}
+
+		public int getKafkaServersNumber() {
+			return kafkaServersNumber;
+		}
+
+		public Config setKafkaServersNumber(int kafkaServersNumber) {
+			this.kafkaServersNumber = kafkaServersNumber;
+			return this;
+		}
+
+		public Properties getKafkaServerProperties() {
+			return kafkaServerProperties;
+		}
+
+		public Config setKafkaServerProperties(Properties kafkaServerProperties) {
+			this.kafkaServerProperties = kafkaServerProperties;
+			return this;
+		}
+
+		public boolean isSecureMode() {
+			return secureMode;
+		}
+
+		public Config setSecureMode(boolean secureMode) {
+			this.secureMode = secureMode;
+			return this;
+		}
+	}
 
 	protected static final String KAFKA_HOST = "localhost";
 
-	public abstract void prepare(int numKafkaServers, Properties kafkaServerProperties, boolean secureMode);
-
-	public void prepare(int numberOfKafkaServers, boolean secureMode) {
-		this.prepare(numberOfKafkaServers, null, secureMode);
+	public static Config createConfig() {
+		return new Config();
 	}
 
+	public abstract void prepare(Config config);
+
 	public abstract void shutdown();
 
 	public abstract void deleteTestTopic(String topic);