You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2017/04/08 18:08:32 UTC
[09/24] twill git commit: (TWILL-171) Clone the HDFS delegation in HA
mode.
(TWILL-171) Clone the HDFS delegation in HA mode.
- This is for working around HDFS-9276
This closes #42 on Github.
Signed-off-by: Terence Yim <ch...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/d2a503ac
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/d2a503ac
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/d2a503ac
Branch: refs/heads/site
Commit: d2a503ac7032fb0874f414acad16015854e54e56
Parents: d1f1122
Author: Terence Yim <ch...@apache.org>
Authored: Mon Mar 27 10:51:26 2017 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Mon Mar 27 12:37:55 2017 -0700
----------------------------------------------------------------------
.../appmaster/ApplicationMasterMain.java | 2 +-
.../appmaster/ApplicationMasterService.java | 8 +++--
.../internal/container/TwillContainerMain.java | 2 +-
.../container/TwillContainerService.java | 7 +++--
.../internal/yarn/AbstractYarnTwillService.java | 18 +++++++++--
.../apache/twill/internal/yarn/YarnUtils.java | 33 ++++++++++++++++++++
6 files changed, 60 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/twill/blob/d2a503ac/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
index 81c61ac..445656d 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
@@ -84,7 +84,7 @@ public final class ApplicationMasterMain extends ServiceMain {
final YarnAMClient amClient = new VersionDetectYarnAMClientFactory(conf).create();
ApplicationMasterService service =
- new ApplicationMasterService(runId, zkClientService, twillRuntimeSpec, amClient,
+ new ApplicationMasterService(runId, zkClientService, twillRuntimeSpec, amClient, conf,
createAppLocation(conf, twillRuntimeSpec.getFsUser(),
twillRuntimeSpec.getTwillAppDir()));
TrackerService trackerService = new TrackerService(service);
http://git-wip-us.apache.org/repos/asf/twill/blob/d2a503ac/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index 0f647cd..523ffce 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -37,6 +37,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
@@ -138,9 +139,10 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
private Queue<RunnableContainerRequest> runnableContainerRequests;
private ExecutorService instanceChangeExecutor;
- public ApplicationMasterService(RunId runId, ZKClient zkClient, TwillRuntimeSpecification twillRuntimeSpec,
- YarnAMClient amClient, Location applicationLocation) throws Exception {
- super(zkClient, runId, applicationLocation);
+ public ApplicationMasterService(RunId runId, ZKClient zkClient,
+ TwillRuntimeSpecification twillRuntimeSpec, YarnAMClient amClient,
+ Configuration config, Location applicationLocation) throws Exception {
+ super(zkClient, runId, config, applicationLocation);
this.runId = runId;
this.twillRuntimeSpec = twillRuntimeSpec;
http://git-wip-us.apache.org/repos/asf/twill/blob/d2a503ac/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
index 2baaca1..e6d86a5 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
@@ -124,7 +124,7 @@ public final class TwillContainerMain extends ServiceMain {
ZKClient containerZKClient = getContainerZKClient(zkClientService, appRunId, runnableName);
Configuration conf = new YarnConfiguration(new HdfsConfiguration(new Configuration()));
TwillContainerService service = new TwillContainerService(context, containerInfo, containerZKClient,
- runId, runnableSpec, getClassLoader(),
+ runId, runnableSpec, getClassLoader(), conf,
createAppLocation(conf, twillRuntimeSpec.getFsUser(),
twillRuntimeSpec.getTwillAppDir()),
defaultLogLevels, logLevels);
http://git-wip-us.apache.org/repos/asf/twill/blob/d2a503ac/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
index 58298a0..6335f9f 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
+import org.apache.hadoop.conf.Configuration;
import org.apache.twill.api.Command;
import org.apache.twill.api.RunId;
import org.apache.twill.api.TwillRunnable;
@@ -70,9 +71,9 @@ public final class TwillContainerService extends AbstractYarnTwillService {
TwillContainerService(BasicTwillContext context, ContainerInfo containerInfo, ZKClient zkClient,
RunId runId, TwillRunnableSpecification specification, ClassLoader classLoader,
- Location applicationLocation, Map<String, String> defaultLogLevels,
- Map<String, String> logLevels) {
- super(zkClient, runId, applicationLocation);
+ Configuration config, Location applicationLocation,
+ Map<String, String> defaultLogLevels, Map<String, String> logLevels) {
+ super(zkClient, runId, config, applicationLocation);
this.specification = specification;
this.classLoader = classLoader;
http://git-wip-us.apache.org/repos/asf/twill/blob/d2a503ac/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnTwillService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnTwillService.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnTwillService.java
index 64f81b4..44cabdc 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnTwillService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnTwillService.java
@@ -17,6 +17,7 @@
*/
package org.apache.twill.internal.yarn;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.twill.api.RunId;
@@ -40,11 +41,15 @@ import java.io.IOException;
public abstract class AbstractYarnTwillService extends AbstractTwillService {
private static final Logger LOG = LoggerFactory.getLogger(AbstractYarnTwillService.class);
- protected final Location applicationLocation;
+
+ private final Configuration config;
+ private final Location applicationLocation;
protected volatile Credentials credentials;
- protected AbstractYarnTwillService(ZKClient zkClient, RunId runId, Location applicationLocation) {
+ protected AbstractYarnTwillService(ZKClient zkClient, RunId runId,
+ Configuration config, Location applicationLocation) {
super(zkClient, runId);
+ this.config = config;
this.applicationLocation = applicationLocation;
}
@@ -83,11 +88,20 @@ public abstract class AbstractYarnTwillService extends AbstractTwillService {
try {
Credentials credentials = new Credentials();
Location location = getSecureStoreLocation();
+
+ // If failed to determine the secure store location, simply ignore the message.
+ if (location == null) {
+ return true;
+ }
+
try (DataInputStream input = new DataInputStream(new BufferedInputStream(location.getInputStream()))) {
credentials.readTokenStorageStream(input);
}
UserGroupInformation.getCurrentUser().addCredentials(credentials);
+
+ // Clone the HDFS tokens for HA NameNode. This is to workaround bug HDFS-9276.
+ YarnUtils.cloneHaNnCredentials(config);
this.credentials = credentials;
LOG.info("Secure store updated from {}.", location);
http://git-wip-us.apache.org/repos/asf/twill/blob/d2a503ac/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
index 9574554..ff8f4bb 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
@@ -21,9 +21,12 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
@@ -48,8 +51,10 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
+import java.net.URI;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -178,6 +183,34 @@ public class YarnUtils {
}
/**
+ * Clones the delegation token to individual host behind the same logical address.
+ *
+ * @param config the hadoop configuration
+ * @throws IOException if failed to get information for the current user.
+ */
+ public static void cloneHaNnCredentials(Configuration config) throws IOException {
+ String scheme = URI.create(config.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
+ CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT)).getScheme();
+
+ // Loop through all name services. Each name service could have multiple name node associated with it.
+ for (Map.Entry<String, Map<String, InetSocketAddress>> entry : DFSUtil.getHaNnRpcAddresses(config).entrySet()) {
+ String nsId = entry.getKey();
+ Map<String, InetSocketAddress> addressesInNN = entry.getValue();
+ if (!HAUtil.isHAEnabled(config, nsId) || addressesInNN == null || addressesInNN.isEmpty()) {
+ continue;
+ }
+
+ // The client may have a delegation token set for the logical
+ // URI of the cluster. Clone this token to apply to each of the
+ // underlying IPC addresses so that the IPC code can find it.
+ URI uri = URI.create(scheme + "://" + nsId);
+
+ LOG.info("Cloning delegation token for uri {}", uri);
+ HAUtil.cloneDelegationTokenForLogicalUri(UserGroupInformation.getCurrentUser(), uri, addressesInNN.values());
+ }
+ }
+
+ /**
* Encodes the given {@link Credentials} as bytes.
*/
public static ByteBuffer encodeCredentials(Credentials credentials) {