You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/21 09:52:45 UTC

[12/50] [abbrv] flink git commit: [FLINK-3929] Added Keytab based Kerberos support to enable secure Flink cluster deployment(addresses HDHS, Kafka and ZK services)

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 0dbe865..213ba4a 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -65,6 +65,9 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	private String brokerConnectionString = "";
 	private Properties standardProps;
 	private Properties additionalServerProperties;
+	private boolean secureMode = false;
+	// 6 seconds is default. Seems to be too small for travis. 30 seconds
+	private String zkTimeout = "30000";
 
 	public String getBrokerConnectionString() {
 		return brokerConnectionString;
@@ -131,8 +134,22 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public void prepare(int numKafkaServers, Properties additionalServerProperties) {
+	public boolean isSecureRunSupported() {
+		return true;
+	}
+
+	@Override
+	public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) {
+
+		//increase the timeout since in Travis ZK connection takes long time for secure connection.
+		if(secureMode) {
+			//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
+			numKafkaServers = 1;
+			zkTimeout = String.valueOf(Integer.parseInt(zkTimeout) * 15);
+		}
+
 		this.additionalServerProperties = additionalServerProperties;
+		this.secureMode = secureMode;
 		File tempDir = new File(System.getProperty("java.io.tmpdir"));
 
 		tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
@@ -155,6 +172,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 			LOG.info("Starting Zookeeper");
 			zookeeper = new TestingServer(-1, tmpZkDir);
 			zookeeperConnectionString = zookeeper.getConnectString();
+			LOG.info("zookeeperConnectionString: {}", zookeeperConnectionString);
 
 			LOG.info("Starting KafkaServer");
 			brokers = new ArrayList<>(numKafkaServers);
@@ -163,7 +181,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 				brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
 
 				SocketServer socketServer = brokers.get(i).socketServer();
-				brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
+				if(secureMode) {
+					brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ",";
+				} else {
+					brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
+				}
 			}
 
 			LOG.info("ZK and KafkaServer started.");
@@ -173,15 +195,18 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 			fail("Test setup failed: " + t.getMessage());
 		}
 
+		LOG.info("brokerConnectionString --> {}", brokerConnectionString);
+
 		standardProps = new Properties();
 		standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
 		standardProps.setProperty("bootstrap.servers", brokerConnectionString);
 		standardProps.setProperty("group.id", "flink-tests");
 		standardProps.setProperty("auto.commit.enable", "false");
-		standardProps.setProperty("zookeeper.session.timeout.ms", "30000"); // 6 seconds is default. Seems to be too small for travis.
-		standardProps.setProperty("zookeeper.connection.timeout.ms", "30000");
+		standardProps.setProperty("zookeeper.session.timeout.ms", zkTimeout);
+		standardProps.setProperty("zookeeper.connection.timeout.ms", zkTimeout);
 		standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. (earliest is kafka 0.9 value)
 		standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)
+
 	}
 
 	@Override
@@ -196,6 +221,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		if (zookeeper != null) {
 			try {
 				zookeeper.stop();
+				zookeeper.close();
 			}
 			catch (Exception e) {
 				LOG.warn("ZK.stop() failed", e);
@@ -224,6 +250,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	public ZkUtils getZkUtils() {
+		LOG.info("In getZKUtils:: zookeeperConnectionString = {}", zookeeperConnectionString);
 		ZkClient creator = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
 				Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer());
 		return ZkUtils.apply(creator, false);
@@ -241,23 +268,37 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 			zkUtils.close();
 		}
 
+		LOG.info("Topic {} create request is successfully posted", topic);
+
 		// validate that the topic has been created
-		final long deadline = System.currentTimeMillis() + 30000;
+		final long deadline = System.currentTimeMillis() + Integer.parseInt(zkTimeout);
 		do {
 			try {
-				Thread.sleep(100);
+				if(secureMode) {
+					//increase wait time since in Travis ZK timeout occurs frequently
+					int wait = Integer.parseInt(zkTimeout) / 100;
+					LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic);
+					Thread.sleep(wait);
+				} else {
+					Thread.sleep(100);
+				}
+
 			} catch (InterruptedException e) {
 				// restore interrupted state
 			}
 			// we could use AdminUtils.topicExists(zkUtils, topic) here, but it's results are
 			// not always correct.
 
+			LOG.info("Validating if the topic {} has been created or not", topic);
+
 			// create a new ZK utils connection
 			ZkUtils checkZKConn = getZkUtils();
 			if(AdminUtils.topicExists(checkZKConn, topic)) {
+				LOG.info("topic {} has been created successfully", topic);
 				checkZKConn.close();
 				return;
 			}
+			LOG.info("topic {} has not been created yet. Will check again...", topic);
 			checkZKConn.close();
 		}
 		while (System.currentTimeMillis() < deadline);
@@ -296,8 +337,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024));
 
 		// for CI stability, increase zookeeper session timeout
-		kafkaProperties.put("zookeeper.session.timeout.ms", "30000");
-		kafkaProperties.put("zookeeper.connection.timeout.ms", "30000");
+		kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout);
+		kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout);
 		if(additionalServerProperties != null) {
 			kafkaProperties.putAll(additionalServerProperties);
 		}
@@ -307,6 +348,15 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		for (int i = 1; i <= numTries; i++) {
 			int kafkaPort = NetUtils.getAvailablePort();
 			kafkaProperties.put("port", Integer.toString(kafkaPort));
+
+			//to support secure kafka cluster
+			if(secureMode) {
+				LOG.info("Adding Kafka secure configurations");
+				kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
+				kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
+				kafkaProperties.putAll(getSecureProperties());
+			}
+
 			KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
 
 			try {
@@ -329,4 +379,19 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		throw new Exception("Could not start Kafka after " + numTries + " retries due to port conflicts.");
 	}
 
+	public Properties getSecureProperties() {
+		Properties prop = new Properties();
+		if(secureMode) {
+			prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
+			prop.put("security.protocol", "SASL_PLAINTEXT");
+			prop.put("sasl.kerberos.service.name", "kafka");
+
+			//add special timeout for Travis
+			prop.setProperty("zookeeper.session.timeout.ms", zkTimeout);
+			prop.setProperty("zookeeper.connection.timeout.ms", zkTimeout);
+			prop.setProperty("metadata.fetch.timeout.ms","120000");
+		}
+		return prop;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
index fbeb110..4ac1773 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
@@ -28,3 +28,5 @@ log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
 log4j.logger.org.apache.zookeeper=OFF, testlogger
 log4j.logger.state.change.logger=OFF, testlogger
 log4j.logger.kafka=OFF, testlogger
+
+log4j.logger.org.apache.directory=OFF, testlogger
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/pom.xml b/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
index 49d630f..ef71bde 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
@@ -161,6 +161,14 @@ under the License.
 			<type>test-jar</type>
 			<scope>test</scope>
 		</dependency>
+
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-minikdc</artifactId>
+			<version>${minikdc.version}</version>
+			<scope>test</scope>
+		</dependency>
+
 	</dependencies>
 
 	<dependencyManagement>
@@ -187,6 +195,17 @@ under the License.
 					</execution>
 				</executions>
 			</plugin>
+			<!--
+            https://issues.apache.org/jira/browse/DIRSHARED-134
+            Required to pull the Mini-KDC transitive dependency
+            -->
+			<plugin>
+				<groupId>org.apache.felix</groupId>
+				<artifactId>maven-bundle-plugin</artifactId>
+				<version>3.0.1</version>
+				<inherited>true</inherited>
+				<extensions>true</extensions>
+			</plugin>
 		</plugins>
 	</build>
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 920f15b..a87ff8a 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -181,6 +181,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			properties.setProperty("session.timeout.ms", "2000");
 			properties.setProperty("fetch.max.wait.ms", "2000");
 			properties.setProperty("heartbeat.interval.ms", "1000");
+			properties.putAll(secureProps);
 			FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer("doesntexist", new SimpleStringSchema(), properties);
 			DataStream<String> stream = see.addSource(source);
 			stream.print();
@@ -275,6 +276,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		});
 		Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
 		producerProperties.setProperty("retries", "3");
