You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/11/30 00:13:59 UTC
hive git commit: HIVE-17905 : propagate background LLAP cluster
changes to WM (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/master 73ea9f595 -> 4ccea29bd
HIVE-17905 : propagate background LLAP cluster changes to WM (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4ccea29b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4ccea29b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4ccea29b
Branch: refs/heads/master
Commit: 4ccea29bdc9f1f0af3611e3abba1e8af5b898ef8
Parents: 73ea9f5
Author: sergey <se...@apache.org>
Authored: Wed Nov 29 16:09:50 2017 -0800
Committer: sergey <se...@apache.org>
Committed: Wed Nov 29 16:09:50 2017 -0800
----------------------------------------------------------------------
.../ql/exec/tez/GuaranteedTasksAllocator.java | 25 ++++++----
.../ql/exec/tez/QueryAllocationManager.java | 9 ++++
.../hive/ql/exec/tez/WorkloadManager.java | 7 +--
.../hive/ql/exec/tez/TestWorkloadManager.java | 52 +++++++++++++++++---
4 files changed, 73 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/4ccea29b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
index d6d6f07..5f6ab05 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
@@ -17,18 +17,15 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
-import org.apache.hadoop.hive.ql.exec.tez.AmPluginNode.AmPluginInfo;
-
-import org.apache.hadoop.hive.ql.exec.tez.LlapPluginEndpointClient.UpdateRequestContext;
-
import com.google.common.annotations.VisibleForTesting;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.AsyncPbRpcProxy.ExecuteRequestCallback;
import org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryRequestProto;
import org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryResponseProto;
+import org.apache.hadoop.hive.ql.exec.tez.AmPluginNode.AmPluginInfo;
+import org.apache.hadoop.hive.ql.exec.tez.LlapPluginEndpointClient.UpdateRequestContext;
import org.apache.hadoop.hive.ql.optimizer.physical.LlapClusterStateForCompile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,6 +40,7 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
private final LlapClusterStateForCompile clusterState;
private final Thread clusterStateUpdateThread;
private final LlapPluginEndpointClient amCommunicator;
+ private Runnable clusterChangedCallback;
public GuaranteedTasksAllocator(
Configuration conf, LlapPluginEndpointClient amCommunicator) {
@@ -50,10 +48,16 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
this.clusterState = new LlapClusterStateForCompile(conf, CLUSTER_INFO_UPDATE_INTERVAL_MS);
this.amCommunicator = amCommunicator;
this.clusterStateUpdateThread = new Thread(new Runnable() {
+ private int lastExecutorCount = -1;
@Override
public void run() {
while (true) {
- getExecutorCount(true); // Trigger an update if needed.
+ int executorCount = getExecutorCount(true); // Trigger an update if needed.
+
+ if (executorCount != lastExecutorCount && lastExecutorCount >= 0) {
+ clusterChangedCallback.run();
+ }
+ lastExecutorCount = executorCount;
try {
Thread.sleep(CLUSTER_INFO_UPDATE_INTERVAL_MS / 2);
} catch (InterruptedException e) {
@@ -78,10 +82,6 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
clusterStateUpdateThread.interrupt(); // Don't wait for the thread.
}
- public void initClusterInfo() {
- clusterState.initClusterInfo();
- }
-
@VisibleForTesting
protected int getExecutorCount(boolean allowUpdate) {
if (allowUpdate && !clusterState.initClusterInfo()) {
@@ -191,4 +191,9 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
endpointVersion = version;
}
}
+
+ @Override
+ public void setClusterChangedCallback(Runnable clusterChangedCallback) {
+ this.clusterChangedCallback = clusterChangedCallback;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/4ccea29b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
index acacfd0..f1da17b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
@@ -19,6 +19,10 @@ package org.apache.hadoop.hive.ql.exec.tez;
import java.util.List;
+/**
+ * Represents the mapping from logical resource allocations to queries from WM, to actual physical
+ * allocations performed using some implementation of a scheduler.
+ */
interface QueryAllocationManager {
void start();
void stop();
@@ -30,4 +34,9 @@ interface QueryAllocationManager {
* @param sessions Sessions to update based on their allocation fraction.
*/
void updateSessionsAsync(Double totalMaxAlloc, List<WmTezSession> sessions);
+
+ /**
+ * Sets a callback to be invoked on cluster changes relevant to resource allocation.
+ */
+ void setClusterChangedCallback(Runnable clusterChangedCallback);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/4ccea29b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index 388a4f4..25a8ff2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -190,6 +190,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
this.conf = conf;
this.totalQueryParallelism = determineQueryParallelism(plan);
this.initRpFuture = this.updateResourcePlanAsync(plan);
+ this.allocationManager = qam;
+ this.allocationManager.setClusterChangedCallback(() -> notifyOfClusterStateChange());
+
this.amComm = amComm;
if (this.amComm != null) {
this.amComm.init(conf);
@@ -201,7 +204,6 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
tezAmPool = new TezSessionPool<>(conf, totalQueryParallelism, true,
oldSession -> createSession(oldSession == null ? null : oldSession.getConf()));
restrictedConfig = new RestrictedConfigChecker(conf);
- allocationManager = qam;
// Only creates the expiration tracker if expiration is configured.
expirationTracker = SessionExpirationTracker.create(conf, this);
@@ -1294,8 +1296,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
}
- // TODO: use this
- public void nofityOfClusterStateChange() {
+ public void notifyOfClusterStateChange() {
currentLock.lock();
try {
current.hasClusterStateChanged = true;
http://git-wip-us.apache.org/repos/asf/hive/blob/4ccea29b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
index 4cb9172..bceb31d 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
@@ -109,10 +109,14 @@ public class TestWorkloadManager {
isCalled = true;
}
- void assertWasCalled() {
+ void assertWasCalledAndReset() {
assertTrue(isCalled);
isCalled = false;
}
+
+ @Override
+ public void setClusterChangedCallback(Runnable clusterChangedCallback) {
+ }
}
public static WMResourcePlan plan() {
@@ -157,6 +161,16 @@ public class TestWorkloadManager {
super(null, yarnQueue, conf, qam, plan);
}
+ @Override
+ public void notifyOfClusterStateChange() {
+ super.notifyOfClusterStateChange();
+ try {
+ ensureWm();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
private static WMFullResourcePlan createDummyPlan(int numSessions) {
WMFullResourcePlan plan = new WMFullResourcePlan(new WMResourcePlan("rp"),
Lists.newArrayList(pool("llap", numSessions, 1.0f)));
@@ -257,13 +271,13 @@ public class TestWorkloadManager {
WmTezSession session = (WmTezSession) wm.getSession(
null, new MappingInput("user", null), conf);
assertEquals(1.0, session.getClusterFraction(), EPSILON);
- qam.assertWasCalled();
+ qam.assertWasCalledAndReset();
WmTezSession session2 = (WmTezSession) session.reopen(conf, null);
assertNotSame(session, session2);
wm.addTestEvent().get();
assertEquals(session2.toString(), 1.0, session2.getClusterFraction(), EPSILON);
assertEquals(0.0, session.getClusterFraction(), EPSILON);
- qam.assertWasCalled();
+ qam.assertWasCalledAndReset();
}
@Test(timeout = 10000)
@@ -275,23 +289,23 @@ public class TestWorkloadManager {
wm.start();
WmTezSession session = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf);
assertEquals(1.0, session.getClusterFraction(), EPSILON);
- qam.assertWasCalled();
+ qam.assertWasCalledAndReset();
WmTezSession session2 = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf);
assertEquals(0.5, session.getClusterFraction(), EPSILON);
assertEquals(0.5, session2.getClusterFraction(), EPSILON);
- qam.assertWasCalled();
+ qam.assertWasCalledAndReset();
assertNotSame(session, session2);
session.destroy(); // Destroy before returning to the pool.
assertEquals(1.0, session2.getClusterFraction(), EPSILON);
assertEquals(0.0, session.getClusterFraction(), EPSILON);
- qam.assertWasCalled();
+ qam.assertWasCalledAndReset();
// We never lose pool session, so we should still be able to get.
session = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf);
session.returnToSessionManager();
assertEquals(1.0, session2.getClusterFraction(), EPSILON);
assertEquals(0.0, session.getClusterFraction(), EPSILON);
- qam.assertWasCalled();
+ qam.assertWasCalledAndReset();
}
@Test(timeout = 10000)
@@ -404,6 +418,30 @@ public class TestWorkloadManager {
}
@Test(timeout=10000)
+ public void testClusterChange() throws Exception {
+ final HiveConf conf = createConf();
+ MockQam qam = new MockQam();
+ WMFullResourcePlan plan = new WMFullResourcePlan(plan(), Lists.newArrayList(pool("A", 2, 1f)));
+ plan.getPlan().setDefaultPoolPath("A");
+ final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
+ wm.start();
+ WmTezSession session1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf),
+ session2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf);
+ assertEquals(0.5, session1.getClusterFraction(), EPSILON);
+ assertEquals(0.5, session2.getClusterFraction(), EPSILON);
+ qam.assertWasCalledAndReset();
+
+ // If cluster info changes, qam should be called with the same fractions.
+ wm.notifyOfClusterStateChange();
+ assertEquals(0.5, session1.getClusterFraction(), EPSILON);
+ assertEquals(0.5, session2.getClusterFraction(), EPSILON);
+ qam.assertWasCalledAndReset();
+
+ session1.returnToSessionManager();
+ session2.returnToSessionManager();
+ }
+
+ @Test(timeout=10000)
public void testReuseWithQueueing() throws Exception {
final HiveConf conf = createConf();
MockQam qam = new MockQam();