You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/21 09:52:43 UTC
[10/50] [abbrv] flink git commit: [FLINK-3929] additional fixes for
keytab security
[FLINK-3929] additional fixes for keytab security
- load flink-jaas.conf from classpath
- avoid using undocumented flink base dir config entry
- enable test cases to run on MacOS
- unify suffix of secure test cases
- fix error logging and reporting
This closes #2275
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/68709b08
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/68709b08
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/68709b08
Branch: refs/heads/flip-6
Commit: 68709b087570402cacb7bc3088e0eb35d83c8d32
Parents: 285b6f7
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Sep 20 15:41:35 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Sep 20 22:03:29 2016 +0200
----------------------------------------------------------------------
.../org/apache/flink/client/CliFrontend.java | 3 -
.../src/main/flink-bin/conf/flink-jaas.conf | 26 --------
.../flink/runtime/security/SecurityContext.java | 67 ++++++++------------
.../src/main/resources/flink-jaas.conf | 26 ++++++++
.../flink/runtime/taskmanager/TaskManager.scala | 2 -
.../runtime/security/SecurityContextTest.java | 4 +-
.../connectors/fs/RollingSinkSecuredITCase.java | 1 -
.../kafka/Kafka09SecureRunITCase.java | 62 ------------------
.../kafka/Kafka09SecuredRunITCase.java | 62 ++++++++++++++++++
.../flink/test/util/SecureTestEnvironment.java | 23 +++----
.../org/apache/flink/yarn/YarnTestBase.java | 3 +-
.../flink/yarn/YarnApplicationMasterRunner.java | 1 -
.../flink/yarn/YarnTaskManagerRunner.java | 5 +-
.../flink/yarn/cli/FlinkYarnSessionCli.java | 32 +++-------
14 files changed, 138 insertions(+), 179 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 575ffad..0711758 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -161,9 +161,6 @@ public class CliFrontend {
"filesystem scheme from configuration.", e);
}
- this.config.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDirectory.getAbsolutePath()
- + ".." + File.separator);
-
this.clientTimeout = AkkaUtils.getClientTimeout(config);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-dist/src/main/flink-bin/conf/flink-jaas.conf
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/conf/flink-jaas.conf b/flink-dist/src/main/flink-bin/conf/flink-jaas.conf
deleted file mode 100644
index d476e24..0000000
--- a/flink-dist/src/main/flink-bin/conf/flink-jaas.conf
+++ /dev/null
@@ -1,26 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-# We are using this file as an workaround for the Kafka and ZK SASL implementation
-# since they explicitly look for java.security.auth.login.config property
-# The file itself is not used by the application since the internal implementation
-# uses a process-wide in-memory java security configuration object.
-# Please do not edit/delete this file - See FLINK-3929
-sample {
- useKeyTab=false
- useTicketCache=true;
-};
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
index 4b8b69b..be6611f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
@@ -22,7 +22,8 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.util.Preconditions;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
@@ -33,7 +34,12 @@ import org.slf4j.LoggerFactory;
import javax.security.auth.Subject;
import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
import java.lang.reflect.Method;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
import java.security.PrivilegedExceptionAction;
import java.util.Collection;
@@ -170,15 +176,12 @@ public class SecurityContext {
* Kafka current code behavior.
*/
private static void populateSystemSecurityProperties(Configuration configuration) {
+ Preconditions.checkNotNull(configuration, "The supplied configuation was null");
//required to be empty for Kafka but we will override the property
//with pseudo JAAS configuration file if SASL auth is enabled for ZK
System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, "");
- if(configuration == null) {
- return;
- }
-
boolean disableSaslClient = configuration.getBoolean(ConfigConstants.ZOOKEEPER_SASL_DISABLE,
ConfigConstants.DEFAULT_ZOOKEEPER_SASL_DISABLE);
if(disableSaslClient) {
@@ -188,46 +191,26 @@ public class SecurityContext {
return;
}
- String baseDir = configuration.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, null);
- if(baseDir == null) {
- String message = "SASL auth is enabled for ZK but unable to locate pseudo Jaas config " +
- "since " + ConfigConstants.FLINK_BASE_DIR_PATH_KEY + " is not provided";
- LOG.error(message);
- throw new IllegalConfigurationException(message);
- }
-
- File f = new File(baseDir);
- if(!f.exists() || !f.isDirectory()) {
- LOG.error("Invalid flink base directory {} configuration provided", baseDir);
- throw new IllegalConfigurationException("Invalid flink base directory configuration provided");
- }
-
- File jaasConfigFile = new File(f, JAAS_CONF_FILENAME);
-
- if (!jaasConfigFile.exists() || !jaasConfigFile.isFile()) {
-
- //check if there is a conf directory
- File confDir = new File(f, "conf");
- if(!confDir.exists() || !confDir.isDirectory()) {
- LOG.error("Could not locate " + JAAS_CONF_FILENAME);
- throw new IllegalConfigurationException("Could not locate " + JAAS_CONF_FILENAME);
- }
-
- jaasConfigFile = new File(confDir, JAAS_CONF_FILENAME);
-
- if (!jaasConfigFile.exists() || !jaasConfigFile.isFile()) {
- LOG.error("Could not locate " + JAAS_CONF_FILENAME);
- throw new IllegalConfigurationException("Could not locate " + JAAS_CONF_FILENAME);
- }
+ // load Jaas config file to initialize SASL
+ final File jaasConfFile;
+ try {
+ Path jaasConfPath = Files.createTempFile(JAAS_CONF_FILENAME, "");
+ InputStream jaasConfStream = SecurityContext.class.getClassLoader().getResourceAsStream(JAAS_CONF_FILENAME);
+ Files.copy(jaasConfStream, jaasConfPath, StandardCopyOption.REPLACE_EXISTING);
+ jaasConfFile = jaasConfPath.toFile();
+ jaasConfFile.deleteOnExit();
+ } catch (IOException e) {
+ throw new RuntimeException("SASL auth is enabled for ZK but unable to " +
+ "locate pseudo Jaas config provided with Flink", e);
}
LOG.info("Enabling {} property with pseudo JAAS config file: {}",
- JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfigFile);
+ JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfFile.getAbsolutePath());
//ZK client module lookup the configuration to handle SASL.
//https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L900
- System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfigFile.getAbsolutePath());
- System.setProperty(ZOOKEEPER_SASL_CLIENT,"true");
+ System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfFile.getAbsolutePath());
+ System.setProperty(ZOOKEEPER_SASL_CLIENT, "true");
String zkSaslServiceName = configuration.getString(ConfigConstants.ZOOKEEPER_SASL_SERVICE_NAME, null);
if(!StringUtils.isBlank(zkSaslServiceName)) {
@@ -250,6 +233,10 @@ public class SecurityContext {
String principal;
+ public SecurityConfiguration() {
+ this.flinkConf = GlobalConfiguration.loadConfiguration();
+ }
+
public String getKeytab() {
return keytab;
}
@@ -310,4 +297,4 @@ public class SecurityContext {
T run() throws Exception;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-runtime/src/main/resources/flink-jaas.conf
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/resources/flink-jaas.conf b/flink-runtime/src/main/resources/flink-jaas.conf
new file mode 100644
index 0000000..7f0f06b
--- /dev/null
+++ b/flink-runtime/src/main/resources/flink-jaas.conf
@@ -0,0 +1,26 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+# We are using this file as an workaround for the Kafka and ZK SASL implementation
+# since they explicitly look for java.security.auth.login.config property
+# The file itself is not used by the application since the internal implementation
+# uses a process-wide in-memory java security configuration object.
+# Please do not edit/delete this file - See FLINK-3929
+sample {
+ useKeyTab=false
+ useTicketCache=true;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 8534ee1..9e2feb5 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1583,8 +1583,6 @@ object TaskManager {
}
}
- conf.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, cliConfig.getConfigDir() + "/..")
-
conf
}
http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java
index 5f3d76a..3c48e8f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java
@@ -35,7 +35,7 @@ public class SecurityContextTest {
SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration();
try {
SecurityContext.install(sc);
- assertEquals(UserGroupInformation.getLoginUser().getUserName(),getOSUserName());
+ assertEquals(UserGroupInformation.getLoginUser().getUserName(), getOSUserName());
} catch (Exception e) {
fail(e.getMessage());
}
@@ -59,7 +59,7 @@ public class SecurityContextTest {
if( osName.contains( "windows" ) ){
className = "com.sun.security.auth.module.NTSystem";
}
- else if( osName.contains( "linux" ) ){
+ else if( osName.contains( "linux" ) || osName.contains( "mac" ) ){
className = "com.sun.security.auth.module.UnixSystem";
}
else if( osName.contains( "solaris" ) || osName.contains( "sunos" ) ){
http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
index 930ddd2..051175a 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -227,7 +227,6 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
TestStreamEnvironment.setAsContext(cluster, DEFAULT_PARALLELISM);
} catch (Exception e) {
- LOG.error("Exception occured while creating MiniFlink cluster. Reason: {}", e);
throw new RuntimeException(e);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecureRunITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecureRunITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecureRunITCase.java
deleted file mode 100644
index d12ec65..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecureRunITCase.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.test.util.SecureTestEnvironment;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/*
- * Kafka Secure Connection (kerberos) IT test case
- */
-public class Kafka09SecureRunITCase extends KafkaConsumerTestBase {
-
- protected static final Logger LOG = LoggerFactory.getLogger(Kafka09SecureRunITCase.class);
-
- @BeforeClass
- public static void prepare() throws IOException, ClassNotFoundException {
- LOG.info("-------------------------------------------------------------------------");
- LOG.info(" Starting Kafka09SecureRunITCase ");
- LOG.info("-------------------------------------------------------------------------");
-
- SecureTestEnvironment.prepare(tempFolder);
- SecureTestEnvironment.populateFlinkSecureConfigurations(getFlinkConfiguration());
-
- startClusters(true);
- }
-
- @AfterClass
- public static void shutDownServices() {
- shutdownClusters();
- SecureTestEnvironment.cleanup();
- }
-
-
- //timeout interval is large since in Travis, ZK connection timeout occurs frequently
- //The timeout for the test case is 2 times timeout of ZK connection
- @Test(timeout = 600000)
- public void testMultipleTopics() throws Exception {
- runProduceConsumeMultipleTopics();
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
new file mode 100644
index 0000000..e748537
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.test.util.SecureTestEnvironment;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/*
+ * Kafka Secure Connection (kerberos) IT test case
+ */
+public class Kafka09SecuredRunITCase extends KafkaConsumerTestBase {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(Kafka09SecuredRunITCase.class);
+
+ @BeforeClass
+ public static void prepare() throws IOException, ClassNotFoundException {
+ LOG.info("-------------------------------------------------------------------------");
+ LOG.info(" Starting Kafka09SecuredRunITCase ");
+ LOG.info("-------------------------------------------------------------------------");
+
+ SecureTestEnvironment.prepare(tempFolder);
+ SecureTestEnvironment.populateFlinkSecureConfigurations(getFlinkConfiguration());
+
+ startClusters(true);
+ }
+
+ @AfterClass
+ public static void shutDownServices() {
+ shutdownClusters();
+ SecureTestEnvironment.cleanup();
+ }
+
+
+ //timeout interval is large since in Travis, ZK connection timeout occurs frequently
+ //The timeout for the test case is 2 times timeout of ZK connection
+ @Test(timeout = 600000)
+ public void testMultipleTopics() throws Exception {
+ runProduceConsumeMultipleTopics();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
index 00b19f1..b5e622b 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
@@ -20,6 +20,7 @@ package org.apache.flink.test.util;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.security.SecurityContext;
import org.apache.hadoop.minikdc.MiniKdc;
import org.junit.rules.TemporaryFolder;
@@ -60,12 +61,10 @@ public class SecureTestEnvironment {
private static String hadoopServicePrincipal = null;
- private static File baseDirForSecureRun = null;
-
public static void prepare(TemporaryFolder tempFolder) {
try {
- baseDirForSecureRun = tempFolder.newFolder();
+ File baseDirForSecureRun = tempFolder.newFolder();
LOG.info("Base Directory for Secure Environment: {}", baseDirForSecureRun);
String hostName = "localhost";
@@ -113,19 +112,17 @@ public class SecureTestEnvironment {
//See Yarn test case module for reference
createJaasConfig(baseDirForSecureRun);
SecurityContext.SecurityConfiguration ctx = new SecurityContext.SecurityConfiguration();
- Configuration flinkConfig = new Configuration();
+ Configuration flinkConfig = GlobalConfiguration.loadConfiguration();
flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, testKeytab);
flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, testPrincipal);
flinkConfig.setBoolean(ConfigConstants.ZOOKEEPER_SASL_DISABLE, false);
- flinkConfig.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, baseDirForSecureRun.getAbsolutePath());
ctx.setFlinkConfiguration(flinkConfig);
TestingSecurityContext.install(ctx, getClientSecurityConfigurationMap());
- populateSystemEnvVariables();
+ populateJavaPropertyVariables();
} catch(Exception e) {
- LOG.error("Exception occured while preparing secure environment. Reason: {}", e);
- throw new RuntimeException(e);
+ throw new RuntimeException("Exception occured while preparing secure environment.", e);
}
}
@@ -145,14 +142,12 @@ public class SecureTestEnvironment {
testPrincipal = null;
testZkServerPrincipal = null;
hadoopServicePrincipal = null;
- baseDirForSecureRun = null;
}
- private static void populateSystemEnvVariables() {
+ private static void populateJavaPropertyVariables() {
if(LOG.isDebugEnabled()) {
- System.setProperty("FLINK_JAAS_DEBUG", "true");
System.setProperty("sun.security.krb5.debug", "true");
}
@@ -165,7 +160,6 @@ public class SecureTestEnvironment {
private static void resetSystemEnvVariables() {
System.clearProperty("java.security.krb5.conf");
- System.clearProperty("FLINK_JAAS_DEBUG");
System.clearProperty("sun.security.krb5.debug");
System.clearProperty("zookeeper.authProvider.1");
@@ -227,7 +221,7 @@ public class SecureTestEnvironment {
}
/*
- * Helper method to create a temporary JAAS configuration file to ger around the Kafka and ZK SASL
+ * Helper method to create a temporary JAAS configuration file to get around the Kafka and ZK SASL
* implementation lookup java.security.auth.login.config
*/
private static void createJaasConfig(File baseDirForSecureRun) {
@@ -241,8 +235,7 @@ public class SecureTestEnvironment {
out.println("useTicketCache=true;");
out.println("};");
} catch (IOException e) {
- LOG.error("Exception occured while trying to create JAAS config. Reason: {}", e.getMessage());
- throw new RuntimeException(e);
+ throw new RuntimeException("Exception occured while trying to create JAAS config.", e);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 605aa44..afdd400 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -427,8 +427,7 @@ public abstract class YarnTestBase extends TestLogger {
out.println(ConfigConstants.SECURITY_PRINCIPAL_KEY + ": " + principal);
out.println("");
} catch (IOException e) {
- LOG.error("Exception occured while trying to append the security configurations. Reason: {}", e.getMessage());
- throw new RuntimeException(e);
+ throw new RuntimeException("Exception occured while trying to append the security configurations.", e);
}
String configDir = tempConfPathForSecureRun.getAbsolutePath();
http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index efb658a..b27876b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -176,7 +176,6 @@ public class YarnApplicationMasterRunner {
flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
}
- flinkConfig.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, currDir);
SecurityContext.install(sc.setFlinkConfiguration(flinkConfig));
http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
index c70a30b..21ed52e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
@@ -127,7 +127,6 @@ public class YarnTaskManagerRunner {
configuration.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
configuration.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
}
- configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, currDir);
SecurityContext.install(sc.setFlinkConfiguration(configuration));
@@ -145,9 +144,9 @@ public class YarnTaskManagerRunner {
}
});
} catch(Exception e) {
- LOG.error("Exception occurred while launching Task Manager. Reason: {}", e);
+ LOG.error("Exception occurred while launching Task Manager", e);
throw new RuntimeException(e);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index b5364f0..d09340c 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -24,7 +24,6 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.configuration.ConfigConstants;
@@ -463,27 +462,17 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
}
}
- public static void main(final String[] args) {
- final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "", true); // no prefix for the YARN session
-
- String confDirPath = CliFrontend.getConfigurationDirectoryFromEnv();
- GlobalConfiguration.loadConfiguration(confDirPath);
+ public static void main(final String[] args) throws Exception {
+ final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN session
Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration();
- flinkConfiguration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, confDirPath);
- try {
- SecurityContext.install(new SecurityContext.SecurityConfiguration().setFlinkConfiguration(flinkConfiguration));
- int retCode = SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() {
- @Override
- public Integer run() {
- return cli.run(args);
- }
- });
- System.exit(retCode);
- } catch(Exception e) {
- e.printStackTrace();
- LOG.error("Exception Occured. Reason: {}", e);
- return;
- }
+ SecurityContext.install(new SecurityContext.SecurityConfiguration().setFlinkConfiguration(flinkConfiguration));
+ int retCode = SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() {
+ @Override
+ public Integer run() {
+ return cli.run(args);
+ }
+ });
+ System.exit(retCode);
}
@Override
@@ -544,7 +533,6 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
try {
return yarnClusterDescriptor.deploy();
} catch (Exception e) {
- LOG.error("Error while deploying YARN cluster: "+e.getMessage(), e);
throw new RuntimeException("Error deploying the YARN cluster", e);
}