You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ga...@apache.org on 2016/05/19 17:35:29 UTC
[1/2] incubator-apex-core git commit: APEXCORE-460 Considering keytab
passed in from command line as well for token refresh. Added unit tests.
Repository: incubator-apex-core
Updated Branches:
refs/heads/master 7813d860d -> 8f0d31544
APEXCORE-460 Considering keytab passed in from command line as well for token refresh. Added unit tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/976a5317
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/976a5317
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/976a5317
Branch: refs/heads/master
Commit: 976a5317917e44757a631411ccb29913e1dc955c
Parents: 87907df
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Thu May 19 00:01:50 2016 -0700
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Thu May 19 00:04:41 2016 -0700
----------------------------------------------------------------------
engine/pom.xml | 18 +++
.../stram/client/StramAppLauncher.java | 41 ++++--
.../stram/client/StramAppLauncherTest.java | 136 +++++++++++++++++++
3 files changed, 184 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/976a5317/engine/pom.xml
----------------------------------------------------------------------
diff --git a/engine/pom.xml b/engine/pom.xml
index 09b046e..a56909e 100644
--- a/engine/pom.xml
+++ b/engine/pom.xml
@@ -363,6 +363,24 @@
<version>2.10</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito</artifactId>
+ <version>1.6.5</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4-rule</artifactId>
+ <version>1.6.5</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-classloading-xstream</artifactId>
+ <version>1.6.5</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<profiles>
<profile>
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/976a5317/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
index bd58e35..5024c38 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.tools.ant.DirectoryScanner;
@@ -559,6 +560,32 @@ public class StramAppLauncher
return cl;
}
+ private void setTokenRefreshKeytab(LogicalPlan dag, Configuration conf) throws IOException
+ {
+ String keytabPath;
+ if ((keytabPath = conf.get(StramClientUtils.KEY_TAB_FILE)) == null) {
+ String keytab;
+ if ((keytab = StramUserLogin.getKeytab()) == null) {
+ keytab = conf.get(StramUserLogin.DT_AUTH_KEYTAB);
+ }
+ if (keytab != null) {
+ Path localKeyTabPath = new Path(keytab);
+ try (FileSystem fs = StramClientUtils.newFileSystemInstance(conf)) {
+ Path destPath = new Path(StramClientUtils.getDTDFSRootDir(fs, conf), localKeyTabPath.getName());
+ if (!fs.exists(destPath)) {
+ fs.copyFromLocalFile(false, false, localKeyTabPath, destPath);
+ }
+ keytabPath = destPath.toString();
+ }
+ }
+ }
+ if (keytabPath != null) {
+ dag.setAttribute(LogicalPlan.KEY_TAB_FILE, keytabPath);
+ } else {
+ LOG.warn("No keytab specified for refreshing tokens, application may not be able to run indefinitely");
+ }
+ }
+
/**
* Submit application to the cluster and return the app id.
* Sets the context class loader for application dependencies.
@@ -577,17 +604,9 @@ public class StramAppLauncher
dag.setAttribute(LogicalPlan.HDFS_TOKEN_LIFE_TIME, hdfsTokenMaxLifeTime);
long rmTokenMaxLifeTime = conf.getLong(StramClientUtils.DT_RM_TOKEN_MAX_LIFE_TIME, conf.getLong(YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_KEY, YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT));
dag.setAttribute(LogicalPlan.RM_TOKEN_LIFE_TIME, rmTokenMaxLifeTime);
- if (conf.get(StramClientUtils.KEY_TAB_FILE) != null) {
- dag.setAttribute(LogicalPlan.KEY_TAB_FILE, conf.get(StramClientUtils.KEY_TAB_FILE));
- } else if (conf.get(StramUserLogin.DT_AUTH_KEYTAB) != null) {
- Path localKeyTabPath = new Path(conf.get(StramUserLogin.DT_AUTH_KEYTAB));
- try (FileSystem fs = StramClientUtils.newFileSystemInstance(conf)) {
- Path destPath = new Path(StramClientUtils.getDTDFSRootDir(fs, conf), localKeyTabPath.getName());
- if (!fs.exists(destPath)) {
- fs.copyFromLocalFile(false, false, localKeyTabPath, destPath);
- }
- dag.setAttribute(LogicalPlan.KEY_TAB_FILE, destPath.toString());
- }
+ // TODO:- Need to see if other token refresh attributes are needed if security is not enabled
+ if (UserGroupInformation.isSecurityEnabled()) {
+ setTokenRefreshKeytab(dag, conf);
}
String tokenRefreshFactor = conf.get(StramClientUtils.TOKEN_ANTICIPATORY_REFRESH_FACTOR);
if (tokenRefreshFactor != null && tokenRefreshFactor.trim().length() > 0) {
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/976a5317/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java b/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java
new file mode 100644
index 0000000..b1856e1
--- /dev/null
+++ b/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram.client;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.rule.PowerMockRule;
+import org.powermock.reflect.Whitebox;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.security.StramUserLogin;
+
+import static org.powermock.api.mockito.PowerMockito.method;
+import static org.powermock.api.mockito.PowerMockito.spy;
+import static org.powermock.api.mockito.PowerMockito.suppress;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * StramAppLauncher Test
+ */
+@RunWith(Enclosed.class)
+public class StramAppLauncherTest
+{
+ @PrepareForTest({StramAppLauncher.class, StramUserLogin.class})
+ @PowerMockIgnore({"javax.xml.*", "org.w3c.*", "org.apache.hadoop.*", "org.apache.log4j.*"})
+ public static class RefreshTokenTests
+ {
+ File workspace;
+ File sourceKeytab;
+
+ @Rule
+ public PowerMockRule rule = new PowerMockRule();
+
+ @Rule
+ public TestWatcher setup = new TestWatcher()
+ {
+ @Override
+ protected void starting(Description description)
+ {
+ super.starting(description);
+ workspace = new File("target/" + description.getClassName() + "/" + description.getMethodName());
+ try {
+ FileUtils.forceMkdir(workspace);
+ sourceKeytab = new File(workspace, "src/keytab");
+ FileUtils.touch(sourceKeytab);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ suppress(method(StramAppLauncher.class, "init"));
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ FileUtils.deleteQuietly(workspace);
+ super.finished(description);
+ }
+ };
+
+ @Test
+ public void testGetTokenRefreshKeytab() throws Exception
+ {
+ Configuration conf = new Configuration(false);
+ conf.set(StramClientUtils.KEY_TAB_FILE, sourceKeytab.getPath());
+ LogicalPlan dag = applyTokenRefreshKeytab(FileSystem.newInstance(conf), conf);
+ Assert.assertEquals("Token refresh keytab path", sourceKeytab.getPath(), dag.getValue(LogicalPlan.KEY_TAB_FILE));
+ }
+
+ @Test
+ public void testUserLoginTokenRefreshKeytab() throws Exception
+ {
+ Configuration conf = new Configuration(false);
+ spy(StramUserLogin.class);
+ when(StramUserLogin.getKeytab()).thenReturn(sourceKeytab.getPath());
+ testDFSTokenPath(conf);
+ }
+
+ @Test
+ public void testAuthPropTokenRefreshKeytab() throws Exception
+ {
+ Configuration conf = new Configuration(false);
+ conf.set(StramUserLogin.DT_AUTH_KEYTAB, sourceKeytab.getPath());
+ testDFSTokenPath(conf);
+ }
+
+ private void testDFSTokenPath(Configuration conf) throws Exception
+ {
+ FileSystem fs = FileSystem.newInstance(conf);
+ File dfsDir = new File(workspace, "dst");
+ conf.set(StramClientUtils.DT_DFS_ROOT_DIR, dfsDir.getAbsolutePath());
+ LogicalPlan dag = applyTokenRefreshKeytab(fs, conf);
+ Assert.assertEquals("Token refresh keytab path", new Path(fs.getUri().getScheme(), fs.getUri().getAuthority(),
+ new File(dfsDir, "keytab").getAbsolutePath()).toString(), dag.getValue(LogicalPlan.KEY_TAB_FILE));
+ }
+
+ private LogicalPlan applyTokenRefreshKeytab(FileSystem fs, Configuration conf) throws Exception
+ {
+ LogicalPlan dag = new LogicalPlan();
+ StramAppLauncher appLauncher = new StramAppLauncher(fs, conf);
+ Whitebox.invokeMethod(appLauncher, "setTokenRefreshKeytab", dag, conf);
+ return dag;
+ }
+ }
+
+}
[2/2] incubator-apex-core git commit: Merge branch 'APEXCORE-460' of
https://github.com/PramodSSImmaneni/incubator-apex-core
Posted by ga...@apache.org.
Merge branch 'APEXCORE-460' of https://github.com/PramodSSImmaneni/incubator-apex-core
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/8f0d3154
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/8f0d3154
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/8f0d3154
Branch: refs/heads/master
Commit: 8f0d31544b259baea506d6d9c4260ac11f3b0b95
Parents: 7813d86 976a531
Author: gaurav gupta <ga...@cisco.com>
Authored: Thu May 19 10:32:37 2016 -0700
Committer: gaurav gupta <ga...@cisco.com>
Committed: Thu May 19 10:32:37 2016 -0700
----------------------------------------------------------------------
engine/pom.xml | 18 +++
.../stram/client/StramAppLauncher.java | 41 ++++--
.../stram/client/StramAppLauncherTest.java | 136 +++++++++++++++++++
3 files changed, 184 insertions(+), 11 deletions(-)
----------------------------------------------------------------------