You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2017/02/21 22:26:08 UTC

hive git commit: HIVE-15971: LLAP: logs urls should use daemon container id instead of fake container id

Repository: hive
Updated Branches:
  refs/heads/master de532b1f9 -> d5bb76cf2


HIVE-15971: LLAP: logs urls should use daemon container id instead of fake container id


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d5bb76cf
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d5bb76cf
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d5bb76cf

Branch: refs/heads/master
Commit: d5bb76cf2da3934d1de6b3087ac4bfafa2b2cb6f
Parents: de532b1
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Tue Feb 21 14:25:47 2017 -0800
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Tue Feb 21 14:25:47 2017 -0800

----------------------------------------------------------------------
 .../llap/registry/impl/LlapRegistryService.java | 13 +--
 .../llap/tezplugins/LlapTaskCommunicator.java   | 91 +++++++++++---------
 2 files changed, 57 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d5bb76cf/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
index 5a94db9..610c0a5 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
 import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener;
 import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
+import org.apache.hadoop.registry.client.binding.RegistryUtils;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.slf4j.Logger;
@@ -57,17 +58,17 @@ public class LlapRegistryService extends AbstractService {
     String hosts = HiveConf.getTrimmedVar(conf, HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
     Preconditions.checkNotNull(hosts, ConfVars.LLAP_DAEMON_SERVICE_HOSTS.toString() + " must be defined");
     LlapRegistryService registry;
-    // TODO: this is not going to work with multiple users.
     if (hosts.startsWith("@")) {
       // Caching instances only in case of the YARN registry. Each host based list will get it's own copy.
-      String name = hosts.substring(1);
-      if (yarnRegistries.containsKey(name) && yarnRegistries.get(name).isInState(STATE.STARTED)) {
-        registry = yarnRegistries.get(name);
-      } else {
+      String appName = hosts.substring(1);
+      String userName = HiveConf.getVar(conf, ConfVars.LLAP_ZK_REGISTRY_USER, RegistryUtils.currentUser());
+      String key = appName + "-" + userName;
+      registry = yarnRegistries.get(key);
+      if (registry == null || !registry.isInState(STATE.STARTED)) {
         registry = new LlapRegistryService(false);
         registry.init(conf);
         registry.start();
-        yarnRegistries.put(name, registry);
+        yarnRegistries.put(key, registry);
       }
     } else {
       registry = new LlapRegistryService(false);

http://git-wip-us.apache.org/repos/asf/hive/blob/d5bb76cf/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index 3aae7a4..e593b33 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -14,12 +14,12 @@
 
 package org.apache.hadoop.hive.llap.tezplugins;
 
+import org.apache.hadoop.hive.llap.registry.ServiceInstance;
 import org.apache.hadoop.io.Writable;
 
 import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.TezAttemptArray;
 
 import java.io.IOException;
-import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.HashSet;
 import java.util.Map;
@@ -75,7 +75,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 import org.apache.tez.common.TezTaskUmbilicalProtocol;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.security.JobTokenSecretManager;
@@ -119,7 +119,8 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
   private final Token<LlapTokenIdentifier> token;
   private final String user;
   private String amHost;
-  private URI timelineServerUri;
+  private String timelineServerUri;
+  private int nmPort;
 
   // These two structures track the list of known nodes, and the list of nodes which are sending in keep-alive heartbeats.
   // Primarily for debugging purposes a.t.m, since there's some unexplained TASK_TIMEOUTS which are currently being observed.
@@ -149,7 +150,6 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
     Preconditions.checkState((token != null) == UserGroupInformation.isSecurityEnabled());
 
     // Not closing this at the moment at shutdown, since this could be a shared instance.
-    // TODO: this is unused.
     serviceRegistry = LlapRegistryService.getClient(conf);
 
     umbilical = new LlapTaskUmbilicalProtocolImpl(getUmbilical());
@@ -191,18 +191,10 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
         + "fileCleanupDelay=" + deleteDelayOnDagComplete
         + ", numCommunicatorThreads=" + numThreads);
     this.communicator.init(conf);
-    if (YarnConfiguration.useHttps(conf)) {
-      timelineServerUri = URI
-        .create(JOINER.join("https://", conf.get(
-          YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
-          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS),
-          RESOURCE_URI_STR));
-    } else {
-      timelineServerUri = URI.create(JOINER.join("http://", conf.get(
-        YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
-        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS),
-        RESOURCE_URI_STR));
-    }
+    String scheme = WebAppUtils.getHttpSchemePrefix(conf);
+    String ahsUrl = WebAppUtils.getAHSWebAppURLWithoutScheme(conf);
+    this.timelineServerUri = WebAppUtils.getURLWithScheme(scheme, ahsUrl);
+    this.nmPort = Integer.valueOf(WebAppUtils.getNMWebAppURLWithoutScheme(conf).split(":")[1]);
   }
 
   @Override
@@ -540,37 +532,54 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
 
   @Override
   public String getInProgressLogsUrl(TezTaskAttemptID attemptID, NodeId containerNodeId) {
-    String url = "";
-    if (timelineServerUri != null && containerNodeId != null) {
-      LlapNodeId llapNodeId = LlapNodeId.getInstance(containerNodeId.getHost(), containerNodeId.getPort());
-      BiMap<ContainerId, TezTaskAttemptID> biMap = entityTracker.getContainerAttemptMapForNode(llapNodeId);
-      ContainerId containerId = biMap.inverse().get(attemptID);
-      if (containerId != null) {
-        String dagId = attemptID.getTaskID().getVertexID().getDAGId().toString();
-        String filename = currentHiveQueryId + "-" + dagId + ".log";
-        // YARN-6011 provides a webservice to get the logs
-        url = PATH_JOINER.join(timelineServerUri.toString(), "containers", containerId.toString(), "logs",
-          filename);
-      }
-    }
-    return url;
+    return constructLogUrl(attemptID, containerNodeId, false);
   }
 
   @Override
   public String getCompletedLogsUrl(TezTaskAttemptID attemptID, NodeId containerNodeId) {
-    String url = "";
-    if (timelineServerUri != null && containerNodeId != null) {
-      LlapNodeId llapNodeId = LlapNodeId.getInstance(containerNodeId.getHost(), containerNodeId.getPort());
-      BiMap<ContainerId, TezTaskAttemptID> biMap = entityTracker.getContainerAttemptMapForNode(llapNodeId);
-      ContainerId containerId = biMap.inverse().get(attemptID);
-      if (containerId != null) {
-        String dagId = attemptID.getTaskID().getVertexID().getDAGId().toString();
-        String filename = currentHiveQueryId + "-" + dagId + ".log.done";
-        // YARN-6011 provides a webservice to get the logs
-        url = PATH_JOINER.join(timelineServerUri.toString(), "containers", containerId.toString(), "logs",
-          filename);
+    return constructLogUrl(attemptID, containerNodeId, true);
+  }
+
+  private String constructLogUrl(final TezTaskAttemptID attemptID, final NodeId containerNodeId, final boolean isDone) {
+    if (timelineServerUri == null || containerNodeId == null) {
+      return null;
+    }
+    Set<ServiceInstance> instanceSet;
+    try {
+      instanceSet = serviceRegistry.getInstances().getByHost(containerNodeId.getHost());
+    } catch (IOException e) {
+      // Not failing the job due to a failure constructing the log url
+      LOG.warn(
+        "Unable to find instance for yarnNodeId={} to construct the log url. Exception message={}",
+        containerNodeId, e.getMessage());
+      return null;
+    }
+    if (instanceSet != null) {
+      ServiceInstance matchedInstance = null;
+      for (ServiceInstance instance : instanceSet) {
+        if (instance.getRpcPort() == containerNodeId.getPort()) {
+          matchedInstance = instance;
+          break;
+        }
+      }
+      if (matchedInstance != null) {
+        String containerIdString = matchedInstance.getProperties()
+          .get(HiveConf.ConfVars.LLAP_DAEMON_CONTAINER_ID.varname);
+        if (containerIdString != null) {
+          return constructLlapLogUrl(attemptID, containerIdString, isDone, containerNodeId.getHost());
+        }
       }
     }
+    return null;
+  }
+
+  private String constructLlapLogUrl(final TezTaskAttemptID attemptID, final String containerIdString,
+    final boolean isDone, final String nmHost) {
+    String dagId = attemptID.getTaskID().getVertexID().getDAGId().toString();
+    String filename = JOINER.join(currentHiveQueryId, "-", dagId, ".log", (isDone ? ".done" : ""),
+      "?nm.id=", nmHost, ":", nmPort);
+    String url = PATH_JOINER.join(timelineServerUri, "ws", "v1", "applicationhistory", "containers",
+      containerIdString, "logs", filename);
     return url;
   }