You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2017/02/21 22:26:08 UTC
hive git commit: HIVE-15971: LLAP: logs urls should use daemon
container id instead of fake container id
Repository: hive
Updated Branches:
refs/heads/master de532b1f9 -> d5bb76cf2
HIVE-15971: LLAP: logs urls should use daemon container id instead of fake container id
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d5bb76cf
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d5bb76cf
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d5bb76cf
Branch: refs/heads/master
Commit: d5bb76cf2da3934d1de6b3087ac4bfafa2b2cb6f
Parents: de532b1
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Tue Feb 21 14:25:47 2017 -0800
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Tue Feb 21 14:25:47 2017 -0800
----------------------------------------------------------------------
.../llap/registry/impl/LlapRegistryService.java | 13 +--
.../llap/tezplugins/LlapTaskCommunicator.java | 91 +++++++++++---------
2 files changed, 57 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/d5bb76cf/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
index 5a94db9..610c0a5 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener;
import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
+import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.slf4j.Logger;
@@ -57,17 +58,17 @@ public class LlapRegistryService extends AbstractService {
String hosts = HiveConf.getTrimmedVar(conf, HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
Preconditions.checkNotNull(hosts, ConfVars.LLAP_DAEMON_SERVICE_HOSTS.toString() + " must be defined");
LlapRegistryService registry;
- // TODO: this is not going to work with multiple users.
if (hosts.startsWith("@")) {
// Caching instances only in case of the YARN registry. Each host based list will get it's own copy.
- String name = hosts.substring(1);
- if (yarnRegistries.containsKey(name) && yarnRegistries.get(name).isInState(STATE.STARTED)) {
- registry = yarnRegistries.get(name);
- } else {
+ String appName = hosts.substring(1);
+ String userName = HiveConf.getVar(conf, ConfVars.LLAP_ZK_REGISTRY_USER, RegistryUtils.currentUser());
+ String key = appName + "-" + userName;
+ registry = yarnRegistries.get(key);
+ if (registry == null || !registry.isInState(STATE.STARTED)) {
registry = new LlapRegistryService(false);
registry.init(conf);
registry.start();
- yarnRegistries.put(name, registry);
+ yarnRegistries.put(key, registry);
}
} else {
registry = new LlapRegistryService(false);
http://git-wip-us.apache.org/repos/asf/hive/blob/d5bb76cf/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index 3aae7a4..e593b33 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -14,12 +14,12 @@
package org.apache.hadoop.hive.llap.tezplugins;
+import org.apache.hadoop.hive.llap.registry.ServiceInstance;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.TezAttemptArray;
import java.io.IOException;
-import java.net.URI;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Map;
@@ -75,7 +75,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.apache.tez.common.TezTaskUmbilicalProtocol;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.security.JobTokenSecretManager;
@@ -119,7 +119,8 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
private final Token<LlapTokenIdentifier> token;
private final String user;
private String amHost;
- private URI timelineServerUri;
+ private String timelineServerUri;
+ private int nmPort;
// These two structures track the list of known nodes, and the list of nodes which are sending in keep-alive heartbeats.
// Primarily for debugging purposes a.t.m, since there's some unexplained TASK_TIMEOUTS which are currently being observed.
@@ -149,7 +150,6 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
Preconditions.checkState((token != null) == UserGroupInformation.isSecurityEnabled());
// Not closing this at the moment at shutdown, since this could be a shared instance.
- // TODO: this is unused.
serviceRegistry = LlapRegistryService.getClient(conf);
umbilical = new LlapTaskUmbilicalProtocolImpl(getUmbilical());
@@ -191,18 +191,10 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
+ "fileCleanupDelay=" + deleteDelayOnDagComplete
+ ", numCommunicatorThreads=" + numThreads);
this.communicator.init(conf);
- if (YarnConfiguration.useHttps(conf)) {
- timelineServerUri = URI
- .create(JOINER.join("https://", conf.get(
- YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS),
- RESOURCE_URI_STR));
- } else {
- timelineServerUri = URI.create(JOINER.join("http://", conf.get(
- YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS),
- RESOURCE_URI_STR));
- }
+ String scheme = WebAppUtils.getHttpSchemePrefix(conf);
+ String ahsUrl = WebAppUtils.getAHSWebAppURLWithoutScheme(conf);
+ this.timelineServerUri = WebAppUtils.getURLWithScheme(scheme, ahsUrl);
+ this.nmPort = Integer.valueOf(WebAppUtils.getNMWebAppURLWithoutScheme(conf).split(":")[1]);
}
@Override
@@ -540,37 +532,54 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
@Override
public String getInProgressLogsUrl(TezTaskAttemptID attemptID, NodeId containerNodeId) {
- String url = "";
- if (timelineServerUri != null && containerNodeId != null) {
- LlapNodeId llapNodeId = LlapNodeId.getInstance(containerNodeId.getHost(), containerNodeId.getPort());
- BiMap<ContainerId, TezTaskAttemptID> biMap = entityTracker.getContainerAttemptMapForNode(llapNodeId);
- ContainerId containerId = biMap.inverse().get(attemptID);
- if (containerId != null) {
- String dagId = attemptID.getTaskID().getVertexID().getDAGId().toString();
- String filename = currentHiveQueryId + "-" + dagId + ".log";
- // YARN-6011 provides a webservice to get the logs
- url = PATH_JOINER.join(timelineServerUri.toString(), "containers", containerId.toString(), "logs",
- filename);
- }
- }
- return url;
+ return constructLogUrl(attemptID, containerNodeId, false);
}
@Override
public String getCompletedLogsUrl(TezTaskAttemptID attemptID, NodeId containerNodeId) {
- String url = "";
- if (timelineServerUri != null && containerNodeId != null) {
- LlapNodeId llapNodeId = LlapNodeId.getInstance(containerNodeId.getHost(), containerNodeId.getPort());
- BiMap<ContainerId, TezTaskAttemptID> biMap = entityTracker.getContainerAttemptMapForNode(llapNodeId);
- ContainerId containerId = biMap.inverse().get(attemptID);
- if (containerId != null) {
- String dagId = attemptID.getTaskID().getVertexID().getDAGId().toString();
- String filename = currentHiveQueryId + "-" + dagId + ".log.done";
- // YARN-6011 provides a webservice to get the logs
- url = PATH_JOINER.join(timelineServerUri.toString(), "containers", containerId.toString(), "logs",
- filename);
+ return constructLogUrl(attemptID, containerNodeId, true);
+ }
+
+ private String constructLogUrl(final TezTaskAttemptID attemptID, final NodeId containerNodeId, final boolean isDone) {
+ if (timelineServerUri == null || containerNodeId == null) {
+ return null;
+ }
+ Set<ServiceInstance> instanceSet;
+ try {
+ instanceSet = serviceRegistry.getInstances().getByHost(containerNodeId.getHost());
+ } catch (IOException e) {
+ // Not failing the job due to a failure constructing the log url
+ LOG.warn(
+ "Unable to find instance for yarnNodeId={} to construct the log url. Exception message={}",
+ containerNodeId, e.getMessage());
+ return null;
+ }
+ if (instanceSet != null) {
+ ServiceInstance matchedInstance = null;
+ for (ServiceInstance instance : instanceSet) {
+ if (instance.getRpcPort() == containerNodeId.getPort()) {
+ matchedInstance = instance;
+ break;
+ }
+ }
+ if (matchedInstance != null) {
+ String containerIdString = matchedInstance.getProperties()
+ .get(HiveConf.ConfVars.LLAP_DAEMON_CONTAINER_ID.varname);
+ if (containerIdString != null) {
+ return constructLlapLogUrl(attemptID, containerIdString, isDone, containerNodeId.getHost());
+ }
}
}
+ return null;
+ }
+
+ private String constructLlapLogUrl(final TezTaskAttemptID attemptID, final String containerIdString,
+ final boolean isDone, final String nmHost) {
+ String dagId = attemptID.getTaskID().getVertexID().getDAGId().toString();
+ String filename = JOINER.join(currentHiveQueryId, "-", dagId, ".log", (isDone ? ".done" : ""),
+ "?nm.id=", nmHost, ":", nmPort);
+ String url = PATH_JOINER.join(timelineServerUri, "ws", "v1", "applicationhistory", "containers",
+ containerIdString, "logs", filename);
return url;
}