You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/03/20 02:10:51 UTC

hive git commit: HIVE-18968 : LLAP: report guaranteed tasks count in AM registry to check for consistency (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

Repository: hive
Updated Branches:
  refs/heads/master 26c0ab6ad -> 68459cf0b


HIVE-18968 : LLAP: report guaranteed tasks count in AM registry to check for consistency (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/68459cf0
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/68459cf0
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/68459cf0

Branch: refs/heads/master
Commit: 68459cf0bfe67dfe72da9095a1dac6b84ede93b0
Parents: 26c0ab6
Author: sergey <se...@apache.org>
Authored: Mon Mar 19 19:10:42 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Mon Mar 19 19:10:42 2018 -0700

----------------------------------------------------------------------
 .../hive/registry/impl/TezAmInstance.java       |  7 ++++
 .../hive/registry/impl/TezAmRegistryImpl.java   | 17 +++++++--
 .../hive/registry/impl/ZkRegistryBase.java      | 16 ++++++--
 .../tezplugins/LlapTaskSchedulerService.java    | 40 ++++++++++++++++++--
 .../ql/exec/tez/GuaranteedTasksAllocator.java   | 13 +++++--
 .../ql/exec/tez/QueryAllocationManager.java     |  7 +++-
 .../hadoop/hive/ql/exec/tez/TezSessionPool.java | 14 +++++--
 .../hadoop/hive/ql/exec/tez/WmTezSession.java   | 38 +++++++++++++++----
 .../hive/ql/exec/tez/WorkloadManager.java       | 12 +++++-
 .../hive/ql/exec/tez/TestWorkloadManager.java   |  4 ++
 .../server/HS2ActivePassiveHARegistry.java      |  5 ++-
 11 files changed, 143 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/68459cf0/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java
index d09cb24..a862947 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java
@@ -16,6 +16,7 @@ package org.apache.hadoop.hive.registry.impl;
 import java.io.IOException;
 
 import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
 import org.apache.hadoop.registry.client.types.AddressTypes;
 import org.apache.hadoop.registry.client.types.Endpoint;
@@ -50,6 +51,12 @@ public class TezAmInstance extends ServiceInstanceBase {
   public String getSessionId() {
     return getProperties().get(TezAmRegistryImpl.AM_SESSION_ID);
   }
+  
+  public int getGuaranteedCount() {
+    String str = getProperties().get(TezAmRegistryImpl.AM_GUARANTEED_COUNT);
+    if (!StringUtils.isEmpty(str)) return 0;
+    return Integer.parseInt(str);
+  }
 
   public String getPluginTokenJobId() {
     return getProperties().get(TezAmRegistryImpl.AM_PLUGIN_JOBID);

http://git-wip-us.apache.org/repos/asf/hive/blob/68459cf0/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java
index ab02cf4..3ff732d 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java
@@ -18,6 +18,7 @@ import org.apache.commons.lang3.StringUtils;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Collection;
+
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -35,11 +36,12 @@ public class TezAmRegistryImpl extends ZkRegistryBase<TezAmInstance> {
   static final String IPC_TEZCLIENT = "tez-client";
   static final String IPC_PLUGIN = "llap-plugin";
   static final String AM_SESSION_ID = "am.session.id", AM_PLUGIN_TOKEN = "am.plugin.token",
-      AM_PLUGIN_JOBID = "am.plugin.jobid";
+      AM_PLUGIN_JOBID = "am.plugin.jobid", AM_GUARANTEED_COUNT = "am.guaranteed.count";
   private final static String NAMESPACE_PREFIX = "tez-am-";
   private static final String SASL_LOGIN_CONTEXT_NAME = "TezAmZooKeeperClient";
 
   private final String registryName;
+  private ServiceRecord srv;
 
   public static TezAmRegistryImpl create(Configuration conf, boolean useSecureZk) {
     String amRegistryName = HiveConf.getVar(conf, ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME);
@@ -68,8 +70,11 @@ public class TezAmRegistryImpl extends ZkRegistryBase<TezAmInstance> {
   }
 
   public String register(int amPort, int pluginPort, String sessionId,
-      String serializedToken, String jobIdForToken) throws IOException {
-    ServiceRecord srv = new ServiceRecord();
+      String serializedToken, String jobIdForToken, int guaranteedCount) throws IOException {
+    if (srv != null) {
+      throw new UnsupportedOperationException("Already registered with " + srv);
+    }
+    srv = new ServiceRecord();
     Endpoint rpcEndpoint = RegistryTypeUtils.ipcEndpoint(
         IPC_TEZCLIENT, new InetSocketAddress(hostname, amPort));
     srv.addInternalEndpoint(rpcEndpoint);
@@ -83,12 +88,18 @@ public class TezAmRegistryImpl extends ZkRegistryBase<TezAmInstance> {
     boolean hasToken = serializedToken != null;
     srv.set(AM_PLUGIN_TOKEN, hasToken ? serializedToken : "");
     srv.set(AM_PLUGIN_JOBID, jobIdForToken != null ? jobIdForToken : "");
+    srv.set(AM_GUARANTEED_COUNT, Integer.toString(guaranteedCount));
     String uniqueId = registerServiceRecord(srv);
     LOG.info("Registered this AM: rpc: {}, plugin: {}, sessionId: {}, token: {}, znodePath: {}",
         rpcEndpoint, pluginEndpoint, sessionId, hasToken, getRegistrationZnodePath());
     return uniqueId;
   }
 
+  public void updateGuaranteed(int guaranteedCount) throws IOException {
+    srv.set(AM_GUARANTEED_COUNT, Integer.toString(guaranteedCount));
+    updateServiceRecord(srv, false, false);
+  }
+
   public TezAmInstance getInstance(String name) {
     Collection<TezAmInstance> instances = getAllInternal();
     for(TezAmInstance instance : instances) {

http://git-wip-us.apache.org/repos/asf/hive/blob/68459cf0/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
index 680d9af..7ca3548 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
@@ -98,7 +98,7 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
 
   private final Set<ServiceInstanceStateChangeListener<InstanceType>> stateChangeListeners;
 
-  private final boolean doCheckAcls;
+  protected final boolean doCheckAcls;
   // Secure ZK is only set up by the registering service; anyone can read the registrations.
   private final String zkPrincipal, zkKeytab, saslLoginContextName;
   private String userNameFromPrincipal; // Only set when setting up the secure config for ZK.
@@ -286,7 +286,7 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
       // even under connection or session interruption (will automatically handle retries)
       znode = new PersistentEphemeralNode(zooKeeperClient, Mode.EPHEMERAL_SEQUENTIAL,
           workersPath + "/" + workerNodePrefix, encoder.toBytes(srv));
-
+    
       // start the creation of znodes
       znode.start();
 
@@ -318,7 +318,12 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
     return uniqueId;
   }
 
-  protected final void updateServiceRecord(ServiceRecord srv) throws IOException {
+  protected final void updateServiceRecord(
+     ServiceRecord srv, boolean doCheckAcls, boolean closeOnFailure) throws IOException {
+    if (srv.get(UNIQUE_IDENTIFIER) == null) {
+      srv.set(UNIQUE_IDENTIFIER, UNIQUE_ID.toString());
+    }
+    // waitForInitialCreate must have already been called in registerServiceRecord.
     try {
       znode.setData(encoder.toBytes(srv));
 
@@ -331,11 +336,14 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
       }
     } catch (Exception e) {
       LOG.error("Unable to update znode with new service record", e);
-      CloseableUtils.closeQuietly(znode);
+      if (closeOnFailure) {
+        CloseableUtils.closeQuietly(znode);
+      }
       throw (e instanceof IOException) ? (IOException) e : new IOException(e);
     }
   }
 
+
   final void initializeWithoutRegisteringInternal() throws IOException {
     // Create a znode under the rootNamespace parent for this instance of the server
     try {

http://git-wip-us.apache.org/repos/asf/hive/blob/68459cf0/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index d536341..8217964 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -257,6 +257,16 @@ public class LlapTaskSchedulerService extends TaskScheduler {
 
   private int totalGuaranteed = 0, unusedGuaranteed = 0;
 
+  /**
+   * An internal version to make sure we don't race and overwrite a newer totalGuaranteed count in
+   * ZK with an older one, without requiring us to make ZK updates under the main writeLock.
+   * This is updated under writeLock, together with totalGuaranteed.
+   */
+  private long totalGuaranteedVersion = Long.MIN_VALUE;
+  private final Object registryUpdateLock = new Object(); // The lock for ZK updates.
+  /** The last totalGuaranteedVersion sent to ZK. Updated under registryUpdateLock. */
+  private long tgVersionSent = Long.MIN_VALUE;
+
   private LlapTaskCommunicator communicator;
   private final int amPort;
   private final String serializedToken, jobIdForToken;
@@ -504,6 +514,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
   @VisibleForTesting
   void updateGuaranteedCount(int newTotalGuaranteed) {
     List<TaskInfo> toUpdate = null;
+    long tgVersionForZk;
     writeLock.lock();
     try {
       // TODO: when this code is a little less hot, change most logs to debug.
@@ -514,8 +525,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       // The "procedural" approach requires that we track the ducks traveling on network,
       // concurrent terminations, etc. So, while more precise it's much more complex.
       int delta = newTotalGuaranteed - totalGuaranteed;
-      WM_LOG.info("Received guaranteed tasks " + newTotalGuaranteed
-          + "; the delta to adjust by is " + delta);
+      tgVersionForZk = ++totalGuaranteedVersion;
+      WM_LOG.info("Received guaranteed tasks " + newTotalGuaranteed + " (internal version "
+          + tgVersionForZk + "); the delta to adjust by is " + delta);
       if (delta == 0) return;
       totalGuaranteed = newTotalGuaranteed;
       if (metrics != null) {
@@ -562,6 +574,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     } finally {
       writeLock.unlock();
     }
+    updateGuaranteedInRegistry(tgVersionForZk, newTotalGuaranteed);
     if (toUpdate == null) return;
     WM_LOG.info("Sending updates to " + toUpdate.size() + " tasks");
     for (TaskInfo ti : toUpdate) {
@@ -752,7 +765,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         amRegistry.start();
         int pluginPort = pluginEndpoint != null ? pluginEndpoint.getActualPort() : -1;
         amRegistry.register(amPort, pluginPort, HiveConf.getVar(conf, ConfVars.HIVESESSIONID),
-            serializedToken, jobIdForToken);
+            serializedToken, jobIdForToken, 0);
       }
     } finally {
       writeLock.unlock();
@@ -969,6 +982,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     if (metrics != null) {
       metrics.incrCompletedDagCount();
     }
+    long tgVersionForZk;
     writeLock.lock();
     try {
       dagRunning = false;
@@ -1002,6 +1016,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       }
 
       totalGuaranteed = unusedGuaranteed = 0;
+      tgVersionForZk = ++totalGuaranteedVersion;
       if (metrics != null) {
         metrics.setDagId(null);
         // We remove the tasks above without state checks so just reset all metrics to 0.
@@ -1012,9 +1027,28 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     } finally {
       writeLock.unlock();
     }
+    updateGuaranteedInRegistry(tgVersionForZk, 0);
     // TODO Cleanup pending tasks etc, so that the next dag is not affected.
   }
 
+  private void updateGuaranteedInRegistry(long tgVersionForZk, int newTotalGuaranteed) {
+    if (amRegistry == null) return;
+    synchronized (registryUpdateLock) {
+      // Make sure the updates are not sent to ZK out of order compared to how we apply them in AM.
+      if (tgVersionForZk <= tgVersionSent) return;
+      try {
+        amRegistry.updateGuaranteed(newTotalGuaranteed);
+        tgVersionSent = tgVersionForZk;
+      } catch (IOException ex) {
+        // Ignore for now. HS2 will probably try to send us the count we already have again.
+        // We are assuming here that if we can't talk to ZK we will eventually fail.
+        LOG.error("Failed to update guaranteed count in registry; ignoring", ex);
+      }
+    }
+  }
+
+
+
   @Override
   public void blacklistNode(NodeId nodeId) {
     LOG.info("BlacklistNode not supported");

http://git-wip-us.apache.org/repos/asf/hive/blob/68459cf0/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
index 82b38d5..a52928c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
@@ -143,14 +143,19 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
     }
   }
 
-  private void updateSessionAsync(final WmTezSession session, final int intAlloc) {
-    boolean needsUpdate = session.setSendingGuaranteed(intAlloc);
-    if (!needsUpdate) return;
+  @Override
+  public void updateSessionAsync(WmTezSession session) {
+    updateSessionAsync(session, null); // Resend existing value if necessary.
+  }
+
+  private void updateSessionAsync(final WmTezSession session, final Integer intAlloc) {
+    Integer valueToSend = session.setSendingGuaranteed(intAlloc);
+    if (valueToSend == null) return;
     // Note: this assumes that the pattern where the same session object is reset with a different
     //       Tez client is not used. It was used a lot in the past but appears to be gone from most
     //       HS2 session pool paths, and this patch removes the last one (reopen).
     UpdateQueryRequestProto request = UpdateQueryRequestProto
-        .newBuilder().setGuaranteedTaskCount(intAlloc).build();
+        .newBuilder().setGuaranteedTaskCount(valueToSend.intValue()).build();
     LOG.info("Updating {} with {} guaranteed tasks", session.getSessionId(), intAlloc);
     amCommunicator.sendUpdateQuery(request, (AmPluginNode)session, new UpdateCallback(session));
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/68459cf0/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
index a446902..9885ce7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
@@ -27,7 +27,7 @@ interface QueryAllocationManager {
   void start();
   void stop();
   /**
-   * Updates the session allocations asynchoronously.
+   * Updates the session allocations asynchronously.
    * @param totalMaxAlloc The total maximum fraction of the cluster to allocate. Used to
    *                      avoid various artifacts, esp. with small numbers and double weirdness.
    *                      Null means the total is unknown.
@@ -39,4 +39,9 @@ interface QueryAllocationManager {
    * Sets a callback to be invoked on cluster changes relevant to resource allocation.
    */
   void setClusterChangedCallback(Runnable clusterChangedCallback);
+  
+  /**
+   * Updates the session asynchronously with the existing allocation.
+   */
+  void updateSessionAsync(WmTezSession session);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/68459cf0/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
index 0962460..89954cb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
@@ -334,10 +334,16 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
     }
 
     @Override
-    public void onUpdate(TezAmInstance serviceInstance, int ephSeqVersion) {
-      // We currently never update the znode once registered.
-      // AM recovery will create a new node when it calls register.
-      LOG.warn("Received an unexpected update for instance={}. Ignoring", serviceInstance);
+    public void onUpdate(TezAmInstance si, int ephSeqVersion) {
+      String sessionId = si.getSessionId();
+      SessionType session = bySessionId.get(sessionId);
+      if (session != null) {
+        LOG.info("AM for " + sessionId + ", v." + ephSeqVersion + " has updated; updating ["
+            + session + "] with an endpoint at " + si.getPluginPort());
+        session.updateFromRegistry(si, ephSeqVersion);
+      } else {
+        LOG.warn("AM for an unknown " + sessionId + " has updated; ignoring");
+      }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/68459cf0/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
index d4c3ab9..1cf5493 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
@@ -106,6 +106,13 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
 
   @Override
   void updateFromRegistry(TezAmInstance si, int ephSeqVersion) {
+    updateAmEndpointInfo(si, ephSeqVersion);
+    if (si != null) {
+      handleGuaranteedTasksChange(si.getGuaranteedCount());
+    }
+  }
+
+  public void updateAmEndpointInfo(TezAmInstance si, int ephSeqVersion) {
     AmPluginInfo info = si == null ? null : new AmPluginInfo(si.getHost(), si.getPluginPort(),
         si.getPluginToken(), si.getPluginTokenJobId());
     synchronized (amPluginInfoLock) {
@@ -131,6 +138,19 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
       }
     }
   }
+  
+
+  private void handleGuaranteedTasksChange(int guaranteedCount) {
+    boolean doNotify = false;
+    synchronized (actualState) {
+      // A noop if we are in process of sending or if we have the correct value.
+      if (actualState.sending != -1 || actualState.sent == guaranteedCount) return;
+      actualState.sent = guaranteedCount;
+      doNotify = actualState.target != guaranteedCount;
+    }
+    if (!doNotify) return;
+    wmParent.notifyOfInconsistentAllocation(this);
+  }
 
   @Override
   public AmPluginInfo getAmPluginInfo(Ref<Integer> version) {
@@ -161,17 +181,21 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
     return this.clusterFraction;
   }
 
-  boolean setSendingGuaranteed(int intAlloc) {
-    assert intAlloc >= 0;
+  Integer setSendingGuaranteed(Integer intAlloc) {
+    assert intAlloc == null || intAlloc >= 0;
     synchronized (actualState) {
-      actualState.target = intAlloc;
-      if (actualState.sending != -1) return false; // The sender will take care of this.
-      if (actualState.sent == intAlloc) return false; // The value didn't change.
+      if (intAlloc != null) {
+        actualState.target = intAlloc;
+      } else {
+        intAlloc = actualState.target;
+      }
+      if (actualState.sending != -1) return null; // The sender will take care of this.
+      if (actualState.sent == intAlloc) return null; // The value didn't change.
       actualState.sending = intAlloc;
-      return true;
+      return intAlloc;
     }
   }
-
+  
   public String getAllocationState() {
     synchronized (actualState) {
       return "actual/target " + actualState.sent + "/" + actualState.target

http://git-wip-us.apache.org/repos/asf/hive/blob/68459cf0/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index 00e2c20..f0e620c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -685,7 +685,6 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       processPoolChangesOnMasterThread(poolName, hasRequeues, syncWork);
     }
 
-
     // 12. Save state for future iterations.
     for (KillQueryContext killCtx : syncWork.toKillQuery.values()) {
       if (killQueryInProgress.put(killCtx.session, killCtx) != null) {
@@ -698,7 +697,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       entry.getValue().endEvent(entry.getKey());
     }
 
-    // 14. Notify tests and global async ops.
+    // 14. Give our final state to UI/API requests if any.
     if (e.dumpStateFuture != null) {
       List<String> result = new ArrayList<>();
       result.add("RESOURCE PLAN " + rpName + "; default pool " + defaultPool);
@@ -708,6 +707,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       e.dumpStateFuture.set(result);
       e.dumpStateFuture = null;
     }
+    
+    // 15. Notify tests and global async ops.
     if (e.testEvent != null) {
       e.testEvent.set(true);
       e.testEvent = null;
@@ -1422,6 +1423,13 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     }
   }
 
+  public void notifyOfInconsistentAllocation(WmTezSession session) {
+    // We just act as a pass-thru between the session and allocation manager. We don't change the
+    // allocation target (only WM thread can do that); therefore we can do this directly and
+    // actualState-based sync will take care of multiple potential message senders.
+    allocationManager.updateSessionAsync(session);
+  }
+
   public void notifyOfClusterStateChange() {
     currentLock.lock();
     try {

http://git-wip-us.apache.org/repos/asf/hive/blob/68459cf0/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
index 8d185ba..20a5947 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
@@ -110,6 +110,10 @@ public class TestWorkloadManager {
     public void updateSessionsAsync(Double totalMaxAlloc, List<WmTezSession> sessions) {
       isCalled = true;
     }
+    
+    @Override
+    public void updateSessionAsync(WmTezSession session) {
+    }
 
     void assertWasCalledAndReset() {
       assertTrue(isCalled);

http://git-wip-us.apache.org/repos/asf/hive/blob/68459cf0/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java b/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java
index 819ce19..7c75489 100644
--- a/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java
+++ b/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java
@@ -156,9 +156,10 @@ public class HS2ActivePassiveHARegistry extends ZkRegistryBase<HiveServer2Instan
     addEndpointToServiceRecord(getNewServiceRecord(), PASSIVE_ENDPOINT);
   }
 
-  private void addEndpointToServiceRecord(final ServiceRecord srv, final String endpointName) throws IOException {
+  private void addEndpointToServiceRecord(
+      final ServiceRecord srv, final String endpointName) throws IOException {
     updateEndpoint(srv, endpointName);
-    updateServiceRecord(srv);
+    updateServiceRecord(srv, doCheckAcls, true);
   }
 
   private void updateEndpoint(final ServiceRecord srv, final String endpointName) {