You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2016/09/01 19:57:01 UTC
[1/2] apex-core git commit: APEXCORE-515 Providing principal for
token refresh
Repository: apex-core
Updated Branches:
refs/heads/APEXCORE-515 [created] c13b0dd41
APEXCORE-515 Providing principal for token refresh
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/dd5e95a0
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/dd5e95a0
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/dd5e95a0
Branch: refs/heads/APEXCORE-515
Commit: dd5e95a090214bdcff3232cbdcad3dc59c24eff9
Parents: d651edc
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Wed Aug 24 17:25:25 2016 -0700
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Thu Sep 1 12:26:43 2016 -0700
----------------------------------------------------------------------
.../stram/StreamingAppMasterService.java | 3 +-
.../stram/client/StramAppLauncher.java | 28 +++++++++---------
.../stram/engine/StreamingContainer.java | 3 +-
.../stram/plan/logical/LogicalPlan.java | 1 +
.../stram/security/StramUserLogin.java | 20 +++++++++----
.../stram/client/StramAppLauncherTest.java | 30 ++++++++++++++------
6 files changed, 54 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/dd5e95a0/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index 43ab743..15b6402 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -676,6 +676,7 @@ public class StreamingAppMasterService extends CompositeService
long tokenLifeTime = (long)(dag.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) * Math.min(dag.getValue(LogicalPlan.HDFS_TOKEN_LIFE_TIME), dag.getValue(LogicalPlan.RM_TOKEN_LIFE_TIME)));
long expiryTime = System.currentTimeMillis() + tokenLifeTime;
LOG.debug(" expiry token time {}", tokenLifeTime);
+ String principal = dag.getValue(LogicalPlan.PRINCIPAL);
String hdfsKeyTabFile = dag.getValue(LogicalPlan.KEY_TAB_FILE);
// Register self with ResourceManager
@@ -753,7 +754,7 @@ public class StreamingAppMasterService extends CompositeService
if (UserGroupInformation.isSecurityEnabled() && currentTimeMillis >= expiryTime && hdfsKeyTabFile != null) {
String applicationId = appAttemptID.getApplicationId().toString();
- expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(), applicationId, conf, hdfsKeyTabFile, credentials, rmAddress, true);
+ expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(), applicationId, conf, principal, hdfsKeyTabFile, credentials, rmAddress, true);
}
if (currentTimeMillis > nodeReportUpdateTime) {
http://git-wip-us.apache.org/repos/asf/apex-core/blob/dd5e95a0/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
index 5024c38..619252f 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
@@ -560,14 +560,12 @@ public class StramAppLauncher
return cl;
}
- private void setTokenRefreshKeytab(LogicalPlan dag, Configuration conf) throws IOException
+ private void setTokenRefreshCredentials(LogicalPlan dag, Configuration conf) throws IOException
{
- String keytabPath;
- if ((keytabPath = conf.get(StramClientUtils.KEY_TAB_FILE)) == null) {
- String keytab;
- if ((keytab = StramUserLogin.getKeytab()) == null) {
- keytab = conf.get(StramUserLogin.DT_AUTH_KEYTAB);
- }
+ String principal = StramUserLogin.getPrincipal();
+ String keytabPath = conf.get(StramClientUtils.KEY_TAB_FILE);
+ if (keytabPath == null) {
+ String keytab = StramUserLogin.getKeytab();
if (keytab != null) {
Path localKeyTabPath = new Path(keytab);
try (FileSystem fs = StramClientUtils.newFileSystemInstance(conf)) {
@@ -579,10 +577,11 @@ public class StramAppLauncher
}
}
}
- if (keytabPath != null) {
+ if ((principal != null) && (keytabPath != null)) {
+ dag.setAttribute(LogicalPlan.PRINCIPAL, principal);
dag.setAttribute(LogicalPlan.KEY_TAB_FILE, keytabPath);
} else {
- LOG.warn("No keytab specified for refreshing tokens, application may not be able to run indefinitely");
+ LOG.warn("Credentials for refreshing tokens not available, application may not be able to run indefinitely");
}
}
@@ -600,13 +599,12 @@ public class StramAppLauncher
Configuration conf = propertiesBuilder.conf;
conf.setEnum(StreamingApplication.ENVIRONMENT, StreamingApplication.Environment.CLUSTER);
LogicalPlan dag = appConfig.createApp(propertiesBuilder);
- long hdfsTokenMaxLifeTime = conf.getLong(StramClientUtils.DT_HDFS_TOKEN_MAX_LIFE_TIME, conf.getLong(StramClientUtils.HDFS_TOKEN_MAX_LIFE_TIME, StramClientUtils.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT));
- dag.setAttribute(LogicalPlan.HDFS_TOKEN_LIFE_TIME, hdfsTokenMaxLifeTime);
- long rmTokenMaxLifeTime = conf.getLong(StramClientUtils.DT_RM_TOKEN_MAX_LIFE_TIME, conf.getLong(YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_KEY, YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT));
- dag.setAttribute(LogicalPlan.RM_TOKEN_LIFE_TIME, rmTokenMaxLifeTime);
- // TODO:- Need to see if other token refresh attributes are needed if security is not enabled
if (UserGroupInformation.isSecurityEnabled()) {
- setTokenRefreshKeytab(dag, conf);
+ long hdfsTokenMaxLifeTime = conf.getLong(StramClientUtils.DT_HDFS_TOKEN_MAX_LIFE_TIME, conf.getLong(StramClientUtils.HDFS_TOKEN_MAX_LIFE_TIME, StramClientUtils.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT));
+ dag.setAttribute(LogicalPlan.HDFS_TOKEN_LIFE_TIME, hdfsTokenMaxLifeTime);
+ long rmTokenMaxLifeTime = conf.getLong(StramClientUtils.DT_RM_TOKEN_MAX_LIFE_TIME, conf.getLong(YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_KEY, YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT));
+ dag.setAttribute(LogicalPlan.RM_TOKEN_LIFE_TIME, rmTokenMaxLifeTime);
+ setTokenRefreshCredentials(dag, conf);
}
String tokenRefreshFactor = conf.get(StramClientUtils.TOKEN_ANTICIPATORY_REFRESH_FACTOR);
if (tokenRefreshFactor != null && tokenRefreshFactor.trim().length() > 0) {
http://git-wip-us.apache.org/repos/asf/apex-core/blob/dd5e95a0/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
index 27688e3..2232bbf 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
@@ -616,11 +616,12 @@ public class StreamingContainer extends YarnContainerMain
Token<?> token = iter.next();
logger.debug("token: {}", token);
}
+ String principal = containerContext.getValue(LogicalPlan.PRINCIPAL);
String hdfsKeyTabFile = containerContext.getValue(LogicalPlan.KEY_TAB_FILE);
while (!exitHeartbeatLoop) {
if (UserGroupInformation.isSecurityEnabled() && System.currentTimeMillis() >= expiryTime && hdfsKeyTabFile != null) {
- expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(), containerId, conf, hdfsKeyTabFile, credentials, null, false);
+ expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(), containerId, conf, principal, hdfsKeyTabFile, credentials, null, false);
}
synchronized (this.heartbeatTrigger) {
try {
http://git-wip-us.apache.org/repos/asf/apex-core/blob/dd5e95a0/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index b301f9e..580d1bc 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -157,6 +157,7 @@ public class LogicalPlan implements Serializable, DAG
public static Attribute<Boolean> FAST_PUBLISHER_SUBSCRIBER = new Attribute<>(false);
public static Attribute<Long> HDFS_TOKEN_LIFE_TIME = new Attribute<>(604800000L);
public static Attribute<Long> RM_TOKEN_LIFE_TIME = new Attribute<>(YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
+ public static Attribute<String> PRINCIPAL = new Attribute<String>(null, new StringCodec.String2String());
public static Attribute<String> KEY_TAB_FILE = new Attribute<>((String)null, new StringCodec.String2String());
public static Attribute<Double> TOKEN_REFRESH_ANTICIPATORY_FACTOR = new Attribute<>(0.7);
/**
http://git-wip-us.apache.org/repos/asf/apex-core/blob/dd5e95a0/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java b/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java
index b2fa4e7..0c1d0c9 100644
--- a/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java
+++ b/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java
@@ -49,7 +49,7 @@ public class StramUserLogin
{
private static final Logger LOG = LoggerFactory.getLogger(StramUserLogin.class);
public static final String DT_AUTH_PREFIX = StreamingApplication.DT_PREFIX + "authentication.";
- private static final String DT_AUTH_PRINCIPAL = DT_AUTH_PREFIX + "principal";
+ public static final String DT_AUTH_PRINCIPAL = DT_AUTH_PREFIX + "principal";
public static final String DT_AUTH_KEYTAB = DT_AUTH_PREFIX + "keytab";
private static String principal;
private static String keytab;
@@ -57,12 +57,17 @@ public class StramUserLogin
public static void attemptAuthentication(Configuration conf) throws IOException
{
if (UserGroupInformation.isSecurityEnabled()) {
- String userPrincipal = conf.get(DT_AUTH_PRINCIPAL);
- String userKeytab = conf.get(DT_AUTH_KEYTAB);
- authenticate(userPrincipal, userKeytab);
+ authenticate(conf);
}
}
+ public static void authenticate(Configuration conf) throws IOException
+ {
+ String userPrincipal = conf.get(DT_AUTH_PRINCIPAL);
+ String userKeytab = conf.get(DT_AUTH_KEYTAB);
+ authenticate(userPrincipal, userKeytab);
+ }
+
public static void authenticate(String principal, String keytab) throws IOException
{
if ((principal != null) && !principal.isEmpty()
@@ -79,7 +84,7 @@ public class StramUserLogin
}
}
- public static long refreshTokens(long tokenLifeTime, String destinationDir, String destinationFile, final Configuration conf, String hdfsKeyTabFile, final Credentials credentials, final InetSocketAddress rmAddress, final boolean renewRMToken) throws IOException
+ public static long refreshTokens(long tokenLifeTime, String destinationDir, String destinationFile, final Configuration conf, String principal, String hdfsKeyTabFile, final Credentials credentials, final InetSocketAddress rmAddress, final boolean renewRMToken) throws IOException
{
long expiryTime = System.currentTimeMillis() + tokenLifeTime;
//renew tokens
@@ -93,7 +98,10 @@ public class StramUserLogin
keyTabFile = FSUtil.copyToLocalFileSystem(fs, destinationDir, destinationFile, hdfsKeyTabFile, conf);
}
- UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(UserGroupInformation.getCurrentUser().getUserName(), keyTabFile.getAbsolutePath());
+ if (principal == null) {
+ principal = UserGroupInformation.getCurrentUser().getUserName();
+ }
+ UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTabFile.getAbsolutePath());
try {
ugi.doAs(new PrivilegedExceptionAction<Object>()
{
http://git-wip-us.apache.org/repos/asf/apex-core/blob/dd5e95a0/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java b/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java
index b1856e1..ad1dbd6 100644
--- a/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java
@@ -42,9 +42,7 @@ import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.security.StramUserLogin;
import static org.powermock.api.mockito.PowerMockito.method;
-import static org.powermock.api.mockito.PowerMockito.spy;
import static org.powermock.api.mockito.PowerMockito.suppress;
-import static org.powermock.api.mockito.PowerMockito.when;
/**
* StramAppLauncher Test
@@ -52,12 +50,18 @@ import static org.powermock.api.mockito.PowerMockito.when;
@RunWith(Enclosed.class)
public class StramAppLauncherTest
{
- @PrepareForTest({StramAppLauncher.class, StramUserLogin.class})
+
+ private static final String SET_TOKEN_REFRESH_CREDENTIALS_METHOD = "setTokenRefreshCredentials";
+
+ @PrepareForTest({StramAppLauncher.class})
@PowerMockIgnore({"javax.xml.*", "org.w3c.*", "org.apache.hadoop.*", "org.apache.log4j.*"})
public static class RefreshTokenTests
{
File workspace;
File sourceKeytab;
+ File dfsDir;
+
+ static final String principal = "username/group@domain";
@Rule
public PowerMockRule rule = new PowerMockRule();
@@ -77,6 +81,7 @@ public class StramAppLauncherTest
} catch (IOException e) {
throw new RuntimeException(e);
}
+ dfsDir = new File(workspace, "dst");
suppress(method(StramAppLauncher.class, "init"));
}
@@ -92,17 +97,24 @@ public class StramAppLauncherTest
public void testGetTokenRefreshKeytab() throws Exception
{
Configuration conf = new Configuration(false);
- conf.set(StramClientUtils.KEY_TAB_FILE, sourceKeytab.getPath());
+ File storeKeytab = new File(dfsDir, "keytab2");
+ conf.set(StramClientUtils.KEY_TAB_FILE, storeKeytab.getPath());
+ StramUserLogin.authenticate(principal, sourceKeytab.getPath());
LogicalPlan dag = applyTokenRefreshKeytab(FileSystem.newInstance(conf), conf);
- Assert.assertEquals("Token refresh keytab path", sourceKeytab.getPath(), dag.getValue(LogicalPlan.KEY_TAB_FILE));
+ Assert.assertEquals("Token refresh principal", principal, dag.getValue(LogicalPlan.PRINCIPAL));
+ Assert.assertEquals("Token refresh keytab path", storeKeytab.getPath(), dag.getValue(LogicalPlan.KEY_TAB_FILE));
}
@Test
public void testUserLoginTokenRefreshKeytab() throws Exception
{
Configuration conf = new Configuration(false);
+ /*
spy(StramUserLogin.class);
+ when(StramUserLogin.getPrincipal()).thenReturn(principal);
when(StramUserLogin.getKeytab()).thenReturn(sourceKeytab.getPath());
+ */
+ StramUserLogin.authenticate(principal, sourceKeytab.getPath());
testDFSTokenPath(conf);
}
@@ -110,25 +122,27 @@ public class StramAppLauncherTest
public void testAuthPropTokenRefreshKeytab() throws Exception
{
Configuration conf = new Configuration(false);
+ conf.set(StramUserLogin.DT_AUTH_PRINCIPAL, principal);
conf.set(StramUserLogin.DT_AUTH_KEYTAB, sourceKeytab.getPath());
+ StramUserLogin.authenticate(conf);
testDFSTokenPath(conf);
}
private void testDFSTokenPath(Configuration conf) throws Exception
{
FileSystem fs = FileSystem.newInstance(conf);
- File dfsDir = new File(workspace, "dst");
conf.set(StramClientUtils.DT_DFS_ROOT_DIR, dfsDir.getAbsolutePath());
LogicalPlan dag = applyTokenRefreshKeytab(fs, conf);
+ Assert.assertEquals("Token refresh principal", principal, dag.getValue(LogicalPlan.PRINCIPAL));
Assert.assertEquals("Token refresh keytab path", new Path(fs.getUri().getScheme(), fs.getUri().getAuthority(),
- new File(dfsDir, "keytab").getAbsolutePath()).toString(), dag.getValue(LogicalPlan.KEY_TAB_FILE));
+ new File(dfsDir, sourceKeytab.getName()).getAbsolutePath()).toString(), dag.getValue(LogicalPlan.KEY_TAB_FILE));
}
private LogicalPlan applyTokenRefreshKeytab(FileSystem fs, Configuration conf) throws Exception
{
LogicalPlan dag = new LogicalPlan();
StramAppLauncher appLauncher = new StramAppLauncher(fs, conf);
- Whitebox.invokeMethod(appLauncher, "setTokenRefreshKeytab", dag, conf);
+ Whitebox.invokeMethod(appLauncher, SET_TOKEN_REFRESH_CREDENTIALS_METHOD, dag, conf);
return dag;
}
}
[2/2] apex-core git commit: Merge branch 'APEXCORE-515' of
https://github.com/PramodSSImmaneni/apex-core into APEXCORE-515
Posted by vr...@apache.org.
Merge branch 'APEXCORE-515' of https://github.com/PramodSSImmaneni/apex-core into APEXCORE-515
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/c13b0dd4
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/c13b0dd4
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/c13b0dd4
Branch: refs/heads/APEXCORE-515
Commit: c13b0dd417e805601169f090fdeebf5b42c78651
Parents: ae0ec24 dd5e95a
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Thu Sep 1 12:54:37 2016 -0700
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Thu Sep 1 12:54:37 2016 -0700
----------------------------------------------------------------------
.../stram/StreamingAppMasterService.java | 3 +-
.../stram/client/StramAppLauncher.java | 28 +++++++++---------
.../stram/engine/StreamingContainer.java | 3 +-
.../stram/plan/logical/LogicalPlan.java | 1 +
.../stram/security/StramUserLogin.java | 20 +++++++++----
.../stram/client/StramAppLauncherTest.java | 30 ++++++++++++++------
6 files changed, 54 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/c13b0dd4/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
----------------------------------------------------------------------