You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ss...@apache.org on 2017/01/10 07:54:57 UTC

hive git commit: HIVE-15562. LLAP TaskExecutorService race can lead to some fragments being permanently lost. (Siddharth Seth, reviewed by Sergey Shelukhin)

Repository: hive
Updated Branches:
  refs/heads/master b4ac43db4 -> 1749d7045


HIVE-15562. LLAP TaskExecutorService race can lead to some fragments being permanently lost. (Siddharth Seth, reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1749d704
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1749d704
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1749d704

Branch: refs/heads/master
Commit: 1749d70455bd6f8d010dd18566dac5774487c753
Parents: b4ac43d
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Jan 9 23:54:25 2017 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Jan 9 23:54:25 2017 -0800

----------------------------------------------------------------------
 .../impl/LlapZookeeperRegistryImpl.java         |  4 +-
 .../impl/EvictingPriorityBlockingQueue.java     | 16 ++++-
 .../llap/daemon/impl/PriorityBlockingDeque.java |  4 +-
 .../hadoop/hive/llap/daemon/impl/QueryInfo.java |  7 ++-
 .../hive/llap/daemon/impl/QueryTracker.java     |  2 +
 .../llap/daemon/impl/TaskExecutorService.java   | 61 +++++++++++++++-----
 .../llap/tezplugins/LlapTaskCommunicator.java   | 24 +++++---
 7 files changed, 91 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/1749d704/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
index 8d9a98a..49ab59a 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
@@ -410,8 +410,8 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
     public DynamicServiceInstance(ServiceRecord srv) throws IOException {
       this.srv = srv;
 
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Working with ServiceRecord: {}", srv);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Working with ServiceRecord: {}", srv);
       }
 
       final Endpoint shuffle = srv.getInternalEndpoint(IPC_SHUFFLE);

http://git-wip-us.apache.org/repos/asf/hive/blob/1749d704/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
index 4ea3b0b..adc86ea 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
@@ -19,6 +19,9 @@ package org.apache.hadoop.hive.llap.daemon.impl;
 
 import java.util.Comparator;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Bounded priority queue that evicts the last element based on priority order specified
  * through comparator. Elements that are added to the queue are sorted based on the specified
@@ -27,6 +30,10 @@ import java.util.Comparator;
  * returned back. If the queue is not full, new element will be added to queue and null is returned.
  */
 public class EvictingPriorityBlockingQueue<E> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(EvictingPriorityBlockingQueue.class);
+
   private final PriorityBlockingDeque<E> deque;
   private final Comparator<E> comparator;
 
@@ -42,7 +49,14 @@ public class EvictingPriorityBlockingQueue<E> {
       E last = deque.peekLast();
       if (comparator.compare(e, last) < 0) {
         deque.removeLast();
-        deque.offer(e);
+        if (!deque.offer(e)) {
+          LOG.error(
+              "Failed to insert element into queue with capacity available. size={}, element={}",
+              size(), e);
+          throw new RuntimeException(
+              "Failed to insert element into queue with capacity available. size=" +
+                  size());
+        }
         return last;
       }
       return e;

http://git-wip-us.apache.org/repos/asf/hive/blob/1749d704/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java
index db2ab16..e27efa5 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java
@@ -107,7 +107,7 @@ public class PriorityBlockingDeque<E>
     }
 
     list.add(insertionPoint, e);
-    //        Collections.sort(list, comparator);
+    // Inserted in sort order. Hence no explict sort.
     notEmpty.signal();
 
     return true;
@@ -178,6 +178,7 @@ public class PriorityBlockingDeque<E>
   /**
    * @throws NullPointerException {@inheritDoc}
    */
