You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ss...@apache.org on 2017/01/10 07:54:57 UTC
hive git commit: HIVE-15562. LLAP TaskExecutorService race can lead
to some fragments being permanently lost. (Siddharth Seth,
reviewed by Sergey Shelukhin)
Repository: hive
Updated Branches:
refs/heads/master b4ac43db4 -> 1749d7045
HIVE-15562. LLAP TaskExecutorService race can lead to some fragments being permanently lost. (Siddharth Seth, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1749d704
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1749d704
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1749d704
Branch: refs/heads/master
Commit: 1749d70455bd6f8d010dd18566dac5774487c753
Parents: b4ac43d
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Jan 9 23:54:25 2017 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Jan 9 23:54:25 2017 -0800
----------------------------------------------------------------------
.../impl/LlapZookeeperRegistryImpl.java | 4 +-
.../impl/EvictingPriorityBlockingQueue.java | 16 ++++-
.../llap/daemon/impl/PriorityBlockingDeque.java | 4 +-
.../hadoop/hive/llap/daemon/impl/QueryInfo.java | 7 ++-
.../hive/llap/daemon/impl/QueryTracker.java | 2 +
.../llap/daemon/impl/TaskExecutorService.java | 61 +++++++++++++++-----
.../llap/tezplugins/LlapTaskCommunicator.java | 24 +++++---
7 files changed, 91 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/1749d704/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
index 8d9a98a..49ab59a 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
@@ -410,8 +410,8 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
public DynamicServiceInstance(ServiceRecord srv) throws IOException {
this.srv = srv;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Working with ServiceRecord: {}", srv);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Working with ServiceRecord: {}", srv);
}
final Endpoint shuffle = srv.getInternalEndpoint(IPC_SHUFFLE);
http://git-wip-us.apache.org/repos/asf/hive/blob/1749d704/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
index 4ea3b0b..adc86ea 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
@@ -19,6 +19,9 @@ package org.apache.hadoop.hive.llap.daemon.impl;
import java.util.Comparator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Bounded priority queue that evicts the last element based on priority order specified
* through comparator. Elements that are added to the queue are sorted based on the specified
@@ -27,6 +30,10 @@ import java.util.Comparator;
* returned back. If the queue is not full, new element will be added to queue and null is returned.
*/
public class EvictingPriorityBlockingQueue<E> {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(EvictingPriorityBlockingQueue.class);
+
private final PriorityBlockingDeque<E> deque;
private final Comparator<E> comparator;
@@ -42,7 +49,14 @@ public class EvictingPriorityBlockingQueue<E> {
E last = deque.peekLast();
if (comparator.compare(e, last) < 0) {
deque.removeLast();
- deque.offer(e);
+ if (!deque.offer(e)) {
+ LOG.error(
+ "Failed to insert element into queue with capacity available. size={}, element={}",
+ size(), e);
+ throw new RuntimeException(
+ "Failed to insert element into queue with capacity available. size=" +
+ size());
+ }
return last;
}
return e;
http://git-wip-us.apache.org/repos/asf/hive/blob/1749d704/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java
index db2ab16..e27efa5 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java
@@ -107,7 +107,7 @@ public class PriorityBlockingDeque<E>
}
list.add(insertionPoint, e);
- // Collections.sort(list, comparator);
+ // Inserted in sort order. Hence no explict sort.
notEmpty.signal();
return true;
@@ -178,6 +178,7 @@ public class PriorityBlockingDeque<E>
/**
* @throws NullPointerException {@inheritDoc}
*/
+ @Override
public boolean offerLast(E e) {
if (e == null) throw new NullPointerException();
lock.lock();
@@ -450,6 +451,7 @@ public class PriorityBlockingDeque<E>
/**
* @throws NullPointerException if the specified element is null
*/
+ @Override
public boolean offer(E e) {
return offerLast(e);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1749d704/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
index 5c7d4ef..1080d3e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
@@ -203,11 +203,12 @@ public class QueryInfo {
sourceToEntity.put(source, entityInfo);
}
- if (lastFinishableState != fragmentInfo.canFinish()) {
+ if (lastFinishableState == fragmentInfo.canFinish()) {
+ // State has not changed.
+ return true;
+ } else {
entityInfo.setLastFinishableState(fragmentInfo.canFinish());
return false;
- } else {
- return true;
}
} finally {
lock.unlock();
http://git-wip-us.apache.org/repos/asf/hive/blob/1749d704/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index f094039..a7d7981 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -231,6 +231,8 @@ public class QueryTracker extends AbstractService {
deleteDelay);
queryInfoMap.remove(queryIdentifier);
if (queryInfo == null) {
+ // One case where this happens is when a query is killed via an explicit signal, and then
+ // another message is received from teh AMHeartbeater.
LOG.warn("Ignoring query complete for unknown dag: {}", queryIdentifier);
return Collections.emptyList();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1749d704/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index cae2591..58863af 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -346,11 +346,12 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
// The wait queue should be able to fit at least (waitQueue + currentFreeExecutor slots)
canFinish = taskWrapper.getTaskRunnerCallable().canFinish();
evictedTask = waitQueue.offer(taskWrapper);
+ // Finishable state is checked on the task, via an explicit query to the TaskRunnerCallable
// null evicted task means offer accepted
// evictedTask is not equal taskWrapper means current task is accepted and it evicted
// some other task
- if (evictedTask == null || evictedTask != taskWrapper) {
+ if (evictedTask == null || !evictedTask.equals(taskWrapper)) {
knownTasks.put(taskWrapper.getRequestId(), taskWrapper);
taskWrapper.setIsInWaitQueue(true);
if (isDebugEnabled) {
@@ -379,6 +380,18 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
}
return result;
}
+
+ // Register for notifications inside the lock. Should avoid races with unregisterForNotifications
+ // happens in a different Submission thread. i.e. Avoid register running for this task
+ // after some other submission has evicted it.
+ boolean stateChanged = !taskWrapper.maybeRegisterForFinishedStateNotifications(canFinish);
+ if (stateChanged) {
+ if (isDebugEnabled) {
+ LOG.debug("Finishable state of {} updated to {} during registration for state updates",
+ taskWrapper.getRequestId(), !canFinish);
+ }
+ finishableStateUpdated(taskWrapper, !canFinish);
+ }
}
// At this point, the task has been added into the queue. It may have caused an eviction for
@@ -387,27 +400,25 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
// This registration has to be done after knownTasks has been populated.
// Register for state change notifications so that the waitQueue can be re-ordered correctly
// if the fragment moves in or out of the finishable state.
- boolean stateChanged = taskWrapper.maybeRegisterForFinishedStateNotifications(canFinish);
- if (stateChanged) {
- if (isDebugEnabled) {
- LOG.debug("Finishable state of {} updated to {} during registration for state updates",
- taskWrapper.getRequestId(), !canFinish);
- }
- finishableStateUpdated(taskWrapper, !canFinish);
- }
if (isDebugEnabled) {
LOG.debug("Wait Queue: {}", waitQueue);
}
+
if (evictedTask != null) {
- knownTasks.remove(evictedTask.getRequestId());
- evictedTask.maybeUnregisterForFinishedStateNotifications();
- evictedTask.setIsInWaitQueue(false);
- evictedTask.getTaskRunnerCallable().killTask();
if (isInfoEnabled) {
LOG.info("{} evicted from wait queue in favor of {} because of lower priority",
evictedTask.getRequestId(), task.getRequestId());
}
+ try {
+ knownTasks.remove(evictedTask.getRequestId());
+ evictedTask.maybeUnregisterForFinishedStateNotifications();
+ evictedTask.setIsInWaitQueue(false);
+ } finally {
+ // This is dealing with tasks from a different submission, and cause the kill
+ // to go out before the previous submissions has completed. Handled in the AM
+ evictedTask.getTaskRunnerCallable().killTask();
+ }
if (metrics != null) {
metrics.incrTotalEvictedFromWaitQueue();
}
@@ -769,6 +780,7 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
return taskRunnerCallable.getFragmentInfo()
.registerForFinishableStateUpdates(this, currentFinishableState);
} else {
+ // State has not changed / already registered for notifications.
return true;
}
}
@@ -830,6 +842,29 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
taskRunnerCallable.getRequestId(), finishableState);
taskExecutorService.finishableStateUpdated(this, finishableState);
}
+
+
+ // TaskWrapper is used in structures, as well as for ordering using Comparators
+ // in the waitQueue. Avoid Object comparison.
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ TaskWrapper that = (TaskWrapper) o;
+
+ return taskRunnerCallable.getRequestId()
+ .equals(that.taskRunnerCallable.getRequestId());
+ }
+
+ @Override
+ public int hashCode() {
+ return taskRunnerCallable.getRequestId().hashCode();
+ }
}
private static class ExecutorThreadFactory implements ThreadFactory {
http://git-wip-us.apache.org/repos/asf/hive/blob/1749d704/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index c692581..1de4bfe 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -752,18 +752,28 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
public void registerTaskSubmittedToNode(
TezTaskAttemptID taskAttemptID, String uniqueNodeId) {
- String prev = uniqueNodeMap.putIfAbsent(taskAttemptID, uniqueNodeId);
- if (prev != null) {
- LOG.warn("Replaced the unique node mapping for task from " + prev + " to " + uniqueNodeId);
+ synchronized (attemptToNodeMap) {
+ if (attemptToNodeMap.containsKey(taskAttemptID)) {
+ // Register only if the attempt is known. In case an unregister call
+ // came in before the register call.
+ String prev = uniqueNodeMap.putIfAbsent(taskAttemptID, uniqueNodeId);
+ if (prev != null) {
+ LOG.warn("Replaced the unique node mapping for task from " + prev +
+ " to " + uniqueNodeId);
+ }
+ }
}
}
void unregisterTaskAttempt(TezTaskAttemptID attemptId) {
uniqueNodeMap.remove(attemptId);
- LlapNodeId llapNodeId = attemptToNodeMap.remove(attemptId);
- if (llapNodeId == null) {
- // Possible since either container / task can be unregistered.
- return;
+ LlapNodeId llapNodeId;
+ synchronized (attemptToNodeMap) {
+ llapNodeId = attemptToNodeMap.remove(attemptId);
+ if (llapNodeId == null) {
+ // Possible since either container / task can be unregistered.
+ return;
+ }
}
BiMap<ContainerId, TezTaskAttemptID> bMap = nodeMap.get(llapNodeId);