You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/03/20 02:10:51 UTC
hive git commit: HIVE-18968 : LLAP: report guaranteed tasks count in
AM registry to check for consistency (Sergey Shelukhin,
reviewed by Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/master 26c0ab6ad -> 68459cf0b
HIVE-18968 : LLAP: report guaranteed tasks count in AM registry to check for consistency (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/68459cf0
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/68459cf0
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/68459cf0
Branch: refs/heads/master
Commit: 68459cf0bfe67dfe72da9095a1dac6b84ede93b0
Parents: 26c0ab6
Author: sergey <se...@apache.org>
Authored: Mon Mar 19 19:10:42 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Mon Mar 19 19:10:42 2018 -0700
----------------------------------------------------------------------
.../hive/registry/impl/TezAmInstance.java | 7 ++++
.../hive/registry/impl/TezAmRegistryImpl.java | 17 +++++++--
.../hive/registry/impl/ZkRegistryBase.java | 16 ++++++--
.../tezplugins/LlapTaskSchedulerService.java | 40 ++++++++++++++++++--
.../ql/exec/tez/GuaranteedTasksAllocator.java | 13 +++++--
.../ql/exec/tez/QueryAllocationManager.java | 7 +++-
.../hadoop/hive/ql/exec/tez/TezSessionPool.java | 14 +++++--
.../hadoop/hive/ql/exec/tez/WmTezSession.java | 38 +++++++++++++++----
.../hive/ql/exec/tez/WorkloadManager.java | 12 +++++-
.../hive/ql/exec/tez/TestWorkloadManager.java | 4 ++
.../server/HS2ActivePassiveHARegistry.java | 5 ++-
11 files changed, 143 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/68459cf0/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java
index d09cb24..a862947 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java
@@ -16,6 +16,7 @@ package org.apache.hadoop.hive.registry.impl;
import java.io.IOException;
import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
import org.apache.hadoop.registry.client.types.AddressTypes;
import org.apache.hadoop.registry.client.types.Endpoint;
@@ -50,6 +51,12 @@ public class TezAmInstance extends ServiceInstanceBase {
public String getSessionId() {
return getProperties().get(TezAmRegistryImpl.AM_SESSION_ID);
}
+
+ public int getGuaranteedCount() {
+ String str = getProperties().get(TezAmRegistryImpl.AM_GUARANTEED_COUNT);
+ if (!StringUtils.isEmpty(str)) return 0;
+ return Integer.parseInt(str);
+ }
public String getPluginTokenJobId() {
return getProperties().get(TezAmRegistryImpl.AM_PLUGIN_JOBID);
http://git-wip-us.apache.org/repos/asf/hive/blob/68459cf0/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java
index ab02cf4..3ff732d 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java
@@ -18,6 +18,7 @@ import org.apache.commons.lang3.StringUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
+
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -35,11 +36,12 @@ public class TezAmRegistryImpl extends ZkRegistryBase<TezAmInstance> {
static final String IPC_TEZCLIENT = "tez-client";
static final String IPC_PLUGIN = "llap-plugin";
static final String AM_SESSION_ID = "am.session.id", AM_PLUGIN_TOKEN = "am.plugin.token",
- AM_PLUGIN_JOBID = "am.plugin.jobid";
+ AM_PLUGIN_JOBID = "am.plugin.jobid", AM_GUARANTEED_COUNT = "am.guaranteed.count";
private final static String NAMESPACE_PREFIX = "tez-am-";
private static final String SASL_LOGIN_CONTEXT_NAME = "TezAmZooKeeperClient";
private final String registryName;
+ private ServiceRecord srv;
public static TezAmRegistryImpl create(Configuration conf, boolean useSecureZk) {
String amRegistryName = HiveConf.getVar(conf, ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME);
@@ -68,8 +70,11 @@ public class TezAmRegistryImpl extends ZkRegistryBase<TezAmInstance> {
}
public String register(int amPort, int pluginPort, String sessionId,
- String serializedToken, String jobIdForToken) throws IOException {
- ServiceRecord srv = new ServiceRecord();
+ String serializedToken, String jobIdForToken, int guaranteedCount) throws IOException {
+ if (srv != null) {
+ throw new UnsupportedOperationException("Already registered with " + srv);
+ }
+ srv = new ServiceRecord();
Endpoint rpcEndpoint = RegistryTypeUtils.ipcEndpoint(
IPC_TEZCLIENT, new InetSocketAddress(hostname, amPort));
srv.addInternalEndpoint(rpcEndpoint);
@@ -83,12 +88,18 @@ public class TezAmRegistryImpl extends ZkRegistryBase<TezAmInstance> {
boolean hasToken = serializedToken != null;
srv.set(AM_PLUGIN_TOKEN, hasToken ? serializedToken : "");
srv.set(AM_PLUGIN_JOBID, jobIdForToken != null ? jobIdForToken : "");
+ srv.set(AM_GUARANTEED_COUNT, Integer.toString(guaranteedCount));
String uniqueId = registerServiceRecord(srv);
LOG.info("Registered this AM: rpc: {}, plugin: {}, sessionId: {}, token: {}, znodePath: {}",
rpcEndpoint, pluginEndpoint, sessionId, hasToken, getRegistrationZnodePath());
return uniqueId;
}
+ public void updateGuaranteed(int guaranteedCount) throws IOException {
+ srv.set(AM_GUARANTEED_COUNT, Integer.toString(guaranteedCount));
+ updateServiceRecord(srv, false, false);
+ }
+
public TezAmInstance getInstance(String name) {
Collection<TezAmInstance> instances = getAllInternal();
for(TezAmInstance instance : instances) {
http://git-wip-us.apache.org/repos/asf/hive/blob/68459cf0/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
index 680d9af..7ca3548 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
@@ -98,7 +98,7 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
private final Set<ServiceInstanceStateChangeListener<InstanceType>> stateChangeListeners;
- private final boolean doCheckAcls;
+ protected final boolean doCheckAcls;
// Secure ZK is only set up by the registering service; anyone can read the registrations.
private final String zkPrincipal, zkKeytab, saslLoginContextName;
private String userNameFromPrincipal; // Only set when setting up the secure config for ZK.
@@ -286,7 +286,7 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
// even under connection or session interruption (will automatically handle retries)
znode = new PersistentEphemeralNode(zooKeeperClient, Mode.EPHEMERAL_SEQUENTIAL,
workersPath + "/" + workerNodePrefix, encoder.toBytes(srv));
-
+
// start the creation of znodes
znode.start();
@@ -318,7 +318,12 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
return uniqueId;
}
- protected final void updateServiceRecord(ServiceRecord srv) throws IOException {
+ protected final void updateServiceRecord(
+ ServiceRecord srv, boolean doCheckAcls, boolean closeOnFailure) throws IOException {
+ if (srv.get(UNIQUE_IDENTIFIER) == null) {
+ srv.set(UNIQUE_IDENTIFIER, UNIQUE_ID.toString());
+ }
+ // waitForInitialCreate must have already been called in registerServiceRecord.
try {
znode.setData(encoder.toBytes(srv));
@@ -331,11 +336,14 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
}
} catch (Exception e) {
LOG.error("Unable to update znode with new service record", e);
- CloseableUtils.closeQuietly(znode);
+ if (closeOnFailure) {
+ CloseableUtils.closeQuietly(znode);
+ }
throw (e instanceof IOException) ? (IOException) e : new IOException(e);
}
}
+
final void initializeWithoutRegisteringInternal() throws IOException {
// Create a znode under the rootNamespace parent for this instance of the server
try {
http://git-wip-us.apache.org/repos/asf/hive/blob/68459cf0/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index d536341..8217964 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -257,6 +257,16 @@ public class LlapTaskSchedulerService extends TaskScheduler {
private int totalGuaranteed = 0, unusedGuaranteed = 0;
+ /**
+ * An internal version to make sure we don't race and overwrite a newer totalGuaranteed count in
+ * ZK with an older one, without requiring us to make ZK updates under the main writeLock.
+ * This is updated under writeLock, together with totalGuaranteed.
+ */
+ private long totalGuaranteedVersion = Long.MIN_VALUE;
+ private final Object registryUpdateLock = new Object(); // The lock for ZK updates.
+ /** The last totalGuaranteedVersion sent to ZK. Updated under registryUpdateLock. */
+ private long tgVersionSent = Long.MIN_VALUE;
+
private LlapTaskCommunicator communicator;
private final int amPort;
private final String serializedToken, jobIdForToken;
@@ -504,6 +514,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
@VisibleForTesting
void updateGuaranteedCount(int newTotalGuaranteed) {
List<TaskInfo> toUpdate = null;
+ long tgVersionForZk;
writeLock.lock();
try {
// TODO: when this code is a little less hot, change most logs to debug.
@@ -514,8 +525,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
// The "procedural" approach requires that we track the ducks traveling on network,
// concurrent terminations, etc. So, while more precise it's much more complex.
int delta = newTotalGuaranteed - totalGuaranteed;
- WM_LOG.info("Received guaranteed tasks " + newTotalGuaranteed
- + "; the delta to adjust by is " + delta);
+ tgVersionForZk = ++totalGuaranteedVersion;
+ WM_LOG.info("Received guaranteed tasks " + newTotalGuaranteed + " (internal version "
+ + tgVersionForZk + "); the delta to adjust by is " + delta);
if (delta == 0) return;
totalGuaranteed = newTotalGuaranteed;
if (metrics != null) {
@@ -562,6 +574,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
} finally {
writeLock.unlock();
}
+ updateGuaranteedInRegistry(tgVersionForZk, newTotalGuaranteed);
if (toUpdate == null) return;
WM_LOG.info("Sending updates to " + toUpdate.size() + " tasks");
for (TaskInfo ti : toUpdate) {
@@ -752,7 +765,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
amRegistry.start();
int pluginPort = pluginEndpoint != null ? pluginEndpoint.getActualPort() : -1;
amRegistry.register(amPort, pluginPort, HiveConf.getVar(conf, ConfVars.HIVESESSIONID),
- serializedToken, jobIdForToken);
+ serializedToken, jobIdForToken, 0);
}
} finally {
writeLock.unlock();
@@ -969,6 +982,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
if (metrics != null) {
metrics.incrCompletedDagCount();
}
+ long tgVersionForZk;
writeLock.lock();
try {
dagRunning = false;
@@ -1002,6 +1016,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
}
totalGuaranteed = unusedGuaranteed = 0;
+ tgVersionForZk = ++totalGuaranteedVersion;
if (metrics != null) {
metrics.setDagId(null);
// We remove the tasks above without state checks so just reset all metrics to 0.
@@ -1012,9 +1027,28 @@ public class LlapTaskSchedulerService extends TaskScheduler {
} finally {
writeLock.unlock();
}
+ updateGuaranteedInRegistry(tgVersionForZk, 0);
// TODO Cleanup pending tasks etc, so that the next dag is not affected.
}
+ private void updateGuaranteedInRegistry(long tgVersionForZk, int newTotalGuaranteed) {
+ if (amRegistry == null) return;
+ synchronized (registryUpdateLock) {
+ // Make sure the updates are not sent to ZK out of order compared to how we apply them in AM.
+ if (tgVersionForZk <= tgVersionSent) return;
+ try {
+ amRegistry.updateGuaranteed(newTotalGuaranteed);
+ tgVersionSent = tgVersionForZk;
+ } catch (IOException ex) {
+ // Ignore for now. HS2 will probably try to send us the count we already have again.
+ // We are assuming here that if we can't talk to ZK we will eventually fail.
+ LOG.error("Failed to update guaranteed count in registry; ignoring", ex);
+ }
+ }
+ }
+
+
+
@Override
public void blacklistNode(NodeId nodeId) {
LOG.info("BlacklistNode not supported");
http://git-wip-us.apache.org/repos/asf/hive/blob/68459cf0/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
index 82b38d5..a52928c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
@@ -143,14 +143,19 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
}
}
- private void updateSessionAsync(final WmTezSession session, final int intAlloc) {
- boolean needsUpdate = session.setSendingGuaranteed(intAlloc);
- if (!needsUpdate) return;
+ @Override
+ public void updateSessionAsync(WmTezSession session) {
+ updateSessionAsync(session, null); // Resend existing value if necessary.
+ }
+
+ private void updateSessionAsync(final WmTezSession session, final Integer intAlloc) {
+ Integer valueToSend = session.setSendingGuaranteed(intAlloc);
+ if (valueToSend == null) return;
// Note: this assumes that the pattern where the same session object is reset with a different
// Tez client is not used. It was used a lot in the past but appears to be gone from most
// HS2 session pool paths, and this patch removes the last one (reopen).
UpdateQueryRequestProto request = UpdateQueryRequestProto
- .newBuilder().setGuaranteedTaskCount(intAlloc).build();
+ .newBuilder().setGuaranteedTaskCount(valueToSend.intValue()).build();
LOG.info("Updating {} with {} guaranteed tasks", session.getSessionId(), intAlloc);
amCommunicator.sendUpdateQuery(request, (AmPluginNode)session, new UpdateCallback(session));
}
http://git-wip-us.apache.org/repos/asf/hive/blob/68459cf0/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
index a446902..9885ce7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
@@ -27,7 +27,7 @@ interface QueryAllocationManager {
void start();
void stop();
/**
- * Updates the session allocations asynchoronously.
+ * Updates the session allocations asynchronously.
* @param totalMaxAlloc The total maximum fraction of the cluster to allocate. Used to
* avoid various artifacts, esp. with small numbers and double weirdness.
* Null means the total is unknown.
@@ -39,4 +39,9 @@ interface QueryAllocationManager {
* Sets a callback to be invoked on cluster changes relevant to resource allocation.
*/
void setClusterChangedCallback(Runnable clusterChangedCallback);
+
+ /**
+ * Updates the session asynchronously with the existing allocation.
+ */
+ void updateSessionAsync(WmTezSession session);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/68459cf0/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
index 0962460..89954cb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
@@ -334,10 +334,16 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
}
@Override
- public void onUpdate(TezAmInstance serviceInstance, int ephSeqVersion) {
- // We currently never update the znode once registered.
- // AM recovery will create a new node when it calls register.
- LOG.warn("Received an unexpected update for instance={}. Ignoring", serviceInstance);
+ public void onUpdate(TezAmInstance si, int ephSeqVersion) {
+ String sessionId = si.getSessionId();
+ SessionType session = bySessionId.get(sessionId);
+ if (session != null) {
+ LOG.info("AM for " + sessionId + ", v." + ephSeqVersion + " has updated; updating ["
+ + session + "] with an endpoint at " + si.getPluginPort());
+ session.updateFromRegistry(si, ephSeqVersion);
+ } else {
+ LOG.warn("AM for an unknown " + sessionId + " has updated; ignoring");
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/68459cf0/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
index d4c3ab9..1cf5493 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
@@ -106,6 +106,13 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
@Override
void updateFromRegistry(TezAmInstance si, int ephSeqVersion) {
+ updateAmEndpointInfo(si, ephSeqVersion);
+ if (si != null) {
+ handleGuaranteedTasksChange(si.getGuaranteedCount());
+ }
+ }
+
+ public void updateAmEndpointInfo(TezAmInstance si, int ephSeqVersion) {
AmPluginInfo info = si == null ? null : new AmPluginInfo(si.getHost(), si.getPluginPort(),
si.getPluginToken(), si.getPluginTokenJobId());
synchronized (amPluginInfoLock) {
@@ -131,6 +138,19 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
}
}
}
+
+
+ private void handleGuaranteedTasksChange(int guaranteedCount) {
+ boolean doNotify = false;
+ synchronized (actualState) {
+ // A noop if we are in process of sending or if we have the correct value.
+ if (actualState.sending != -1 || actualState.sent == guaranteedCount) return;
+ actualState.sent = guaranteedCount;
+ doNotify = actualState.target != guaranteedCount;
+ }
+ if (!doNotify) return;
+ wmParent.notifyOfInconsistentAllocation(this);
+ }
@Override
public AmPluginInfo getAmPluginInfo(Ref<Integer> version) {
@@ -161,17 +181,21 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
return this.clusterFraction;
}
- boolean setSendingGuaranteed(int intAlloc) {
- assert intAlloc >= 0;
+ Integer setSendingGuaranteed(Integer intAlloc) {
+ assert intAlloc == null || intAlloc >= 0;
synchronized (actualState) {
- actualState.target = intAlloc;
- if (actualState.sending != -1) return false; // The sender will take care of this.
- if (actualState.sent == intAlloc) return false; // The value didn't change.
+ if (intAlloc != null) {
+ actualState.target = intAlloc;
+ } else {
+ intAlloc = actualState.target;
+ }
+ if (actualState.sending != -1) return null; // The sender will take care of this.
+ if (actualState.sent == intAlloc) return null; // The value didn't change.
actualState.sending = intAlloc;
- return true;
+ return intAlloc;
}
}
-
+
public String getAllocationState() {
synchronized (actualState) {
return "actual/target " + actualState.sent + "/" + actualState.target
http://git-wip-us.apache.org/repos/asf/hive/blob/68459cf0/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index 00e2c20..f0e620c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -685,7 +685,6 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
processPoolChangesOnMasterThread(poolName, hasRequeues, syncWork);
}
-
// 12. Save state for future iterations.
for (KillQueryContext killCtx : syncWork.toKillQuery.values()) {
if (killQueryInProgress.put(killCtx.session, killCtx) != null) {
@@ -698,7 +697,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
entry.getValue().endEvent(entry.getKey());
}
- // 14. Notify tests and global async ops.
+ // 14. Give our final state to UI/API requests if any.
if (e.dumpStateFuture != null) {
List<String> result = new ArrayList<>();
result.add("RESOURCE PLAN " + rpName + "; default pool " + defaultPool);
@@ -708,6 +707,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
e.dumpStateFuture.set(result);
e.dumpStateFuture = null;
}
+
+ // 15. Notify tests and global async ops.
if (e.testEvent != null) {
e.testEvent.set(true);
e.testEvent = null;
@@ -1422,6 +1423,13 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
}
+ public void notifyOfInconsistentAllocation(WmTezSession session) {
+ // We just act as a pass-thru between the session and allocation manager. We don't change the
+ // allocation target (only WM thread can do that); therefore we can do this directly and
+ // actualState-based sync will take care of multiple potential message senders.
+ allocationManager.updateSessionAsync(session);
+ }
+
public void notifyOfClusterStateChange() {
currentLock.lock();
try {
http://git-wip-us.apache.org/repos/asf/hive/blob/68459cf0/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
index 8d185ba..20a5947 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
@@ -110,6 +110,10 @@ public class TestWorkloadManager {
public void updateSessionsAsync(Double totalMaxAlloc, List<WmTezSession> sessions) {
isCalled = true;
}
+
+ @Override
+ public void updateSessionAsync(WmTezSession session) {
+ }
void assertWasCalledAndReset() {
assertTrue(isCalled);
http://git-wip-us.apache.org/repos/asf/hive/blob/68459cf0/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java b/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java
index 819ce19..7c75489 100644
--- a/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java
+++ b/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java
@@ -156,9 +156,10 @@ public class HS2ActivePassiveHARegistry extends ZkRegistryBase<HiveServer2Instan
addEndpointToServiceRecord(getNewServiceRecord(), PASSIVE_ENDPOINT);
}
- private void addEndpointToServiceRecord(final ServiceRecord srv, final String endpointName) throws IOException {
+ private void addEndpointToServiceRecord(
+ final ServiceRecord srv, final String endpointName) throws IOException {
updateEndpoint(srv, endpointName);
- updateServiceRecord(srv);
+ updateServiceRecord(srv, doCheckAcls, true);
}
private void updateEndpoint(final ServiceRecord srv, final String endpointName) {