+  @Override
   public boolean offerLast(E e) {
     if (e == null) throw new NullPointerException();
     lock.lock();
@@ -450,6 +451,7 @@ public class PriorityBlockingDeque<E>
   /**
    * @throws NullPointerException if the specified element is null
    */
+  @Override
   public boolean offer(E e) {
     return offerLast(e);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/1749d704/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
index 5c7d4ef..1080d3e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
@@ -203,11 +203,12 @@ public class QueryInfo {
           sourceToEntity.put(source, entityInfo);
         }
 
-        if (lastFinishableState != fragmentInfo.canFinish()) {
+        if (lastFinishableState == fragmentInfo.canFinish()) {
+          // State has not changed.
+          return true;
+        } else {
           entityInfo.setLastFinishableState(fragmentInfo.canFinish());
           return false;
-        } else {
-          return true;
         }
       } finally {
         lock.unlock();

http://git-wip-us.apache.org/repos/asf/hive/blob/1749d704/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index f094039..a7d7981 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -231,6 +231,8 @@ public class QueryTracker extends AbstractService {
           deleteDelay);
       queryInfoMap.remove(queryIdentifier);
       if (queryInfo == null) {
+        // One case where this happens is when a query is killed via an explicit signal, and then
+        // another message is received from teh AMHeartbeater.
         LOG.warn("Ignoring query complete for unknown dag: {}", queryIdentifier);
         return Collections.emptyList();
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/1749d704/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index cae2591..58863af 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -346,11 +346,12 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
       // The wait queue should be able to fit at least (waitQueue + currentFreeExecutor slots)
       canFinish = taskWrapper.getTaskRunnerCallable().canFinish();
       evictedTask = waitQueue.offer(taskWrapper);
+      // Finishable state is checked on the task, via an explicit query to the TaskRunnerCallable
 
       // null evicted task means offer accepted
       // evictedTask is not equal taskWrapper means current task is accepted and it evicted
       // some other task
-      if (evictedTask == null || evictedTask != taskWrapper) {
+      if (evictedTask == null || !evictedTask.equals(taskWrapper)) {
         knownTasks.put(taskWrapper.getRequestId(), taskWrapper);
         taskWrapper.setIsInWaitQueue(true);
         if (isDebugEnabled) {
@@ -379,6 +380,18 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
         }
         return result;
       }
+
+      // Register for notifications inside the lock. Should avoid races with unregisterForNotifications
+      // happens in a different Submission thread. i.e. Avoid register running for this task
+      // after some other submission has evicted it.
+      boolean stateChanged = !taskWrapper.maybeRegisterForFinishedStateNotifications(canFinish);
+      if (stateChanged) {
+        if (isDebugEnabled) {
+          LOG.debug("Finishable state of {} updated to {} during registration for state updates",
+              taskWrapper.getRequestId(), !canFinish);
+        }
+        finishableStateUpdated(taskWrapper, !canFinish);
+      }
     }
 
     // At this point, the task has been added into the queue. It may have caused an eviction for
@@ -387,27 +400,25 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
     // This registration has to be done after knownTasks has been populated.
     // Register for state change notifications so that the waitQueue can be re-ordered correctly
     // if the fragment moves in or out of the finishable state.
-    boolean stateChanged = taskWrapper.maybeRegisterForFinishedStateNotifications(canFinish);
-    if (stateChanged) {
-      if (isDebugEnabled) {
-        LOG.debug("Finishable state of {} updated to {} during registration for state updates",
-            taskWrapper.getRequestId(), !canFinish);
-      }
-      finishableStateUpdated(taskWrapper, !canFinish);
-    }
 
     if (isDebugEnabled) {
       LOG.debug("Wait Queue: {}", waitQueue);
     }
+
     if (evictedTask != null) {
-      knownTasks.remove(evictedTask.getRequestId());
-      evictedTask.maybeUnregisterForFinishedStateNotifications();
-      evictedTask.setIsInWaitQueue(false);
-      evictedTask.getTaskRunnerCallable().killTask();
       if (isInfoEnabled) {
         LOG.info("{} evicted from wait queue in favor of {} because of lower priority",
             evictedTask.getRequestId(), task.getRequestId());
       }
+      try {
+        knownTasks.remove(evictedTask.getRequestId());
+        evictedTask.maybeUnregisterForFinishedStateNotifications();
+        evictedTask.setIsInWaitQueue(false);
+      } finally {
+        // This is dealing with tasks from a different submission, and cause the kill
+        // to go out before the previous submissions has completed. Handled in the AM
+        evictedTask.getTaskRunnerCallable().killTask();
+      }
       if (metrics != null) {
         metrics.incrTotalEvictedFromWaitQueue();
       }
@@ -769,6 +780,7 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
         return taskRunnerCallable.getFragmentInfo()
             .registerForFinishableStateUpdates(this, currentFinishableState);
       } else {
+        // State has not changed / already registered for notifications.
         return true;
       }
     }
@@ -830,6 +842,29 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
           taskRunnerCallable.getRequestId(), finishableState);
       taskExecutorService.finishableStateUpdated(this, finishableState);
     }
+
+
+    // TaskWrapper is used in structures, as well as for ordering using Comparators
+    // in the waitQueue. Avoid Object comparison.
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      TaskWrapper that = (TaskWrapper) o;
+
+      return taskRunnerCallable.getRequestId()
+          .equals(that.taskRunnerCallable.getRequestId());
+    }
+
+    @Override
+    public int hashCode() {
+      return taskRunnerCallable.getRequestId().hashCode();
+    }
   }
 
   private static class ExecutorThreadFactory implements ThreadFactory {

http://git-wip-us.apache.org/repos/asf/hive/blob/1749d704/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index c692581..1de4bfe 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -752,18 +752,28 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
 
     public void registerTaskSubmittedToNode(
         TezTaskAttemptID taskAttemptID, String uniqueNodeId) {
-      String prev = uniqueNodeMap.putIfAbsent(taskAttemptID, uniqueNodeId);
-      if (prev != null) {
-        LOG.warn("Replaced the unique node mapping for task from " + prev + " to " + uniqueNodeId);
+      synchronized (attemptToNodeMap) {
+        if (attemptToNodeMap.containsKey(taskAttemptID)) {
+          // Register only if the attempt is known. In case an unregister call
+          // came in before the register call.
+          String prev = uniqueNodeMap.putIfAbsent(taskAttemptID, uniqueNodeId);
+          if (prev != null) {
+            LOG.warn("Replaced the unique node mapping for task from " + prev +
+                " to " + uniqueNodeId);
+          }
+        }
       }
     }
 
     void unregisterTaskAttempt(TezTaskAttemptID attemptId) {
       uniqueNodeMap.remove(attemptId);
-      LlapNodeId llapNodeId = attemptToNodeMap.remove(attemptId);
-      if (llapNodeId == null) {
-        // Possible since either container / task can be unregistered.
-        return;
+      LlapNodeId llapNodeId;
+      synchronized (attemptToNodeMap) {
+        llapNodeId = attemptToNodeMap.remove(attemptId);
+        if (llapNodeId == null) {
+          // Possible since either container / task can be unregistered.
+          return;
+        }
       }
 
       BiMap<ContainerId, TezTaskAttemptID> bMap = nodeMap.get(llapNodeId);