+		producerProperties.putAll(secureProps);
 		FlinkKafkaProducerBase<Tuple2<Long, String>> prod = kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null);
 		stream.addSink(prod);
 
@@ -283,7 +285,11 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		List<String> topics = new ArrayList<>();
 		topics.add(topic);
 		topics.add(additionalEmptyTopic);
-		FlinkKafkaConsumerBase<Tuple2<Long, String>> source = kafkaServer.getConsumer(topics, sourceSchema, standardProps);
+
+		Properties props = new Properties();
+		props.putAll(standardProps);
+		props.putAll(secureProps);
+		FlinkKafkaConsumerBase<Tuple2<Long, String>> source = kafkaServer.getConsumer(topics, sourceSchema, props);
 
 		DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(source).setParallelism(parallelism);
 
@@ -371,7 +377,11 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
 		env.getConfig().disableSysoutLogging();
 
-		FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps);
+		Properties props = new Properties();
+		props.putAll(standardProps);
+		props.putAll(secureProps);
+
+		FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props);
 
 		env
 				.addSource(kafkaSource)
@@ -416,7 +426,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
 		env.getConfig().disableSysoutLogging();
 
-		FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps);
+		Properties props = new Properties();
+		props.putAll(standardProps);
+		props.putAll(secureProps);
+		FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props);
 
 		env
 				.addSource(kafkaSource)
@@ -463,7 +476,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		env.getConfig().disableSysoutLogging();
 		env.setBufferTimeout(0);
 
-		FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps);
+		Properties props = new Properties();
+		props.putAll(standardProps);
+		props.putAll(secureProps);
+		FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props);
 
 		env
 			.addSource(kafkaSource)
@@ -506,7 +522,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 					env.enableCheckpointing(100);
 					env.getConfig().disableSysoutLogging();
 
-					FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), standardProps);
+					Properties props = new Properties();
+					props.putAll(standardProps);
+					props.putAll(secureProps);
+					FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props);
 
 					env.addSource(source).addSink(new DiscardingSink<String>());
 
@@ -577,7 +596,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 					env.enableCheckpointing(100);
 					env.getConfig().disableSysoutLogging();
 
-					FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), standardProps);
+					Properties props = new Properties();
+					props.putAll(standardProps);
+					props.putAll(secureProps);
+					FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props);
 
 					env.addSource(source).addSink(new DiscardingSink<String>());
 
@@ -629,7 +651,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		env.setParallelism(12); // needs to be more that the mini cluster has slots
 		env.getConfig().disableSysoutLogging();
 
-		FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps);
+		Properties props = new Properties();
+		props.putAll(standardProps);
+		props.putAll(secureProps);
+		FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props);
 
 		env
 				.addSource(kafkaSource)
@@ -700,15 +725,19 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		Tuple2WithTopicSchema schema = new Tuple2WithTopicSchema(env.getConfig());
 
-		stream.addSink(kafkaServer.getProducer("dummy", schema, standardProps, null));
+		Properties props = new Properties();
+		props.putAll(standardProps);
+		props.putAll(secureProps);
+
+		stream.addSink(kafkaServer.getProducer("dummy", schema, props, null));
 
 		env.execute("Write to topics");
 
 		// run second job consuming from multiple topics
 		env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
 		env.getConfig().disableSysoutLogging();
-		
-		stream = env.addSource(kafkaServer.getConsumer(topics, schema, standardProps));
+
+		stream = env.addSource(kafkaServer.getConsumer(topics, schema, props));
 
 		stream.flatMap(new FlatMapFunction<Tuple3<Integer, Integer, String>, Integer>() {
 			Map<String, Integer> countPerTopic = new HashMap<>(NUM_TOPICS);
@@ -787,6 +816,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		// Produce serialized JSON data
 		createTestTopic(topic, 1, 1);
 
+		Properties props = new Properties();
+		props.putAll(standardProps);
+		props.putAll(secureProps);
+
 		StreamExecutionEnvironment env = StreamExecutionEnvironment
 				.createRemoteEnvironment("localhost", flinkPort);
 		env.getConfig().disableSysoutLogging();
@@ -805,7 +838,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		}).addSink(kafkaServer.getProducer(
 				topic,
 				new ByteArraySerializationSchema(),
-				standardProps,
+				props,
 				null));
 
 		// Execute blocks
@@ -940,6 +973,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 14));
 		consumerProps.setProperty("max.partition.fetch.bytes", Integer.toString(1024 * 1024 * 14)); // for the new fetcher
 		consumerProps.setProperty("queued.max.message.chunks", "1");
+		consumerProps.putAll(secureProps);
 
 		FlinkKafkaConsumerBase<Tuple2<Long, byte[]>> source = kafkaServer.getConsumer(topic, serSchema, consumerProps);
 		DataStreamSource<Tuple2<Long, byte[]>> consuming = env.addSource(source);
@@ -969,6 +1003,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		Properties producerProps = new Properties();
 		producerProps.setProperty("max.request.size", Integer.toString(1024 * 1024 * 15));
 		producerProps.setProperty("retries", "3");
+		producerProps.putAll(secureProps);
 		producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectionStrings);
 
 		DataStream<Tuple2<Long, byte[]>> stream = env.addSource(new RichSourceFunction<Tuple2<Long, byte[]>>() {
@@ -1047,8 +1082,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		env.setRestartStrategy(RestartStrategies.noRestart());
 		env.getConfig().disableSysoutLogging();
 
-
-		FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps);
+		Properties props = new Properties();
+		props.putAll(standardProps);
+		props.putAll(secureProps);
+		FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props);
 
 		env
 				.addSource(kafkaSource)
@@ -1097,6 +1134,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		KeyedSerializationSchema<Tuple2<Long, PojoValue>> schema = new TypeInformationKeyValueSerializationSchema<>(Long.class, PojoValue.class, env.getConfig());
 		Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
 		producerProperties.setProperty("retries", "3");
+		producerProperties.putAll(secureProps);
 		kvStream.addSink(kafkaServer.getProducer(topic, schema, producerProperties, null));
 		env.execute("Write KV to Kafka");
 
@@ -1110,7 +1148,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		KeyedDeserializationSchema<Tuple2<Long, PojoValue>> readSchema = new TypeInformationKeyValueSerializationSchema<>(Long.class, PojoValue.class, env.getConfig());
 
