You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/11 20:19:44 UTC
[6/8] flink git commit: [FLINK-5364] [security] Rework JAAS
configuration to support user-supplied entries
http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-runtime/src/main/resources/flink-jaas.conf
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/resources/flink-jaas.conf b/flink-runtime/src/main/resources/flink-jaas.conf
index 7f0f06b..d287ff4 100644
--- a/flink-runtime/src/main/resources/flink-jaas.conf
+++ b/flink-runtime/src/main/resources/flink-jaas.conf
@@ -1,3 +1,4 @@
+/**
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
@@ -17,10 +18,6 @@
################################################################################
# We are using this file as an workaround for the Kafka and ZK SASL implementation
# since they explicitly look for java.security.auth.login.config property
-# The file itself is not used by the application since the internal implementation
-# uses a process-wide in-memory java security configuration object.
# Please do not edit/delete this file - See FLINK-3929
-sample {
- useKeyTab=false
- useTicketCache=true;
-}
+**/
+
http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java
index 0570f28..1847ec4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.clusterframework.overlays;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.junit.Rule;
@@ -46,7 +47,7 @@ public class KeytabOverlayTest extends ContainerOverlayTestBase {
ContainerSpecification spec = new ContainerSpecification();
overlay.configure(spec);
- assertEquals(TARGET_PATH.getPath(), spec.getDynamicConfiguration().getString(ConfigConstants.SECURITY_KEYTAB_KEY, null));
+ assertEquals(TARGET_PATH.getPath(), spec.getDynamicConfiguration().getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB));
checkArtifact(spec, TARGET_PATH);
}
@@ -64,7 +65,7 @@ public class KeytabOverlayTest extends ContainerOverlayTestBase {
final Configuration conf = new Configuration();
File keytab = tempFolder.newFile();
- conf.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytab.getAbsolutePath());
+ conf.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytab.getAbsolutePath());
KeytabOverlay.Builder builder = KeytabOverlay.newBuilder().fromEnvironment(conf);
assertEquals(builder.keytabPath, keytab);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-runtime/src/test/java/org/apache/flink/runtime/security/JaasConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/JaasConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/JaasConfigurationTest.java
deleted file mode 100644
index 89e5ef9..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/JaasConfigurationTest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.security;
-
-import org.apache.hadoop.security.authentication.util.KerberosUtil;
-import org.junit.Test;
-
-import javax.security.auth.login.AppConfigurationEntry;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests for the {@link JaasConfiguration}.
- */
-public class JaasConfigurationTest {
-
- @Test
- public void testInvalidKerberosParams() {
- String keytab = "user.keytab";
- String principal = null;
- try {
- new JaasConfiguration(keytab, principal);
- } catch(RuntimeException re) {
- assertEquals("Both keytab and principal are required and cannot be empty",re.getMessage());
- }
- }
-
- @Test
- public void testDefaultAceEntry() {
- JaasConfiguration conf = new JaasConfiguration(null,null);
- javax.security.auth.login.Configuration.setConfiguration(conf);
- final AppConfigurationEntry[] entry = conf.getAppConfigurationEntry("test");
- AppConfigurationEntry ace = entry[0];
- assertEquals(ace.getLoginModuleName(), KerberosUtil.getKrb5LoginModuleName());
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-runtime/src/test/java/org/apache/flink/runtime/security/KerberosUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/KerberosUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/KerberosUtilsTest.java
new file mode 100644
index 0000000..4c899e8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/KerberosUtilsTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.junit.Test;
+
+import javax.security.auth.login.AppConfigurationEntry;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests for the {@link KerberosUtils}.
+ */
+public class KerberosUtilsTest {
+
+ @Test
+ public void testTicketCacheEntry() {
+ AppConfigurationEntry entry = KerberosUtils.ticketCacheEntry();
+ assertNotNull(entry);
+ }
+
+ @Test
+ public void testKeytabEntry() {
+ String keytab = "user.keytab";
+ String principal = "user";
+ AppConfigurationEntry entry = KerberosUtils.keytabEntry(keytab, principal);
+ assertNotNull(entry);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
index 2648a7a..c5624f4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
@@ -18,97 +18,64 @@
package org.apache.flink.runtime.security;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.OperatingSystem;
-import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.flink.runtime.security.modules.SecurityModule;
import org.junit.AfterClass;
-import org.junit.Assert;
import org.junit.Test;
-import java.lang.reflect.Method;
+import java.util.Collections;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
/**
* Tests for the {@link SecurityUtils}.
*/
public class SecurityUtilsTest {
- @AfterClass
- public static void afterClass() {
- SecurityUtils.clearContext();
- System.setProperty(SecurityUtils.JAVA_SECURITY_AUTH_LOGIN_CONFIG, "");
- }
+ static class TestSecurityModule implements SecurityModule {
+ boolean installed;
- @Test
- public void testCreateInsecureHadoopCtx() {
- SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(new Configuration());
- try {
- SecurityUtils.install(sc);
- assertEquals(UserGroupInformation.getLoginUser().getUserName(), getOSUserName());
- } catch (Exception e) {
- fail(e.getMessage());
+ @Override
+ public void install(SecurityUtils.SecurityConfiguration configuration) throws SecurityInstallException {
+ installed = true;
}
- }
- @Test
- public void testInvalidUGIContext() {
- try {
- new HadoopSecurityContext(null);
- } catch (RuntimeException re) {
- assertEquals("UGI passed cannot be null",re.getMessage());
+ @Override
+ public void uninstall() throws SecurityInstallException {
+ installed = false;
}
}
+ @AfterClass
+ public static void afterClass() {
+ SecurityUtils.uninstall();
+ }
+
@Test
- /**
- * The Jaas configuration file provided should not be overridden.
- */
- public void testJaasPropertyOverride() throws Exception {
- String confFile = "jaas.conf";
- System.setProperty(SecurityUtils.JAVA_SECURITY_AUTH_LOGIN_CONFIG, confFile);
+ public void testModuleInstall() throws Exception {
+ SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(
+ new Configuration(), new org.apache.hadoop.conf.Configuration(),
+ Collections.singletonList(TestSecurityModule.class));
- SecurityUtils.install(new SecurityUtils.SecurityConfiguration(new Configuration()));
+ SecurityUtils.install(sc);
+ assertEquals(1, SecurityUtils.getInstalledModules().size());
+ TestSecurityModule testModule = (TestSecurityModule) SecurityUtils.getInstalledModules().get(0);
+ assertTrue(testModule.installed);
- Assert.assertEquals(
- confFile,
- System.getProperty(SecurityUtils.JAVA_SECURITY_AUTH_LOGIN_CONFIG));
+ SecurityUtils.uninstall();
+ assertNull(SecurityUtils.getInstalledModules());
+ assertFalse(testModule.installed);
}
+ @Test
+ public void testSecurityContext() throws Exception {
+ SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(
+ new Configuration(), new org.apache.hadoop.conf.Configuration(),
+ Collections.singletonList(TestSecurityModule.class));
- private String getOSUserName() throws Exception {
- String userName = "";
- OperatingSystem os = OperatingSystem.getCurrentOperatingSystem();
- String className;
- String methodName;
-
- switch(os) {
- case LINUX:
- case MAC_OS:
- className = "com.sun.security.auth.module.UnixSystem";
- methodName = "getUsername";
- break;
- case WINDOWS:
- className = "com.sun.security.auth.module.NTSystem";
- methodName = "getName";
- break;
- case SOLARIS:
- className = "com.sun.security.auth.module.SolarisSystem";
- methodName = "getUsername";
- break;
- case FREE_BSD:
- case UNKNOWN:
- default:
- className = null;
- methodName = null;
- }
+ SecurityUtils.install(sc);
+ assertEquals(HadoopSecurityContext.class, SecurityUtils.getInstalledContext().getClass());
- if( className != null ){
- Class<?> c = Class.forName( className );
- Method method = c.getDeclaredMethod( methodName );
- Object o = c.newInstance();
- userName = (String) method.invoke( o );
- }
- return userName;
+ SecurityUtils.uninstall();
+ assertEquals(NoOpSecurityContext.class, SecurityUtils.getInstalledContext().getClass());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
index de715c6..10450c3 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
@@ -18,10 +18,10 @@
package org.apache.flink.test.util;
-import org.apache.flink.configuration.ConfigConstants;
+
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.hadoop.minikdc.MiniKdc;
import org.junit.rules.TemporaryFolder;
@@ -30,10 +30,6 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.File;
-import java.io.FileWriter;
-import java.io.BufferedWriter;
-import java.io.PrintWriter;
-import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@@ -111,11 +107,11 @@ public class SecureTestEnvironment {
//the context can be reinitialized with Hadoop configuration by calling
//ctx.setHadoopConfiguration() for the UGI implementation to work properly.
//See Yarn test case module for reference
- createJaasConfig(baseDirForSecureRun);
Configuration flinkConfig = GlobalConfiguration.loadConfiguration();
- flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, testKeytab);
- flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, testPrincipal);
- flinkConfig.setBoolean(HighAvailabilityOptions.ZOOKEEPER_SASL_DISABLE, false);
+ flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, testKeytab);
+ flinkConfig.setBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE, false);
+ flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, testPrincipal);
+ flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS, "Client,KafkaClient");
SecurityUtils.SecurityConfiguration ctx = new SecurityUtils.SecurityConfiguration(flinkConfig);
TestingSecurityContext.install(ctx, getClientSecurityConfigurationMap());
@@ -178,8 +174,8 @@ public class SecureTestEnvironment {
conf = flinkConf;
}
- conf.setString(ConfigConstants.SECURITY_KEYTAB_KEY , testKeytab);
- conf.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY , testPrincipal);
+ conf.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB , testKeytab);
+ conf.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL , testPrincipal);
return conf;
}
@@ -190,22 +186,19 @@ public class SecureTestEnvironment {
if(testZkServerPrincipal != null ) {
TestingSecurityContext.ClientSecurityConfiguration zkServer =
- new TestingSecurityContext.ClientSecurityConfiguration(testZkServerPrincipal, testKeytab,
- "Server", "zk-server");
+ new TestingSecurityContext.ClientSecurityConfiguration(testZkServerPrincipal, testKeytab);
clientSecurityConfigurationMap.put("Server",zkServer);
}
if(testZkClientPrincipal != null ) {
TestingSecurityContext.ClientSecurityConfiguration zkClient =
- new TestingSecurityContext.ClientSecurityConfiguration(testZkClientPrincipal, testKeytab,
- "Client", "zk-client");
+ new TestingSecurityContext.ClientSecurityConfiguration(testZkClientPrincipal, testKeytab);
clientSecurityConfigurationMap.put("Client",zkClient);
}
if(testKafkaServerPrincipal != null ) {
TestingSecurityContext.ClientSecurityConfiguration kafkaServer =
- new TestingSecurityContext.ClientSecurityConfiguration(testKafkaServerPrincipal, testKeytab,
- "KafkaServer", "kafka-server");
+ new TestingSecurityContext.ClientSecurityConfiguration(testKafkaServerPrincipal, testKeytab);
clientSecurityConfigurationMap.put("KafkaServer",kafkaServer);
}
@@ -220,23 +213,4 @@ public class SecureTestEnvironment {
return hadoopServicePrincipal;
}
- /*
- * Helper method to create a temporary JAAS configuration file to get around the Kafka and ZK SASL
- * implementation lookup java.security.auth.login.config
- */
- private static void createJaasConfig(File baseDirForSecureRun) {
-
- try(FileWriter fw = new FileWriter(new File(baseDirForSecureRun, SecurityUtils.JAAS_CONF_FILENAME), true);
- BufferedWriter bw = new BufferedWriter(fw);
- PrintWriter out = new PrintWriter(bw))
- {
- out.println("sample {");
- out.println("useKeyTab=false");
- out.println("useTicketCache=true;");
- out.println("};");
- } catch (IOException e) {
- throw new RuntimeException("Exception occured while trying to create JAAS config.", e);
- }
-
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java
deleted file mode 100644
index 25b2362..0000000
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.util;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.security.JaasConfiguration;
-import org.apache.hadoop.security.authentication.util.KerberosUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.security.auth.login.AppConfigurationEntry;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * {@link TestingJaasConfiguration} for handling the integration test case since it requires to manage
- * client principal as well as server principals of Hadoop/ZK which expects the host name to be populated
- * in specific way (localhost vs 127.0.0.1). This provides an abstraction to handle more than one Login Module
- * since the default {@link JaasConfiguration} behavior only supports global/unique principal identifier
- */
-
-@Internal
-public class TestingJaasConfiguration extends JaasConfiguration {
-
- private static final Logger LOG = LoggerFactory.getLogger(TestingJaasConfiguration.class);
-
- public Map<String, TestingSecurityContext.ClientSecurityConfiguration> clientSecurityConfigurationMap;
-
- TestingJaasConfiguration(String keytab, String principal, Map<String,
- TestingSecurityContext.ClientSecurityConfiguration> clientSecurityConfigurationMap) {
- super(keytab, principal);
- this.clientSecurityConfigurationMap = clientSecurityConfigurationMap;
- }
-
- @Override
- public AppConfigurationEntry[] getAppConfigurationEntry(String applicationName) {
-
- LOG.debug("In TestingJaasConfiguration - Application Requested: {}", applicationName);
-
- AppConfigurationEntry[] appConfigurationEntry = super.getAppConfigurationEntry(applicationName);
-
- if(clientSecurityConfigurationMap != null && clientSecurityConfigurationMap.size() > 0) {
-
- if(clientSecurityConfigurationMap.containsKey(applicationName)) {
-
- LOG.debug("In TestingJaasConfiguration - Application: {} found in the supplied context", applicationName);
-
- TestingSecurityContext.ClientSecurityConfiguration conf = clientSecurityConfigurationMap.get(applicationName);
-
- if(appConfigurationEntry != null && appConfigurationEntry.length > 0) {
-
- for(int count=0; count < appConfigurationEntry.length; count++) {
-
- AppConfigurationEntry ace = appConfigurationEntry[count];
-
- if (ace.getOptions().containsKey("keyTab")) {
-
- String keyTab = conf.getKeytab();
- String principal = conf.getPrincipal();
-
- LOG.debug("In TestingJaasConfiguration - Application: {} from the supplied context will " +
- "use Client Specific Keytab: {} and Principal: {}", applicationName, keyTab, principal);
-
- Map<String, String> newKeytabKerberosOptions = new HashMap<>();
- newKeytabKerberosOptions.putAll(getKeytabKerberosOptions());
-
- newKeytabKerberosOptions.put("keyTab", keyTab);
- newKeytabKerberosOptions.put("principal", principal);
-
- AppConfigurationEntry keytabKerberosAce = new AppConfigurationEntry(
- KerberosUtil.getKrb5LoginModuleName(),
- AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
- newKeytabKerberosOptions);
- appConfigurationEntry = new AppConfigurationEntry[] {keytabKerberosAce};
-
- LOG.debug("---->Login Module is using Keytab based configuration<------");
- LOG.debug("Login Module Name: " + keytabKerberosAce.getLoginModuleName());
- LOG.debug("Control Flag: " + keytabKerberosAce.getControlFlag());
- LOG.debug("Options: " + keytabKerberosAce.getOptions());
- }
- }
- }
- }
-
- }
-
- return appConfigurationEntry;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
index 4343013..ff1810b 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
@@ -19,10 +19,16 @@
package org.apache.flink.test.util;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.security.DynamicConfiguration;
+import org.apache.flink.runtime.security.KerberosUtils;
import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.security.modules.JaasModule;
+import javax.security.auth.login.AppConfigurationEntry;
import java.util.Map;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
/*
* Test security context to support handling both client and server principals in MiniKDC
* This class is used only in integration test code for connectors like Kafka, HDFS etc.,
@@ -36,21 +42,20 @@ public class TestingSecurityContext {
SecurityUtils.install(config);
- // establish the JAAS config for Test environment
- TestingJaasConfiguration jaasConfig = new TestingJaasConfiguration(config.getKeytab(),
- config.getPrincipal(), clientSecurityConfigurationMap);
- javax.security.auth.login.Configuration.setConfiguration(jaasConfig);
+ // install dynamic JAAS entries
+ checkArgument(config.getSecurityModules().contains(JaasModule.class));
+ DynamicConfiguration jaasConf = (DynamicConfiguration) javax.security.auth.login.Configuration.getConfiguration();
+ for(Map.Entry<String,ClientSecurityConfiguration> e : clientSecurityConfigurationMap.entrySet()) {
+ AppConfigurationEntry entry = KerberosUtils.keytabEntry(e.getValue().getKeytab(), e.getValue().getPrincipal());
+ jaasConf.addAppConfigurationEntry(e.getKey(), entry);
+ }
}
public static class ClientSecurityConfiguration {
- private String principal;
-
- private String keytab;
+ private final String principal;
- private String moduleName;
-
- private String jaasServiceName;
+ private final String keytab;
public String getPrincipal() {
return principal;
@@ -60,21 +65,10 @@ public class TestingSecurityContext {
return keytab;
}
- public String getModuleName() {
- return moduleName;
- }
-
- public String getJaasServiceName() {
- return jaasServiceName;
- }
-
- public ClientSecurityConfiguration(String principal, String keytab, String moduleName, String jaasServiceName) {
+ public ClientSecurityConfiguration(String principal, String keytab) {
this.principal = principal;
this.keytab = keytab;
- this.moduleName = moduleName;
- this.jaasServiceName = jaasServiceName;
}
-
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
index 45fd8d0..d3558a9 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
@@ -18,8 +18,8 @@
package org.apache.flink.yarn;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.test.util.SecureTestEnvironment;
import org.apache.flink.test.util.TestingSecurityContext;
@@ -53,13 +53,13 @@ public class YARNSessionFIFOSecuredITCase extends YARNSessionFIFOITCase {
SecureTestEnvironment.getTestKeytab());
Configuration flinkConfig = new Configuration();
- flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY,
+ flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB,
SecureTestEnvironment.getTestKeytab());
- flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY,
+ flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL,
SecureTestEnvironment.getHadoopServicePrincipal());
- SecurityUtils.SecurityConfiguration ctx = new SecurityUtils.SecurityConfiguration(flinkConfig);
- ctx.setHadoopConfiguration(yarnConfiguration);
+ SecurityUtils.SecurityConfiguration ctx = new SecurityUtils.SecurityConfiguration(flinkConfig,
+ yarnConfiguration);
try {
TestingSecurityContext.install(ctx, SecureTestEnvironment.getClientSecurityConfigurationMap());
http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index dc7cca3..ca8a0da 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -26,6 +26,7 @@ import org.apache.flink.client.cli.CommandLineOptions;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.flink.test.util.TestBaseUtils;
@@ -424,8 +425,8 @@ public abstract class YarnTestBase extends TestLogger {
LOG.info("writing keytab: " + keytab + " and principal: " + principal + " to config file");
out.println("");
out.println("#Security Configurations Auto Populated ");
- out.println(ConfigConstants.SECURITY_KEYTAB_KEY + ": " + keytab);
- out.println(ConfigConstants.SECURITY_PRINCIPAL_KEY + ": " + principal);
+ out.println(SecurityOptions.KERBEROS_LOGIN_KEYTAB.key() + ": " + keytab);
+ out.println(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL.key() + ": " + principal);
out.println("");
} catch (IOException e) {
throw new RuntimeException("Exception occured while trying to append the security configurations.", e);
http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index cf970b0..6cf3997 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -22,12 +22,12 @@ import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -407,6 +407,18 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
@Override
public YarnClusterClient deploy() {
try {
+ if(UserGroupInformation.isSecurityEnabled()) {
+ // note: UGI::hasKerberosCredentials inaccurately reports false
+ // for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786),
+ // so we check only in ticket cache scenario.
+ boolean useTicketCache = flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
+ UserGroupInformation loginUser = UserGroupInformation.getCurrentUser();
+ if (useTicketCache && !loginUser.hasKerberosCredentials()) {
+ LOG.error("Hadoop security is enabled but the login user does not have Kerberos credentials");
+ throw new RuntimeException("Hadoop security is enabled but the login user " +
+ "does not have Kerberos credentials");
+ }
+ }
return deployInternal();
} catch (Exception e) {
throw new RuntimeException("Couldn't deploy Yarn cluster", e);
@@ -604,12 +616,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
}
- //check if there is a JAAS config file
- File jaasConfigFile = new File(configurationDirectory + File.separator + SecurityUtils.JAAS_CONF_FILENAME);
- if (jaasConfigFile.exists() && jaasConfigFile.isFile()) {
- effectiveShipFiles.add(jaasConfigFile);
- }
-
addLibFolderToShipFiles(effectiveShipFiles);
// add the user jar to the classpath of the to-be-created cluster
@@ -773,7 +779,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
// setup security tokens
LocalResource keytabResource = null;
Path remotePathKeytab = null;
- String keytab = flinkConfiguration.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null);
+ String keytab = flinkConfiguration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
if(keytab != null) {
LOG.info("Adding keytab {} to the AM container local resource bucket", keytab);
keytabResource = Records.newRecord(LocalResource.class);
@@ -816,7 +822,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
if(keytabResource != null) {
appMasterEnv.put(YarnConfigKeys.KEYTAB_PATH, remotePathKeytab.toString() );
- String principal = flinkConfiguration.getString(ConfigConstants.SECURITY_PRINCIPAL_KEY, null);
+ String principal = flinkConfiguration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal );
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
index 1c8bad7..a8aeb07 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.util.Preconditions;
@@ -103,22 +104,28 @@ public abstract class AbstractYarnFlinkApplicationMasterRunner {
LOG.debug("YARN dynamic properties: {}", dynamicProperties);
final Configuration flinkConfig = createConfiguration(currDir, dynamicProperties);
- if(keytabPath != null && remoteKeytabPrincipal != null) {
- flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
- flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
+ if (keytabPath != null && remoteKeytabPrincipal != null) {
+ flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
+ flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
}
- SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(flinkConfig);
+ org.apache.hadoop.conf.Configuration hadoopConfiguration = null;
//To support Yarn Secure Integration Test Scenario
File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
- if(krb5Conf.exists() && krb5Conf.canRead()) {
+ if (krb5Conf.exists() && krb5Conf.canRead()) {
String krb5Path = krb5Conf.getAbsolutePath();
LOG.info("KRB5 Conf: {}", krb5Path);
- org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
- conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
- conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
- sc.setHadoopConfiguration(conf);
+ hadoopConfiguration = new org.apache.hadoop.conf.Configuration();
+ hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+ hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
+ }
+
+ SecurityUtils.SecurityConfiguration sc;
+ if(hadoopConfiguration != null) {
+ sc = new SecurityUtils.SecurityConfiguration(flinkConfig, hadoopConfiguration);
+ } else {
+ sc = new SecurityUtils.SecurityConfiguration(flinkConfig);
}
SecurityUtils.install(sc);
http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index d1ef553..71be589 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -168,22 +169,28 @@ public class YarnApplicationMasterRunner {
LOG.debug("YARN dynamic properties: {}", dynamicProperties);
final Configuration flinkConfig = createConfiguration(currDir, dynamicProperties);
- if(keytabPath != null && remoteKeytabPrincipal != null) {
- flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
- flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
+ if (keytabPath != null && remoteKeytabPrincipal != null) {
+ flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
+ flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
}
- SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(flinkConfig);
+ org.apache.hadoop.conf.Configuration hadoopConfiguration = null;
//To support Yarn Secure Integration Test Scenario
File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
if(krb5Conf.exists() && krb5Conf.canRead()) {
String krb5Path = krb5Conf.getAbsolutePath();
LOG.info("KRB5 Conf: {}", krb5Path);
- org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
- conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
- conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
- sc.setHadoopConfiguration(conf);
+ hadoopConfiguration = new org.apache.hadoop.conf.Configuration();
+ hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+ hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
+ }
+
+ SecurityUtils.SecurityConfiguration sc;
+ if(hadoopConfiguration != null) {
+ sc = new SecurityUtils.SecurityConfiguration(flinkConfig, hadoopConfiguration);
+ } else {
+ sc = new SecurityUtils.SecurityConfiguration(flinkConfig);
}
SecurityUtils.install(sc);
@@ -250,14 +257,14 @@ public class YarnApplicationMasterRunner {
final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
String keytabPath = null;
- if(remoteKeytabPath != null) {
+ if (remoteKeytabPath != null) {
File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
keytabPath = f.getAbsolutePath();
LOG.info("keytabPath: {}", keytabPath);
}
- if(keytabPath != null && remoteKeytabPrincipal != null) {
- config.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
- config.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
+ if (keytabPath != null && remoteKeytabPrincipal != null) {
+ config.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
+ config.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
}
// Hadoop/Yarn configuration (loads config data automatically from classpath files)
http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
index dc8c604..414c3de 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
@@ -21,6 +21,7 @@ package org.apache.flink.yarn;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -139,22 +140,28 @@ public class YarnTaskExecutorRunner {
LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
currentUser.getShortUserName(), yarnClientUsername);
- SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(configuration);
+ org.apache.hadoop.conf.Configuration hadoopConfiguration = null;
//To support Yarn Secure Integration Test Scenario
File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
- if(krb5Conf.exists() && krb5Conf.canRead()) {
+ if (krb5Conf.exists() && krb5Conf.canRead()) {
String krb5Path = krb5Conf.getAbsolutePath();
LOG.info("KRB5 Conf: {}", krb5Path);
- org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
- conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
- conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
- sc.setHadoopConfiguration(conf);
+ hadoopConfiguration = new org.apache.hadoop.conf.Configuration();
+ hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+ hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
}
- if(keytabPath != null && remoteKeytabPrincipal != null) {
- configuration.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
- configuration.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
+ SecurityUtils.SecurityConfiguration sc;
+ if (hadoopConfiguration != null) {
+ sc = new SecurityUtils.SecurityConfiguration(configuration, hadoopConfiguration);
+ } else {
+ sc = new SecurityUtils.SecurityConfiguration(configuration);
+ }
+
+ if (keytabPath != null && remoteKeytabPrincipal != null) {
+ configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
+ configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
}
SecurityUtils.install(sc);
http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
index 015eb1b..4a780e0 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
@@ -25,6 +25,7 @@ import java.util.concurrent.Callable;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.taskmanager.TaskManager;
@@ -111,22 +112,28 @@ public class YarnTaskManagerRunner {
try {
- SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(configuration);
+ org.apache.hadoop.conf.Configuration hadoopConfiguration = null;
//To support Yarn Secure Integration Test Scenario
File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
- if(krb5Conf.exists() && krb5Conf.canRead()) {
+ if (krb5Conf.exists() && krb5Conf.canRead()) {
String krb5Path = krb5Conf.getAbsolutePath();
LOG.info("KRB5 Conf: {}", krb5Path);
- org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
- conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
- conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
- sc.setHadoopConfiguration(conf);
+ hadoopConfiguration = new org.apache.hadoop.conf.Configuration();
+ hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+ hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
}
- if(keytabPath != null && remoteKeytabPrincipal != null) {
- configuration.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
- configuration.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
+ SecurityUtils.SecurityConfiguration sc;
+ if (hadoopConfiguration != null) {
+ sc = new SecurityUtils.SecurityConfiguration(configuration, hadoopConfiguration);
+ } else {
+ sc = new SecurityUtils.SecurityConfiguration(configuration);
+ }
+
+ if (keytabPath != null && remoteKeytabPrincipal != null) {
+ configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
+ configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
}
SecurityUtils.install(sc);