You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/09/20 20:08:59 UTC

[2/5] flink git commit: [FLINK-3929] conditionally skip RollingSinkSecuredITCase

[FLINK-3929] conditionally skip RollingSinkSecuredITCase

- for now, we skip this test class until Hadoop version 3.x.x.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/285b6f74
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/285b6f74
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/285b6f74

Branch: refs/heads/master
Commit: 285b6f74e0416b200229bd67cb7521e4a6871bbc
Parents: 25a622f
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu Sep 1 12:49:53 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Sep 20 22:03:29 2016 +0200

----------------------------------------------------------------------
 .../connectors/fs/RollingSinkSecuredITCase.java | 37 +++++++++++++++-----
 .../connectors/kafka/KafkaTestBase.java         |  8 +++--
 2 files changed, 35 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/285b6f74/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
index 86cedaf..930ddd2 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -31,9 +31,10 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.VersionInfo;
 import org.junit.AfterClass;
+import org.junit.Assume;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,15 +58,31 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
 
 /**
  * Tests for running {@link RollingSinkSecuredITCase} which is an extension of {@link RollingSink} in secure environment
+ * Note: only executed for Hadoop version > 3.x.x
  */
-
-//The test is disabled since MiniDFS secure run requires lower order ports to be used.
-//We can enable the test when the fix is available (HDFS-9213)
-@Ignore
 public class RollingSinkSecuredITCase extends RollingSinkITCase {
 
 	protected static final Logger LOG = LoggerFactory.getLogger(RollingSinkSecuredITCase.class);
 
+	/**
+	 * Skips all tests if the Hadoop version doesn't match.
+	 * We can't run this test class until HDFS-9213 is fixed which allows a secure DataNode
+	 * to bind to non-privileged ports for testing.
+	 * For now, we skip this test class until Hadoop version 3.x.x.
+	 */
+	private static void skipIfHadoopVersionIsNotAppropriate() {
+		// Skips all tests if the Hadoop version doesn't match
+		String hadoopVersionString = VersionInfo.getVersion();
+		String[] split = hadoopVersionString.split("\\.");
+		if (split.length != 3) {
+			throw new IllegalStateException("Hadoop version was not of format 'X.X.X': " + hadoopVersionString);
+		}
+		Assume.assumeTrue(
+			// check whether we're running Hadoop version >= 3.x.x
+			Integer.parseInt(split[0]) >= 3
+		);
+	}
+
 	/*
 	 * override super class static methods to avoid creating MiniDFS and MiniFlink with wrong configurations
 	 * and out-of-order sequence for secure cluster
@@ -85,6 +102,8 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
 	@BeforeClass
 	public static void startSecureCluster() throws Exception {
 
+		skipIfHadoopVersionIsNotAppropriate();
+
 		LOG.info("starting secure cluster environment for testing");
 
 		dataDir = tempFolder.newFolder();
@@ -143,7 +162,9 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
 		TestStreamEnvironment.unsetAsContext();
 		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
 
-		hdfsCluster.shutdown();
+		if (hdfsCluster != null) {
+			hdfsCluster.shutdown();
+		}
 
 		SecureTestEnvironment.cleanup();
 	}
@@ -229,4 +250,4 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
 	@Override
 	public void testDateTimeRollingStringWriter() throws Exception {}
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/285b6f74/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index afdd158..5cec4f0 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -110,7 +110,7 @@ public abstract class KafkaTestBase extends TestLogger {
 	}
 
 	protected static Configuration getFlinkConfiguration() {
-		Configuration flinkConfig = new Configuration();;
+		Configuration flinkConfig = new Configuration();
 		flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
 		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
 		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
@@ -134,7 +134,11 @@ public abstract class KafkaTestBase extends TestLogger {
 
 		brokerConnectionStrings = kafkaServer.getBrokerConnectionString();
 
-		if(kafkaServer.isSecureRunSupported() && secureMode) {
+		if (secureMode) {
+			if (!kafkaServer.isSecureRunSupported()) {
+				throw new IllegalStateException(
+					"Attempting to test in secure mode but secure mode not supported by the KafkaTestEnvironment.");
+			}
 			secureProps = kafkaServer.getSecureProperties();
 		}