You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/03/29 19:33:58 UTC

hive git commit: HIVE-19061 : WM needs to output an event for allocation update (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

Repository: hive
Updated Branches:
  refs/heads/master 67c8ac103 -> e02597af1


HIVE-19061 : WM needs to output an event for allocation update (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e02597af
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e02597af
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e02597af

Branch: refs/heads/master
Commit: e02597af17594f9e6ecdda1e8e2a34e62c99053d
Parents: 67c8ac1
Author: sergey <se...@apache.org>
Authored: Thu Mar 29 12:29:40 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Thu Mar 29 12:29:40 2018 -0700

----------------------------------------------------------------------
 .../ql/exec/tez/GuaranteedTasksAllocator.java   |  1 +
 .../apache/hadoop/hive/ql/exec/tez/WmEvent.java |  4 ++-
 .../hadoop/hive/ql/exec/tez/WmTezSession.java   |  8 +++--
 .../hive/ql/exec/tez/WorkloadManager.java       | 31 +++++++++++++++-----
 .../hive/ql/exec/tez/TestWorkloadManager.java   | 12 ++++----
 5 files changed, 40 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/e02597af/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
index d3b4e07..6d7fc25 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
@@ -129,6 +129,7 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
         // we'd produce 2-2-2-2-0 as we round 1.6; whereas adding the last delta to the next query
         // we'd round 1.6-1.2-1.8-1.4-2.0 and thus give out 2-1-2-1-2, as intended.
         // Note that fractions don't have to all be the same like in this example.
+        assert session.hasClusterFraction();
         double fraction = session.getClusterFraction();
         double allocation = fraction * totalCount + lastDelta;
         double roundedAlloc = Math.round(allocation);

http://git-wip-us.apache.org/repos/asf/hive/blob/e02597af/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmEvent.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmEvent.java
index 33341ad..fae68ef 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmEvent.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmEvent.java
@@ -31,6 +31,7 @@ public class WmEvent {
   private static final Logger LOG = LoggerFactory.getLogger(WmEvent.class);
   enum EventType {
     GET, // get session
+    UPDATE, // update session allocation
     KILL, // kill query
     DESTROY, // destroy session
     RESTART, // restart session
@@ -51,7 +52,8 @@ public class WmEvent {
     WmTezSessionInfo(WmTezSession wmTezSession) {
       this.poolName = wmTezSession.getPoolName();
       this.sessionId = wmTezSession.getSessionId();
-      this.clusterPercent = wmTezSession.getClusterFraction() * 100.0;
+      this.clusterPercent = wmTezSession.hasClusterFraction()
+          ? wmTezSession.getClusterFraction() * 100.0 : 0;
     }
 
     public String getPoolName() {

http://git-wip-us.apache.org/repos/asf/hive/blob/e02597af/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
index 1cf5493..fa2b02e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
@@ -38,7 +38,7 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
   @JsonProperty("poolName")
   private String poolName;
   @JsonProperty("clusterFraction")
-  private double clusterFraction;
+  private Double clusterFraction;
   /**
    * The reason to kill an AM. Note that this is for the entire session, not just for a query.
    * Once set, this can never be unset because you can only kill the session once.
@@ -174,7 +174,11 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
 
   void clearWm() {
     this.poolName = null;
-    this.clusterFraction = 0f;
+    this.clusterFraction = null;
+  }
+
+  public boolean hasClusterFraction() {
+    return this.clusterFraction != null;
   }
 
   public double getClusterFraction() {

http://git-wip-us.apache.org/repos/asf/hive/blob/e02597af/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index 96158fc..65e3c82 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import com.google.common.math.DoubleMath;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -64,6 +65,7 @@ import org.apache.hadoop.hive.metastore.api.WMTrigger;
 import org.apache.hadoop.hive.ql.exec.tez.AmPluginNode.AmPluginInfo;
 import org.apache.hadoop.hive.ql.exec.tez.TezSessionState.HiveResources;
 import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput;
+import org.apache.hadoop.hive.ql.exec.tez.WmEvent.EventType;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.session.KillQuery;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -731,14 +733,16 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
 
   private void dumpPoolState(PoolState ps, List<String> set) {
     StringBuilder sb = new StringBuilder();
-    sb.append("POOL ").append(ps.fullName).append(": qp ").append(ps.queryParallelism).append(", %% ")
-      .append(ps.finalFraction).append(", sessions: ").append(ps.sessions.size())
-      .append(", initializing: ").append(ps.initializingSessions.size()).append(", queued: ").append(ps.queue.size());
+    sb.append("POOL ").append(ps.fullName).append(": qp ").append(ps.queryParallelism)
+      .append(", %% ").append(ps.finalFraction).append(", sessions: ").append(ps.sessions.size())
+      .append(", initializing: ").append(ps.initializingSessions.size()).append(", queued: ")
+      .append(ps.queue.size());
     set.add(sb.toString());
     sb.setLength(0);
     for (WmTezSession session : ps.sessions) {
-      sb.append("RUNNING: ").append(session.getClusterFraction()).append(" (")
-        .append(session.getAllocationState()).append(") => ").append(session.getSessionId());
+      double cf = session.hasClusterFraction() ? session.getClusterFraction() : 0;
+      sb.append("RUNNING: ").append(cf).append(" (") .append(session.getAllocationState())
+        .append(") => ").append(session.getSessionId());
       set.add(sb.toString());
       sb.setLength(0);
     }
@@ -1797,7 +1801,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
         if (totalSessions == 0) return 0;
         double allocation = finalFractionRemaining / totalSessions;
         for (WmTezSession session : sessions) {
-          session.setClusterFraction(allocation);
+          updateSessionAllocationWithEvent(session, allocation);
         }
         // Do not give out the capacity of the initializing sessions to the running ones;
         // we expect init to be fast.
@@ -1806,7 +1810,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
         if (sessions.isEmpty()) return 0;
         boolean isFirst = true;
         for (WmTezSession session : sessions) {
-          session.setClusterFraction(isFirst ? finalFractionRemaining : 0);
+          updateSessionAllocationWithEvent(session, isFirst ? finalFractionRemaining : 0);
           isFirst = false;
         }
         return finalFractionRemaining;
@@ -1815,6 +1819,19 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       }
     }
 
+    private void updateSessionAllocationWithEvent(WmTezSession session, double allocation) {
+      WmEvent event = null;
+      WmContext ctx = session.getWmContext();
+      if (ctx != null && session.hasClusterFraction()
+          && !DoubleMath.fuzzyEquals(session.getClusterFraction(), allocation, 0.0001f)) {
+        event = new WmEvent(EventType.UPDATE);
+      }
+      session.setClusterFraction(allocation);
+      if (event != null) {
+        event.endEvent(session);
+      }
+    }
+
     public LinkedList<WmTezSession> getSessions() {
       return sessions;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/e02597af/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
index fb32e90..61bf84c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
@@ -314,7 +314,7 @@ public class TestWorkloadManager {
     assertNotSame(session, session2);
     wm.addTestEvent().get();
     assertEquals(session2.toString(), 1.0, session2.getClusterFraction(), EPSILON);
-    assertEquals(0.0, session.getClusterFraction(), EPSILON);
+    assertFalse(session.hasClusterFraction());
     qam.assertWasCalledAndReset();
   }
 
@@ -335,14 +335,14 @@ public class TestWorkloadManager {
     assertNotSame(session, session2);
     session.destroy(); // Destroy before returning to the pool.
     assertEquals(1.0, session2.getClusterFraction(), EPSILON);
-    assertEquals(0.0, session.getClusterFraction(), EPSILON);
+    assertFalse(session.hasClusterFraction());
     qam.assertWasCalledAndReset();
 
     // We never lose pool session, so we should still be able to get.
     session = (WmTezSession) wm.getSession(null, mappingInput("user"), conf);
     session.returnToSessionManager();
     assertEquals(1.0, session2.getClusterFraction(), EPSILON);
-    assertEquals(0.0, session.getClusterFraction(), EPSILON);
+    assertFalse(session.hasClusterFraction());
     qam.assertWasCalledAndReset();
   }
 
@@ -1095,7 +1095,7 @@ public class TestWorkloadManager {
     assertEquals(0, allSessionProviders.get("B.x").getSessions().size());
     assertEquals(0, allSessionProviders.get("B.y").getSessions().size());
     assertEquals(0, allSessionProviders.get("C").getSessions().size());
-    assertEquals(0.0f, sessionA1.getClusterFraction(), EPSILON);
+    assertFalse(sessionA1.hasClusterFraction());
     assertFalse(allSessionProviders.get("A").getSessions().contains(sessionA1));
   }
 
@@ -1213,7 +1213,7 @@ public class TestWorkloadManager {
     assertNotNull(theOnlySession);
     theOnlySession.setWaitForAmRegistryFuture(null);
     assertNull(oldSession.getPoolName());
-    assertEquals(0f, oldSession.getClusterFraction(), EPSILON);
+    assertFalse(oldSession.hasClusterFraction());
     pool.returnSession(theOnlySession);
     // Make sure we can actually get a session still - parallelism/etc. should not be affected.
     WmTezSession result = (WmTezSession) wm.getSession(null, mappingInput("A"), conf);
@@ -1225,7 +1225,7 @@ public class TestWorkloadManager {
 
   private void assertKilledByWm(WmTezSession session) {
     assertNull(session.getPoolName());
-    assertEquals(0f, session.getClusterFraction(), EPSILON);
+    assertFalse(session.hasClusterFraction());
     assertTrue(session.isIrrelevantForWm());
   }