You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/03/16 18:11:10 UTC

hive git commit: HIVE-18962 : add WM task state to Tez AM heartbeat (Sergey Shelukhin, reviewed by Jason Dere)

Repository: hive
Updated Branches:
  refs/heads/master 21c6a5407 -> 6f38c9f1b


HIVE-18962 : add WM task state to Tez AM heartbeat (Sergey Shelukhin, reviewed by Jason Dere)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6f38c9f1
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6f38c9f1
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6f38c9f1

Branch: refs/heads/master
Commit: 6f38c9f1bd3658f538450d5ba8208bd2e2104f9f
Parents: 21c6a54
Author: sergey <se...@apache.org>
Authored: Fri Mar 16 11:06:27 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Fri Mar 16 11:06:27 2018 -0700

----------------------------------------------------------------------
 .../ext/LlapTaskUmbilicalExternalClient.java    |  5 +-
 .../protocol/LlapTaskUmbilicalProtocol.java     | 11 ++-
 .../hive/llap/daemon/impl/AMReporter.java       | 84 ++++++++++++--------
 .../llap/daemon/impl/TaskRunnerCallable.java    | 12 ++-
 .../daemon/impl/comparator/TestAMReporter.java  |  7 +-
 .../llap/tezplugins/LlapTaskCommunicator.java   | 31 +++++---
 .../tezplugins/LlapTaskSchedulerService.java    | 37 +++++++++
 .../TestLlapTaskSchedulerService.java           | 30 +++++++
 8 files changed, 166 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/6f38c9f1/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
index d97b156..945474f 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
@@ -513,11 +513,12 @@ public class LlapTaskUmbilicalExternalClient implements Closeable {
     }
 
     @Override
-    public void nodeHeartbeat(
-        Text hostname, Text uniqueId, int port, TezAttemptArray aw) throws IOException {
+    public void nodeHeartbeat(Text hostname, Text uniqueId, int port, TezAttemptArray aw,
+        BooleanArray guaranteed) throws IOException {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Node heartbeat from " + hostname + ":" + port + ", " + uniqueId);
       }
+      // External client currently cannot use guaranteed.
       updateHeartbeatInfo(hostname.toString(), uniqueId.toString(), port, aw);
       // No need to propagate to this to the responder
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/6f38c9f1/llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java b/llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
index a2dca1b..14c8b50 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
@@ -15,6 +15,7 @@
 package org.apache.hadoop.hive.llap.protocol;
 
 import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.BooleanWritable;
 
 import java.io.IOException;
 
@@ -37,6 +38,12 @@ public interface LlapTaskUmbilicalProtocol extends VersionedProtocol {
     }
   }
 
+  public class BooleanArray extends ArrayWritable {
+    public BooleanArray() {
+      super(BooleanWritable.class);
+    }
+  }
+
   public static final long versionID = 1L;
 
   // From Tez. Eventually changes over to the LLAP protocol and ProtocolBuffers
@@ -44,8 +51,8 @@ public interface LlapTaskUmbilicalProtocol extends VersionedProtocol {
   public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request)
       throws IOException, TezException;
 
-  public void nodeHeartbeat(
-      Text hostname, Text uniqueId, int port, TezAttemptArray aw) throws IOException;
+  public void nodeHeartbeat(Text hostname, Text uniqueId, int port,
+      TezAttemptArray aw, BooleanArray guaranteed) throws IOException;
 
   public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/6f38c9f1/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
index b784360..088a5f3 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
@@ -14,18 +14,14 @@
 
 package org.apache.hadoop.hive.llap.daemon.impl;
 
+import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.BooleanArray;
 import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.TezAttemptArray;
 
-
 import java.util.ArrayList;
-
 import java.util.List;
-
 import java.util.HashSet;
-
 import java.util.Set;
 
-
 import javax.net.SocketFactory;
 
 import java.io.IOException;
@@ -34,6 +30,7 @@ import java.security.PrivilegedExceptionAction;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.ExecutorService;
@@ -58,6 +55,7 @@ import org.apache.hadoop.hive.llap.DaemonId;
 import org.apache.hadoop.hive.llap.LlapNodeId;
 import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler;
 import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
+import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
@@ -194,15 +192,14 @@ public class AMReporter extends AbstractService {
     }
   }
 
