You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/03/16 18:11:10 UTC
hive git commit: HIVE-18962 : add WM task state to Tez AM heartbeat
(Sergey Shelukhin, reviewed by Jason Dere)
Repository: hive
Updated Branches:
refs/heads/master 21c6a5407 -> 6f38c9f1b
HIVE-18962 : add WM task state to Tez AM heartbeat (Sergey Shelukhin, reviewed by Jason Dere)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6f38c9f1
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6f38c9f1
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6f38c9f1
Branch: refs/heads/master
Commit: 6f38c9f1bd3658f538450d5ba8208bd2e2104f9f
Parents: 21c6a54
Author: sergey <se...@apache.org>
Authored: Fri Mar 16 11:06:27 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Fri Mar 16 11:06:27 2018 -0700
----------------------------------------------------------------------
.../ext/LlapTaskUmbilicalExternalClient.java | 5 +-
.../protocol/LlapTaskUmbilicalProtocol.java | 11 ++-
.../hive/llap/daemon/impl/AMReporter.java | 84 ++++++++++++--------
.../llap/daemon/impl/TaskRunnerCallable.java | 12 ++-
.../daemon/impl/comparator/TestAMReporter.java | 7 +-
.../llap/tezplugins/LlapTaskCommunicator.java | 31 +++++---
.../tezplugins/LlapTaskSchedulerService.java | 37 +++++++++
.../TestLlapTaskSchedulerService.java | 30 +++++++
8 files changed, 166 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/6f38c9f1/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
index d97b156..945474f 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
@@ -513,11 +513,12 @@ public class LlapTaskUmbilicalExternalClient implements Closeable {
}
@Override
- public void nodeHeartbeat(
- Text hostname, Text uniqueId, int port, TezAttemptArray aw) throws IOException {
+ public void nodeHeartbeat(Text hostname, Text uniqueId, int port, TezAttemptArray aw,
+ BooleanArray guaranteed) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Node heartbeat from " + hostname + ":" + port + ", " + uniqueId);
}
+ // External client currently cannot use guaranteed.
updateHeartbeatInfo(hostname.toString(), uniqueId.toString(), port, aw);
// No need to propagate to this to the responder
}
http://git-wip-us.apache.org/repos/asf/hive/blob/6f38c9f1/llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java b/llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
index a2dca1b..14c8b50 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
@@ -15,6 +15,7 @@
package org.apache.hadoop.hive.llap.protocol;
import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.BooleanWritable;
import java.io.IOException;
@@ -37,6 +38,12 @@ public interface LlapTaskUmbilicalProtocol extends VersionedProtocol {
}
}
+ public class BooleanArray extends ArrayWritable {
+ public BooleanArray() {
+ super(BooleanWritable.class);
+ }
+ }
+
public static final long versionID = 1L;
// From Tez. Eventually changes over to the LLAP protocol and ProtocolBuffers
@@ -44,8 +51,8 @@ public interface LlapTaskUmbilicalProtocol extends VersionedProtocol {
public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request)
throws IOException, TezException;
- public void nodeHeartbeat(
- Text hostname, Text uniqueId, int port, TezAttemptArray aw) throws IOException;
+ public void nodeHeartbeat(Text hostname, Text uniqueId, int port,
+ TezAttemptArray aw, BooleanArray guaranteed) throws IOException;
public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException;
http://git-wip-us.apache.org/repos/asf/hive/blob/6f38c9f1/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
index b784360..088a5f3 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
@@ -14,18 +14,14 @@
package org.apache.hadoop.hive.llap.daemon.impl;
+import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.BooleanArray;
import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.TezAttemptArray;
-
import java.util.ArrayList;
-
import java.util.List;
-
import java.util.HashSet;
-
import java.util.Set;
-
import javax.net.SocketFactory;
import java.io.IOException;
@@ -34,6 +30,7 @@ import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
@@ -58,6 +55,7 @@ import org.apache.hadoop.hive.llap.DaemonId;
import org.apache.hadoop.hive.llap.LlapNodeId;
import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler;
import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
+import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
@@ -194,15 +192,14 @@ public class AMReporter extends AbstractService {
}
}
- public void registerTask(String amLocation, int port, String umbilicalUser,
+ public AMNodeInfo registerTask(String amLocation, int port, String umbilicalUser,
Token<JobTokenIdentifier> jobToken, QueryIdentifier queryIdentifier,
- TezTaskAttemptID attemptId) {
+ TezTaskAttemptID attemptId, boolean isGuaranteed) {
if (LOG.isTraceEnabled()) {
LOG.trace(
"Registering for heartbeat: {}, queryIdentifier={}, attemptId={}",
(amLocation + ":" + port), queryIdentifier, attemptId);
}
- AMNodeInfo amNodeInfo;
// Since we don't have an explicit AM end signal yet - we're going to create
// and discard AMNodeInfo instances per query.
@@ -213,7 +210,7 @@ public class AMReporter extends AbstractService {
amNodeInfoPerQuery = new HashMap<>();
knownAppMasters.put(queryIdentifier, amNodeInfoPerQuery);
}
- amNodeInfo = amNodeInfoPerQuery.get(amNodeId);
+ AMNodeInfo amNodeInfo = amNodeInfoPerQuery.get(amNodeId);
if (amNodeInfo == null) {
amNodeInfo = new AMNodeInfo(amNodeId, umbilicalUser, jobToken, queryIdentifier, retryPolicy,
retryTimeout, socketFactory, conf);
@@ -227,7 +224,8 @@ public class AMReporter extends AbstractService {
// A single queueLookupCallable is added here. We have to make sure one instance stays
// in the queue till the query completes.
}
- amNodeInfo.addTaskAttempt(attemptId);
+ amNodeInfo.addTaskAttempt(attemptId, isGuaranteed);
+ return amNodeInfo;
}
}
@@ -398,18 +396,22 @@ public class AMReporter extends AbstractService {
if (LOG.isTraceEnabled()) {
LOG.trace("Attempting to heartbeat to AM: " + amNodeInfo);
}
- List<TezTaskAttemptID> tasks = amNodeInfo.getTasksSnapshot();
- if (tasks.isEmpty()) {
+ TaskSnapshot tasks = amNodeInfo.getTasksSnapshot();
+ if (tasks.attempts.isEmpty()) {
return null;
}
try {
if (LOG.isTraceEnabled()) {
LOG.trace("NodeHeartbeat to: " + amNodeInfo);
}
+ // TODO: if there are more fields perhaps there should be an array of class.
TezAttemptArray aw = new TezAttemptArray();
- aw.set(tasks.toArray(new TezTaskAttemptID[tasks.size()]));
+ aw.set(tasks.attempts.toArray(new TezTaskAttemptID[tasks.attempts.size()]));
+ BooleanArray guaranteed = new BooleanArray();
+ guaranteed.set(tasks.guaranteed.toArray(new BooleanWritable[tasks.guaranteed.size()]));
+
amNodeInfo.getUmbilical().nodeHeartbeat(new Text(nodeId.getHostname()),
- new Text(daemonId.getUniqueNodeIdInCluster()), nodeId.getPort(), aw);
+ new Text(daemonId.getUniqueNodeIdInCluster()), nodeId.getPort(), aw, guaranteed);
} catch (IOException e) {
QueryIdentifier currentQueryIdentifier = amNodeInfo.getQueryIdentifier();
amNodeInfo.setAmFailed(true);
@@ -455,7 +457,7 @@ public class AMReporter extends AbstractService {
protected class AMNodeInfo implements Delayed {
// Serves as lock for itself.
- private final Set<TezTaskAttemptID> tasks = new HashSet<>();
+ private final ConcurrentHashMap<TezTaskAttemptID, Boolean> tasks = new ConcurrentHashMap<>();
private final String umbilicalUser;
private final Token<JobTokenIdentifier> jobToken;
private final Configuration conf;
@@ -501,21 +503,25 @@ public class AMReporter extends AbstractService {
umbilical = null;
}
- int addTaskAttempt(TezTaskAttemptID attemptId) {
- synchronized (tasks) {
- if (!tasks.add(attemptId)) {
- throw new RuntimeException(attemptId + " was already registered");
- }
- return tasks.size();
+ void addTaskAttempt(TezTaskAttemptID attemptId, boolean isGuaranteed) {
+ Boolean oldVal = tasks.putIfAbsent(attemptId, isGuaranteed);
+ if (oldVal != null) {
+ throw new RuntimeException(attemptId + " was already registered");
}
}
- int removeTaskAttempt(TezTaskAttemptID attemptId) {
- synchronized (tasks) {
- if (!tasks.remove(attemptId)) {
- throw new RuntimeException(attemptId + " was not registered and couldn't be removed");
- }
- return tasks.size();
+ void updateTaskAttempt(TezTaskAttemptID attemptId, boolean isGuaranteed) {
+ Boolean oldVal = tasks.replace(attemptId, isGuaranteed);
+ if (oldVal == null) {
+ LOG.warn("Task " + attemptId + " is no longer registered");
+ tasks.remove(attemptId);
+ }
+ }
+
+ void removeTaskAttempt(TezTaskAttemptID attemptId) {
+ Boolean oldVal = tasks.remove(attemptId);
+ if (oldVal == null) {
+ throw new RuntimeException(attemptId + " was not registered and couldn't be removed");
}
}
@@ -535,10 +541,16 @@ public class AMReporter extends AbstractService {
return isDone.get();
}
- List<TezTaskAttemptID> getTasksSnapshot() {
- List<TezTaskAttemptID> result = new ArrayList<>();
- synchronized (tasks) {
- result.addAll(tasks);
+ /**
+ * @return A snapshot of the tasks running at this daemon from this AM.
+ * Doesn't have to be consistent between multiple tasks; whether some task makes it into
+ * a given heartbeat when it's about to be started/about to finish is a timing issue anyway.
+ */
+ TaskSnapshot getTasksSnapshot() {
+ TaskSnapshot result = new TaskSnapshot(tasks.size());
+ for (Map.Entry<TezTaskAttemptID, Boolean> e : tasks.entrySet()) {
+ result.attempts.add(e.getKey());
+ result.guaranteed.add(new BooleanWritable(e.getValue()));
}
return result;
}
@@ -579,4 +591,14 @@ public class AMReporter extends AbstractService {
}
}
}
+
+
+ private static final class TaskSnapshot {
+ public TaskSnapshot(int count) {
+ attempts = new ArrayList<>(count);
+ guaranteed = new ArrayList<>(count);
+ }
+ public final List<TezTaskAttemptID> attempts;
+ public final List<BooleanWritable> guaranteed;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/6f38c9f1/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 75d8577..b484a13 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
import org.apache.hadoop.hive.llap.daemon.HistoryLogger;
import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener;
+import org.apache.hadoop.hive.llap.daemon.impl.AMReporter.AMNodeInfo;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
@@ -130,6 +131,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
private final SocketFactory socketFactory;
private boolean isGuaranteed;
private WmFragmentCounters wmCounters;
+ private final AMNodeInfo amNodeInfo;
@VisibleForTesting
public TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo fragmentInfo,
@@ -156,8 +158,11 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
this.amReporter = amReporter;
// Register with the AMReporter when the callable is setup. Unregister once it starts running.
if (amReporter != null && jobToken != null) {
- this.amReporter.registerTask(request.getAmHost(), request.getAmPort(),
- vertex.getTokenIdentifier(), jobToken, fragmentInfo.getQueryInfo().getQueryIdentifier(), attemptId);
+ this.amNodeInfo = amReporter.registerTask(request.getAmHost(), request.getAmPort(),
+ vertex.getTokenIdentifier(), jobToken, fragmentInfo.getQueryInfo().getQueryIdentifier(),
+ attemptId, isGuaranteed);
+ } else {
+ this.amNodeInfo = null;
}
this.metrics = metrics;
this.requestId = taskSpec.getTaskAttemptID().toString();
@@ -611,6 +616,9 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
public void setIsGuaranteed(boolean isGuaranteed) {
this.isGuaranteed = isGuaranteed;
+ if (amNodeInfo != null) {
+ amNodeInfo.updateTaskAttempt(taskSpec.getTaskAttemptID(), isGuaranteed);
+ }
if (wmCounters != null) {
wmCounters.changeGuaranteed(isGuaranteed);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/6f38c9f1/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestAMReporter.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestAMReporter.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestAMReporter.java
index bde3546..19f8048 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestAMReporter.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestAMReporter.java
@@ -69,9 +69,9 @@ public class TestAMReporter {
String umbilicalUser = "user";
QueryIdentifier queryId = new QueryIdentifier("app", 0);
amReporter.registerTask(am1Location, am1Port, umbilicalUser, null, queryId,
- mock(TezTaskAttemptID.class));
+ mock(TezTaskAttemptID.class), false);
amReporter.registerTask(am2Location, am2Port, umbilicalUser, null, queryId,
- mock(TezTaskAttemptID.class));
+ mock(TezTaskAttemptID.class), false);
Thread.currentThread().sleep(2000);
// verify both am get node heartbeat
@@ -97,7 +97,8 @@ public class TestAMReporter {
return null;
}
}).when(umbilical).nodeHeartbeat(any(Text.class), any(Text.class), anyInt(),
- any(LlapTaskUmbilicalProtocol.TezAttemptArray.class));
+ any(LlapTaskUmbilicalProtocol.TezAttemptArray.class),
+ any(LlapTaskUmbilicalProtocol.BooleanArray.class));
return umbilical;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/6f38c9f1/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index a4bd4df..5d4ce22 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -15,15 +15,15 @@
package org.apache.hadoop.hive.llap.tezplugins;
import org.apache.hadoop.hive.llap.tezplugins.LlapTaskSchedulerService.NodeInfo;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
import org.apache.hadoop.io.Writable;
-
+import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.BooleanArray;
import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.TezAttemptArray;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -41,6 +41,7 @@ import com.google.common.collect.HashBiMap;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.google.protobuf.ServiceException;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -66,6 +67,7 @@ import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
import org.apache.hadoop.hive.llap.tez.Converters;
import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy;
import org.apache.hadoop.hive.llap.tezplugins.helpers.SourceStateTracker;
+import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtocolSignature;
@@ -705,16 +707,21 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
private final AtomicLong nodeNotFoundLogTime = new AtomicLong(0);
- void nodePinged(String hostname, String uniqueId, int port, TezAttemptArray tasks) {
+ void nodePinged(String hostname, String uniqueId, int port,
+ TezAttemptArray tasks, BooleanArray guaranteed) {
// TODO: do we ever need the port? we could just do away with nodeId altogether.
LlapNodeId nodeId = LlapNodeId.getInstance(hostname, port);
registerPingingNode(nodeId);
BiMap<ContainerId, TezTaskAttemptID> biMap =
entityTracker.getContainerAttemptMapForNode(nodeId);
if (biMap != null) {
- HashSet<TezTaskAttemptID> attempts = new HashSet<>();
- for (Writable w : tasks.get()) {
- attempts.add((TezTaskAttemptID)w);
+ HashMap<TezTaskAttemptID, Boolean> attempts = new HashMap<>();
+ for (int i = 0; i < tasks.get().length; ++i) {
+ boolean isGuaranteed = false;
+ if (guaranteed != null) {
+ isGuaranteed = ((BooleanWritable)guaranteed.get()[i]).get();
+ }
+ attempts.put((TezTaskAttemptID)tasks.get()[i], isGuaranteed);
}
String error = "";
synchronized (biMap) {
@@ -729,8 +736,10 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
// However, the next heartbeat(s) should get the value eventually and mark task as alive.
// Also, we prefer a missed heartbeat over a stuck query in case of discrepancy in ET.
if (taskNodeId != null && taskNodeId.equals(uniqueId)) {
- if (attempts.contains(attemptId)) {
- getContext().taskAlive(entry.getValue());
+ Boolean isGuaranteed = attempts.get(attemptId);
+ if (isGuaranteed != null) {
+ getContext().taskAlive(attemptId);
+ scheduler.taskInfoUpdated(attemptId, isGuaranteed.booleanValue());
} else {
error += (attemptId + ", ");
}
@@ -835,12 +844,12 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
}
@Override
- public void nodeHeartbeat(
- Text hostname, Text uniqueId, int port, TezAttemptArray aw) throws IOException {
+ public void nodeHeartbeat(Text hostname, Text uniqueId, int port,
+ TezAttemptArray aw, BooleanArray guaranteed) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Received heartbeat from [" + hostname + ":" + port +" (" + uniqueId +")]");
}
- nodePinged(hostname.toString(), uniqueId.toString(), port, aw);
+ nodePinged(hostname.toString(), uniqueId.toString(), port, aw, guaranteed);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/6f38c9f1/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 6ddecca..5ab1964 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -3012,5 +3012,42 @@ public class LlapTaskSchedulerService extends TaskScheduler {
return unusedGuaranteed;
}
+ /**
+ * A direct call from communicator to scheduler to propagate data that cannot be passed via Tez.
+ */
+ public void taskInfoUpdated(TezTaskAttemptID attemptId, boolean isGuaranteed) {
+ TaskInfo ti = null;
+ writeLock.lock();
+ try {
+ ti = tasksById.get(attemptId);
+ if (ti == null) {
+ WM_LOG.warn("Unknown task from heartbeat " + attemptId);
+ return;
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ boolean newState = false;
+ synchronized (ti) {
+ if (ti.isPendingUpdate) return; // A pending update is not done.
+ if (ti.isGuaranteed == null) return; // The task has terminated, out of date heartbeat.
+ if (ti.lastSetGuaranteed != null && ti.lastSetGuaranteed == isGuaranteed) {
+ return; // The heartbeat is consistent with what we have.
+ }
+ ti.lastSetGuaranteed = isGuaranteed;
+ if (isGuaranteed == ti.isGuaranteed) return; // Already consistent. Can happen w/null lSG.
+
+ // There could be races here, e.g. heartbeat delivered us the old value just after we have
+ // received a successful confirmation from the API, so we are about to overwrite the latter.
+ // We could solve this by adding a version or smth like that; or by ignoring discrepancies
+ // unless we have previously received an update error for this task; however, the only effect
+ // right now are a few cheap redundant update calls; let's just do the simple thing.
+ newState = ti.isGuaranteed;
+ setUpdateStartedUnderTiLock(ti);
+ } // End of synchronized (ti)
+ WM_LOG.info("Sending an update based on inconsistent state from heartbeat for "
+ + attemptId + ", " + newState);
+ sendUpdateMessageAsync(ti, newState);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/6f38c9f1/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
index 90b31e4..4600755 100644
--- a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
+++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
@@ -552,6 +552,36 @@ public class TestLlapTaskSchedulerService {
}
@Test(timeout = 10000)
+ public void testHeartbeatInconsistency() throws IOException, InterruptedException {
+ final TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper();
+ // Guaranteed flag is inconsistent based on heartbeat - another message should be send.
+ try {
+ Priority highPri = Priority.newInstance(1);
+ TezTaskAttemptID task1 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
+ tsWrapper.ts.updateGuaranteedCount(0);
+ tsWrapper.controlScheduler(true);
+ tsWrapper.allocateTask(task1, null, highPri, new Object());
+ tsWrapper.awaitTotalTaskAllocations(1);
+ TaskInfo ti1 = tsWrapper.ts.getTaskInfo(task1);
+ assertFalse(ti1.isGuaranteed());
+
+ // Heartbeat indicates task has a duck - this must be reverted.
+ tsWrapper.ts.taskInfoUpdated(task1, true);
+ tsWrapper.ts.waitForMessagesSent(1);
+ assertTrue(ti1.getLastSetGuaranteed());
+ assertTrue(ti1.isUpdateInProgress());
+ assertFalse(ti1.isGuaranteed());
+ tsWrapper.ts.handleUpdateResult(ti1, true);
+ assertFalse(ti1.getLastSetGuaranteed());
+
+ tsWrapper.deallocateTask(task1, true, TaskAttemptEndReason.CONTAINER_EXITED);
+ assertEquals(0, tsWrapper.ts.getUnusedGuaranteedCount());
+ } finally {
+ tsWrapper.shutdown();
+ }
+ }
+
+ @Test(timeout = 10000)
public void testSimpleNoLocalityAllocation() throws IOException, InterruptedException {
TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper();