You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/09/20 20:08:59 UTC
[2/5] flink git commit: [FLINK-3929] conditionally skip
RollingSinkSecuredITCase
[FLINK-3929] conditionally skip RollingSinkSecuredITCase
- for now, we skip this test class until Hadoop version 3.x.x.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/285b6f74
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/285b6f74
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/285b6f74
Branch: refs/heads/master
Commit: 285b6f74e0416b200229bd67cb7521e4a6871bbc
Parents: 25a622f
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu Sep 1 12:49:53 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Sep 20 22:03:29 2016 +0200
----------------------------------------------------------------------
.../connectors/fs/RollingSinkSecuredITCase.java | 37 +++++++++++++++-----
.../connectors/kafka/KafkaTestBase.java | 8 +++--
2 files changed, 35 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/285b6f74/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
index 86cedaf..930ddd2 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -31,9 +31,10 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.VersionInfo;
import org.junit.AfterClass;
+import org.junit.Assume;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,15 +58,31 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
/**
* Tests for running {@link RollingSinkSecuredITCase} which is an extension of {@link RollingSink} in secure environment
+ * Note: only executed for Hadoop version > 3.x.x
*/
-
-//The test is disabled since MiniDFS secure run requires lower order ports to be used.
-//We can enable the test when the fix is available (HDFS-9213)
-@Ignore
public class RollingSinkSecuredITCase extends RollingSinkITCase {
protected static final Logger LOG = LoggerFactory.getLogger(RollingSinkSecuredITCase.class);
+ /**
+ * Skips all tests if the Hadoop version doesn't match.
+ * We can't run this test class until HDFS-9213 is fixed which allows a secure DataNode
+ * to bind to non-privileged ports for testing.
+ * For now, we skip this test class until Hadoop version 3.x.x.
+ */
+ private static void skipIfHadoopVersionIsNotAppropriate() {
+ // Skips all tests if the Hadoop version doesn't match
+ String hadoopVersionString = VersionInfo.getVersion();
+ String[] split = hadoopVersionString.split("\\.");
+ if (split.length != 3) {
+ throw new IllegalStateException("Hadoop version was not of format 'X.X.X': " + hadoopVersionString);
+ }
+ Assume.assumeTrue(
+ // check whether we're running Hadoop version >= 3.x.x
+ Integer.parseInt(split[0]) >= 3
+ );
+ }
+
/*
* override super class static methods to avoid creating MiniDFS and MiniFlink with wrong configurations
* and out-of-order sequence for secure cluster
@@ -85,6 +102,8 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
@BeforeClass
public static void startSecureCluster() throws Exception {
+ skipIfHadoopVersionIsNotAppropriate();
+
LOG.info("starting secure cluster environment for testing");
dataDir = tempFolder.newFolder();
@@ -143,7 +162,9 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
TestStreamEnvironment.unsetAsContext();
stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
- hdfsCluster.shutdown();
+ if (hdfsCluster != null) {
+ hdfsCluster.shutdown();
+ }
SecureTestEnvironment.cleanup();
}
@@ -229,4 +250,4 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
@Override
public void testDateTimeRollingStringWriter() throws Exception {}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/285b6f74/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index afdd158..5cec4f0 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -110,7 +110,7 @@ public abstract class KafkaTestBase extends TestLogger {
}
protected static Configuration getFlinkConfiguration() {
- Configuration flinkConfig = new Configuration();;
+ Configuration flinkConfig = new Configuration();
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
@@ -134,7 +134,11 @@ public abstract class KafkaTestBase extends TestLogger {
brokerConnectionStrings = kafkaServer.getBrokerConnectionString();
- if(kafkaServer.isSecureRunSupported() && secureMode) {
+ if (secureMode) {
+ if (!kafkaServer.isSecureRunSupported()) {
+ throw new IllegalStateException(
+ "Attempting to test in secure mode but secure mode not supported by the KafkaTestEnvironment.");
+ }
secureProps = kafkaServer.getSecureProperties();
}