You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/21 09:52:45 UTC
[12/50] [abbrv] flink git commit: [FLINK-3929] Added Keytab based
Kerberos support to enable secure Flink cluster deployment(addresses HDHS,
Kafka and ZK services)
http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 0dbe865..213ba4a 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -65,6 +65,9 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
private String brokerConnectionString = "";
private Properties standardProps;
private Properties additionalServerProperties;
+ private boolean secureMode = false;
+ // 6 seconds is default. Seems to be too small for travis. 30 seconds
+ private String zkTimeout = "30000";
public String getBrokerConnectionString() {
return brokerConnectionString;
@@ -131,8 +134,22 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
- public void prepare(int numKafkaServers, Properties additionalServerProperties) {
+ public boolean isSecureRunSupported() {
+ return true;
+ }
+
+ @Override
+ public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) {
+
+ //increase the timeout since in Travis ZK connection takes long time for secure connection.
+ if(secureMode) {
+ //run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
+ numKafkaServers = 1;
+ zkTimeout = String.valueOf(Integer.parseInt(zkTimeout) * 15);
+ }
+
this.additionalServerProperties = additionalServerProperties;
+ this.secureMode = secureMode;
File tempDir = new File(System.getProperty("java.io.tmpdir"));
tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
@@ -155,6 +172,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
LOG.info("Starting Zookeeper");
zookeeper = new TestingServer(-1, tmpZkDir);
zookeeperConnectionString = zookeeper.getConnectString();
+ LOG.info("zookeeperConnectionString: {}", zookeeperConnectionString);
LOG.info("Starting KafkaServer");
brokers = new ArrayList<>(numKafkaServers);
@@ -163,7 +181,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
SocketServer socketServer = brokers.get(i).socketServer();
- brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
+ if(secureMode) {
+ brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ",";
+ } else {
+ brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
+ }
}
LOG.info("ZK and KafkaServer started.");
@@ -173,15 +195,18 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
fail("Test setup failed: " + t.getMessage());
}
+ LOG.info("brokerConnectionString --> {}", brokerConnectionString);
+
standardProps = new Properties();
standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
standardProps.setProperty("bootstrap.servers", brokerConnectionString);
standardProps.setProperty("group.id", "flink-tests");
standardProps.setProperty("auto.commit.enable", "false");
- standardProps.setProperty("zookeeper.session.timeout.ms", "30000"); // 6 seconds is default. Seems to be too small for travis.
- standardProps.setProperty("zookeeper.connection.timeout.ms", "30000");
+ standardProps.setProperty("zookeeper.session.timeout.ms", zkTimeout);
+ standardProps.setProperty("zookeeper.connection.timeout.ms", zkTimeout);
standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. (earliest is kafka 0.9 value)
standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)
+
}
@Override
@@ -196,6 +221,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
if (zookeeper != null) {
try {
zookeeper.stop();
+ zookeeper.close();
}
catch (Exception e) {
LOG.warn("ZK.stop() failed", e);
@@ -224,6 +250,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
public ZkUtils getZkUtils() {
+ LOG.info("In getZKUtils:: zookeeperConnectionString = {}", zookeeperConnectionString);
ZkClient creator = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer());
return ZkUtils.apply(creator, false);
@@ -241,23 +268,37 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
zkUtils.close();
}
+ LOG.info("Topic {} create request is successfully posted", topic);
+
// validate that the topic has been created
- final long deadline = System.currentTimeMillis() + 30000;
+ final long deadline = System.currentTimeMillis() + Integer.parseInt(zkTimeout);
do {
try {
- Thread.sleep(100);
+ if(secureMode) {
+ //increase wait time since in Travis ZK timeout occurs frequently
+ int wait = Integer.parseInt(zkTimeout) / 100;
+ LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic);
+ Thread.sleep(wait);
+ } else {
+ Thread.sleep(100);
+ }
+
} catch (InterruptedException e) {
// restore interrupted state
}
// we could use AdminUtils.topicExists(zkUtils, topic) here, but it's results are
// not always correct.
+ LOG.info("Validating if the topic {} has been created or not", topic);
+
// create a new ZK utils connection
ZkUtils checkZKConn = getZkUtils();
if(AdminUtils.topicExists(checkZKConn, topic)) {
+ LOG.info("topic {} has been created successfully", topic);
checkZKConn.close();
return;
}
+ LOG.info("topic {} has not been created yet. Will check again...", topic);
checkZKConn.close();
}
while (System.currentTimeMillis() < deadline);
@@ -296,8 +337,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024));
// for CI stability, increase zookeeper session timeout
- kafkaProperties.put("zookeeper.session.timeout.ms", "30000");
- kafkaProperties.put("zookeeper.connection.timeout.ms", "30000");
+ kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout);
+ kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout);
if(additionalServerProperties != null) {
kafkaProperties.putAll(additionalServerProperties);
}
@@ -307,6 +348,15 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
for (int i = 1; i <= numTries; i++) {
int kafkaPort = NetUtils.getAvailablePort();
kafkaProperties.put("port", Integer.toString(kafkaPort));
+
+ //to support secure kafka cluster
+ if(secureMode) {
+ LOG.info("Adding Kafka secure configurations");
+ kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
+ kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
+ kafkaProperties.putAll(getSecureProperties());
+ }
+
KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
try {
@@ -329,4 +379,19 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
throw new Exception("Could not start Kafka after " + numTries + " retries due to port conflicts.");
}
+ public Properties getSecureProperties() {
+ Properties prop = new Properties();
+ if(secureMode) {
+ prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
+ prop.put("security.protocol", "SASL_PLAINTEXT");
+ prop.put("sasl.kerberos.service.name", "kafka");
+
+ //add special timeout for Travis
+ prop.setProperty("zookeeper.session.timeout.ms", zkTimeout);
+ prop.setProperty("zookeeper.connection.timeout.ms", zkTimeout);
+ prop.setProperty("metadata.fetch.timeout.ms","120000");
+ }
+ return prop;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
index fbeb110..4ac1773 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
@@ -28,3 +28,5 @@ log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
log4j.logger.org.apache.zookeeper=OFF, testlogger
log4j.logger.state.change.logger=OFF, testlogger
log4j.logger.kafka=OFF, testlogger
+
+log4j.logger.org.apache.directory=OFF, testlogger
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/pom.xml b/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
index 49d630f..ef71bde 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
@@ -161,6 +161,14 @@ under the License.
<type>test-jar</type>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minikdc</artifactId>
+ <version>${minikdc.version}</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<dependencyManagement>
@@ -187,6 +195,17 @@ under the License.
</execution>
</executions>
</plugin>
+ <!--
+ https://issues.apache.org/jira/browse/DIRSHARED-134
+ Required to pull the Mini-KDC transitive dependency
+ -->
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <version>3.0.1</version>
+ <inherited>true</inherited>
+ <extensions>true</extensions>
+ </plugin>
</plugins>
</build>
http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 920f15b..a87ff8a 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -181,6 +181,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
properties.setProperty("session.timeout.ms", "2000");
properties.setProperty("fetch.max.wait.ms", "2000");
properties.setProperty("heartbeat.interval.ms", "1000");
+ properties.putAll(secureProps);
FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer("doesntexist", new SimpleStringSchema(), properties);
DataStream<String> stream = see.addSource(source);
stream.print();
@@ -275,6 +276,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
});
Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
producerProperties.setProperty("retries", "3");
+ producerProperties.putAll(secureProps);
FlinkKafkaProducerBase<Tuple2<Long, String>> prod = kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null);
stream.addSink(prod);
@@ -283,7 +285,11 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
List<String> topics = new ArrayList<>();
topics.add(topic);
topics.add(additionalEmptyTopic);
- FlinkKafkaConsumerBase<Tuple2<Long, String>> source = kafkaServer.getConsumer(topics, sourceSchema, standardProps);
+
+ Properties props = new Properties();
+ props.putAll(standardProps);
+ props.putAll(secureProps);
+ FlinkKafkaConsumerBase<Tuple2<Long, String>> source = kafkaServer.getConsumer(topics, sourceSchema, props);
DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(source).setParallelism(parallelism);
@@ -371,7 +377,11 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
env.getConfig().disableSysoutLogging();
- FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps);
+ Properties props = new Properties();
+ props.putAll(standardProps);
+ props.putAll(secureProps);
+
+ FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props);
env
.addSource(kafkaSource)
@@ -416,7 +426,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
env.getConfig().disableSysoutLogging();
- FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps);
+ Properties props = new Properties();
+ props.putAll(standardProps);
+ props.putAll(secureProps);
+ FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props);
env
.addSource(kafkaSource)
@@ -463,7 +476,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
env.getConfig().disableSysoutLogging();
env.setBufferTimeout(0);
- FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps);
+ Properties props = new Properties();
+ props.putAll(standardProps);
+ props.putAll(secureProps);
+ FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props);
env
.addSource(kafkaSource)
@@ -506,7 +522,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
env.enableCheckpointing(100);
env.getConfig().disableSysoutLogging();
- FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), standardProps);
+ Properties props = new Properties();
+ props.putAll(standardProps);
+ props.putAll(secureProps);
+ FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props);
env.addSource(source).addSink(new DiscardingSink<String>());
@@ -577,7 +596,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
env.enableCheckpointing(100);
env.getConfig().disableSysoutLogging();
- FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), standardProps);
+ Properties props = new Properties();
+ props.putAll(standardProps);
+ props.putAll(secureProps);
+ FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props);
env.addSource(source).addSink(new DiscardingSink<String>());
@@ -629,7 +651,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
env.setParallelism(12); // needs to be more that the mini cluster has slots
env.getConfig().disableSysoutLogging();
- FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps);
+ Properties props = new Properties();
+ props.putAll(standardProps);
+ props.putAll(secureProps);
+ FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props);
env
.addSource(kafkaSource)
@@ -700,15 +725,19 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
Tuple2WithTopicSchema schema = new Tuple2WithTopicSchema(env.getConfig());
- stream.addSink(kafkaServer.getProducer("dummy", schema, standardProps, null));
+ Properties props = new Properties();
+ props.putAll(standardProps);
+ props.putAll(secureProps);
+
+ stream.addSink(kafkaServer.getProducer("dummy", schema, props, null));
env.execute("Write to topics");
// run second job consuming from multiple topics
env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
env.getConfig().disableSysoutLogging();
-
- stream = env.addSource(kafkaServer.getConsumer(topics, schema, standardProps));
+
+ stream = env.addSource(kafkaServer.getConsumer(topics, schema, props));
stream.flatMap(new FlatMapFunction<Tuple3<Integer, Integer, String>, Integer>() {
Map<String, Integer> countPerTopic = new HashMap<>(NUM_TOPICS);
@@ -787,6 +816,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
// Produce serialized JSON data
createTestTopic(topic, 1, 1);
+ Properties props = new Properties();
+ props.putAll(standardProps);
+ props.putAll(secureProps);
+
StreamExecutionEnvironment env = StreamExecutionEnvironment
.createRemoteEnvironment("localhost", flinkPort);
env.getConfig().disableSysoutLogging();
@@ -805,7 +838,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}).addSink(kafkaServer.getProducer(
topic,
new ByteArraySerializationSchema(),
- standardProps,
+ props,
null));
// Execute blocks
@@ -940,6 +973,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 14));
consumerProps.setProperty("max.partition.fetch.bytes", Integer.toString(1024 * 1024 * 14)); // for the new fetcher
consumerProps.setProperty("queued.max.message.chunks", "1");
+ consumerProps.putAll(secureProps);
FlinkKafkaConsumerBase<Tuple2<Long, byte[]>> source = kafkaServer.getConsumer(topic, serSchema, consumerProps);
DataStreamSource<Tuple2<Long, byte[]>> consuming = env.addSource(source);
@@ -969,6 +1003,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
Properties producerProps = new Properties();
producerProps.setProperty("max.request.size", Integer.toString(1024 * 1024 * 15));
producerProps.setProperty("retries", "3");
+ producerProps.putAll(secureProps);
producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectionStrings);
DataStream<Tuple2<Long, byte[]>> stream = env.addSource(new RichSourceFunction<Tuple2<Long, byte[]>>() {
@@ -1047,8 +1082,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
env.setRestartStrategy(RestartStrategies.noRestart());
env.getConfig().disableSysoutLogging();
-
- FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps);
+ Properties props = new Properties();
+ props.putAll(standardProps);
+ props.putAll(secureProps);
+ FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props);
env
.addSource(kafkaSource)
@@ -1097,6 +1134,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
KeyedSerializationSchema<Tuple2<Long, PojoValue>> schema = new TypeInformationKeyValueSerializationSchema<>(Long.class, PojoValue.class, env.getConfig());
Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
producerProperties.setProperty("retries", "3");
+ producerProperties.putAll(secureProps);
kvStream.addSink(kafkaServer.getProducer(topic, schema, producerProperties, null));
env.execute("Write KV to Kafka");
@@ -1110,7 +1148,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
KeyedDeserializationSchema<Tuple2<Long, PojoValue>> readSchema = new TypeInformationKeyValueSerializationSchema<>(Long.class, PojoValue.class, env.getConfig());
- DataStream<Tuple2<Long, PojoValue>> fromKafka = env.addSource(kafkaServer.getConsumer(topic, readSchema, standardProps));
+ Properties props = new Properties();
+ props.putAll(standardProps);
+ props.putAll(secureProps);
+ DataStream<Tuple2<Long, PojoValue>> fromKafka = env.addSource(kafkaServer.getConsumer(topic, readSchema, props));
fromKafka.flatMap(new RichFlatMapFunction<Tuple2<Long,PojoValue>, Object>() {
long counter = 0;
@Override
@@ -1178,6 +1219,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
producerProperties.setProperty("retries", "3");
+ producerProperties.putAll(secureProps);
+
kvStream.addSink(kafkaServer.getProducer(topic, schema, producerProperties, null));
env.execute("Write deletes to Kafka");
@@ -1189,7 +1232,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
env.getConfig().disableSysoutLogging();
- DataStream<Tuple2<byte[], PojoValue>> fromKafka = env.addSource(kafkaServer.getConsumer(topic, schema, standardProps));
+ Properties props = new Properties();
+ props.putAll(standardProps);
+ props.putAll(secureProps);
+ DataStream<Tuple2<byte[], PojoValue>> fromKafka = env.addSource(kafkaServer.getConsumer(topic, schema, props));
fromKafka.flatMap(new RichFlatMapFunction<Tuple2<byte[], PojoValue>, Object>() {
long counter = 0;
@@ -1226,7 +1272,11 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
env1.getConfig().disableSysoutLogging();
- DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, new FixedNumberDeserializationSchema(ELEMENT_COUNT), standardProps));
+ Properties props = new Properties();
+ props.putAll(standardProps);
+ props.putAll(secureProps);
+
+ DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, new FixedNumberDeserializationSchema(ELEMENT_COUNT), props));
fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer,Integer>, Void>() {
@Override
public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception {
@@ -1262,8 +1312,12 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
env1.getConfig().disableSysoutLogging();
env1.disableOperatorChaining(); // let the source read everything into the network buffers
+ Properties props = new Properties();
+ props.putAll(standardProps);
+ props.putAll(secureProps);
+
TypeInformationSerializationSchema<Tuple2<Integer, Integer>> schema = new TypeInformationSerializationSchema<>(TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>"), env1.getConfig());
- DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, standardProps));
+ DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, props));
fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer, Integer>, Void>() {
@Override
public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception {// no op
@@ -1288,7 +1342,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}
});
- fromGen.addSink(kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null));
+ fromGen.addSink(kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(schema), props, null));
env1.execute("Metrics test job");
} catch(Throwable t) {
@@ -1403,6 +1457,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
final TypeInformationSerializationSchema<Tuple2<Integer, Integer>> deser =
new TypeInformationSerializationSchema<>(intIntTupleType, env.getConfig());
+ cc.putAll(secureProps);
// create the consumer
FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> consumer = kafkaServer.getConsumer(topicName, deser, cc);
@@ -1505,6 +1560,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
// the producer must not produce duplicates
Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
producerProperties.setProperty("retries", "0");
+ producerProperties.putAll(secureProps);
stream.addSink(kafkaServer.getProducer(
topicName, serSchema, producerProperties,
@@ -1537,7 +1593,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
Properties readProps = (Properties) standardProps.clone();
readProps.setProperty("group.id", "flink-tests-validator");
-
+ readProps.putAll(secureProps);
FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> consumer = kafkaServer.getConsumer(topicName, deserSchema, readProps);
readEnv
@@ -1672,6 +1728,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
newProps.setProperty("group.id", "topic-printer"+ UUID.randomUUID().toString());
newProps.setProperty("auto.offset.reset", "smallest");
newProps.setProperty("zookeeper.connect", standardProps.getProperty("zookeeper.connect"));
+ newProps.putAll(secureProps);
ConsumerConfig printerConfig = new ConsumerConfig(newProps);
printTopic(topicName, printerConfig, deserializer, elements);
@@ -1893,8 +1950,11 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
TypeInformationSerializationSchema<Tuple2<Long, Integer>> sourceSchema =
new TypeInformationSerializationSchema<>(inputTypeInfo, env.getConfig());
+ Properties props = new Properties();
+ props.putAll(standardProps);
+ props.putAll(secureProps);
FlinkKafkaConsumerBase<Tuple2<Long, Integer>> source = kafkaServer
- .getConsumer(topics, sourceSchema, standardProps)
+ .getConsumer(topics, sourceSchema, props)
.assignTimestampsAndWatermarks(new TestPunctuatedTSExtractor());
DataStreamSource<Tuple2<Long, Integer>> consuming = env.setParallelism(1).addSource(source);
http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 14e74f1..5bcf406 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -34,6 +34,7 @@ import org.apache.flink.test.util.SuccessException;
import java.io.Serializable;
+import java.util.Properties;
import static org.apache.flink.test.util.TestUtils.tryExecute;
import static org.junit.Assert.assertEquals;
@@ -102,17 +103,24 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
}
})
.setParallelism(1);
+
+ Properties props = new Properties();
+ props.putAll(FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings));
+ props.putAll(secureProps);
// sink partitions into
stream.addSink(kafkaServer.getProducer(topic,
new KeyedSerializationSchemaWrapper<>(serSchema),
- FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings),
+ props,
new CustomPartitioner(parallelism)))
.setParallelism(parallelism);
// ------ consuming topology ---------
-
- FlinkKafkaConsumerBase<Tuple2<Long, String>> source = kafkaServer.getConsumer(topic, deserSchema, standardProps);
+
+ Properties consumerProps = new Properties();
+ consumerProps.putAll(standardProps);
+ consumerProps.putAll(secureProps);
+ FlinkKafkaConsumerBase<Tuple2<Long, String>> source = kafkaServer.getConsumer(topic, deserSchema, consumerProps);
env.addSource(source).setParallelism(parallelism)
http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index c4949ff..9236e78 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -36,6 +36,8 @@ import org.apache.flink.util.InstantiationUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,29 +62,39 @@ public class KafkaShortRetentionTestBase implements Serializable {
private static Properties standardProps;
private static LocalFlinkMiniCluster flink;
+ @ClassRule
+ public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+ protected static Properties secureProps = new Properties();
+
@BeforeClass
public static void prepare() throws IOException, ClassNotFoundException {
LOG.info("-------------------------------------------------------------------------");
LOG.info(" Starting KafkaShortRetentionTestBase ");
LOG.info("-------------------------------------------------------------------------");
+ Configuration flinkConfig = new Configuration();
+
// dynamically load the implementation for the test
Class<?> clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
kafkaServer = (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz);
LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion());
+ if(kafkaServer.isSecureRunSupported()) {
+ secureProps = kafkaServer.getSecureProperties();
+ }
+
Properties specificProperties = new Properties();
specificProperties.setProperty("log.retention.hours", "0");
specificProperties.setProperty("log.retention.minutes", "0");
specificProperties.setProperty("log.retention.ms", "250");
specificProperties.setProperty("log.retention.check.interval.ms", "100");
- kafkaServer.prepare(1, specificProperties);
+ kafkaServer.prepare(1, specificProperties, false);
standardProps = kafkaServer.getStandardProperties();
// start also a re-usable Flink mini cluster
- Configuration flinkConfig = new Configuration();
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
@@ -98,6 +110,8 @@ public class KafkaShortRetentionTestBase implements Serializable {
flink.shutdown();
}
kafkaServer.shutdown();
+
+ secureProps.clear();
}
/**
@@ -151,12 +165,17 @@ public class KafkaShortRetentionTestBase implements Serializable {
running = false;
}
});
- stream.addSink(kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), standardProps, null));
+
+ Properties props = new Properties();
+ props.putAll(standardProps);
+ props.putAll(secureProps);
+
+ stream.addSink(kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props, null));
// ----------- add consumer dataflow ----------
NonContinousOffsetsDeserializationSchema deserSchema = new NonContinousOffsetsDeserializationSchema();
- FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, deserSchema, standardProps);
+ FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, deserSchema, props);
DataStreamSource<String> consuming = env.addSource(source);
consuming.addSink(new DiscardingSink<String>());
@@ -224,6 +243,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
Properties customProps = new Properties();
customProps.putAll(standardProps);
+ customProps.putAll(secureProps);
customProps.setProperty("auto.offset.reset", "none"); // test that "none" leads to an exception
FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), customProps);
@@ -255,6 +275,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
Properties customProps = new Properties();
customProps.putAll(standardProps);
+ customProps.putAll(secureProps);
customProps.setProperty("auto.offset.reset", "none"); // test that "none" leads to an exception
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 771db17..afdd158 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -31,6 +31,8 @@ import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,61 +75,90 @@ public abstract class KafkaTestBase extends TestLogger {
protected static KafkaTestEnvironment kafkaServer;
+ @ClassRule
+ public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+ protected static Properties secureProps = new Properties();
+
// ------------------------------------------------------------------------
// Setup and teardown of the mini clusters
// ------------------------------------------------------------------------
@BeforeClass
public static void prepare() throws IOException, ClassNotFoundException {
+
LOG.info("-------------------------------------------------------------------------");
LOG.info(" Starting KafkaTestBase ");
LOG.info("-------------------------------------------------------------------------");
-
+ startClusters(false);
- // dynamically load the implementation for the test
- Class<?> clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
- kafkaServer = (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz);
+ }
- LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion());
+ @AfterClass
+ public static void shutDownServices() {
- kafkaServer.prepare(NUMBER_OF_KAFKA_SERVERS);
+ LOG.info("-------------------------------------------------------------------------");
+ LOG.info(" Shut down KafkaTestBase ");
+ LOG.info("-------------------------------------------------------------------------");
- standardProps = kafkaServer.getStandardProperties();
- brokerConnectionStrings = kafkaServer.getBrokerConnectionString();
+ shutdownClusters();
- // start also a re-usable Flink mini cluster
- Configuration flinkConfig = new Configuration();
+ LOG.info("-------------------------------------------------------------------------");
+ LOG.info(" KafkaTestBase finished");
+ LOG.info("-------------------------------------------------------------------------");
+ }
+
+ protected static Configuration getFlinkConfiguration() {
+ Configuration flinkConfig = new Configuration();;
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
flinkConfig.setString(ConfigConstants.METRICS_REPORTERS_LIST, "my_reporter");
flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
+ return flinkConfig;
+ }
+
+ protected static void startClusters(boolean secureMode) throws ClassNotFoundException {
+
+ // dynamically load the implementation for the test
+ Class<?> clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
+ kafkaServer = (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz);
+
+ LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion());
+
+ kafkaServer.prepare(NUMBER_OF_KAFKA_SERVERS, secureMode);
+
+ standardProps = kafkaServer.getStandardProperties();
- flink = new LocalFlinkMiniCluster(flinkConfig, false);
+ brokerConnectionStrings = kafkaServer.getBrokerConnectionString();
+
+ if(kafkaServer.isSecureRunSupported() && secureMode) {
+ secureProps = kafkaServer.getSecureProperties();
+ }
+
+ // start also a re-usable Flink mini cluster
+ flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false);
flink.start();
flinkPort = flink.getLeaderRPCPort();
- }
- @AfterClass
- public static void shutDownServices() {
+ }
- LOG.info("-------------------------------------------------------------------------");
- LOG.info(" Shut down KafkaTestBase ");
- LOG.info("-------------------------------------------------------------------------");
+ protected static void shutdownClusters() {
flinkPort = -1;
if (flink != null) {
flink.shutdown();
}
+ if(secureProps != null) {
+ secureProps.clear();
+ }
+
kafkaServer.shutdown();
- LOG.info("-------------------------------------------------------------------------");
- LOG.info(" KafkaTestBase finished");
- LOG.info("-------------------------------------------------------------------------");
}
@@ -164,4 +195,5 @@ public abstract class KafkaTestBase extends TestLogger {
protected static void deleteTestTopic(String topic) {
kafkaServer.deleteTestTopic(topic);
}
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 0b1d51d..6ecde71 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -35,10 +35,10 @@ public abstract class KafkaTestEnvironment {
protected static final String KAFKA_HOST = "localhost";
- public abstract void prepare(int numKafkaServers, Properties kafkaServerProperties);
+ public abstract void prepare(int numKafkaServers, Properties kafkaServerProperties, boolean secureMode);
- public void prepare(int numberOfKafkaServers) {
- this.prepare(numberOfKafkaServers, null);
+ public void prepare(int numberOfKafkaServers, boolean secureMode) {
+ this.prepare(numberOfKafkaServers, null, secureMode);
}
public abstract void shutdown();
@@ -51,9 +51,10 @@ public abstract class KafkaTestEnvironment {
this.createTestTopic(topic, numberOfPartitions, replicationFactor, new Properties());
}
-
public abstract Properties getStandardProperties();
+ public abstract Properties getSecureProperties();
+
public abstract String getBrokerConnectionString();
public abstract String getVersion();
@@ -86,4 +87,6 @@ public abstract class KafkaTestEnvironment {
public abstract int getBrokerId(KafkaServer server);
+ public abstract boolean isSecureRunSupported();
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
index 5a38e56..58a5cc3 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -135,11 +135,18 @@ public class DataGenerators {
}
});
+ Properties props = new Properties();
+ props.putAll(FlinkKafkaProducerBase.getPropertiesFromBrokerList(testServer.getBrokerConnectionString()));
+ Properties secureProps = testServer.getSecureProperties();
+ if(secureProps != null) {
+ props.putAll(testServer.getSecureProperties());
+ }
+
stream
.rebalance()
.addSink(testServer.getProducer(topic,
new KeyedSerializationSchemaWrapper<>(new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig())),
- FlinkKafkaProducerBase.getPropertiesFromBrokerList(testServer.getBrokerConnectionString()),
+ props,
new KafkaPartitioner<Integer>() {
@Override
public int partition(Integer next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-test-utils-parent/flink-test-utils/pom.xml
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/pom.xml b/flink-test-utils-parent/flink-test-utils/pom.xml
index 18ecfde..5c99ef6 100644
--- a/flink-test-utils-parent/flink-test-utils/pom.xml
+++ b/flink-test-utils-parent/flink-test-utils/pom.xml
@@ -78,5 +78,30 @@ under the License.
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minikdc</artifactId>
+ <version>${minikdc.version}</version>
+ </dependency>
+
</dependencies>
+
+ <build>
+ <plugins>
+
+ <!--
+ https://issues.apache.org/jira/browse/DIRSHARED-134
+ Required to pull the Mini-KDC transitive dependency
+ -->
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <version>3.0.1</version>
+ <inherited>true</inherited>
+ <extensions>true</extensions>
+ </plugin>
+
+ </plugins>
+ </build>
+
</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
index a478908..6ec6c2c 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
@@ -25,6 +25,8 @@ import org.apache.flink.test.util.TestBaseUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Base class for streaming unit tests that run multiple tests and want to reuse the same
@@ -67,18 +69,22 @@ public class StreamingMultipleProgramsTestBase extends AbstractTestBase {
super(new Configuration());
}
+ protected static final Logger LOG = LoggerFactory.getLogger(StreamingMultipleProgramsTestBase.class);
+
// ------------------------------------------------------------------------
// Cluster setup & teardown
// ------------------------------------------------------------------------
@BeforeClass
public static void setup() throws Exception {
+ LOG.info("In StreamingMultipleProgramsTestBase: Starting FlinkMiniCluster ");
cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, false, false, true);
TestStreamEnvironment.setAsContext(cluster, DEFAULT_PARALLELISM);
}
@AfterClass
public static void teardown() throws Exception {
+ LOG.info("In StreamingMultipleProgramsTestBase: Closing FlinkMiniCluster ");
TestStreamEnvironment.unsetAsContext();
stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
new file mode 100644
index 0000000..00b19f1
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.BufferedWriter;
+import java.io.PrintWriter;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Helper {@link SecureTestEnvironment} to handle MiniKDC lifecycle.
+ * This class can be used to start/stop MiniKDC and create secure configurations for MiniDFSCluster
+ * and MiniYarn
+ */
+
+public class SecureTestEnvironment {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(SecureTestEnvironment.class);
+
+ private static MiniKdc kdc;
+
+ private static String testKeytab = null;
+
+ private static String testPrincipal = null;
+
+ private static String testZkServerPrincipal = null;
+
+ private static String testZkClientPrincipal = null;
+
+ private static String testKafkaServerPrincipal = null;
+
+ private static String hadoopServicePrincipal = null;
+
+ private static File baseDirForSecureRun = null;
+
+ public static void prepare(TemporaryFolder tempFolder) {
+
+ try {
+ baseDirForSecureRun = tempFolder.newFolder();
+ LOG.info("Base Directory for Secure Environment: {}", baseDirForSecureRun);
+
+ String hostName = "localhost";
+ Properties kdcConf = MiniKdc.createConf();
+ if(LOG.isDebugEnabled()) {
+ kdcConf.setProperty(MiniKdc.DEBUG, "true");
+ }
+ kdcConf.setProperty(MiniKdc.KDC_BIND_ADDRESS, hostName);
+ kdc = new MiniKdc(kdcConf, baseDirForSecureRun);
+ kdc.start();
+ LOG.info("Started Mini KDC");
+
+ File keytabFile = new File(baseDirForSecureRun, "test-users.keytab");
+ testKeytab = keytabFile.getAbsolutePath();
+ testZkServerPrincipal = "zookeeper/127.0.0.1";
+ testZkClientPrincipal = "zk-client/127.0.0.1";
+ testKafkaServerPrincipal = "kafka/" + hostName;
+ hadoopServicePrincipal = "hadoop/" + hostName;
+ testPrincipal = "client/" + hostName;
+
+ kdc.createPrincipal(keytabFile, testPrincipal, testZkServerPrincipal,
+ hadoopServicePrincipal,
+ testZkClientPrincipal,
+ testKafkaServerPrincipal);
+
+ testPrincipal = testPrincipal + "@" + kdc.getRealm();
+ testZkServerPrincipal = testZkServerPrincipal + "@" + kdc.getRealm();
+ testZkClientPrincipal = testZkClientPrincipal + "@" + kdc.getRealm();
+ testKafkaServerPrincipal = testKafkaServerPrincipal + "@" + kdc.getRealm();
+ hadoopServicePrincipal = hadoopServicePrincipal + "@" + kdc.getRealm();
+
+ LOG.info("-------------------------------------------------------------------");
+ LOG.info("Test Principal: {}", testPrincipal);
+ LOG.info("Test ZK Server Principal: {}", testZkServerPrincipal);
+ LOG.info("Test ZK Client Principal: {}", testZkClientPrincipal);
+ LOG.info("Test Kafka Server Principal: {}", testKafkaServerPrincipal);
+ LOG.info("Test Hadoop Service Principal: {}", hadoopServicePrincipal);
+ LOG.info("Test Keytab: {}", testKeytab);
+ LOG.info("-------------------------------------------------------------------");
+
+ //Security Context is established to allow non hadoop applications that requires JAAS
+ //based SASL/Kerberos authentication to work. However, for Hadoop specific applications
+ //the context can be reinitialized with Hadoop configuration by calling
+ //ctx.setHadoopConfiguration() for the UGI implementation to work properly.
+ //See Yarn test case module for reference
+ createJaasConfig(baseDirForSecureRun);
+ SecurityContext.SecurityConfiguration ctx = new SecurityContext.SecurityConfiguration();
+ Configuration flinkConfig = new Configuration();
+ flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, testKeytab);
+ flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, testPrincipal);
+ flinkConfig.setBoolean(ConfigConstants.ZOOKEEPER_SASL_DISABLE, false);
+ flinkConfig.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, baseDirForSecureRun.getAbsolutePath());
+ ctx.setFlinkConfiguration(flinkConfig);
+ TestingSecurityContext.install(ctx, getClientSecurityConfigurationMap());
+
+ populateSystemEnvVariables();
+
+ } catch(Exception e) {
+ LOG.error("Exception occured while preparing secure environment. Reason: {}", e);
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ public static void cleanup() {
+
+ LOG.info("Cleaning up Secure Environment");
+
+ if( kdc != null) {
+ kdc.stop();
+ LOG.info("Stopped KDC server");
+ }
+
+ resetSystemEnvVariables();
+
+ testKeytab = null;
+ testPrincipal = null;
+ testZkServerPrincipal = null;
+ hadoopServicePrincipal = null;
+ baseDirForSecureRun = null;
+
+ }
+
+ private static void populateSystemEnvVariables() {
+
+ if(LOG.isDebugEnabled()) {
+ System.setProperty("FLINK_JAAS_DEBUG", "true");
+ System.setProperty("sun.security.krb5.debug", "true");
+ }
+
+ System.setProperty("java.security.krb5.conf", kdc.getKrb5conf().getAbsolutePath());
+
+ System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider");
+ System.setProperty("zookeeper.kerberos.removeHostFromPrincipal", "true");
+ System.setProperty("zookeeper.kerberos.removeRealmFromPrincipal", "true");
+ }
+
+ private static void resetSystemEnvVariables() {
+ System.clearProperty("java.security.krb5.conf");
+ System.clearProperty("FLINK_JAAS_DEBUG");
+ System.clearProperty("sun.security.krb5.debug");
+
+ System.clearProperty("zookeeper.authProvider.1");
+ System.clearProperty("zookeeper.kerberos.removeHostFromPrincipal");
+ System.clearProperty("zookeeper.kerberos.removeRealmFromPrincipal");
+ }
+
+ public static org.apache.flink.configuration.Configuration populateFlinkSecureConfigurations(
+ @Nullable org.apache.flink.configuration.Configuration flinkConf) {
+
+ org.apache.flink.configuration.Configuration conf;
+
+ if(flinkConf== null) {
+ conf = new org.apache.flink.configuration.Configuration();
+ } else {
+ conf = flinkConf;
+ }
+
+ conf.setString(ConfigConstants.SECURITY_KEYTAB_KEY , testKeytab);
+ conf.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY , testPrincipal);
+
+ return conf;
+ }
+
+ public static Map<String, TestingSecurityContext.ClientSecurityConfiguration> getClientSecurityConfigurationMap() {
+
+ Map<String, TestingSecurityContext.ClientSecurityConfiguration> clientSecurityConfigurationMap = new HashMap<>();
+
+ if(testZkServerPrincipal != null ) {
+ TestingSecurityContext.ClientSecurityConfiguration zkServer =
+ new TestingSecurityContext.ClientSecurityConfiguration(testZkServerPrincipal, testKeytab,
+ "Server", "zk-server");
+ clientSecurityConfigurationMap.put("Server",zkServer);
+ }
+
+ if(testZkClientPrincipal != null ) {
+ TestingSecurityContext.ClientSecurityConfiguration zkClient =
+ new TestingSecurityContext.ClientSecurityConfiguration(testZkClientPrincipal, testKeytab,
+ "Client", "zk-client");
+ clientSecurityConfigurationMap.put("Client",zkClient);
+ }
+
+ if(testKafkaServerPrincipal != null ) {
+ TestingSecurityContext.ClientSecurityConfiguration kafkaServer =
+ new TestingSecurityContext.ClientSecurityConfiguration(testKafkaServerPrincipal, testKeytab,
+ "KafkaServer", "kafka-server");
+ clientSecurityConfigurationMap.put("KafkaServer",kafkaServer);
+ }
+
+ return clientSecurityConfigurationMap;
+ }
+
+ public static String getTestKeytab() {
+ return testKeytab;
+ }
+
+ public static String getHadoopServicePrincipal() {
+ return hadoopServicePrincipal;
+ }
+
+ /*
+ * Helper method to create a temporary JAAS configuration file to ger around the Kafka and ZK SASL
+ * implementation lookup java.security.auth.login.config
+ */
+ private static void createJaasConfig(File baseDirForSecureRun) {
+
+ try(FileWriter fw = new FileWriter(new File(baseDirForSecureRun,SecurityContext.JAAS_CONF_FILENAME), true);
+ BufferedWriter bw = new BufferedWriter(fw);
+ PrintWriter out = new PrintWriter(bw))
+ {
+ out.println("sample {");
+ out.println("useKeyTab=false");
+ out.println("useTicketCache=true;");
+ out.println("};");
+ } catch (IOException e) {
+ LOG.error("Exception occured while trying to create JAAS config. Reason: {}", e.getMessage());
+ throw new RuntimeException(e);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java
new file mode 100644
index 0000000..25b2362
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.security.JaasConfiguration;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * {@link TestingJaasConfiguration} for handling the integration test case since it requires to manage
+ * client principal as well as server principals of Hadoop/ZK which expects the host name to be populated
+ * in specific way (localhost vs 127.0.0.1). This provides an abstraction to handle more than one Login Module
+ * since the default {@link JaasConfiguration} behavior only supports global/unique principal identifier
+ */
+
+@Internal
+public class TestingJaasConfiguration extends JaasConfiguration {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestingJaasConfiguration.class);
+
+ public Map<String, TestingSecurityContext.ClientSecurityConfiguration> clientSecurityConfigurationMap;
+
+ TestingJaasConfiguration(String keytab, String principal, Map<String,
+ TestingSecurityContext.ClientSecurityConfiguration> clientSecurityConfigurationMap) {
+ super(keytab, principal);
+ this.clientSecurityConfigurationMap = clientSecurityConfigurationMap;
+ }
+
+ @Override
+ public AppConfigurationEntry[] getAppConfigurationEntry(String applicationName) {
+
+ LOG.debug("In TestingJaasConfiguration - Application Requested: {}", applicationName);
+
+ AppConfigurationEntry[] appConfigurationEntry = super.getAppConfigurationEntry(applicationName);
+
+ if(clientSecurityConfigurationMap != null && clientSecurityConfigurationMap.size() > 0) {
+
+ if(clientSecurityConfigurationMap.containsKey(applicationName)) {
+
+ LOG.debug("In TestingJaasConfiguration - Application: {} found in the supplied context", applicationName);
+
+ TestingSecurityContext.ClientSecurityConfiguration conf = clientSecurityConfigurationMap.get(applicationName);
+
+ if(appConfigurationEntry != null && appConfigurationEntry.length > 0) {
+
+ for(int count=0; count < appConfigurationEntry.length; count++) {
+
+ AppConfigurationEntry ace = appConfigurationEntry[count];
+
+ if (ace.getOptions().containsKey("keyTab")) {
+
+ String keyTab = conf.getKeytab();
+ String principal = conf.getPrincipal();
+
+ LOG.debug("In TestingJaasConfiguration - Application: {} from the supplied context will " +
+ "use Client Specific Keytab: {} and Principal: {}", applicationName, keyTab, principal);
+
+ Map<String, String> newKeytabKerberosOptions = new HashMap<>();
+ newKeytabKerberosOptions.putAll(getKeytabKerberosOptions());
+
+ newKeytabKerberosOptions.put("keyTab", keyTab);
+ newKeytabKerberosOptions.put("principal", principal);
+
+ AppConfigurationEntry keytabKerberosAce = new AppConfigurationEntry(
+ KerberosUtil.getKrb5LoginModuleName(),
+ AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
+ newKeytabKerberosOptions);
+ appConfigurationEntry = new AppConfigurationEntry[] {keytabKerberosAce};
+
+ LOG.debug("---->Login Module is using Keytab based configuration<------");
+ LOG.debug("Login Module Name: " + keytabKerberosAce.getLoginModuleName());
+ LOG.debug("Control Flag: " + keytabKerberosAce.getControlFlag());
+ LOG.debug("Options: " + keytabKerberosAce.getOptions());
+ }
+ }
+ }
+ }
+
+ }
+
+ return appConfigurationEntry;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
new file mode 100644
index 0000000..5e84c7e
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.security.SecurityContext;
+
+import java.util.Map;
+
+/*
+ * Test security context to support handling both client and server principals in MiniKDC
+ * This class is used only in integration test code for connectors like Kafka, HDFS etc.,
+ */
+@Internal
+public class TestingSecurityContext {
+
+ public static void install(SecurityContext.SecurityConfiguration config,
+ Map<String, ClientSecurityConfiguration> clientSecurityConfigurationMap)
+ throws Exception {
+
+ SecurityContext.install(config);
+
+ // establish the JAAS config for Test environment
+ TestingJaasConfiguration jaasConfig = new TestingJaasConfiguration(config.getKeytab(),
+ config.getPrincipal(), clientSecurityConfigurationMap);
+ javax.security.auth.login.Configuration.setConfiguration(jaasConfig);
+ }
+
+ public static class ClientSecurityConfiguration {
+
+ private String principal;
+
+ private String keytab;
+
+ private String moduleName;
+
+ private String jaasServiceName;
+
+ public String getPrincipal() {
+ return principal;
+ }
+
+ public String getKeytab() {
+ return keytab;
+ }
+
+ public String getModuleName() {
+ return moduleName;
+ }
+
+ public String getJaasServiceName() {
+ return jaasServiceName;
+ }
+
+ public ClientSecurityConfiguration(String principal, String keytab, String moduleName, String jaasServiceName) {
+ this.principal = principal;
+ this.keytab = keytab;
+ this.moduleName = moduleName;
+ this.jaasServiceName = jaasServiceName;
+ }
+
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml
index ffdca36..68e4752 100644
--- a/flink-yarn-tests/pom.xml
+++ b/flink-yarn-tests/pom.xml
@@ -103,6 +103,13 @@ under the License.
<artifactId>akka-testkit_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minikdc</artifactId>
+ <version>${minikdc.version}</version>
+ </dependency>
+
</dependencies>
<build>
@@ -298,6 +305,19 @@ under the License.
<skip>true</skip>
</configuration>
</plugin>
+
+ <!--
+ https://issues.apache.org/jira/browse/DIRSHARED-134
+ Required to pull the Mini-KDC transitive dependency
+ -->
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <version>3.0.1</version>
+ <inherited>true</inherited>
+ <extensions>true</extensions>
+ </plugin>
+
</plugins>
</build>
</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index d03d9eb..a503115 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -180,7 +180,7 @@ public class FlinkYarnSessionCliTest {
Mockito.mock(YarnClient.class),
Mockito.mock(ApplicationReport.class),
config,
- new Path("/tmp"), false);
+ new Path("/temp"), false);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index a293348..9d6ff85 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -50,14 +50,14 @@ import java.util.concurrent.TimeUnit;
public class YARNHighAvailabilityITCase extends YarnTestBase {
- private static TestingServer zkServer;
+ protected static TestingServer zkServer;
- private static ActorSystem actorSystem;
+ protected static ActorSystem actorSystem;
- private static final int numberApplicationAttempts = 10;
+ protected static final int numberApplicationAttempts = 10;
@Rule
- public TemporaryFolder tmp = new TemporaryFolder();
+ public TemporaryFolder temp = new TemporaryFolder();
@BeforeClass
public static void setup() {
@@ -108,7 +108,11 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
flinkYarnClient.setConfigurationDirectory(confDirPath);
- String fsStateHandlePath = tmp.getRoot().getPath();
+ String fsStateHandlePath = temp.getRoot().getPath();
+
+ // load the configuration
+ File configDirectory = new File(confDirPath);
+ GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.loadConfiguration());
flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum=" +
http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index ddea4dd..650397d 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -23,10 +23,12 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.base.Joiner;
import org.apache.commons.io.FileUtils;
import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -516,6 +518,27 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
} catch(Throwable t) {
LOG.warn("Error while detached yarn session was running", t);
Assert.fail(t.getMessage());
+ } finally {
+
+ //cleanup the yarn-properties file
+ String confDirPath = System.getenv("FLINK_CONF_DIR");
+ File configDirectory = new File(confDirPath);
+ LOG.info("testDetachedPerJobYarnClusterInternal: Using configuration directory " + configDirectory.getAbsolutePath());
+
+ // load the configuration
+ LOG.info("testDetachedPerJobYarnClusterInternal: Trying to load configuration file");
+ GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
+
+ try {
+ File yarnPropertiesFile = FlinkYarnSessionCli.getYarnPropertiesLocation(GlobalConfiguration.loadConfiguration());
+ if(yarnPropertiesFile.exists()) {
+ LOG.info("testDetachedPerJobYarnClusterInternal: Cleaning up temporary Yarn address reference: {}", yarnPropertiesFile.getAbsolutePath());
+ yarnPropertiesFile.delete();
+ }
+ } catch (Exception e) {
+ LOG.warn("testDetachedPerJobYarnClusterInternal: Exception while deleting the JobManager address file", e);
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index 3caa0ee..ca696f9 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -100,6 +100,9 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
while(getRunningContainers() < 2) {
sleep(500);
}
+
+ //additional sleep for the JM/TM to start and establish connection
+ sleep(2000);
LOG.info("Two containers are running. Killing the application");
// kill application "externally".
@@ -121,6 +124,27 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
} catch(Throwable t) {
LOG.warn("Killing failed", t);
Assert.fail();
+ } finally {
+
+ //cleanup the yarn-properties file
+ String confDirPath = System.getenv("FLINK_CONF_DIR");
+ File configDirectory = new File(confDirPath);
+ LOG.info("testDetachedPerJobYarnClusterInternal: Using configuration directory " + configDirectory.getAbsolutePath());
+
+ // load the configuration
+ LOG.info("testDetachedPerJobYarnClusterInternal: Trying to load configuration file");
+ GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
+
+ try {
+ File yarnPropertiesFile = FlinkYarnSessionCli.getYarnPropertiesLocation(GlobalConfiguration.loadConfiguration());
+ if(yarnPropertiesFile.exists()) {
+ LOG.info("testDetachedPerJobYarnClusterInternal: Cleaning up temporary Yarn address reference: {}", yarnPropertiesFile.getAbsolutePath());
+ yarnPropertiesFile.delete();
+ }
+ } catch (Exception e) {
+ LOG.warn("testDetachedPerJobYarnClusterInternal: Exception while deleting the JobManager address file", e);
+ }
+
}
LOG.info("Finished testDetachedMode()");
http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
new file mode 100644
index 0000000..0b7c230
--- /dev/null
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.test.util.SecureTestEnvironment;
+import org.apache.flink.test.util.TestingSecurityContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class YARNSessionFIFOSecuredITCase extends YARNSessionFIFOITCase {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(YARNSessionFIFOSecuredITCase.class);
+
+ @BeforeClass
+ public static void setup() {
+
+ LOG.info("starting secure cluster environment for testing");
+
+ yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class);
+ yarnConfiguration.setInt(YarnConfiguration.NM_PMEM_MB, 768);
+ yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
+ yarnConfiguration.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-fifo-secured");
+
+ SecureTestEnvironment.prepare(tmp);
+
+ populateYarnSecureConfigurations(yarnConfiguration,SecureTestEnvironment.getHadoopServicePrincipal(),
+ SecureTestEnvironment.getTestKeytab());
+
+ Configuration flinkConfig = new Configuration();
+ flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY,
+ SecureTestEnvironment.getTestKeytab());
+ flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY,
+ SecureTestEnvironment.getHadoopServicePrincipal());
+
+ SecurityContext.SecurityConfiguration ctx = new SecurityContext.SecurityConfiguration();
+ ctx.setFlinkConfiguration(flinkConfig);
+ ctx.setHadoopConfiguration(yarnConfiguration);
+ try {
+ TestingSecurityContext.install(ctx, SecureTestEnvironment.getClientSecurityConfigurationMap());
+
+ SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() {
+ @Override
+ public Integer run() {
+ startYARNSecureMode(yarnConfiguration, SecureTestEnvironment.getHadoopServicePrincipal(),
+ SecureTestEnvironment.getTestKeytab());
+ return null;
+ }
+ });
+
+ } catch(Exception e) {
+ throw new RuntimeException("Exception occurred while setting up secure test context. Reason: {}", e);
+ }
+
+ }
+
+ @AfterClass
+ public static void teardownSecureCluster() throws Exception {
+ LOG.info("tearing down secure cluster environment");
+ SecureTestEnvironment.cleanup();
+ }
+
+ /* For secure cluster testing, it is enough to run only one test and override below test methods
+ * to keep the overall build time minimal
+ */
+ @Override
+ public void testQueryCluster() {}
+
+ @Override
+ public void testNonexistingQueue() {}
+
+ @Override
+ public void testResourceComputation() {}
+
+ @Override
+ public void testfullAlloc() {}
+
+ @Override
+ public void testJavaAPI() {}
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 6270010..605aa44 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.yarn;
import akka.actor.Identify;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.cli.CommandLineOptions;
import org.apache.flink.client.program.ClusterClient;
@@ -29,6 +30,8 @@ import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.TestLogger;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -62,6 +65,8 @@ import java.io.FileWriter;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.PrintStream;
+import java.io.BufferedWriter;
+import java.io.PrintWriter;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -123,6 +128,11 @@ public abstract class YarnTestBase extends TestLogger {
*/
protected static File flinkLibFolder;
+ /**
+ * Temporary folder where Flink configurations will be kept for secure run
+ */
+ protected static File tempConfPathForSecureRun = null;
+
static {
yarnConfiguration = new YarnConfiguration();
yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
@@ -140,6 +150,23 @@ public abstract class YarnTestBase extends TestLogger {
}
+ public static void populateYarnSecureConfigurations(Configuration conf, String principal, String keytab) {
+
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
+
+ conf.set(YarnConfiguration.RM_KEYTAB, keytab);
+ conf.set(YarnConfiguration.RM_PRINCIPAL, principal);
+ conf.set(YarnConfiguration.NM_KEYTAB, keytab);
+ conf.set(YarnConfiguration.NM_PRINCIPAL, principal);
+
+ conf.set(YarnConfiguration.RM_WEBAPP_SPNEGO_USER_NAME_KEY, principal);
+ conf.set(YarnConfiguration.RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY,keytab);
+ conf.set(YarnConfiguration.NM_WEBAPP_SPNEGO_USER_NAME_KEY, principal);
+ conf.set(YarnConfiguration.NM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY,keytab);
+
+ conf.set("hadoop.security.auth_to_local","RULE:[1:$1] RULE:[2:$1]");
+ }
/**
* Sleep a bit between the tests (we are re-using the YARN cluster for the tests)
@@ -336,8 +363,16 @@ public abstract class YarnTestBase extends TestLogger {
return count;
}
+ public static void startYARNSecureMode(Configuration conf, String principal, String keytab) {
+ start(conf, principal, keytab);
+ }
+
public static void startYARNWithConfig(Configuration conf) {
- // set the home directory to a tmp directory. Flink on YARN is using the home dir to distribute the file
+ start(conf,null,null);
+ }
+
+ private static void start(Configuration conf, String principal, String keytab) {
+ // set the home directory to a temp directory. Flink on YARN is using the home dir to distribute the file
File homeDir = null;
try {
homeDir = tmp.newFolder();
@@ -374,7 +409,39 @@ public abstract class YarnTestBase extends TestLogger {
File flinkConfDirPath = findFile(flinkDistRootDir, new ContainsName(new String[]{"flink-conf.yaml"}));
Assert.assertNotNull(flinkConfDirPath);
- map.put(ConfigConstants.ENV_FLINK_CONF_DIR, flinkConfDirPath.getParent());
+ if(!StringUtils.isBlank(principal) && !StringUtils.isBlank(keytab)) {
+ //copy conf dir to test temporary workspace location
+ tempConfPathForSecureRun = tmp.newFolder("conf");
+
+ String confDirPath = flinkConfDirPath.getParentFile().getAbsolutePath();
+ FileUtils.copyDirectory(new File(confDirPath), tempConfPathForSecureRun);
+
+ try(FileWriter fw = new FileWriter(new File(tempConfPathForSecureRun,"flink-conf.yaml"), true);
+ BufferedWriter bw = new BufferedWriter(fw);
+ PrintWriter out = new PrintWriter(bw))
+ {
+ LOG.info("writing keytab: " + keytab + " and principal: " + principal + " to config file");
+ out.println("");
+ out.println("#Security Configurations Auto Populated ");
+ out.println(ConfigConstants.SECURITY_KEYTAB_KEY + ": " + keytab);
+ out.println(ConfigConstants.SECURITY_PRINCIPAL_KEY + ": " + principal);
+ out.println("");
+ } catch (IOException e) {
+ LOG.error("Exception occured while trying to append the security configurations. Reason: {}", e.getMessage());
+ throw new RuntimeException(e);
+ }
+
+ String configDir = tempConfPathForSecureRun.getAbsolutePath();
+
+ LOG.info("Temporary Flink configuration directory to be used for secure test: {}", configDir);
+
+ Assert.assertNotNull(configDir);
+
+ map.put(ConfigConstants.ENV_FLINK_CONF_DIR, configDir);
+
+ } else {
+ map.put(ConfigConstants.ENV_FLINK_CONF_DIR, flinkConfDirPath.getParent());
+ }
File yarnConfFile = writeYarnSiteConfigXML(conf);
map.put("YARN_CONF_DIR", yarnConfFile.getParentFile().getAbsolutePath());
@@ -392,6 +459,7 @@ public abstract class YarnTestBase extends TestLogger {
LOG.error("setup failure", ex);
Assert.fail();
}
+
}
/**
@@ -421,7 +489,6 @@ public abstract class YarnTestBase extends TestLogger {
System.setOut(new PrintStream(outContent));
System.setErr(new PrintStream(errContent));
-
final int START_TIMEOUT_SECONDS = 60;
Runner runner = new Runner(args, type);
@@ -624,12 +691,23 @@ public abstract class YarnTestBase extends TestLogger {
@AfterClass
public static void teardown() throws Exception {
+
+ LOG.info("Stopping MiniYarn Cluster");
+ yarnCluster.stop();
+
// Unset FLINK_CONF_DIR, as it might change the behavior of other tests
Map<String, String> map = new HashMap<>(System.getenv());
map.remove(ConfigConstants.ENV_FLINK_CONF_DIR);
+ map.remove("YARN_CONF_DIR");
+ map.remove("IN_TESTS");
TestBaseUtils.setEnv(map);
- // When we are on travis, we copy the tmp files of JUnit (containing the MiniYARNCluster log files)
+ if(tempConfPathForSecureRun != null) {
+ FileUtil.fullyDelete(tempConfPathForSecureRun);
+ tempConfPathForSecureRun = null;
+ }
+
+ // When we are on travis, we copy the temp files of JUnit (containing the MiniYARNCluster log files)
// to <flinkRoot>/target/flink-yarn-tests-*.
// The files from there are picked up by the ./tools/travis_watchdog.sh script
// to upload them to Amazon S3.
@@ -646,6 +724,7 @@ public abstract class YarnTestBase extends TestLogger {
LOG.warn("Error copying the final files from {} to {}: msg: {}", src.getAbsolutePath(), target.getAbsolutePath(), e.getMessage(), e);
}
}
+
}
public static boolean isOnTravis() {
http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/resources/log4j-test.properties b/flink-yarn-tests/src/test/resources/log4j-test.properties
index e94ca26..8f56c1f 100644
--- a/flink-yarn-tests/src/test/resources/log4j-test.properties
+++ b/flink-yarn-tests/src/test/resources/log4j-test.properties
@@ -34,3 +34,8 @@ log4j.logger.org.apache.hadoop=OFF
log4j.logger.org.apache.flink.runtime.leaderelection=INFO
log4j.logger.org.apache.flink.runtime.leaderretrieval=INFO
+log4j.logger.org.apache.directory=OFF
+log4j.logger.org.mortbay.log=OFF, testlogger
+log4j.logger.net.sf.ehcache=OFF
+log4j.logger.org.apache.hadoop.metrics2=OFF
+log4j.logger.org.apache.hadoop.conf.Configuration=OFF