You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2018/05/13 17:29:54 UTC
[apex-core] branch master updated: APEXCORE-807 Added renewal of
tokens before renewal expiry interval functionality into the engine,
refactored token renewal component
This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-core.git
The following commit(s) were added to refs/heads/master by this push:
new a8bbec7 APEXCORE-807 Added renewal of tokens before renewal expiry interval functionality into the engine, refactored token renewal component
a8bbec7 is described below
commit a8bbec7f54e94d67106a46f8b1ca6d8e7890f126
Author: Pramod Immaneni <pr...@apache.org>
AuthorDate: Sun Feb 4 18:59:18 2018 -0800
APEXCORE-807 Added renewal of tokens before renewal expiry interval functionality into the engine, refactored token renewal component
---
.../stram/StreamingAppMasterService.java | 26 +--
.../datatorrent/stram/client/StramAppLauncher.java | 12 +-
.../datatorrent/stram/client/StramClientUtils.java | 4 +
.../stram/engine/StreamingContainer.java | 24 +--
.../stram/plan/logical/LogicalPlan.java | 2 +
.../datatorrent/stram/security/StramUserLogin.java | 69 -------
.../apache/apex/engine/security/TokenRenewer.java | 207 +++++++++++++++++++++
7 files changed, 239 insertions(+), 105 deletions(-)
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index 6c640ee..e172541 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -28,7 +28,6 @@ import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -52,13 +51,12 @@ import org.apache.apex.engine.plugin.DefaultApexPluginDispatcher;
import org.apache.apex.engine.plugin.loaders.ChainedPluginLocator;
import org.apache.apex.engine.plugin.loaders.PropertyBasedPluginLocator;
import org.apache.apex.engine.plugin.loaders.ServiceLoaderBasedPluginLocator;
-import org.apache.commons.io.FileUtils;
+import org.apache.apex.engine.security.TokenRenewer;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.CompositeService;
@@ -111,7 +109,6 @@ import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.security.StramDelegationTokenIdentifier;
import com.datatorrent.stram.security.StramDelegationTokenManager;
-import com.datatorrent.stram.security.StramUserLogin;
import com.datatorrent.stram.util.ConfigUtils;
import com.datatorrent.stram.util.SecurityUtils;
import com.datatorrent.stram.webapp.AppInfo;
@@ -166,6 +163,7 @@ public class StreamingAppMasterService extends CompositeService
private ApexPluginDispatcher apexPluginDispatcher;
private final GroupingManager groupingManager = GroupingManager.getGroupingManagerInstance();
private static final long REMOVE_CONTAINER_TIMEOUT = PropertiesHelper.getLong("org.apache.apex.nodemanager.containerKill.timeout", 30 * 1000, 0, Long.MAX_VALUE);
+ private TokenRenewer tokenRenewer;
public StreamingAppMasterService(ApplicationAttemptId appAttemptID)
{
@@ -693,19 +691,10 @@ public class StreamingAppMasterService extends CompositeService
private void execute() throws YarnException, IOException
{
LOG.info("Starting ApplicationMaster");
- final Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
- LOG.info("number of tokens: {}", credentials.getAllTokens().size());
- Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
- while (iter.hasNext()) {
- Token<?> token = iter.next();
- LOG.debug("token: {}", token);
- }
final Configuration conf = getConfig();
- long tokenLifeTime = (long)(dag.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) * Math.min(dag.getValue(LogicalPlan.HDFS_TOKEN_LIFE_TIME), dag.getValue(LogicalPlan.RM_TOKEN_LIFE_TIME)));
- long expiryTime = System.currentTimeMillis() + tokenLifeTime;
- LOG.debug(" expiry token time {}", tokenLifeTime);
- String principal = dag.getValue(LogicalPlan.PRINCIPAL);
- String hdfsKeyTabFile = dag.getValue(LogicalPlan.KEY_TAB_FILE);
+ if (UserGroupInformation.isSecurityEnabled()) {
+ tokenRenewer = new TokenRenewer(dag, true, conf, appAttemptID.getApplicationId().toString());
+ }
// Register self with ResourceManager
RegisterApplicationMasterResponse response = amRmClient.registerApplicationMaster(appMasterHostname, 0, appMasterTrackingUrl);
@@ -778,9 +767,8 @@ public class StreamingAppMasterService extends CompositeService
loopCounter++;
final long currentTimeMillis = System.currentTimeMillis();
- if (UserGroupInformation.isSecurityEnabled() && currentTimeMillis >= expiryTime && hdfsKeyTabFile != null) {
- String applicationId = appAttemptID.getApplicationId().toString();
- expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(), applicationId, conf, principal, hdfsKeyTabFile, credentials, rmAddress, true);
+ if (tokenRenewer != null) {
+ tokenRenewer.checkAndRenew();
}
if (currentTimeMillis > nodeReportUpdateTime) {
diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
index 2019f48..d3079d0 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
@@ -608,13 +608,23 @@ public class StramAppLauncher
if (UserGroupInformation.isSecurityEnabled()) {
long hdfsTokenMaxLifeTime = conf.getLong(StramClientUtils.DT_HDFS_TOKEN_MAX_LIFE_TIME, conf.getLong(StramClientUtils.HDFS_TOKEN_MAX_LIFE_TIME, StramClientUtils.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT));
dag.setAttribute(LogicalPlan.HDFS_TOKEN_LIFE_TIME, hdfsTokenMaxLifeTime);
+ LOG.debug("HDFS token life time {}", hdfsTokenMaxLifeTime);
+ long hdfsTokenRenewInterval = conf.getLong(StramClientUtils.DT_HDFS_TOKEN_RENEW_INTERVAL, conf.getLong(StramClientUtils.HDFS_TOKEN_RENEW_INTERVAL, StramClientUtils.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT));
+ dag.setAttribute(LogicalPlan.HDFS_TOKEN_RENEWAL_INTERVAL, hdfsTokenRenewInterval);
+ LOG.debug("HDFS token renew interval {}", hdfsTokenRenewInterval);
long rmTokenMaxLifeTime = conf.getLong(StramClientUtils.DT_RM_TOKEN_MAX_LIFE_TIME, conf.getLong(YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_KEY, YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT));
dag.setAttribute(LogicalPlan.RM_TOKEN_LIFE_TIME, rmTokenMaxLifeTime);
+ LOG.debug("RM token life time {}", rmTokenMaxLifeTime);
+ long rmTokenRenewInterval = conf.getLong(StramClientUtils.DT_RM_TOKEN_RENEW_INTERVAL, conf.getLong(YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_KEY, YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT));
+ dag.setAttribute(LogicalPlan.RM_TOKEN_RENEWAL_INTERVAL, rmTokenRenewInterval);
+ LOG.debug("RM token renew interval {}", rmTokenRenewInterval);
setTokenRefreshCredentials(dag, conf);
}
String tokenRefreshFactor = conf.get(StramClientUtils.TOKEN_ANTICIPATORY_REFRESH_FACTOR);
if (tokenRefreshFactor != null && tokenRefreshFactor.trim().length() > 0) {
- dag.setAttribute(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR, Double.parseDouble(tokenRefreshFactor));
+ double refreshFactor = Double.parseDouble(tokenRefreshFactor);
+ dag.setAttribute(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR, refreshFactor);
+ LOG.debug("Token refresh anticipatory factor {}", refreshFactor);
}
StramClient client = new StramClient(conf, dag);
try {
diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
index a310ee2..d4f190f 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
@@ -111,12 +111,16 @@ public class StramClientUtils
public static final String SUBDIR_CONF = "conf";
public static final long RESOURCEMANAGER_CONNECT_MAX_WAIT_MS_OVERRIDE = 10 * 1000;
public static final String DT_HDFS_TOKEN_MAX_LIFE_TIME = StreamingApplication.DT_PREFIX + "namenode.delegation.token.max-lifetime";
+ public static final String DT_HDFS_TOKEN_RENEW_INTERVAL = StreamingApplication.DT_PREFIX + "namenode.delegation.token.renew-interval";
public static final String HDFS_TOKEN_MAX_LIFE_TIME = "dfs.namenode.delegation.token.max-lifetime";
+ public static final String HDFS_TOKEN_RENEW_INTERVAL = "dfs.namenode.delegation.token.renew-interval";
public static final String DT_RM_TOKEN_MAX_LIFE_TIME = StreamingApplication.DT_PREFIX + "resourcemanager.delegation.token.max-lifetime";
+ public static final String DT_RM_TOKEN_RENEW_INTERVAL = StreamingApplication.DT_PREFIX + "resourcemanager.delegation.token.renew-interval";
@Deprecated
public static final String KEY_TAB_FILE = StramUserLogin.DT_AUTH_PREFIX + "store.keytab";
public static final String TOKEN_ANTICIPATORY_REFRESH_FACTOR = StramUserLogin.DT_AUTH_PREFIX + "token.refresh.factor";
public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT = 7 * 24 * 60 * 60 * 1000;
+ public static final long DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT = 24 * 60 * 60 * 1000;
public static final String TOKEN_REFRESH_PRINCIPAL = StramUserLogin.DT_AUTH_PREFIX + "token.refresh.principal";
public static final String TOKEN_REFRESH_KEYTAB = StramUserLogin.DT_AUTH_PREFIX + "token.refresh.keytab";
/**
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
index f5aaf35..927ad6d 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
@@ -42,15 +42,13 @@ import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.apex.engine.security.TokenRenewer;
import org.apache.apex.log.LogFileInformation;
-import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.log4j.LogManager;
@@ -106,7 +104,6 @@ import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.Operators.PortContextPair;
import com.datatorrent.stram.plan.logical.Operators.PortMappingDescriptor;
import com.datatorrent.stram.plan.logical.StreamCodecWrapperForPersistance;
-import com.datatorrent.stram.security.StramUserLogin;
import com.datatorrent.stram.stream.BufferServerPublisher;
import com.datatorrent.stram.stream.BufferServerSubscriber;
import com.datatorrent.stram.stream.FastPublisher;
@@ -164,6 +161,7 @@ public class StreamingContainer extends YarnContainerMain
private final MBassador<ContainerEvent> eventBus; // event bus for publishing container events
HashSet<Component<ContainerContext>> components;
private RequestFactory requestFactory;
+ private TokenRenewer tokenRenewer;
static {
try {
@@ -608,22 +606,16 @@ public class StreamingContainer extends YarnContainerMain
logger.debug("Entering heartbeat loop (interval is {} ms)", this.heartbeatIntervalMillis);
umbilical.log(containerId, "[" + containerId + "] Entering heartbeat loop..");
final YarnConfiguration conf = new YarnConfiguration();
- long tokenLifeTime = (long)(containerContext.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) * containerContext.getValue(LogicalPlan.HDFS_TOKEN_LIFE_TIME));
- long expiryTime = System.currentTimeMillis();
- final Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
- String stackTrace = null;
- Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
- while (iter.hasNext()) {
- Token<?> token = iter.next();
- logger.debug("token: {}", token);
+ if (UserGroupInformation.isSecurityEnabled()) {
+ tokenRenewer = new TokenRenewer(containerContext, false, conf, containerId);
}
- String principal = containerContext.getValue(LogicalPlan.PRINCIPAL);
- String hdfsKeyTabFile = containerContext.getValue(LogicalPlan.KEY_TAB_FILE);
+ String stackTrace = null;
while (!exitHeartbeatLoop) {
- if (UserGroupInformation.isSecurityEnabled() && System.currentTimeMillis() >= expiryTime && hdfsKeyTabFile != null) {
- expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(), containerId, conf, principal, hdfsKeyTabFile, credentials, null, false);
+ if (tokenRenewer != null) {
+ tokenRenewer.checkAndRenew();
}
+
synchronized (this.heartbeatTrigger) {
try {
this.heartbeatTrigger.wait(heartbeatIntervalMillis);
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index bf4b2cb..18a9a63 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -161,7 +161,9 @@ public class LogicalPlan implements Serializable, DAG
* Then it can be moved back to DAGContext.
*/
public static Attribute<Boolean> FAST_PUBLISHER_SUBSCRIBER = new Attribute<>(false);
+ public static Attribute<Long> HDFS_TOKEN_RENEWAL_INTERVAL = new Attribute<>(86400000L);
public static Attribute<Long> HDFS_TOKEN_LIFE_TIME = new Attribute<>(604800000L);
+ public static Attribute<Long> RM_TOKEN_RENEWAL_INTERVAL = new Attribute<>(YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
public static Attribute<Long> RM_TOKEN_LIFE_TIME = new Attribute<>(YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
public static Attribute<String> PRINCIPAL = new Attribute<>(null, StringCodec.String2String.getInstance());
public static Attribute<String> KEY_TAB_FILE = new Attribute<>((String)null, StringCodec.String2String.getInstance());
diff --git a/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java b/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java
index 71eb825..7522906 100644
--- a/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java
+++ b/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java
@@ -18,28 +18,16 @@
*/
package com.datatorrent.stram.security;
-import java.io.File;
import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.security.PrivilegedExceptionAction;
-import java.util.Iterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.stram.client.StramClientUtils;
-import com.datatorrent.stram.util.FSUtil;
-
/**
* <p>StramUserLogin class.</p>
*
@@ -85,63 +73,6 @@ public class StramUserLogin
}
}
- public static long refreshTokens(long tokenLifeTime, String destinationDir, String destinationFile, final Configuration conf, String principal, String hdfsKeyTabFile, final Credentials credentials, final InetSocketAddress rmAddress, final boolean renewRMToken) throws IOException
- {
- long expiryTime = System.currentTimeMillis() + tokenLifeTime;
- //renew tokens
- final String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
- if (tokenRenewer == null || tokenRenewer.length() == 0) {
- throw new IOException("Can't get Master Kerberos principal for the RM to use as renewer");
- }
-
- File keyTabFile;
- try (FileSystem fs = FileSystem.newInstance(conf)) {
- keyTabFile = FSUtil.copyToLocalFileSystem(fs, destinationDir, destinationFile, hdfsKeyTabFile, conf);
- }
-
- if (principal == null) {
- principal = UserGroupInformation.getCurrentUser().getUserName();
- }
- UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTabFile.getAbsolutePath());
- try {
- ugi.doAs(new PrivilegedExceptionAction<Object>()
- {
- @Override
- public Object run() throws Exception
- {
-
- Credentials creds = new Credentials();
- try (FileSystem fs1 = FileSystem.newInstance(conf)) {
- fs1.addDelegationTokens(tokenRenewer, creds);
- }
- if (renewRMToken) {
- try (YarnClient yarnClient = StramClientUtils.createYarnClient(conf)) {
- new StramClientUtils.ClientRMHelper(yarnClient, conf).addRMDelegationToken(tokenRenewer, creds);
- }
- }
- credentials.addAll(creds);
-
- return null;
- }
- });
- UserGroupInformation.getCurrentUser().addCredentials(credentials);
- } catch (InterruptedException e) {
- LOG.error("Error while renewing tokens ", e);
- expiryTime = System.currentTimeMillis();
- } catch (IOException e) {
- LOG.error("Error while renewing tokens ", e);
- expiryTime = System.currentTimeMillis();
- }
- LOG.debug("number of tokens: {}", credentials.getAllTokens().size());
- Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
- while (iter.hasNext()) {
- Token<?> token = iter.next();
- LOG.debug("updated token: {}", token);
- }
- keyTabFile.delete();
- return expiryTime;
- }
-
public static String getPrincipal()
{
return principal;
diff --git a/engine/src/main/java/org/apache/apex/engine/security/TokenRenewer.java b/engine/src/main/java/org/apache/apex/engine/security/TokenRenewer.java
new file mode 100644
index 0000000..cda7c98
--- /dev/null
+++ b/engine/src/main/java/org/apache/apex/engine/security/TokenRenewer.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.engine.security;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.stram.client.StramClientUtils;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.util.FSUtil;
+
+public class TokenRenewer
+{
+
+ // The constant is not available hence defining here. If in future it is available this can be removed
+ private static final Text HDFS_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN");
+
+ private static final Logger logger = LoggerFactory.getLogger(TokenRenewer.class);
+
+ boolean renewRMToken;
+ Configuration conf;
+ String destinationFile;
+
+ long tokenLifeTime;
+ long tokenRenewalInterval;
+ String principal;
+ String hdfsKeyTabFile;
+ InetSocketAddress rmAddress;
+
+ long expiryTime;
+ long renewTime;
+ Credentials credentials;
+
+ public TokenRenewer(Context context, boolean renewRMToken, Configuration conf, String destinationFile) throws IOException
+ {
+ this.renewRMToken = renewRMToken;
+ this.destinationFile = destinationFile;
+ this.conf = conf;
+
+ if (renewRMToken) {
+ tokenLifeTime = (long)(context.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) * Math.min(context.getValue(LogicalPlan.HDFS_TOKEN_LIFE_TIME), context.getValue(LogicalPlan.RM_TOKEN_LIFE_TIME)));
+ tokenRenewalInterval = (long)(context.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) * Math.min(context.getValue(LogicalPlan.HDFS_TOKEN_RENEWAL_INTERVAL), context.getValue(LogicalPlan.RM_TOKEN_RENEWAL_INTERVAL)));
+ rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT);
+ } else {
+ tokenLifeTime = (long)(context.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) * context.getValue(LogicalPlan.HDFS_TOKEN_LIFE_TIME));
+ tokenRenewalInterval = (long)(context.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) * context.getValue(LogicalPlan.HDFS_TOKEN_RENEWAL_INTERVAL));
+ }
+
+ principal = context.getValue(LogicalPlan.PRINCIPAL);
+ hdfsKeyTabFile = context.getValue(LogicalPlan.KEY_TAB_FILE);
+
+ expiryTime = System.currentTimeMillis() + tokenLifeTime;
+ renewTime = expiryTime;
+
+ logger.debug("token life time {} renewal interval {}", tokenLifeTime, tokenRenewalInterval);
+ logger.debug("Token expiry time {} renew time {}", expiryTime, renewTime);
+
+ credentials = UserGroupInformation.getCurrentUser().getCredentials();
+ // Check credentials are proper at RM
+ if (renewRMToken) {
+ renewTokens(false, true);
+ }
+ }
+
+ public void checkAndRenew() throws IOException
+ {
+ boolean renew = false;
+ boolean refresh = false;
+ long currentTimeMillis = System.currentTimeMillis();
+ if (currentTimeMillis >= expiryTime && hdfsKeyTabFile != null) {
+ refresh = true;
+ } else if (currentTimeMillis >= renewTime) {
+ renew = true;
+ }
+ if (refresh || renew) {
+ long updateTime = renewTokens(refresh, false);
+ if (refresh) {
+ expiryTime = updateTime;
+ renewTime = currentTimeMillis + tokenRenewalInterval;
+ logger.debug("Token expiry time {} renew time {}", expiryTime, renewTime);
+ } else {
+ renewTime = updateTime;
+ logger.debug("Token renew time {}", renewTime);
+ }
+ }
+ }
+
+ private long renewTokens(final boolean refresh, boolean checkOnly) throws IOException
+ {
+ logger.info("{}", checkOnly ? "Checking renewal" : (refresh ? "Refreshing tokens" : "Renewing tokens"));
+ long expiryTime = System.currentTimeMillis() + (refresh ? tokenLifeTime : tokenRenewalInterval);
+
+ final String tokenRenewer = UserGroupInformation.getCurrentUser().getUserName();
+ logger.debug("Token renewer {}", tokenRenewer);
+
+ File keyTabFile = null;
+ try (FileSystem fs = FileSystem.newInstance(conf)) {
+ String destinationDir = FileUtils.getTempDirectoryPath();
+ keyTabFile = FSUtil.copyToLocalFileSystem(fs, destinationDir, destinationFile, hdfsKeyTabFile, conf);
+
+ if (principal == null) {
+ //principal = UserGroupInformation.getCurrentUser().getUserName();
+ principal = UserGroupInformation.getLoginUser().getUserName();
+ }
+ logger.debug("Principal {}", principal);
+ UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTabFile.getAbsolutePath());
+ if (!checkOnly) {
+ try {
+ UserGroupInformation currUGI = UserGroupInformation.createProxyUser(tokenRenewer, ugi);
+ currUGI.doAs(new PrivilegedExceptionAction<Object>()
+ {
+ @Override
+ public Object run() throws Exception
+ {
+
+ if (refresh) {
+ Credentials creds = new Credentials();
+ try (FileSystem fs1 = FileSystem.newInstance(conf)) {
+ logger.info("Refreshing fs tokens");
+ fs1.addDelegationTokens(tokenRenewer, creds);
+ logger.info("Refreshed tokens");
+ }
+ if (renewRMToken) {
+ try (YarnClient yarnClient = StramClientUtils.createYarnClient(conf)) {
+ logger.info("Refreshing rm tokens");
+ new StramClientUtils.ClientRMHelper(yarnClient, conf).addRMDelegationToken(tokenRenewer, creds);
+ logger.info("Refreshed tokens");
+ }
+ }
+ credentials.addAll(creds);
+ } else {
+ Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
+ for (Token<? extends TokenIdentifier> token : tokens) {
+ logger.debug("Token {}", token);
+ if (token.getKind().equals(HDFS_TOKEN_KIND) || (renewRMToken && token.getKind().equals(RMDelegationTokenIdentifier.KIND_NAME))) {
+ logger.info("Renewing token {}", token.getKind());
+ token.renew(conf);
+ logger.info("Renewed token");
+ }
+ }
+ }
+
+ return null;
+ }
+ });
+ UserGroupInformation.getCurrentUser().addCredentials(credentials);
+ } catch (InterruptedException e) {
+ logger.error("Error while renewing tokens ", e);
+ expiryTime = System.currentTimeMillis();
+ } catch (IOException e) {
+ logger.error("Error while renewing tokens ", e);
+ expiryTime = System.currentTimeMillis();
+ }
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("number of tokens: {}", credentials.getAllTokens().size());
+ Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
+ while (iter.hasNext()) {
+ Token<?> token = iter.next();
+ logger.debug("updated token: {}", token);
+ }
+ }
+ } finally {
+ if (keyTabFile != null) {
+ keyTabFile.delete();
+ }
+ }
+ return expiryTime;
+ }
+
+}
--
To stop receiving notification emails like this one, please contact
thw@apache.org.