You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/11/30 00:13:59 UTC

hive git commit: HIVE-17905 : propagate background LLAP cluster changes to WM (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

Repository: hive
Updated Branches:
  refs/heads/master 73ea9f595 -> 4ccea29bd


HIVE-17905 : propagate background LLAP cluster changes to WM (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4ccea29b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4ccea29b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4ccea29b

Branch: refs/heads/master
Commit: 4ccea29bdc9f1f0af3611e3abba1e8af5b898ef8
Parents: 73ea9f5
Author: sergey <se...@apache.org>
Authored: Wed Nov 29 16:09:50 2017 -0800
Committer: sergey <se...@apache.org>
Committed: Wed Nov 29 16:09:50 2017 -0800

----------------------------------------------------------------------
 .../ql/exec/tez/GuaranteedTasksAllocator.java   | 25 ++++++----
 .../ql/exec/tez/QueryAllocationManager.java     |  9 ++++
 .../hive/ql/exec/tez/WorkloadManager.java       |  7 +--
 .../hive/ql/exec/tez/TestWorkloadManager.java   | 52 +++++++++++++++++---
 4 files changed, 73 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/4ccea29b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
index d6d6f07..5f6ab05 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
@@ -17,18 +17,15 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
-import org.apache.hadoop.hive.ql.exec.tez.AmPluginNode.AmPluginInfo;
-
-import org.apache.hadoop.hive.ql.exec.tez.LlapPluginEndpointClient.UpdateRequestContext;
-
 import com.google.common.annotations.VisibleForTesting;
 import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.AsyncPbRpcProxy.ExecuteRequestCallback;
 import org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryRequestProto;
 import org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryResponseProto;
+import org.apache.hadoop.hive.ql.exec.tez.AmPluginNode.AmPluginInfo;
+import org.apache.hadoop.hive.ql.exec.tez.LlapPluginEndpointClient.UpdateRequestContext;
 import org.apache.hadoop.hive.ql.optimizer.physical.LlapClusterStateForCompile;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,6 +40,7 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
   private final LlapClusterStateForCompile clusterState;
   private final Thread clusterStateUpdateThread;
   private final LlapPluginEndpointClient amCommunicator;
+  private Runnable clusterChangedCallback;
 
   public GuaranteedTasksAllocator(
       Configuration conf, LlapPluginEndpointClient amCommunicator) {
@@ -50,10 +48,16 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
     this.clusterState = new LlapClusterStateForCompile(conf, CLUSTER_INFO_UPDATE_INTERVAL_MS);
     this.amCommunicator = amCommunicator;
     this.clusterStateUpdateThread = new Thread(new Runnable() {
+      private int lastExecutorCount = -1;
       @Override
       public void run() {
         while (true) {
-          getExecutorCount(true); // Trigger an update if needed.
+          int executorCount = getExecutorCount(true); // Trigger an update if needed.
+          
+          if (executorCount != lastExecutorCount && lastExecutorCount >= 0) {
+            clusterChangedCallback.run();
+          }
+          lastExecutorCount = executorCount;
           try {
             Thread.sleep(CLUSTER_INFO_UPDATE_INTERVAL_MS / 2);
           } catch (InterruptedException e) {
@@ -78,10 +82,6 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
     clusterStateUpdateThread.interrupt(); // Don't wait for the thread.
   }
 
-  public void initClusterInfo() {
-    clusterState.initClusterInfo();
-  }
-
   @VisibleForTesting
   protected int getExecutorCount(boolean allowUpdate) {
     if (allowUpdate && !clusterState.initClusterInfo()) {
@@ -191,4 +191,9 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
       endpointVersion = version;
     }
   }
+
+  @Override
+  public void setClusterChangedCallback(Runnable clusterChangedCallback) {
+    this.clusterChangedCallback = clusterChangedCallback;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/4ccea29b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
index acacfd0..f1da17b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
@@ -19,6 +19,10 @@ package org.apache.hadoop.hive.ql.exec.tez;
 
 import java.util.List;
 
+/**
+ * Represents the mapping from logical resource allocations to queries from WM, to actual physical
+ * allocations performed using some implementation of a scheduler.
+ */
 interface QueryAllocationManager {
   void start();
   void stop();
@@ -30,4 +34,9 @@ interface QueryAllocationManager {
    * @param sessions Sessions to update based on their allocation fraction.
    */
   void updateSessionsAsync(Double totalMaxAlloc, List<WmTezSession> sessions);
+
+  /**
+   * Sets a callback to be invoked on cluster changes relevant to resource allocation.
+   */
+  void setClusterChangedCallback(Runnable clusterChangedCallback);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/4ccea29b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index 388a4f4..25a8ff2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -190,6 +190,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     this.conf = conf;
     this.totalQueryParallelism = determineQueryParallelism(plan);
     this.initRpFuture = this.updateResourcePlanAsync(plan);
+    this.allocationManager = qam;
+    this.allocationManager.setClusterChangedCallback(() -> notifyOfClusterStateChange());
+
     this.amComm = amComm;
     if (this.amComm != null) {
       this.amComm.init(conf);
@@ -201,7 +204,6 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     tezAmPool = new TezSessionPool<>(conf, totalQueryParallelism, true,
       oldSession -> createSession(oldSession == null ? null : oldSession.getConf()));
     restrictedConfig = new RestrictedConfigChecker(conf);
-    allocationManager = qam;
     // Only creates the expiration tracker if expiration is configured.
     expirationTracker = SessionExpirationTracker.create(conf, this);
 
@@ -1294,8 +1296,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     }
   }
 
-  // TODO: use this
-  public void nofityOfClusterStateChange() {
+  public void notifyOfClusterStateChange() {
     currentLock.lock();
     try {
       current.hasClusterStateChanged = true;

http://git-wip-us.apache.org/repos/asf/hive/blob/4ccea29b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
index 4cb9172..bceb31d 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
@@ -109,10 +109,14 @@ public class TestWorkloadManager {
       isCalled = true;
     }
 
-    void assertWasCalled() {
+    void assertWasCalledAndReset() {
       assertTrue(isCalled);
       isCalled = false;
     }
+
+    @Override
+    public void setClusterChangedCallback(Runnable clusterChangedCallback) {
+    }
   }
 
   public static WMResourcePlan plan() {
@@ -157,6 +161,16 @@ public class TestWorkloadManager {
       super(null, yarnQueue, conf, qam, plan);
     }
 
+    @Override
+    public void notifyOfClusterStateChange() {
+      super.notifyOfClusterStateChange();
+      try {
+        ensureWm();
+      } catch (InterruptedException | ExecutionException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
     private static WMFullResourcePlan createDummyPlan(int numSessions) {
       WMFullResourcePlan plan = new WMFullResourcePlan(new WMResourcePlan("rp"), 
           Lists.newArrayList(pool("llap", numSessions, 1.0f)));
@@ -257,13 +271,13 @@ public class TestWorkloadManager {
     WmTezSession session = (WmTezSession) wm.getSession(
         null, new MappingInput("user", null), conf);
     assertEquals(1.0, session.getClusterFraction(), EPSILON);
-    qam.assertWasCalled();
+    qam.assertWasCalledAndReset();
     WmTezSession session2 = (WmTezSession) session.reopen(conf, null);
     assertNotSame(session, session2);
     wm.addTestEvent().get();
     assertEquals(session2.toString(), 1.0, session2.getClusterFraction(), EPSILON);
     assertEquals(0.0, session.getClusterFraction(), EPSILON);
-    qam.assertWasCalled();
+    qam.assertWasCalledAndReset();
   }
 
   @Test(timeout = 10000)
@@ -275,23 +289,23 @@ public class TestWorkloadManager {
     wm.start();
     WmTezSession session = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf);
     assertEquals(1.0, session.getClusterFraction(), EPSILON);
-    qam.assertWasCalled();
+    qam.assertWasCalledAndReset();
     WmTezSession session2 = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf);
     assertEquals(0.5, session.getClusterFraction(), EPSILON);
     assertEquals(0.5, session2.getClusterFraction(), EPSILON);
-    qam.assertWasCalled();
+    qam.assertWasCalledAndReset();
     assertNotSame(session, session2);
     session.destroy(); // Destroy before returning to the pool.
     assertEquals(1.0, session2.getClusterFraction(), EPSILON);
     assertEquals(0.0, session.getClusterFraction(), EPSILON);
-    qam.assertWasCalled();
+    qam.assertWasCalledAndReset();
 
     // We never lose pool session, so we should still be able to get.
     session = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf);
     session.returnToSessionManager();
     assertEquals(1.0, session2.getClusterFraction(), EPSILON);
     assertEquals(0.0, session.getClusterFraction(), EPSILON);
-    qam.assertWasCalled();
+    qam.assertWasCalledAndReset();
   }
 
   @Test(timeout = 10000)
@@ -404,6 +418,30 @@ public class TestWorkloadManager {
   }
 
   @Test(timeout=10000)
+  public void testClusterChange() throws Exception {
+    final HiveConf conf = createConf();
+    MockQam qam = new MockQam();
+    WMFullResourcePlan plan = new WMFullResourcePlan(plan(), Lists.newArrayList(pool("A", 2, 1f)));
+    plan.getPlan().setDefaultPoolPath("A");
+    final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
+    wm.start();
+    WmTezSession session1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf),
+        session2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf);
+    assertEquals(0.5, session1.getClusterFraction(), EPSILON);
+    assertEquals(0.5, session2.getClusterFraction(), EPSILON);
+    qam.assertWasCalledAndReset();
+
+    // If cluster info changes, qam should be called with the same fractions.
+    wm.notifyOfClusterStateChange();
+    assertEquals(0.5, session1.getClusterFraction(), EPSILON);
+    assertEquals(0.5, session2.getClusterFraction(), EPSILON);
+    qam.assertWasCalledAndReset();
+
+    session1.returnToSessionManager();
+    session2.returnToSessionManager();
+  }
+
+  @Test(timeout=10000)
   public void testReuseWithQueueing() throws Exception {
     final HiveConf conf = createConf();
     MockQam qam = new MockQam();