You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/02/02 02:03:03 UTC

[04/50] [abbrv] hive git commit: HIVE-15722. LLAP: Avoid marking a query as complete if the AMReporter runs into an error. (Siddharth Seth, reviewed by Sergey Shelukhin, Prasanth Jayachandran)

HIVE-15722. LLAP: Avoid marking a query as complete if the AMReporter runs into an error. (Siddharth Seth, reviewed by Sergey Shelukhin, Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/aabe83db
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/aabe83db
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/aabe83db

Branch: refs/heads/hive-14535
Commit: aabe83dbf303ac4891d41ced72916c72b46ae072
Parents: 888e5d7
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Jan 27 11:52:52 2017 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Jan 27 11:52:52 2017 -0800

----------------------------------------------------------------------
 .../llap/daemon/impl/ContainerRunnerImpl.java   | 17 +++++-------
 .../hive/llap/daemon/impl/QueryTracker.java     | 29 ++++++++++++++++----
 2 files changed, 31 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/aabe83db/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 6d7d4de..8c33fa2 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -55,7 +55,6 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.Terminate
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrBinary;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
 import org.apache.hadoop.hive.llap.security.LlapSignerImpl;
-import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
 import org.apache.hadoop.hive.llap.tez.Converters;
 import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -183,14 +182,16 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
     SignableVertexSpec vertex = extractVertexSpec(request, tokenInfo);
     TezEvent initialEvent = extractInitialEvent(request, tokenInfo);
 
-    if (LOG.isInfoEnabled()) {
-      LOG.info("Queueing container for execution: " + stringifySubmitRequest(request, vertex));
-    }
-    QueryIdentifierProto qIdProto = vertex.getQueryIdentifier();
     TezTaskAttemptID attemptId =
         Converters.createTaskAttemptId(vertex.getQueryIdentifier(), vertex.getVertexIndex(),
             request.getFragmentNumber(), request.getAttemptNumber());
     String fragmentIdString = attemptId.toString();
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Queueing container for execution: fragemendId={}, {}",
+          fragmentIdString, stringifySubmitRequest(request, vertex));
+    }
+    QueryIdentifierProto qIdProto = vertex.getQueryIdentifier();
+
     HistoryLogger.logFragmentStart(qIdProto.getApplicationIdString(), request.getContainerIdString(),
         localAddress.get().getHostName(), vertex.getDagName(), qIdProto.getDagIndex(),
         vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber());
@@ -478,11 +479,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
   public void queryFailed(QueryIdentifier queryIdentifier) {
     LOG.info("Processing query failed notification for {}", queryIdentifier);
     List<QueryFragmentInfo> knownFragments;
-    try {
-      knownFragments = queryTracker.queryComplete(queryIdentifier, -1, true);
-    } catch (IOException e) {
-      throw new RuntimeException(e); // Should never happen here, no permission check.
-    }
+    knownFragments = queryTracker.getRegisteredFragments(queryIdentifier);
     LOG.info("DBG: Pending fragment count for failed query {} = {}", queryIdentifier,
         knownFragments.size());
     for (QueryFragmentInfo fragmentInfo : knownFragments) {

http://git-wip-us.apache.org/repos/asf/hive/blob/aabe83db/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index a7d7981..9eaddd2 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.llap.log.Log4jQueryCompleteMarker;
 import org.apache.hadoop.hive.llap.log.LogHelpers;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.log4j.MDC;
 import org.apache.logging.slf4j.Log4jMarker;
 import org.apache.tez.common.CallableWithNdc;
@@ -141,14 +142,17 @@ public class QueryTracker extends AbstractService {
       String fragmentIdString, LlapTokenInfo tokenInfo) throws IOException {
 
     ReadWriteLock dagLock = getDagLock(queryIdentifier);
+    // Note: This is a readLock to prevent a race with queryComplete. Operations
+    // and mutations within this lock need to be on concurrent structures.
     dagLock.readLock().lock();
     try {
       if (completedDagMap.contains(queryIdentifier)) {
         // Cleanup the dag lock here, since it may have been created after the query completed
         dagSpecificLocks.remove(queryIdentifier);
-        throw new RuntimeException(
-            "Dag " + dagName + " already complete. Rejecting fragment ["
-                + vertexName + ", " + fragmentNumber + ", " + attemptNumber + "]");
+        String message = "Dag " + dagName + " already complete. Rejecting fragment ["
+            + vertexName + ", " + fragmentNumber + ", " + attemptNumber + "]";
+        LOG.info(message);
+        throw new RuntimeException(message);
       }
       // TODO: for now, we get the secure username out of UGI... after signing, we can take it
       //       out of the request provided that it's signed.
@@ -211,6 +215,22 @@ public class QueryTracker extends AbstractService {
     }
   }
 
+  List<QueryFragmentInfo> getRegisteredFragments(QueryIdentifier queryIdentifier) {
+    ReadWriteLock dagLock = getDagLock(queryIdentifier);
+    dagLock.readLock().lock();
+    try {
+      QueryInfo queryInfo = queryInfoMap.get(queryIdentifier);
+      if (queryInfo == null) {
+        // Race with queryComplete
+        LOG.warn("Unknown query: Returning an empty list of fragments");
+        return Collections.emptyList();
+      }
+      return queryInfo.getRegisteredFragments();
+    } finally {
+      dagLock.readLock().unlock();
+    }
+  }
+
   /**
    * Register completion for a query
    * @param queryIdentifier
@@ -231,8 +251,7 @@ public class QueryTracker extends AbstractService {
           deleteDelay);
       queryInfoMap.remove(queryIdentifier);
       if (queryInfo == null) {
-        // One case where this happens is when a query is killed via an explicit signal, and then
-        // another message is received from teh AMHeartbeater.
+        // Should not happen.
         LOG.warn("Ignoring query complete for unknown dag: {}", queryIdentifier);
         return Collections.emptyList();
       }