You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2017/02/28 03:32:32 UTC
hive git commit: HIVE-15958: LLAP: IPC connections are not being
reused for umbilical protocol (Prasanth Jayachandran reviewed by Siddharth
Seth)
Repository: hive
Updated Branches:
refs/heads/master ec4036bb6 -> 2869eca25
HIVE-15958: LLAP: IPC connections are not being reused for umbilical protocol (Prasanth Jayachandran reviewed by Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2869eca2
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2869eca2
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2869eca2
Branch: refs/heads/master
Commit: 2869eca251ae7bfe6c0ef1c7cf2fc2647ed98f53
Parents: ec4036b
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Mon Feb 27 19:32:23 2017 -0800
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Mon Feb 27 19:32:23 2017 -0800
----------------------------------------------------------------------
.../hive/llap/daemon/impl/AMReporter.java | 77 ++++++++++++--------
.../llap/daemon/impl/ContainerRunnerImpl.java | 21 ++++--
.../hadoop/hive/llap/daemon/impl/QueryInfo.java | 17 +++--
.../hive/llap/daemon/impl/QueryTracker.java | 20 ++---
.../daemon/impl/TaskExecutorTestHelpers.java | 2 +-
5 files changed, 82 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/2869eca2/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
index a30f8b9..65f7232 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
@@ -16,7 +16,6 @@ package org.apache.hadoop.hive.llap.daemon.impl;
import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.TezAttemptArray;
-import org.apache.hadoop.io.ArrayWritable;
import java.util.ArrayList;
@@ -26,7 +25,6 @@ import java.util.HashSet;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import javax.net.SocketFactory;
@@ -44,7 +42,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.util.concurrent.FutureCallback;
@@ -127,9 +124,9 @@ public class AMReporter extends AbstractService {
this.conf = conf;
this.daemonId = daemonId;
if (maxThreads < numExecutors) {
- maxThreads = numExecutors;
LOG.warn("maxThreads={} is less than numExecutors={}. Setting maxThreads=numExecutors",
- maxThreads, numExecutors);
+ maxThreads, numExecutors);
+ maxThreads = numExecutors;
}
ExecutorService rawExecutor =
new ThreadPoolExecutor(numExecutors, maxThreads,
@@ -241,12 +238,15 @@ public class AMReporter extends AbstractService {
public void taskKilled(String amLocation, int port, String user, Token<JobTokenIdentifier> jobToken,
final QueryIdentifier queryIdentifier, final TezTaskAttemptID taskAttemptId) {
- // Not re-using the connection for the AM heartbeat - which may or may not be open by this point.
- // knownAppMasters is used for sending heartbeats for queued tasks. Killed messages use a new connection.
LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port);
- AMNodeInfo amNodeInfo =
- new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory,
- conf);
+ AMNodeInfo amNodeInfo;
+ synchronized (knownAppMasters) {
+ amNodeInfo = knownAppMasters.get(amNodeId);
+ if (amNodeInfo == null) {
+ amNodeInfo = new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory,
+ conf);
+ }
+ }
// Even if the service hasn't started up. It's OK to make this invocation since this will
// only happen after the AtomicReference address has been populated. Not adding an additional check.
@@ -266,6 +266,20 @@ public class AMReporter extends AbstractService {
});
}
+ public void queryComplete(LlapNodeId llapNodeId) {
+ if (llapNodeId != null) {
+ synchronized (knownAppMasters) {
+ AMNodeInfo amNodeInfo = knownAppMasters.remove(llapNodeId);
+ // TODO: not stopping umbilical explicitly as some taskKill requests may get scheduled during queryComplete
+ // which will be using the umbilical. HIVE-16021 should fix this, until then leave umbilical open and wait for
+ // it to be closed after max idle timeout (10s default)
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Query complete received. Removed {}.", amNodeInfo);
+ }
+ }
+ }
+ }
+
private class QueueLookupCallable extends CallableWithNdc<Void> {
@Override
@@ -273,7 +287,7 @@ public class AMReporter extends AbstractService {
while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) {
try {
final AMNodeInfo amNodeInfo = pendingHeartbeatQueeu.take();
- if (amNodeInfo.getTaskCount() == 0 || amNodeInfo.hasAmFailed()) {
+ if (amNodeInfo.hasAmFailed()) {
synchronized (knownAppMasters) {
if (LOG.isDebugEnabled()) {
LOG.debug(
@@ -283,28 +297,29 @@ public class AMReporter extends AbstractService {
}
knownAppMasters.remove(amNodeInfo.amNodeId);
}
- amNodeInfo.stopUmbilical();
} else {
- // Add back to the queue for the next heartbeat, and schedule the actual heartbeat
- long next = System.currentTimeMillis() + heartbeatInterval;
- amNodeInfo.setNextHeartbeatTime(next);
- pendingHeartbeatQueeu.add(amNodeInfo);
- ListenableFuture<Void> future = executor.submit(new AMHeartbeatCallable(amNodeInfo));
- Futures.addCallback(future, new FutureCallback<Void>() {
- @Override
- public void onSuccess(Void result) {
- // Nothing to do.
- }
-
- @Override
- public void onFailure(Throwable t) {
- QueryIdentifier currentQueryIdentifier = amNodeInfo.getCurrentQueryIdentifier();
- amNodeInfo.setAmFailed(true);
- LOG.warn("Heartbeat failed to AM {}. Killing all other tasks for the query={}",
+ if (amNodeInfo.getTaskCount() > 0) {
+ // Add back to the queue for the next heartbeat, and schedule the actual heartbeat
+ long next = System.currentTimeMillis() + heartbeatInterval;
+ amNodeInfo.setNextHeartbeatTime(next);
+ pendingHeartbeatQueeu.add(amNodeInfo);
+ ListenableFuture<Void> future = executor.submit(new AMHeartbeatCallable(amNodeInfo));
+ Futures.addCallback(future, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(Void result) {
+ // Nothing to do.
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ QueryIdentifier currentQueryIdentifier = amNodeInfo.getCurrentQueryIdentifier();
+ amNodeInfo.setAmFailed(true);
+ LOG.warn("Heartbeat failed to AM {}. Killing all other tasks for the query={}",
amNodeInfo.amNodeId, currentQueryIdentifier, t);
- queryFailedHandler.queryFailed(currentQueryIdentifier);
- }
- });
+ queryFailedHandler.queryFailed(currentQueryIdentifier);
+ }
+ });
+ }
}
} catch (InterruptedException e) {
if (isShutdown.get()) {
http://git-wip-us.apache.org/repos/asf/hive/blob/2869eca2/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index fac9730..af8f5b0 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.common.UgiFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.DaemonId;
+import org.apache.hadoop.hive.llap.LlapNodeId;
import org.apache.hadoop.hive.llap.NotTezEventHelper;
import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
@@ -242,12 +243,12 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
+ LlapNodeId amNodeId = LlapNodeId.getInstance(request.getAmHost(), request.getAmPort());
QueryFragmentInfo fragmentInfo = queryTracker.registerFragment(
queryIdentifier, qIdProto.getApplicationIdString(), dagId,
vertex.getDagName(), vertex.getHiveQueryId(), dagIdentifier,
vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber(),
- vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo, request.getAmHost(),
- request.getAmPort());
+ vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo, amNodeId);
String[] localDirs = fragmentInfo.getLocalDirs();
Preconditions.checkNotNull(localDirs);
@@ -390,14 +391,18 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
new QueryIdentifier(request.getQueryIdentifier().getApplicationIdString(),
request.getQueryIdentifier().getDagIndex());
LOG.info("Processing queryComplete notification for {}", queryIdentifier);
- List<QueryFragmentInfo> knownFragments = queryTracker.queryComplete(
- queryIdentifier, request.getDeleteDelay(), false);
- LOG.info("DBG: Pending fragment count for completed query {} = {}", queryIdentifier,
+ QueryInfo queryInfo = queryTracker.queryComplete(queryIdentifier, request.getDeleteDelay(), false);
+ if (queryInfo != null) {
+ List<QueryFragmentInfo> knownFragments = queryInfo.getRegisteredFragments();
+ LOG.info("DBG: Pending fragment count for completed query {} = {}", queryIdentifier,
knownFragments.size());
- for (QueryFragmentInfo fragmentInfo : knownFragments) {
- LOG.info("Issuing killFragment for completed query {} {}", queryIdentifier,
+ for (QueryFragmentInfo fragmentInfo : knownFragments) {
+ LOG.info("Issuing killFragment for completed query {} {}", queryIdentifier,
fragmentInfo.getFragmentIdentifierString());
- executorService.killFragment(fragmentInfo.getFragmentIdentifierString());
+ executorService.killFragment(fragmentInfo.getFragmentIdentifierString());
+ }
+ LlapNodeId amNodeId = queryInfo.getAmNodeId();
+ amReporter.queryComplete(amNodeId);
}
return QueryCompleteResponseProto.getDefaultInstance();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2869eca2/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
index eaa3e7e..088f07c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
@@ -35,6 +35,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.llap.LlapNodeId;
import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto;
@@ -55,6 +56,7 @@ public class QueryInfo {
private final String[] localDirsBase;
private final FileSystem localFs;
private String[] localDirs;
+ private final LlapNodeId amNodeId;
// Map of states for different vertices.
private final Set<QueryFragmentInfo> knownFragments =
@@ -67,11 +69,11 @@ public class QueryInfo {
private final AtomicReference<UserGroupInformation> umbilicalUgi;
public QueryInfo(QueryIdentifier queryIdentifier, String appIdString, String dagIdString,
- String dagName, String hiveQueryIdString,
- int dagIdentifier, String user,
- ConcurrentMap<String, SourceStateProto> sourceStateMap,
- String[] localDirsBase, FileSystem localFs, String tokenUserName,
- String tokenAppId) {
+ String dagName, String hiveQueryIdString,
+ int dagIdentifier, String user,
+ ConcurrentMap<String, SourceStateProto> sourceStateMap,
+ String[] localDirsBase, FileSystem localFs, String tokenUserName,
+ String tokenAppId, final LlapNodeId amNodeId) {
this.queryIdentifier = queryIdentifier;
this.appIdString = appIdString;
this.dagIdString = dagIdString;
@@ -85,6 +87,7 @@ public class QueryInfo {
this.tokenUserName = tokenUserName;
this.appId = tokenAppId;
this.umbilicalUgi = new AtomicReference<>();
+ this.amNodeId = amNodeId;
}
public QueryIdentifier getQueryIdentifier() {
@@ -115,6 +118,10 @@ public class QueryInfo {
return sourceStateMap;
}
+ public LlapNodeId getAmNodeId() {
+ return amNodeId;
+ }
+
public QueryFragmentInfo registerFragment(String vertexName, int fragmentNumber,
int attemptNumber, SignableVertexSpec vertexSpec, String fragmentIdString) {
QueryFragmentInfo fragmentInfo = new QueryFragmentInfo(
http://git-wip-us.apache.org/repos/asf/hive/blob/2869eca2/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index 5cf3a38..7e646c5 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -21,6 +21,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.llap.LlapNodeId;
import org.apache.hadoop.hive.llap.log.Log4jQueryCompleteMarker;
import org.apache.hadoop.hive.llap.log.LogHelpers;
import org.apache.hadoop.security.UserGroupInformation;
@@ -69,8 +70,6 @@ public class QueryTracker extends AbstractService {
private final ConcurrentHashMap<QueryIdentifier, QueryInfo> queryInfoMap = new ConcurrentHashMap<>();
-
-
private final String[] localDirsBase;
private final FileSystem localFs;
private final String clusterId;
@@ -137,9 +136,10 @@ public class QueryTracker extends AbstractService {
* Register a new fragment for a specific query
*/
QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appIdString, String dagIdString,
- String dagName, String hiveQueryIdString, int dagIdentifier, String vertexName, int fragmentNumber, int attemptNumber,
- String user, SignableVertexSpec vertex, Token<JobTokenIdentifier> appToken,
- String fragmentIdString, LlapTokenInfo tokenInfo, String amHost, int amPort) throws IOException {
+ String dagName, String hiveQueryIdString, int dagIdentifier, String vertexName, int fragmentNumber,
+ int attemptNumber,
+ String user, SignableVertexSpec vertex, Token<JobTokenIdentifier> appToken,
+ String fragmentIdString, LlapTokenInfo tokenInfo, final LlapNodeId amNodeId) throws IOException {
ReadWriteLock dagLock = getDagLock(queryIdentifier);
// Note: This is a readLock to prevent a race with queryComplete. Operations
@@ -169,13 +169,13 @@ public class QueryTracker extends AbstractService {
new QueryInfo(queryIdentifier, appIdString, dagIdString, dagName, hiveQueryIdString,
dagIdentifier, user,
getSourceCompletionMap(queryIdentifier), localDirsBase, localFs,
- tokenInfo.userName, tokenInfo.appId);
+ tokenInfo.userName, tokenInfo.appId, amNodeId);
QueryInfo old = queryInfoMap.putIfAbsent(queryIdentifier, queryInfo);
if (old != null) {
queryInfo = old;
} else {
// Ensure the UGI is setup once.
- queryInfo.setupUmbilicalUgi(vertex.getTokenIdentifier(), appToken, amHost, amPort);
+ queryInfo.setupUmbilicalUgi(vertex.getTokenIdentifier(), appToken, amNodeId.getHostname(), amNodeId.getPort());
isExistingQueryInfo = false;
}
}
@@ -238,7 +238,7 @@ public class QueryTracker extends AbstractService {
* @param queryIdentifier
* @param deleteDelay
*/
- List<QueryFragmentInfo> queryComplete(QueryIdentifier queryIdentifier, long deleteDelay,
+ QueryInfo queryComplete(QueryIdentifier queryIdentifier, long deleteDelay,
boolean isInternal) throws IOException {
if (deleteDelay == -1) {
deleteDelay = defaultDeleteDelaySeconds;
@@ -255,7 +255,7 @@ public class QueryTracker extends AbstractService {
if (queryInfo == null) {
// Should not happen.
LOG.warn("Ignoring query complete for unknown dag: {}", queryIdentifier);
- return Collections.emptyList();
+ return null;
}
String[] localDirs = queryInfo.getLocalDirsNoCreate();
if (localDirs != null) {
@@ -292,7 +292,7 @@ public class QueryTracker extends AbstractService {
if (savedQueryId != null) {
ObjectCacheFactory.removeLlapQueryCache(savedQueryId);
}
- return queryInfo.getRegisteredFragments();
+ return queryInfo;
} finally {
dagLock.writeLock().unlock();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2869eca2/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
index 71feb33..259e383 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -93,7 +93,7 @@ public class TaskExecutorTestHelpers {
new QueryInfo(queryIdentifier, "fake_app_id_string", "fake_dag_id_string", "fake_dag_name",
"fakeHiveQueryId", 1, "fakeUser",
new ConcurrentHashMap<String, LlapDaemonProtocolProtos.SourceStateProto>(),
- new String[0], null, "fakeUser", null);
+ new String[0], null, "fakeUser", null, null);
return queryInfo;
}