-		DataStream<Tuple2<Long, PojoValue>> fromKafka = env.addSource(kafkaServer.getConsumer(topic, readSchema, standardProps));
+		Properties props = new Properties();
+		props.putAll(standardProps);
+		props.putAll(secureProps);
+		DataStream<Tuple2<Long, PojoValue>> fromKafka = env.addSource(kafkaServer.getConsumer(topic, readSchema, props));
 		fromKafka.flatMap(new RichFlatMapFunction<Tuple2<Long,PojoValue>, Object>() {
 			long counter = 0;
 			@Override
@@ -1178,6 +1219,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
 		producerProperties.setProperty("retries", "3");
+		producerProperties.putAll(secureProps);
+
 		kvStream.addSink(kafkaServer.getProducer(topic, schema, producerProperties, null));
 
 		env.execute("Write deletes to Kafka");
@@ -1189,7 +1232,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
 		env.getConfig().disableSysoutLogging();
 
-		DataStream<Tuple2<byte[], PojoValue>> fromKafka = env.addSource(kafkaServer.getConsumer(topic, schema, standardProps));
+		Properties props = new Properties();
+		props.putAll(standardProps);
+		props.putAll(secureProps);
+		DataStream<Tuple2<byte[], PojoValue>> fromKafka = env.addSource(kafkaServer.getConsumer(topic, schema, props));
 
 		fromKafka.flatMap(new RichFlatMapFunction<Tuple2<byte[], PojoValue>, Object>() {
 			long counter = 0;
@@ -1226,7 +1272,11 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
 		env1.getConfig().disableSysoutLogging();
 
-		DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, new FixedNumberDeserializationSchema(ELEMENT_COUNT), standardProps));
+		Properties props = new Properties();
+		props.putAll(standardProps);
+		props.putAll(secureProps);
+
+		DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, new FixedNumberDeserializationSchema(ELEMENT_COUNT), props));
 		fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer,Integer>, Void>() {
 			@Override
 			public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception {
@@ -1262,8 +1312,12 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 					env1.getConfig().disableSysoutLogging();
 					env1.disableOperatorChaining(); // let the source read everything into the network buffers
 
+					Properties props = new Properties();
+					props.putAll(standardProps);
+					props.putAll(secureProps);
+
 					TypeInformationSerializationSchema<Tuple2<Integer, Integer>> schema = new TypeInformationSerializationSchema<>(TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>"), env1.getConfig());
-					DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, standardProps));
+					DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, props));
 					fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer, Integer>, Void>() {
 						@Override
 						public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception {// no op
@@ -1288,7 +1342,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 						}
 					});
 
-					fromGen.addSink(kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null));
+					fromGen.addSink(kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(schema), props, null));
 
 					env1.execute("Metrics test job");
 				} catch(Throwable t) {
@@ -1403,6 +1457,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		final TypeInformationSerializationSchema<Tuple2<Integer, Integer>> deser =
 				new TypeInformationSerializationSchema<>(intIntTupleType, env.getConfig());
 
+		cc.putAll(secureProps);
 		// create the consumer
 		FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> consumer = kafkaServer.getConsumer(topicName, deser, cc);
 
@@ -1505,6 +1560,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			// the producer must not produce duplicates
 			Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
 			producerProperties.setProperty("retries", "0");
+			producerProperties.putAll(secureProps);
 			
 			stream.addSink(kafkaServer.getProducer(
 							topicName, serSchema, producerProperties,
@@ -1537,7 +1593,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			
 			Properties readProps = (Properties) standardProps.clone();
 			readProps.setProperty("group.id", "flink-tests-validator");
-			
+			readProps.putAll(secureProps);
 			FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> consumer = kafkaServer.getConsumer(topicName, deserSchema, readProps);
 
 			readEnv
@@ -1672,6 +1728,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		newProps.setProperty("group.id", "topic-printer"+ UUID.randomUUID().toString());
 		newProps.setProperty("auto.offset.reset", "smallest");
 		newProps.setProperty("zookeeper.connect", standardProps.getProperty("zookeeper.connect"));
+		newProps.putAll(secureProps);
 
 		ConsumerConfig printerConfig = new ConsumerConfig(newProps);
 		printTopic(topicName, printerConfig, deserializer, elements);
@@ -1893,8 +1950,11 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		TypeInformationSerializationSchema<Tuple2<Long, Integer>> sourceSchema =
 			new TypeInformationSerializationSchema<>(inputTypeInfo, env.getConfig());
 
+		Properties props = new Properties();
+		props.putAll(standardProps);
+		props.putAll(secureProps);
 		FlinkKafkaConsumerBase<Tuple2<Long, Integer>> source = kafkaServer
-			.getConsumer(topics, sourceSchema, standardProps)
+			.getConsumer(topics, sourceSchema, props)
 			.assignTimestampsAndWatermarks(new TestPunctuatedTSExtractor());
 
 		DataStreamSource<Tuple2<Long, Integer>> consuming = env.setParallelism(1).addSource(source);

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 14e74f1..5bcf406 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -34,6 +34,7 @@ import org.apache.flink.test.util.SuccessException;
 
 
 import java.io.Serializable;
+import java.util.Properties;
 
 import static org.apache.flink.test.util.TestUtils.tryExecute;
 import static org.junit.Assert.assertEquals;
@@ -102,17 +103,24 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 				}
 			})
 			.setParallelism(1);
+
+			Properties props = new Properties();
+			props.putAll(FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings));
+			props.putAll(secureProps);
 			
 			// sink partitions into 
 			stream.addSink(kafkaServer.getProducer(topic,
 					new KeyedSerializationSchemaWrapper<>(serSchema),
-					FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings),
+					props,
 					new CustomPartitioner(parallelism)))
 			.setParallelism(parallelism);
 
 			// ------ consuming topology ---------
