You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/02/02 02:03:03 UTC
[04/50] [abbrv] hive git commit: HIVE-15722. LLAP: Avoid marking a
query as complete if the AMReporter runs into an error. (Siddharth Seth,
reviewed by Sergey Shelukhin, Prasanth Jayachandran)
HIVE-15722. LLAP: Avoid marking a query as complete if the AMReporter runs into an error. (Siddharth Seth, reviewed by Sergey Shelukhin, Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/aabe83db
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/aabe83db
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/aabe83db
Branch: refs/heads/hive-14535
Commit: aabe83dbf303ac4891d41ced72916c72b46ae072
Parents: 888e5d7
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Jan 27 11:52:52 2017 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Jan 27 11:52:52 2017 -0800
----------------------------------------------------------------------
.../llap/daemon/impl/ContainerRunnerImpl.java | 17 +++++-------
.../hive/llap/daemon/impl/QueryTracker.java | 29 ++++++++++++++++----
2 files changed, 31 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/aabe83db/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 6d7d4de..8c33fa2 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -55,7 +55,6 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.Terminate
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrBinary;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
import org.apache.hadoop.hive.llap.security.LlapSignerImpl;
-import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
import org.apache.hadoop.hive.llap.tez.Converters;
import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
import org.apache.hadoop.io.DataInputBuffer;
@@ -183,14 +182,16 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
SignableVertexSpec vertex = extractVertexSpec(request, tokenInfo);
TezEvent initialEvent = extractInitialEvent(request, tokenInfo);
- if (LOG.isInfoEnabled()) {
- LOG.info("Queueing container for execution: " + stringifySubmitRequest(request, vertex));
- }
- QueryIdentifierProto qIdProto = vertex.getQueryIdentifier();
TezTaskAttemptID attemptId =
Converters.createTaskAttemptId(vertex.getQueryIdentifier(), vertex.getVertexIndex(),
request.getFragmentNumber(), request.getAttemptNumber());
String fragmentIdString = attemptId.toString();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Queueing container for execution: fragemendId={}, {}",
+ fragmentIdString, stringifySubmitRequest(request, vertex));
+ }
+ QueryIdentifierProto qIdProto = vertex.getQueryIdentifier();
+
HistoryLogger.logFragmentStart(qIdProto.getApplicationIdString(), request.getContainerIdString(),
localAddress.get().getHostName(), vertex.getDagName(), qIdProto.getDagIndex(),
vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber());
@@ -478,11 +479,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
public void queryFailed(QueryIdentifier queryIdentifier) {
LOG.info("Processing query failed notification for {}", queryIdentifier);
List<QueryFragmentInfo> knownFragments;
- try {
- knownFragments = queryTracker.queryComplete(queryIdentifier, -1, true);
- } catch (IOException e) {
- throw new RuntimeException(e); // Should never happen here, no permission check.
- }
+ knownFragments = queryTracker.getRegisteredFragments(queryIdentifier);
LOG.info("DBG: Pending fragment count for failed query {} = {}", queryIdentifier,
knownFragments.size());
for (QueryFragmentInfo fragmentInfo : knownFragments) {
http://git-wip-us.apache.org/repos/asf/hive/blob/aabe83db/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index a7d7981..9eaddd2 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.llap.log.Log4jQueryCompleteMarker;
import org.apache.hadoop.hive.llap.log.LogHelpers;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.MDC;
import org.apache.logging.slf4j.Log4jMarker;
import org.apache.tez.common.CallableWithNdc;
@@ -141,14 +142,17 @@ public class QueryTracker extends AbstractService {
String fragmentIdString, LlapTokenInfo tokenInfo) throws IOException {
ReadWriteLock dagLock = getDagLock(queryIdentifier);
+ // Note: This is a readLock to prevent a race with queryComplete. Operations
+ // and mutations within this lock need to be on concurrent structures.
dagLock.readLock().lock();
try {
if (completedDagMap.contains(queryIdentifier)) {
// Cleanup the dag lock here, since it may have been created after the query completed
dagSpecificLocks.remove(queryIdentifier);
- throw new RuntimeException(
- "Dag " + dagName + " already complete. Rejecting fragment ["
- + vertexName + ", " + fragmentNumber + ", " + attemptNumber + "]");
+ String message = "Dag " + dagName + " already complete. Rejecting fragment ["
+ + vertexName + ", " + fragmentNumber + ", " + attemptNumber + "]";
+ LOG.info(message);
+ throw new RuntimeException(message);
}
// TODO: for now, we get the secure username out of UGI... after signing, we can take it
// out of the request provided that it's signed.
@@ -211,6 +215,22 @@ public class QueryTracker extends AbstractService {
}
}
+ List<QueryFragmentInfo> getRegisteredFragments(QueryIdentifier queryIdentifier) {
+ ReadWriteLock dagLock = getDagLock(queryIdentifier);
+ dagLock.readLock().lock();
+ try {
+ QueryInfo queryInfo = queryInfoMap.get(queryIdentifier);
+ if (queryInfo == null) {
+ // Race with queryComplete
+ LOG.warn("Unknown query: Returning an empty list of fragments");
+ return Collections.emptyList();
+ }
+ return queryInfo.getRegisteredFragments();
+ } finally {
+ dagLock.readLock().unlock();
+ }
+ }
+
/**
* Register completion for a query
* @param queryIdentifier
@@ -231,8 +251,7 @@ public class QueryTracker extends AbstractService {
deleteDelay);
queryInfoMap.remove(queryIdentifier);
if (queryInfo == null) {
- // One case where this happens is when a query is killed via an explicit signal, and then
- // another message is received from teh AMHeartbeater.
+ // Should not happen.
LOG.warn("Ignoring query complete for unknown dag: {}", queryIdentifier);
return Collections.emptyList();
}