You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2017/04/08 18:08:32 UTC

[09/24] twill git commit: (TWILL-171) Clone the HDFS delegation in HA mode.

(TWILL-171) Clone the HDFS delegation in HA mode.

- This is for working around HDFS-9276

This closes #42 on Github.

Signed-off-by: Terence Yim <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/d2a503ac
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/d2a503ac
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/d2a503ac

Branch: refs/heads/site
Commit: d2a503ac7032fb0874f414acad16015854e54e56
Parents: d1f1122
Author: Terence Yim <ch...@apache.org>
Authored: Mon Mar 27 10:51:26 2017 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Mon Mar 27 12:37:55 2017 -0700

----------------------------------------------------------------------
 .../appmaster/ApplicationMasterMain.java        |  2 +-
 .../appmaster/ApplicationMasterService.java     |  8 +++--
 .../internal/container/TwillContainerMain.java  |  2 +-
 .../container/TwillContainerService.java        |  7 +++--
 .../internal/yarn/AbstractYarnTwillService.java | 18 +++++++++--
 .../apache/twill/internal/yarn/YarnUtils.java   | 33 ++++++++++++++++++++
 6 files changed, 60 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/twill/blob/d2a503ac/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
index 81c61ac..445656d 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
@@ -84,7 +84,7 @@ public final class ApplicationMasterMain extends ServiceMain {
 
     final YarnAMClient amClient = new VersionDetectYarnAMClientFactory(conf).create();
     ApplicationMasterService service =
-      new ApplicationMasterService(runId, zkClientService, twillRuntimeSpec, amClient,
+      new ApplicationMasterService(runId, zkClientService, twillRuntimeSpec, amClient, conf,
                                    createAppLocation(conf, twillRuntimeSpec.getFsUser(),
                                                      twillRuntimeSpec.getTwillAppDir()));
     TrackerService trackerService = new TrackerService(service);

http://git-wip-us.apache.org/repos/asf/twill/blob/d2a503ac/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index 0f647cd..523ffce 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -37,6 +37,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -138,9 +139,10 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
   private Queue<RunnableContainerRequest> runnableContainerRequests;
   private ExecutorService instanceChangeExecutor;
 
-  public ApplicationMasterService(RunId runId, ZKClient zkClient, TwillRuntimeSpecification twillRuntimeSpec,
-                                  YarnAMClient amClient, Location applicationLocation) throws Exception {
-    super(zkClient, runId, applicationLocation);
+  public ApplicationMasterService(RunId runId, ZKClient zkClient,
+                                  TwillRuntimeSpecification twillRuntimeSpec, YarnAMClient amClient,
+                                  Configuration config, Location applicationLocation) throws Exception {
+    super(zkClient, runId, config, applicationLocation);
 
     this.runId = runId;
     this.twillRuntimeSpec = twillRuntimeSpec;

http://git-wip-us.apache.org/repos/asf/twill/blob/d2a503ac/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
index 2baaca1..e6d86a5 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
@@ -124,7 +124,7 @@ public final class TwillContainerMain extends ServiceMain {
     ZKClient containerZKClient = getContainerZKClient(zkClientService, appRunId, runnableName);
     Configuration conf = new YarnConfiguration(new HdfsConfiguration(new Configuration()));
     TwillContainerService service = new TwillContainerService(context, containerInfo, containerZKClient,
-                                                              runId, runnableSpec, getClassLoader(),
+                                                              runId, runnableSpec, getClassLoader(), conf,
                                                               createAppLocation(conf, twillRuntimeSpec.getFsUser(),
                                                                                 twillRuntimeSpec.getTwillAppDir()),
                                                               defaultLogLevels, logLevels);

http://git-wip-us.apache.org/repos/asf/twill/blob/d2a503ac/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
index 58298a0..6335f9f 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.twill.api.Command;
 import org.apache.twill.api.RunId;
 import org.apache.twill.api.TwillRunnable;
@@ -70,9 +71,9 @@ public final class TwillContainerService extends AbstractYarnTwillService {
 
   TwillContainerService(BasicTwillContext context, ContainerInfo containerInfo, ZKClient zkClient,
                         RunId runId, TwillRunnableSpecification specification, ClassLoader classLoader,
-                        Location applicationLocation, Map<String, String> defaultLogLevels,
-                        Map<String, String> logLevels) {
-    super(zkClient, runId, applicationLocation);
+                        Configuration config, Location applicationLocation,
+                        Map<String, String> defaultLogLevels, Map<String, String> logLevels) {
+    super(zkClient, runId, config, applicationLocation);
 
     this.specification = specification;
     this.classLoader = classLoader;

http://git-wip-us.apache.org/repos/asf/twill/blob/d2a503ac/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnTwillService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnTwillService.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnTwillService.java
index 64f81b4..44cabdc 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnTwillService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnTwillService.java
@@ -17,6 +17,7 @@
  */
 package org.apache.twill.internal.yarn;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.twill.api.RunId;
@@ -40,11 +41,15 @@ import java.io.IOException;
 public abstract class AbstractYarnTwillService extends AbstractTwillService {
 
   private static final Logger LOG = LoggerFactory.getLogger(AbstractYarnTwillService.class);
-  protected final Location applicationLocation;
+
+  private final Configuration config;
+  private final Location applicationLocation;
   protected volatile Credentials credentials;
 
-  protected AbstractYarnTwillService(ZKClient zkClient, RunId runId, Location applicationLocation) {
+  protected AbstractYarnTwillService(ZKClient zkClient, RunId runId,
+                                     Configuration config, Location applicationLocation) {
     super(zkClient, runId);
+    this.config = config;
     this.applicationLocation = applicationLocation;
   }
 
@@ -83,11 +88,20 @@ public abstract class AbstractYarnTwillService extends AbstractTwillService {
     try {
       Credentials credentials = new Credentials();
       Location location = getSecureStoreLocation();
+
+      // If failed to determine the secure store location, simply ignore the message.
+      if (location == null) {
+        return true;
+      }
+
       try (DataInputStream input = new DataInputStream(new BufferedInputStream(location.getInputStream()))) {
         credentials.readTokenStorageStream(input);
       }
 
       UserGroupInformation.getCurrentUser().addCredentials(credentials);
+
+      // Clone the HDFS tokens for HA NameNode. This is to workaround bug HDFS-9276.
+      YarnUtils.cloneHaNnCredentials(config);
       this.credentials = credentials;
 
       LOG.info("Secure store updated from {}.", location);

http://git-wip-us.apache.org/repos/asf/twill/blob/d2a503ac/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
index 9574554..ff8f4bb 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
@@ -21,9 +21,12 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.security.Credentials;
@@ -48,8 +51,10 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
+import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -178,6 +183,34 @@ public class YarnUtils {
   }
 
   /**
+   * Clones the delegation token to individual host behind the same logical address.
+   *
+   * @param config the hadoop configuration
+   * @throws IOException if failed to get information for the current user.
+   */
+  public static void cloneHaNnCredentials(Configuration config) throws IOException {
+    String scheme = URI.create(config.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
+                                          CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT)).getScheme();
+
+    // Loop through all name services. Each name service could have multiple name node associated with it.
+    for (Map.Entry<String, Map<String, InetSocketAddress>> entry : DFSUtil.getHaNnRpcAddresses(config).entrySet()) {
+      String nsId = entry.getKey();
+      Map<String, InetSocketAddress> addressesInNN = entry.getValue();
+      if (!HAUtil.isHAEnabled(config, nsId) || addressesInNN == null || addressesInNN.isEmpty()) {
+        continue;
+      }
+
+      // The client may have a delegation token set for the logical
+      // URI of the cluster. Clone this token to apply to each of the
+      // underlying IPC addresses so that the IPC code can find it.
+      URI uri = URI.create(scheme + "://" + nsId);
+
+      LOG.info("Cloning delegation token for uri {}", uri);
+      HAUtil.cloneDelegationTokenForLogicalUri(UserGroupInformation.getCurrentUser(), uri, addressesInNN.values());
+    }
+  }
+
+  /**
    * Encodes the given {@link Credentials} as bytes.
    */
   public static ByteBuffer encodeCredentials(Credentials credentials) {