-			
-			FlinkKafkaConsumerBase<Tuple2<Long, String>> source = kafkaServer.getConsumer(topic, deserSchema, standardProps);
+
+			Properties consumerProps = new Properties();
+			consumerProps.putAll(standardProps);
+			consumerProps.putAll(secureProps);
+			FlinkKafkaConsumerBase<Tuple2<Long, String>> source = kafkaServer.getConsumer(topic, deserSchema, consumerProps);
 			
 			env.addSource(source).setParallelism(parallelism)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index c4949ff..9236e78 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -36,6 +36,8 @@ import org.apache.flink.util.InstantiationUtil;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,29 +62,39 @@ public class KafkaShortRetentionTestBase implements Serializable {
 	private static Properties standardProps;
 	private static LocalFlinkMiniCluster flink;
 
+	@ClassRule
+	public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+	protected static Properties secureProps = new Properties();
+
 	@BeforeClass
 	public static void prepare() throws IOException, ClassNotFoundException {
 		LOG.info("-------------------------------------------------------------------------");
 		LOG.info("    Starting KafkaShortRetentionTestBase ");
 		LOG.info("-------------------------------------------------------------------------");
 
+		Configuration flinkConfig = new Configuration();
+
 		// dynamically load the implementation for the test
 		Class<?> clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
 		kafkaServer = (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz);
 
 		LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion());
 
+		if(kafkaServer.isSecureRunSupported()) {
+			secureProps = kafkaServer.getSecureProperties();
+		}
+
 		Properties specificProperties = new Properties();
 		specificProperties.setProperty("log.retention.hours", "0");
 		specificProperties.setProperty("log.retention.minutes", "0");
 		specificProperties.setProperty("log.retention.ms", "250");
 		specificProperties.setProperty("log.retention.check.interval.ms", "100");
-		kafkaServer.prepare(1, specificProperties);
+		kafkaServer.prepare(1, specificProperties, false);
 
 		standardProps = kafkaServer.getStandardProperties();
 
 		// start also a re-usable Flink mini cluster
-		Configuration flinkConfig = new Configuration();
 		flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
 		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
 		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
@@ -98,6 +110,8 @@ public class KafkaShortRetentionTestBase implements Serializable {
 			flink.shutdown();
 		}
 		kafkaServer.shutdown();
+
+		secureProps.clear();
 	}
 
 	/**
@@ -151,12 +165,17 @@ public class KafkaShortRetentionTestBase implements Serializable {
 				running = false;
 			}
 		});
-		stream.addSink(kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), standardProps, null));
+
+		Properties props = new Properties();
+		props.putAll(standardProps);
+		props.putAll(secureProps);
+
+		stream.addSink(kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props, null));
 
 		// ----------- add consumer dataflow ----------
 
 		NonContinousOffsetsDeserializationSchema deserSchema = new NonContinousOffsetsDeserializationSchema();
-		FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, deserSchema, standardProps);
+		FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, deserSchema, props);
 
 		DataStreamSource<String> consuming = env.addSource(source);
 		consuming.addSink(new DiscardingSink<String>());
@@ -224,6 +243,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
 
 		Properties customProps = new Properties();
 		customProps.putAll(standardProps);
+		customProps.putAll(secureProps);
 		customProps.setProperty("auto.offset.reset", "none"); // test that "none" leads to an exception
 		FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), customProps);
 
@@ -255,6 +275,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
 
 		Properties customProps = new Properties();
 		customProps.putAll(standardProps);
+		customProps.putAll(secureProps);
 		customProps.setProperty("auto.offset.reset", "none"); // test that "none" leads to an exception
 		
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 771db17..afdd158 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -31,6 +31,8 @@ import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,61 +75,90 @@ public abstract class KafkaTestBase extends TestLogger {
 
 	protected static KafkaTestEnvironment kafkaServer;
 
+	@ClassRule
+	public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+	protected static Properties secureProps = new Properties();
+
 	// ------------------------------------------------------------------------
 	//  Setup and teardown of the mini clusters
 	// ------------------------------------------------------------------------
 	
 	@BeforeClass
 	public static void prepare() throws IOException, ClassNotFoundException {
+
 		LOG.info("-------------------------------------------------------------------------");
 		LOG.info("    Starting KafkaTestBase ");
 		LOG.info("-------------------------------------------------------------------------");
-		
 
+		startClusters(false);
 
-		// dynamically load the implementation for the test
-		Class<?> clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
-		kafkaServer = (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz);
+	}
 
-		LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion());
+	@AfterClass
+	public static void shutDownServices() {
 
-		kafkaServer.prepare(NUMBER_OF_KAFKA_SERVERS);
+		LOG.info("-------------------------------------------------------------------------");
+		LOG.info("    Shut down KafkaTestBase ");
+		LOG.info("-------------------------------------------------------------------------");
 
-		standardProps = kafkaServer.getStandardProperties();
-		brokerConnectionStrings = kafkaServer.getBrokerConnectionString();
+		shutdownClusters();
 
-		// start also a re-usable Flink mini cluster
-		Configuration flinkConfig = new Configuration();
+		LOG.info("-------------------------------------------------------------------------");
+		LOG.info("    KafkaTestBase finished");
+		LOG.info("-------------------------------------------------------------------------");
+	}
+
+	protected static Configuration getFlinkConfiguration() {
+		Configuration flinkConfig = new Configuration();;
 		flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
 		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
 		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
 		flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
 		flinkConfig.setString(ConfigConstants.METRICS_REPORTERS_LIST, "my_reporter");
 		flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
+		return flinkConfig;
+	}
+
+	protected static void startClusters(boolean secureMode) throws ClassNotFoundException {
+
+		// dynamically load the implementation for the test
+		Class<?> clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
+		kafkaServer = (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz);
+
+		LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion());
+
+		kafkaServer.prepare(NUMBER_OF_KAFKA_SERVERS, secureMode);
+
+		standardProps = kafkaServer.getStandardProperties();
 
-		flink = new LocalFlinkMiniCluster(flinkConfig, false);
+		brokerConnectionStrings = kafkaServer.getBrokerConnectionString();
+
+		if(kafkaServer.isSecureRunSupported() && secureMode) {
+			secureProps = kafkaServer.getSecureProperties();
+		}
+
+		// start also a re-usable Flink mini cluster
+		flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false);
 		flink.start();
 
 		flinkPort = flink.getLeaderRPCPort();
-	}
 
-	@AfterClass
-	public static void shutDownServices() {
+	}
 
-		LOG.info("-------------------------------------------------------------------------");
-		LOG.info("    Shut down KafkaTestBase ");
-		LOG.info("-------------------------------------------------------------------------");
+	protected static void shutdownClusters() {
 
 		flinkPort = -1;
 		if (flink != null) {
 			flink.shutdown();
 		}
 
+		if(secureProps != null) {
+			secureProps.clear();
+		}
+
 		kafkaServer.shutdown();
 
-		LOG.info("-------------------------------------------------------------------------");
-		LOG.info("    KafkaTestBase finished");
-		LOG.info("-------------------------------------------------------------------------");
 	}
 
 
@@ -164,4 +195,5 @@ public abstract class KafkaTestBase extends TestLogger {
 	protected static void deleteTestTopic(String topic) {
 		kafkaServer.deleteTestTopic(topic);
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 0b1d51d..6ecde71 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -35,10 +35,10 @@ public abstract class KafkaTestEnvironment {
 
 	protected static final String KAFKA_HOST = "localhost";
 
-	public abstract void prepare(int numKafkaServers, Properties kafkaServerProperties);
+	public abstract void prepare(int numKafkaServers, Properties kafkaServerProperties, boolean secureMode);
 
-	public void prepare(int numberOfKafkaServers) {
-		this.prepare(numberOfKafkaServers, null);
+	public void prepare(int numberOfKafkaServers, boolean secureMode) {
+		this.prepare(numberOfKafkaServers, null, secureMode);
 	}
 
 	public abstract void shutdown();
@@ -51,9 +51,10 @@ public abstract class KafkaTestEnvironment {
 		this.createTestTopic(topic, numberOfPartitions, replicationFactor, new Properties());
 	}
 
-
 	public abstract Properties getStandardProperties();
 
+	public abstract Properties getSecureProperties();
+
 	public abstract String getBrokerConnectionString();
 
 	public abstract String getVersion();
@@ -86,4 +87,6 @@ public abstract class KafkaTestEnvironment {
 
 	public abstract int getBrokerId(KafkaServer server);
 
+	public abstract boolean isSecureRunSupported();
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
index 5a38e56..58a5cc3 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -135,11 +135,18 @@ public class DataGenerators {
 					}
 				});
 
+		Properties props = new Properties();
+		props.putAll(FlinkKafkaProducerBase.getPropertiesFromBrokerList(testServer.getBrokerConnectionString()));
+		Properties secureProps = testServer.getSecureProperties();
+		if(secureProps != null) {
+			props.putAll(testServer.getSecureProperties());
+		}
+
 		stream
 				.rebalance()
 				.addSink(testServer.getProducer(topic,
 						new KeyedSerializationSchemaWrapper<>(new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig())),
-						FlinkKafkaProducerBase.getPropertiesFromBrokerList(testServer.getBrokerConnectionString()),
+						props,
 						new KafkaPartitioner<Integer>() {
 							@Override
 							public int partition(Integer next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-test-utils-parent/flink-test-utils/pom.xml
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/pom.xml b/flink-test-utils-parent/flink-test-utils/pom.xml
index 18ecfde..5c99ef6 100644
--- a/flink-test-utils-parent/flink-test-utils/pom.xml
+++ b/flink-test-utils-parent/flink-test-utils/pom.xml
@@ -78,5 +78,30 @@ under the License.
 			<scope>compile</scope>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-minikdc</artifactId>
+			<version>${minikdc.version}</version>
+		</dependency>
+
 	</dependencies>
+
+	<build>
+		<plugins>
+
+			<!--
+            https://issues.apache.org/jira/browse/DIRSHARED-134
+            Required to pull the Mini-KDC transitive dependency
+            -->
+			<plugin>
+				<groupId>org.apache.felix</groupId>
+				<artifactId>maven-bundle-plugin</artifactId>
+				<version>3.0.1</version>
+				<inherited>true</inherited>
+				<extensions>true</extensions>
+			</plugin>
+
+		</plugins>
+	</build>
+
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
index a478908..6ec6c2c 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
@@ -25,6 +25,8 @@ import org.apache.flink.test.util.TestBaseUtils;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Base class for streaming unit tests that run multiple tests and want to reuse the same
@@ -67,18 +69,22 @@ public class StreamingMultipleProgramsTestBase extends AbstractTestBase {
 		super(new Configuration());
 	}
 
+	protected static final Logger LOG = LoggerFactory.getLogger(StreamingMultipleProgramsTestBase.class);
+
 	// ------------------------------------------------------------------------
 	//  Cluster setup & teardown
 	// ------------------------------------------------------------------------
 
 	@BeforeClass
 	public static void setup() throws Exception {
+		LOG.info("In StreamingMultipleProgramsTestBase: Starting FlinkMiniCluster ");
 		cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, false, false, true);
 		TestStreamEnvironment.setAsContext(cluster, DEFAULT_PARALLELISM);
 	}
 
 	@AfterClass
 	public static void teardown() throws Exception {
+		LOG.info("In StreamingMultipleProgramsTestBase: Closing FlinkMiniCluster ");
 		TestStreamEnvironment.unsetAsContext();
 		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
new file mode 100644
index 0000000..00b19f1
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.BufferedWriter;
+import java.io.PrintWriter;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Helper {@link SecureTestEnvironment} to handle MiniKDC lifecycle.
+ * This class can be used to start/stop MiniKDC and create secure configurations for MiniDFSCluster
+ * and MiniYarn
+ */
+
+public class SecureTestEnvironment {
+
+	protected static final Logger LOG = LoggerFactory.getLogger(SecureTestEnvironment.class);
+
+	private static MiniKdc kdc;
+
+	private static String testKeytab = null;
+
+	private static String testPrincipal = null;
+
+	private static String testZkServerPrincipal = null;
+
+	private static String testZkClientPrincipal = null;
+
+	private static String testKafkaServerPrincipal = null;
+
+	private static String hadoopServicePrincipal = null;
+
+	private static File baseDirForSecureRun = null;
+
+	public static void prepare(TemporaryFolder tempFolder) {
+
+		try {
+			baseDirForSecureRun = tempFolder.newFolder();
+			LOG.info("Base Directory for Secure Environment: {}", baseDirForSecureRun);
+
+			String hostName = "localhost";
+			Properties kdcConf = MiniKdc.createConf();
+			if(LOG.isDebugEnabled()) {
+				kdcConf.setProperty(MiniKdc.DEBUG, "true");
+			}
+			kdcConf.setProperty(MiniKdc.KDC_BIND_ADDRESS, hostName);
+			kdc = new MiniKdc(kdcConf, baseDirForSecureRun);
+			kdc.start();
+			LOG.info("Started Mini KDC");
+
+			File keytabFile = new File(baseDirForSecureRun, "test-users.keytab");
+			testKeytab = keytabFile.getAbsolutePath();
+			testZkServerPrincipal = "zookeeper/127.0.0.1";
+			testZkClientPrincipal = "zk-client/127.0.0.1";
+			testKafkaServerPrincipal = "kafka/" + hostName;
+			hadoopServicePrincipal = "hadoop/" + hostName;
+			testPrincipal = "client/" + hostName;
+
+			kdc.createPrincipal(keytabFile, testPrincipal, testZkServerPrincipal,
+					hadoopServicePrincipal,
+					testZkClientPrincipal,
+					testKafkaServerPrincipal);
+
+			testPrincipal = testPrincipal + "@" + kdc.getRealm();
+			testZkServerPrincipal = testZkServerPrincipal + "@" + kdc.getRealm();
+			testZkClientPrincipal = testZkClientPrincipal + "@" + kdc.getRealm();
+			testKafkaServerPrincipal = testKafkaServerPrincipal + "@" + kdc.getRealm();
+			hadoopServicePrincipal = hadoopServicePrincipal + "@" + kdc.getRealm();
+
+			LOG.info("-------------------------------------------------------------------");
+			LOG.info("Test Principal: {}", testPrincipal);
+			LOG.info("Test ZK Server Principal: {}", testZkServerPrincipal);
+			LOG.info("Test ZK Client Principal: {}", testZkClientPrincipal);
+			LOG.info("Test Kafka Server Principal: {}", testKafkaServerPrincipal);
+			LOG.info("Test Hadoop Service Principal: {}", hadoopServicePrincipal);
+			LOG.info("Test Keytab: {}", testKeytab);
+			LOG.info("-------------------------------------------------------------------");
+
+			//Security Context is established to allow non hadoop applications that requires JAAS
+			//based SASL/Kerberos authentication to work. However, for Hadoop specific applications
+			//the context can be reinitialized with Hadoop configuration by calling
+			//ctx.setHadoopConfiguration() for the UGI implementation to work properly.
+			//See Yarn test case module for reference
+			createJaasConfig(baseDirForSecureRun);
+			SecurityContext.SecurityConfiguration ctx = new SecurityContext.SecurityConfiguration();
+			Configuration flinkConfig = new Configuration();
+			flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, testKeytab);
+			flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, testPrincipal);
+			flinkConfig.setBoolean(ConfigConstants.ZOOKEEPER_SASL_DISABLE, false);
+			flinkConfig.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, baseDirForSecureRun.getAbsolutePath());
+			ctx.setFlinkConfiguration(flinkConfig);
+			TestingSecurityContext.install(ctx, getClientSecurityConfigurationMap());
+
+			populateSystemEnvVariables();
+
+		} catch(Exception e) {
+			LOG.error("Exception occured while preparing secure environment. Reason: {}", e);
+			throw new RuntimeException(e);
+		}
+
+	}
+
+	public static void cleanup() {
+
+		LOG.info("Cleaning up Secure Environment");
+
+		if( kdc != null) {
+			kdc.stop();
+			LOG.info("Stopped KDC server");
+		}
+
+		resetSystemEnvVariables();
+
+		testKeytab = null;
+		testPrincipal = null;
+		testZkServerPrincipal = null;
+		hadoopServicePrincipal = null;
+		baseDirForSecureRun = null;
+
+	}
+
+	private static void populateSystemEnvVariables() {
+
+		if(LOG.isDebugEnabled()) {
+			System.setProperty("FLINK_JAAS_DEBUG", "true");
+			System.setProperty("sun.security.krb5.debug", "true");
+		}
+
+		System.setProperty("java.security.krb5.conf", kdc.getKrb5conf().getAbsolutePath());
+
+		System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider");
+		System.setProperty("zookeeper.kerberos.removeHostFromPrincipal", "true");
+		System.setProperty("zookeeper.kerberos.removeRealmFromPrincipal", "true");
+	}
+
+	private static void resetSystemEnvVariables() {
+		System.clearProperty("java.security.krb5.conf");
+		System.clearProperty("FLINK_JAAS_DEBUG");
+		System.clearProperty("sun.security.krb5.debug");
+
+		System.clearProperty("zookeeper.authProvider.1");
+		System.clearProperty("zookeeper.kerberos.removeHostFromPrincipal");
+		System.clearProperty("zookeeper.kerberos.removeRealmFromPrincipal");
+	}
+
+	public static org.apache.flink.configuration.Configuration populateFlinkSecureConfigurations(
+			@Nullable org.apache.flink.configuration.Configuration flinkConf) {
+
+		org.apache.flink.configuration.Configuration conf;
+
+		if(flinkConf== null) {
+			conf = new org.apache.flink.configuration.Configuration();
+		} else {
+			conf = flinkConf;
+		}
+
+		conf.setString(ConfigConstants.SECURITY_KEYTAB_KEY , testKeytab);
+		conf.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY , testPrincipal);
+
+		return conf;
+	}
+
+	public static Map<String, TestingSecurityContext.ClientSecurityConfiguration> getClientSecurityConfigurationMap() {
+
+		Map<String, TestingSecurityContext.ClientSecurityConfiguration> clientSecurityConfigurationMap = new HashMap<>();
+
+		if(testZkServerPrincipal != null ) {
+			TestingSecurityContext.ClientSecurityConfiguration zkServer =
+					new TestingSecurityContext.ClientSecurityConfiguration(testZkServerPrincipal, testKeytab,
+							"Server", "zk-server");
+			clientSecurityConfigurationMap.put("Server",zkServer);
+		}
+
+		if(testZkClientPrincipal != null ) {
+			TestingSecurityContext.ClientSecurityConfiguration zkClient =
+					new TestingSecurityContext.ClientSecurityConfiguration(testZkClientPrincipal, testKeytab,
+							"Client", "zk-client");
+			clientSecurityConfigurationMap.put("Client",zkClient);
+		}
+
+		if(testKafkaServerPrincipal != null ) {
+			TestingSecurityContext.ClientSecurityConfiguration kafkaServer =
+					new TestingSecurityContext.ClientSecurityConfiguration(testKafkaServerPrincipal, testKeytab,
+							"KafkaServer", "kafka-server");
+			clientSecurityConfigurationMap.put("KafkaServer",kafkaServer);
+		}
+
+		return clientSecurityConfigurationMap;
+	}
+
+	public static String getTestKeytab() {
+		return testKeytab;
+	}
+
+	public static String getHadoopServicePrincipal() {
+		return hadoopServicePrincipal;
+	}
+
+	/*
+	 * Helper method to create a temporary JAAS configuration file to ger around the Kafka and ZK SASL
+	 * implementation lookup java.security.auth.login.config
+	 */
+	private static void  createJaasConfig(File baseDirForSecureRun) {
+
+		try(FileWriter fw = new FileWriter(new File(baseDirForSecureRun,SecurityContext.JAAS_CONF_FILENAME), true);
+			BufferedWriter bw = new BufferedWriter(fw);
+			PrintWriter out = new PrintWriter(bw))
+		{
+			out.println("sample {");
+			out.println("useKeyTab=false");
+			out.println("useTicketCache=true;");
+			out.println("};");
+		} catch (IOException e) {
+			LOG.error("Exception occured while trying to create JAAS config. Reason: {}", e.getMessage());
+			throw new RuntimeException(e);
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java
new file mode 100644
index 0000000..25b2362
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.security.JaasConfiguration;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * {@link TestingJaasConfiguration} for handling the integration test case since it requires to manage
+ * client principal as well as server principals of Hadoop/ZK which expects the host name to be populated
+ * in specific way (localhost vs 127.0.0.1). This provides an abstraction to handle more than one Login Module
+ * since the default {@link JaasConfiguration} behavior only supports global/unique principal identifier
+ */
+
+@Internal
+public class TestingJaasConfiguration extends JaasConfiguration {
+
+	private static final Logger LOG = LoggerFactory.getLogger(TestingJaasConfiguration.class);
+
+	public Map<String, TestingSecurityContext.ClientSecurityConfiguration> clientSecurityConfigurationMap;
+
+	TestingJaasConfiguration(String keytab, String principal, Map<String,
+			TestingSecurityContext.ClientSecurityConfiguration> clientSecurityConfigurationMap) {
+		super(keytab, principal);
+		this.clientSecurityConfigurationMap = clientSecurityConfigurationMap;
+	}
+
+	@Override
+	public AppConfigurationEntry[] getAppConfigurationEntry(String applicationName) {
+
+		LOG.debug("In TestingJaasConfiguration - Application Requested: {}", applicationName);
+
+		AppConfigurationEntry[] appConfigurationEntry = super.getAppConfigurationEntry(applicationName);
+
+		if(clientSecurityConfigurationMap != null && clientSecurityConfigurationMap.size() > 0) {
+
+			if(clientSecurityConfigurationMap.containsKey(applicationName)) {
+
+				LOG.debug("In TestingJaasConfiguration - Application: {} found in the supplied context", applicationName);
+
+				TestingSecurityContext.ClientSecurityConfiguration conf = clientSecurityConfigurationMap.get(applicationName);
+
+				if(appConfigurationEntry != null && appConfigurationEntry.length > 0) {
+
+					for(int count=0; count < appConfigurationEntry.length; count++) {
+
+						AppConfigurationEntry ace = appConfigurationEntry[count];
+
+						if (ace.getOptions().containsKey("keyTab")) {
+
+							String keyTab = conf.getKeytab();
+							String principal = conf.getPrincipal();
+
+							LOG.debug("In TestingJaasConfiguration - Application: {} from the supplied context will " +
+									"use Client Specific Keytab: {} and Principal: {}", applicationName, keyTab, principal);
+
+							Map<String, String> newKeytabKerberosOptions = new HashMap<>();
+							newKeytabKerberosOptions.putAll(getKeytabKerberosOptions());
+
+							newKeytabKerberosOptions.put("keyTab", keyTab);
+							newKeytabKerberosOptions.put("principal", principal);
+
+							AppConfigurationEntry keytabKerberosAce = new AppConfigurationEntry(
+									KerberosUtil.getKrb5LoginModuleName(),
+									AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
+									newKeytabKerberosOptions);
+							appConfigurationEntry = new AppConfigurationEntry[] {keytabKerberosAce};
+
+							LOG.debug("---->Login Module is using Keytab based configuration<------");
+							LOG.debug("Login Module Name: " + keytabKerberosAce.getLoginModuleName());
+							LOG.debug("Control Flag: " + keytabKerberosAce.getControlFlag());
+							LOG.debug("Options: " + keytabKerberosAce.getOptions());
+						}
+					}
+				}
+			}
+
+		}
+
+		return appConfigurationEntry;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
new file mode 100644
index 0000000..5e84c7e
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.security.SecurityContext;
+
+import java.util.Map;
+
+/*
+ * Test security context to support handling both client and server principals in MiniKDC
+ * This class is used only in integration test code for connectors like Kafka, HDFS etc.,
+ */
+@Internal
+public class TestingSecurityContext {
+
+	public static void install(SecurityContext.SecurityConfiguration config,
+						Map<String, ClientSecurityConfiguration> clientSecurityConfigurationMap)
+			throws Exception {
+
+		SecurityContext.install(config);
+
+		// establish the JAAS config for Test environment
+		TestingJaasConfiguration jaasConfig = new TestingJaasConfiguration(config.getKeytab(),
+				config.getPrincipal(), clientSecurityConfigurationMap);
+		javax.security.auth.login.Configuration.setConfiguration(jaasConfig);
+	}
+
+	public static class ClientSecurityConfiguration {
+
+		private String principal;
+
+		private String keytab;
+
+		private String moduleName;
+
+		private String jaasServiceName;
+
+		public String getPrincipal() {
+			return principal;
+		}
+
+		public String getKeytab() {
+			return keytab;
+		}
+
+		public String getModuleName() {
+			return moduleName;
+		}
+
+		public String getJaasServiceName() {
+			return jaasServiceName;
+		}
+
+		public ClientSecurityConfiguration(String principal, String keytab, String moduleName, String jaasServiceName) {
+			this.principal = principal;
+			this.keytab = keytab;
+			this.moduleName = moduleName;
+			this.jaasServiceName = jaasServiceName;
+		}
+
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml
index ffdca36..68e4752 100644
--- a/flink-yarn-tests/pom.xml
+++ b/flink-yarn-tests/pom.xml
@@ -103,6 +103,13 @@ under the License.
 			<artifactId>akka-testkit_${scala.binary.version}</artifactId>
 			<scope>test</scope>
 		</dependency>
+
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-minikdc</artifactId>
+			<version>${minikdc.version}</version>
+		</dependency>
+
 	</dependencies>
 
 	<build>
@@ -298,6 +305,19 @@ under the License.
 					<skip>true</skip>
 				</configuration>
 			</plugin>
+
+			<!--
+            https://issues.apache.org/jira/browse/DIRSHARED-134
+            Required to pull the Mini-KDC transitive dependency
+            -->
+			<plugin>
+				<groupId>org.apache.felix</groupId>
+				<artifactId>maven-bundle-plugin</artifactId>
+				<version>3.0.1</version>
+				<inherited>true</inherited>
+				<extensions>true</extensions>
+			</plugin>
+			
 		</plugins>
 	</build>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index d03d9eb..a503115 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -180,7 +180,7 @@ public class FlinkYarnSessionCliTest {
 				Mockito.mock(YarnClient.class),
 				Mockito.mock(ApplicationReport.class),
 				config,
-				new Path("/tmp"), false);
+				new Path("/temp"), false);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index a293348..9d6ff85 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -50,14 +50,14 @@ import java.util.concurrent.TimeUnit;
 
 public class YARNHighAvailabilityITCase extends YarnTestBase {
 
-	private static TestingServer zkServer;
+	protected static TestingServer zkServer;
 
-	private static ActorSystem actorSystem;
+	protected static ActorSystem actorSystem;
 
-	private static final int numberApplicationAttempts = 10;
+	protected static final int numberApplicationAttempts = 10;
 
 	@Rule
-	public TemporaryFolder tmp = new TemporaryFolder();
+	public TemporaryFolder temp = new TemporaryFolder();
 
 	@BeforeClass
 	public static void setup() {
@@ -108,7 +108,11 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 		String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
 		flinkYarnClient.setConfigurationDirectory(confDirPath);
 
-		String fsStateHandlePath = tmp.getRoot().getPath();
+		String fsStateHandlePath = temp.getRoot().getPath();
+
+		// load the configuration
+		File configDirectory = new File(confDirPath);
+		GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
 
 		flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.loadConfiguration());
 		flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum=" +

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index ddea4dd..650397d 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -23,10 +23,12 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.google.common.base.Joiner;
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -516,6 +518,27 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 		} catch(Throwable t) {
 			LOG.warn("Error while detached yarn session was running", t);
 			Assert.fail(t.getMessage());
+		} finally {
+
+			//cleanup the yarn-properties file
+			String confDirPath = System.getenv("FLINK_CONF_DIR");
+			File configDirectory = new File(confDirPath);
+			LOG.info("testDetachedPerJobYarnClusterInternal: Using configuration directory " + configDirectory.getAbsolutePath());
+
+			// load the configuration
+			LOG.info("testDetachedPerJobYarnClusterInternal: Trying to load configuration file");
+			GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
+
+			try {
+				File yarnPropertiesFile = FlinkYarnSessionCli.getYarnPropertiesLocation(GlobalConfiguration.loadConfiguration());
+				if(yarnPropertiesFile.exists()) {
+					LOG.info("testDetachedPerJobYarnClusterInternal: Cleaning up temporary Yarn address reference: {}", yarnPropertiesFile.getAbsolutePath());
+					yarnPropertiesFile.delete();
+				}
+			} catch (Exception e) {
+				LOG.warn("testDetachedPerJobYarnClusterInternal: Exception while deleting the JobManager address file", e);
+			}
+
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index 3caa0ee..ca696f9 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -100,6 +100,9 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 		while(getRunningContainers() < 2) {
 			sleep(500);
 		}
+
+		//additional sleep for the JM/TM to start and establish connection
+		sleep(2000);
 		LOG.info("Two containers are running. Killing the application");
 
 		// kill application "externally".
@@ -121,6 +124,27 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 		} catch(Throwable t) {
 			LOG.warn("Killing failed", t);
 			Assert.fail();
+		} finally {
+
+			//cleanup the yarn-properties file
+			String confDirPath = System.getenv("FLINK_CONF_DIR");
+			File configDirectory = new File(confDirPath);
+			LOG.info("testDetachedPerJobYarnClusterInternal: Using configuration directory " + configDirectory.getAbsolutePath());
+
+			// load the configuration
+			LOG.info("testDetachedPerJobYarnClusterInternal: Trying to load configuration file");
+			GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
+
+			try {
+				File yarnPropertiesFile = FlinkYarnSessionCli.getYarnPropertiesLocation(GlobalConfiguration.loadConfiguration());
+				if(yarnPropertiesFile.exists()) {
+					LOG.info("testDetachedPerJobYarnClusterInternal: Cleaning up temporary Yarn address reference: {}", yarnPropertiesFile.getAbsolutePath());
+					yarnPropertiesFile.delete();
+				}
+			} catch (Exception e) {
+				LOG.warn("testDetachedPerJobYarnClusterInternal: Exception while deleting the JobManager address file", e);
+			}
+
 		}
 
 		LOG.info("Finished testDetachedMode()");

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
new file mode 100644
index 0000000..0b7c230
--- /dev/null
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.test.util.SecureTestEnvironment;
+import org.apache.flink.test.util.TestingSecurityContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class YARNSessionFIFOSecuredITCase extends YARNSessionFIFOITCase {
+
+	protected static final Logger LOG = LoggerFactory.getLogger(YARNSessionFIFOSecuredITCase.class);
+
+	@BeforeClass
+	public static void setup() {
+
+		LOG.info("starting secure cluster environment for testing");
+
+		yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class);
+		yarnConfiguration.setInt(YarnConfiguration.NM_PMEM_MB, 768);
+		yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
+		yarnConfiguration.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-fifo-secured");
+
+		SecureTestEnvironment.prepare(tmp);
+
+		populateYarnSecureConfigurations(yarnConfiguration,SecureTestEnvironment.getHadoopServicePrincipal(),
+				SecureTestEnvironment.getTestKeytab());
+
+		Configuration flinkConfig = new Configuration();
+		flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY,
+				SecureTestEnvironment.getTestKeytab());
+		flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY,
+				SecureTestEnvironment.getHadoopServicePrincipal());
+
+		SecurityContext.SecurityConfiguration ctx = new SecurityContext.SecurityConfiguration();
+		ctx.setFlinkConfiguration(flinkConfig);
+		ctx.setHadoopConfiguration(yarnConfiguration);
+		try {
+			TestingSecurityContext.install(ctx, SecureTestEnvironment.getClientSecurityConfigurationMap());
+
+			SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() {
+				@Override
+				public Integer run() {
+					startYARNSecureMode(yarnConfiguration, SecureTestEnvironment.getHadoopServicePrincipal(),
+							SecureTestEnvironment.getTestKeytab());
+					return null;
+				}
+			});
+
+		} catch(Exception e) {
+			throw new RuntimeException("Exception occurred while setting up secure test context. Reason: {}", e);
+		}
+
+	}
+
+	@AfterClass
+	public static void teardownSecureCluster() throws Exception {
+		LOG.info("tearing down secure cluster environment");
+		SecureTestEnvironment.cleanup();
+	}
+
+	/* For secure cluster testing, it is enough to run only one test and override below test methods
+	 * to keep the overall build time minimal
+	 */
+	@Override
+	public void testQueryCluster() {}
+
+	@Override
+	public void testNonexistingQueue() {}
+
+	@Override
+	public void testResourceComputation() {}
+
+	@Override
+	public void testfullAlloc() {}
+
+	@Override
+	public void testJavaAPI() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 6270010..605aa44 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.yarn;
 
 import akka.actor.Identify;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.cli.CommandLineOptions;
 import org.apache.flink.client.program.ClusterClient;
@@ -29,6 +30,8 @@ import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.TestLogger;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -62,6 +65,8 @@ import java.io.FileWriter;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.PrintStream;
+import java.io.BufferedWriter;
+import java.io.PrintWriter;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -123,6 +128,11 @@ public abstract class YarnTestBase extends TestLogger {
 	 */
 	protected static File flinkLibFolder;
 
+	/**
+	 * Temporary folder where Flink configurations will be kept for secure run
+	 */
+	protected static File tempConfPathForSecureRun = null;
+
 	static {
 		yarnConfiguration = new YarnConfiguration();
 		yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
@@ -140,6 +150,23 @@ public abstract class YarnTestBase extends TestLogger {
 	}
 
 
+	public static void populateYarnSecureConfigurations(Configuration conf, String principal, String keytab) {
+
+		conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+		conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
+
+		conf.set(YarnConfiguration.RM_KEYTAB, keytab);
+		conf.set(YarnConfiguration.RM_PRINCIPAL, principal);
+		conf.set(YarnConfiguration.NM_KEYTAB, keytab);
+		conf.set(YarnConfiguration.NM_PRINCIPAL, principal);
+
+		conf.set(YarnConfiguration.RM_WEBAPP_SPNEGO_USER_NAME_KEY, principal);
+		conf.set(YarnConfiguration.RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY,keytab);
+		conf.set(YarnConfiguration.NM_WEBAPP_SPNEGO_USER_NAME_KEY, principal);
+		conf.set(YarnConfiguration.NM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY,keytab);
+
+		conf.set("hadoop.security.auth_to_local","RULE:[1:$1] RULE:[2:$1]");
+	}
 
 	/**
 	 * Sleep a bit between the tests (we are re-using the YARN cluster for the tests)
@@ -336,8 +363,16 @@ public abstract class YarnTestBase extends TestLogger {
 		return count;
 	}
 
+	public static void startYARNSecureMode(Configuration conf, String principal, String keytab) {
+		start(conf, principal, keytab);
+	}
+
 	public static void startYARNWithConfig(Configuration conf) {
-		// set the home directory to a tmp directory. Flink on YARN is using the home dir to distribute the file
+		start(conf,null,null);
+	}
+
+	private static void start(Configuration conf, String principal, String keytab) {
+		// set the home directory to a temp directory. Flink on YARN is using the home dir to distribute the file
 		File homeDir = null;
 		try {
 			homeDir = tmp.newFolder();
@@ -374,7 +409,39 @@ public abstract class YarnTestBase extends TestLogger {
 			File flinkConfDirPath = findFile(flinkDistRootDir, new ContainsName(new String[]{"flink-conf.yaml"}));
 			Assert.assertNotNull(flinkConfDirPath);
 
-			map.put(ConfigConstants.ENV_FLINK_CONF_DIR, flinkConfDirPath.getParent());
+			if(!StringUtils.isBlank(principal) && !StringUtils.isBlank(keytab)) {
+				//copy conf dir to test temporary workspace location
+				tempConfPathForSecureRun = tmp.newFolder("conf");
+
+				String confDirPath = flinkConfDirPath.getParentFile().getAbsolutePath();
+				FileUtils.copyDirectory(new File(confDirPath), tempConfPathForSecureRun);
+
+				try(FileWriter fw = new FileWriter(new File(tempConfPathForSecureRun,"flink-conf.yaml"), true);
+					BufferedWriter bw = new BufferedWriter(fw);
+					PrintWriter out = new PrintWriter(bw))
+				{
+					LOG.info("writing keytab: " + keytab + " and principal: " + principal + " to config file");
+					out.println("");
+					out.println("#Security Configurations Auto Populated ");
+					out.println(ConfigConstants.SECURITY_KEYTAB_KEY + ": " + keytab);
+					out.println(ConfigConstants.SECURITY_PRINCIPAL_KEY + ": " + principal);
+					out.println("");
+				} catch (IOException e) {
+					LOG.error("Exception occured while trying to append the security configurations. Reason: {}", e.getMessage());
+					throw new RuntimeException(e);
+				}
+
+				String configDir = tempConfPathForSecureRun.getAbsolutePath();
+
+				LOG.info("Temporary Flink configuration directory to be used for secure test: {}", configDir);
+
+				Assert.assertNotNull(configDir);
+
+				map.put(ConfigConstants.ENV_FLINK_CONF_DIR, configDir);
+
+			} else {
+				map.put(ConfigConstants.ENV_FLINK_CONF_DIR, flinkConfDirPath.getParent());
+			}
 
 			File yarnConfFile = writeYarnSiteConfigXML(conf);
 			map.put("YARN_CONF_DIR", yarnConfFile.getParentFile().getAbsolutePath());
@@ -392,6 +459,7 @@ public abstract class YarnTestBase extends TestLogger {
 			LOG.error("setup failure", ex);
 			Assert.fail();
 		}
+
 	}
 
 	/**
@@ -421,7 +489,6 @@ public abstract class YarnTestBase extends TestLogger {
 		System.setOut(new PrintStream(outContent));
 		System.setErr(new PrintStream(errContent));
 
-
 		final int START_TIMEOUT_SECONDS = 60;
 
 		Runner runner = new Runner(args, type);
@@ -624,12 +691,23 @@ public abstract class YarnTestBase extends TestLogger {
 
 	@AfterClass
 	public static void teardown() throws Exception {
+
+		LOG.info("Stopping MiniYarn Cluster");
+		yarnCluster.stop();
+
 		// Unset FLINK_CONF_DIR, as it might change the behavior of other tests
 		Map<String, String> map = new HashMap<>(System.getenv());
 		map.remove(ConfigConstants.ENV_FLINK_CONF_DIR);
+		map.remove("YARN_CONF_DIR");
+		map.remove("IN_TESTS");
 		TestBaseUtils.setEnv(map);
 
-		// When we are on travis, we copy the tmp files of JUnit (containing the MiniYARNCluster log files)
+		if(tempConfPathForSecureRun != null) {
+			FileUtil.fullyDelete(tempConfPathForSecureRun);
+			tempConfPathForSecureRun = null;
+		}
+
+		// When we are on travis, we copy the temp files of JUnit (containing the MiniYARNCluster log files)
 		// to <flinkRoot>/target/flink-yarn-tests-*.
 		// The files from there are picked up by the ./tools/travis_watchdog.sh script
 		// to upload them to Amazon S3.
@@ -646,6 +724,7 @@ public abstract class YarnTestBase extends TestLogger {
 				LOG.warn("Error copying the final files from {} to {}: msg: {}", src.getAbsolutePath(), target.getAbsolutePath(), e.getMessage(), e);
 			}
 		}
+
 	}
 
 	public static boolean isOnTravis() {

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/resources/log4j-test.properties b/flink-yarn-tests/src/test/resources/log4j-test.properties
index e94ca26..8f56c1f 100644
--- a/flink-yarn-tests/src/test/resources/log4j-test.properties
+++ b/flink-yarn-tests/src/test/resources/log4j-test.properties
@@ -34,3 +34,8 @@ log4j.logger.org.apache.hadoop=OFF
 log4j.logger.org.apache.flink.runtime.leaderelection=INFO
 log4j.logger.org.apache.flink.runtime.leaderretrieval=INFO
 
+log4j.logger.org.apache.directory=OFF
+log4j.logger.org.mortbay.log=OFF, testlogger
+log4j.logger.net.sf.ehcache=OFF
+log4j.logger.org.apache.hadoop.metrics2=OFF
+log4j.logger.org.apache.hadoop.conf.Configuration=OFF