-  public void registerTask(String amLocation, int port, String umbilicalUser,
+  public AMNodeInfo registerTask(String amLocation, int port, String umbilicalUser,
       Token<JobTokenIdentifier> jobToken, QueryIdentifier queryIdentifier,
-      TezTaskAttemptID attemptId) {
+      TezTaskAttemptID attemptId, boolean isGuaranteed) {
     if (LOG.isTraceEnabled()) {
       LOG.trace(
           "Registering for heartbeat: {}, queryIdentifier={}, attemptId={}",
           (amLocation + ":" + port), queryIdentifier, attemptId);
     }
-    AMNodeInfo amNodeInfo;
 
     // Since we don't have an explicit AM end signal yet - we're going to create
     // and discard AMNodeInfo instances per query.
@@ -213,7 +210,7 @@ public class AMReporter extends AbstractService {
         amNodeInfoPerQuery = new HashMap<>();
         knownAppMasters.put(queryIdentifier, amNodeInfoPerQuery);
       }
-      amNodeInfo = amNodeInfoPerQuery.get(amNodeId);
+      AMNodeInfo amNodeInfo = amNodeInfoPerQuery.get(amNodeId);
       if (amNodeInfo == null) {
         amNodeInfo = new AMNodeInfo(amNodeId, umbilicalUser, jobToken, queryIdentifier, retryPolicy,
           retryTimeout, socketFactory, conf);
@@ -227,7 +224,8 @@ public class AMReporter extends AbstractService {
         // A single queueLookupCallable is added here. We have to make sure one instance stays
         // in the queue till the query completes.
       }
-      amNodeInfo.addTaskAttempt(attemptId);
+      amNodeInfo.addTaskAttempt(attemptId, isGuaranteed);
+      return amNodeInfo;
     }
   }
 
@@ -398,18 +396,22 @@ public class AMReporter extends AbstractService {
       if (LOG.isTraceEnabled()) {
         LOG.trace("Attempting to heartbeat to AM: " + amNodeInfo);
       }
-      List<TezTaskAttemptID> tasks = amNodeInfo.getTasksSnapshot();
-      if (tasks.isEmpty()) {
+      TaskSnapshot tasks = amNodeInfo.getTasksSnapshot();
+      if (tasks.attempts.isEmpty()) {
         return null;
       }
       try {
         if (LOG.isTraceEnabled()) {
           LOG.trace("NodeHeartbeat to: " + amNodeInfo);
         }
+        // TODO: if there are more fields perhaps there should be an array of class.
         TezAttemptArray aw = new TezAttemptArray();
-        aw.set(tasks.toArray(new TezTaskAttemptID[tasks.size()]));
+        aw.set(tasks.attempts.toArray(new TezTaskAttemptID[tasks.attempts.size()]));
+        BooleanArray guaranteed = new BooleanArray();
+        guaranteed.set(tasks.guaranteed.toArray(new BooleanWritable[tasks.guaranteed.size()]));
+
         amNodeInfo.getUmbilical().nodeHeartbeat(new Text(nodeId.getHostname()),
-            new Text(daemonId.getUniqueNodeIdInCluster()), nodeId.getPort(), aw);
+            new Text(daemonId.getUniqueNodeIdInCluster()), nodeId.getPort(), aw, guaranteed);
       } catch (IOException e) {
         QueryIdentifier currentQueryIdentifier = amNodeInfo.getQueryIdentifier();
         amNodeInfo.setAmFailed(true);
@@ -455,7 +457,7 @@ public class AMReporter extends AbstractService {
 
   protected class AMNodeInfo implements Delayed {
     // Serves as lock for itself.
-    private final Set<TezTaskAttemptID> tasks = new HashSet<>();
+    private final ConcurrentHashMap<TezTaskAttemptID, Boolean> tasks = new ConcurrentHashMap<>();
     private final String umbilicalUser;
     private final Token<JobTokenIdentifier> jobToken;
     private final Configuration conf;
@@ -501,21 +503,25 @@ public class AMReporter extends AbstractService {
       umbilical = null;
     }
 
-    int addTaskAttempt(TezTaskAttemptID attemptId) {
-      synchronized (tasks) {
-        if (!tasks.add(attemptId)) {
-          throw new RuntimeException(attemptId + " was already registered");
-        }
-        return tasks.size();
+    void addTaskAttempt(TezTaskAttemptID attemptId, boolean isGuaranteed) {
+      Boolean oldVal = tasks.putIfAbsent(attemptId, isGuaranteed);
+      if (oldVal != null) {
+        throw new RuntimeException(attemptId + " was already registered");
       }
     }
 
-    int removeTaskAttempt(TezTaskAttemptID attemptId) {
-      synchronized (tasks) {
-        if (!tasks.remove(attemptId)) {
-          throw new RuntimeException(attemptId + " was not registered and couldn't be removed");
-        }
-        return tasks.size();
+    void updateTaskAttempt(TezTaskAttemptID attemptId, boolean isGuaranteed) {
+      Boolean oldVal = tasks.replace(attemptId, isGuaranteed);
+      if (oldVal == null) {
+        LOG.warn("Task " + attemptId + " is no longer registered");
+        tasks.remove(attemptId);
+      }
+    }
+
+    void removeTaskAttempt(TezTaskAttemptID attemptId) {
+      Boolean oldVal = tasks.remove(attemptId);
+      if (oldVal == null) {
+        throw new RuntimeException(attemptId + " was not registered and couldn't be removed");
       }
     }
 
@@ -535,10 +541,16 @@ public class AMReporter extends AbstractService {
       return isDone.get();
     }
 
-    List<TezTaskAttemptID> getTasksSnapshot() {
-      List<TezTaskAttemptID> result = new ArrayList<>();
-      synchronized (tasks) {
-        result.addAll(tasks);
+    /**
+     * @return A snapshot of the tasks running at this daemon from this AM.
+     * Doesn't have to be consistent between multiple tasks; whether some task makes it into
+     * a given heartbeat when it's about to be started/about to finish is a timing issue anyway.
+     */
+    TaskSnapshot getTasksSnapshot() {
+      TaskSnapshot result = new TaskSnapshot(tasks.size());
+      for (Map.Entry<TezTaskAttemptID, Boolean> e : tasks.entrySet()) {
+        result.attempts.add(e.getKey());
+        result.guaranteed.add(new BooleanWritable(e.getValue()));
       }
       return result;
     }
@@ -579,4 +591,14 @@ public class AMReporter extends AbstractService {
       }
     }
   }
+
+
+  private static final class TaskSnapshot {
+    public TaskSnapshot(int count) {
+      attempts = new ArrayList<>(count);
+      guaranteed = new ArrayList<>(count);
+    }
+    public final List<TezTaskAttemptID> attempts;
+    public final List<BooleanWritable> guaranteed;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/6f38c9f1/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 75d8577..b484a13 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
 import org.apache.hadoop.hive.llap.daemon.HistoryLogger;
 import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
 import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener;
+import org.apache.hadoop.hive.llap.daemon.impl.AMReporter.AMNodeInfo;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
@@ -130,6 +131,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
   private final SocketFactory socketFactory;
   private boolean isGuaranteed;
   private WmFragmentCounters wmCounters;
+  private final AMNodeInfo amNodeInfo;
 
   @VisibleForTesting
   public TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo fragmentInfo,
@@ -156,8 +158,11 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
     this.amReporter = amReporter;
     // Register with the AMReporter when the callable is setup. Unregister once it starts running.
     if (amReporter != null && jobToken != null) {
-      this.amReporter.registerTask(request.getAmHost(), request.getAmPort(),
-          vertex.getTokenIdentifier(), jobToken, fragmentInfo.getQueryInfo().getQueryIdentifier(), attemptId);
+      this.amNodeInfo = amReporter.registerTask(request.getAmHost(), request.getAmPort(),
+          vertex.getTokenIdentifier(), jobToken, fragmentInfo.getQueryInfo().getQueryIdentifier(),
+          attemptId, isGuaranteed);
+    } else {
+      this.amNodeInfo = null;
     }
     this.metrics = metrics;
     this.requestId = taskSpec.getTaskAttemptID().toString();
@@ -611,6 +616,9 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
 
   public void setIsGuaranteed(boolean isGuaranteed) {
     this.isGuaranteed = isGuaranteed;
+    if (amNodeInfo != null) {
+      amNodeInfo.updateTaskAttempt(taskSpec.getTaskAttemptID(), isGuaranteed);
+    }
     if (wmCounters != null) {
       wmCounters.changeGuaranteed(isGuaranteed);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/6f38c9f1/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestAMReporter.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestAMReporter.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestAMReporter.java
index bde3546..19f8048 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestAMReporter.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestAMReporter.java
@@ -69,9 +69,9 @@ public class TestAMReporter {
     String umbilicalUser = "user";
     QueryIdentifier queryId = new QueryIdentifier("app", 0);
     amReporter.registerTask(am1Location, am1Port, umbilicalUser, null, queryId,
-      mock(TezTaskAttemptID.class));
+      mock(TezTaskAttemptID.class), false);
     amReporter.registerTask(am2Location, am2Port, umbilicalUser, null, queryId,
-      mock(TezTaskAttemptID.class));
+      mock(TezTaskAttemptID.class), false);
 
     Thread.currentThread().sleep(2000);
     // verify both am get node heartbeat
@@ -97,7 +97,8 @@ public class TestAMReporter {
           return null;
         }
       }).when(umbilical).nodeHeartbeat(any(Text.class), any(Text.class), anyInt(),
-        any(LlapTaskUmbilicalProtocol.TezAttemptArray.class));
+        any(LlapTaskUmbilicalProtocol.TezAttemptArray.class),
+        any(LlapTaskUmbilicalProtocol.BooleanArray.class));
       return umbilical;
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/6f38c9f1/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index a4bd4df..5d4ce22 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -15,15 +15,15 @@
 package org.apache.hadoop.hive.llap.tezplugins;
 
 import org.apache.hadoop.hive.llap.tezplugins.LlapTaskSchedulerService.NodeInfo;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
 import org.apache.hadoop.io.Writable;
-
+import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.BooleanArray;
 import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.TezAttemptArray;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -41,6 +41,7 @@ import com.google.common.collect.HashBiMap;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
 import com.google.protobuf.ServiceException;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -66,6 +67,7 @@ import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
 import org.apache.hadoop.hive.llap.tez.Converters;
 import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy;
 import org.apache.hadoop.hive.llap.tezplugins.helpers.SourceStateTracker;
+import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.ProtocolSignature;
@@ -705,16 +707,21 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
 
   private final AtomicLong nodeNotFoundLogTime = new AtomicLong(0);
 
-  void nodePinged(String hostname, String uniqueId, int port, TezAttemptArray tasks) {
+  void nodePinged(String hostname, String uniqueId, int port,
+      TezAttemptArray tasks, BooleanArray guaranteed) {
     // TODO: do we ever need the port? we could just do away with nodeId altogether.
     LlapNodeId nodeId = LlapNodeId.getInstance(hostname, port);
     registerPingingNode(nodeId);
     BiMap<ContainerId, TezTaskAttemptID> biMap =
         entityTracker.getContainerAttemptMapForNode(nodeId);
     if (biMap != null) {
-      HashSet<TezTaskAttemptID> attempts = new HashSet<>();
-      for (Writable w : tasks.get()) {
-        attempts.add((TezTaskAttemptID)w);
+      HashMap<TezTaskAttemptID, Boolean> attempts = new HashMap<>();
+      for (int i = 0; i < tasks.get().length; ++i) {
+        boolean isGuaranteed = false;
+        if (guaranteed != null) {
+          isGuaranteed = ((BooleanWritable)guaranteed.get()[i]).get();
+        }
+        attempts.put((TezTaskAttemptID)tasks.get()[i], isGuaranteed);
       }
       String error = "";
       synchronized (biMap) {
@@ -729,8 +736,10 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
           // However, the next heartbeat(s) should get the value eventually and mark task as alive.
           // Also, we prefer a missed heartbeat over a stuck query in case of discrepancy in ET.
           if (taskNodeId != null && taskNodeId.equals(uniqueId)) {
-            if (attempts.contains(attemptId)) {
-              getContext().taskAlive(entry.getValue());
+            Boolean isGuaranteed = attempts.get(attemptId);
+            if (isGuaranteed != null) {
+              getContext().taskAlive(attemptId);
+              scheduler.taskInfoUpdated(attemptId, isGuaranteed.booleanValue());
             } else {
               error += (attemptId + ", ");
             }
@@ -835,12 +844,12 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
     }
 
     @Override
-    public void nodeHeartbeat(
-        Text hostname, Text uniqueId, int port, TezAttemptArray aw) throws IOException {
+    public void nodeHeartbeat(Text hostname, Text uniqueId, int port,
+        TezAttemptArray aw, BooleanArray guaranteed) throws IOException {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Received heartbeat from [" + hostname + ":" + port +" (" + uniqueId +")]");
       }
-      nodePinged(hostname.toString(), uniqueId.toString(), port, aw);
+      nodePinged(hostname.toString(), uniqueId.toString(), port, aw, guaranteed);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/6f38c9f1/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 6ddecca..5ab1964 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -3012,5 +3012,42 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     return unusedGuaranteed;
   }
 
+  /**
+   * A direct call from communicator to scheduler to propagate data that cannot be passed via Tez.
+   */
+  public void taskInfoUpdated(TezTaskAttemptID attemptId, boolean isGuaranteed) {
+    TaskInfo ti = null;
+    writeLock.lock();
+    try {
+      ti = tasksById.get(attemptId);
+      if (ti == null) {
+        WM_LOG.warn("Unknown task from heartbeat " + attemptId);
+        return;
+      }
+    } finally {
+      writeLock.unlock();
+    }
 
+    boolean newState = false;
+    synchronized (ti) {
+      if (ti.isPendingUpdate) return; // A pending update is not done.
+      if (ti.isGuaranteed == null) return; // The task has terminated, out of date heartbeat.
+      if (ti.lastSetGuaranteed != null && ti.lastSetGuaranteed == isGuaranteed) {
+        return; // The heartbeat is consistent with what we have.
+      }
+      ti.lastSetGuaranteed = isGuaranteed;
+      if (isGuaranteed == ti.isGuaranteed) return; // Already consistent. Can happen w/null lSG.
+
+      // There could be races here, e.g. heartbeat delivered us the old value just after we have
+      // received a successful confirmation from the API, so we are about to overwrite the latter.
+      // We could solve this by adding a version or smth like that; or by ignoring discrepancies
+      // unless we have previously received an update error for this task; however, the only effect
+      // right now are a few cheap redundant update calls; let's just do the simple thing.
+      newState = ti.isGuaranteed;
+      setUpdateStartedUnderTiLock(ti);
+    } // End of synchronized (ti)
+    WM_LOG.info("Sending an update based on inconsistent state from heartbeat for "
+        + attemptId + ", " + newState);
+    sendUpdateMessageAsync(ti, newState);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/6f38c9f1/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
index 90b31e4..4600755 100644
--- a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
+++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
@@ -552,6 +552,36 @@ public class TestLlapTaskSchedulerService {
   }
 
   @Test(timeout = 10000)
+  public void testHeartbeatInconsistency() throws IOException, InterruptedException {
+    final TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper();
+    // Guaranteed flag is inconsistent based on heartbeat - another message should be send.
+    try {
+      Priority highPri = Priority.newInstance(1);
+      TezTaskAttemptID task1 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
+      tsWrapper.ts.updateGuaranteedCount(0);
+      tsWrapper.controlScheduler(true);
+      tsWrapper.allocateTask(task1, null, highPri, new Object());
+      tsWrapper.awaitTotalTaskAllocations(1);
+      TaskInfo ti1 = tsWrapper.ts.getTaskInfo(task1);
+      assertFalse(ti1.isGuaranteed());
+
+      // Heartbeat indicates task has a duck - this must be reverted.
+      tsWrapper.ts.taskInfoUpdated(task1, true);
+      tsWrapper.ts.waitForMessagesSent(1);
+      assertTrue(ti1.getLastSetGuaranteed());
+      assertTrue(ti1.isUpdateInProgress());
+      assertFalse(ti1.isGuaranteed());
+      tsWrapper.ts.handleUpdateResult(ti1, true);
+      assertFalse(ti1.getLastSetGuaranteed());
+
+      tsWrapper.deallocateTask(task1, true, TaskAttemptEndReason.CONTAINER_EXITED);
+      assertEquals(0, tsWrapper.ts.getUnusedGuaranteedCount());
+    } finally {
+      tsWrapper.shutdown();
+    }
+  }
+
+  @Test(timeout = 10000)
   public void testSimpleNoLocalityAllocation() throws IOException, InterruptedException {
     TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper();