You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/03/29 19:33:58 UTC
hive git commit: HIVE-19061 : WM needs to output an event for
allocation update (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/master 67c8ac103 -> e02597af1
HIVE-19061 : WM needs to output an event for allocation update (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e02597af
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e02597af
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e02597af
Branch: refs/heads/master
Commit: e02597af17594f9e6ecdda1e8e2a34e62c99053d
Parents: 67c8ac1
Author: sergey <se...@apache.org>
Authored: Thu Mar 29 12:29:40 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Thu Mar 29 12:29:40 2018 -0700
----------------------------------------------------------------------
.../ql/exec/tez/GuaranteedTasksAllocator.java | 1 +
.../apache/hadoop/hive/ql/exec/tez/WmEvent.java | 4 ++-
.../hadoop/hive/ql/exec/tez/WmTezSession.java | 8 +++--
.../hive/ql/exec/tez/WorkloadManager.java | 31 +++++++++++++++-----
.../hive/ql/exec/tez/TestWorkloadManager.java | 12 ++++----
5 files changed, 40 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/e02597af/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
index d3b4e07..6d7fc25 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
@@ -129,6 +129,7 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
// we'd produce 2-2-2-2-0 as we round 1.6; whereas adding the last delta to the next query
// we'd round 1.6-1.2-1.8-1.4-2.0 and thus give out 2-1-2-1-2, as intended.
// Note that fractions don't have to all be the same like in this example.
+ assert session.hasClusterFraction();
double fraction = session.getClusterFraction();
double allocation = fraction * totalCount + lastDelta;
double roundedAlloc = Math.round(allocation);
http://git-wip-us.apache.org/repos/asf/hive/blob/e02597af/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmEvent.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmEvent.java
index 33341ad..fae68ef 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmEvent.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmEvent.java
@@ -31,6 +31,7 @@ public class WmEvent {
private static final Logger LOG = LoggerFactory.getLogger(WmEvent.class);
enum EventType {
GET, // get session
+ UPDATE, // update session allocation
KILL, // kill query
DESTROY, // destroy session
RESTART, // restart session
@@ -51,7 +52,8 @@ public class WmEvent {
WmTezSessionInfo(WmTezSession wmTezSession) {
this.poolName = wmTezSession.getPoolName();
this.sessionId = wmTezSession.getSessionId();
- this.clusterPercent = wmTezSession.getClusterFraction() * 100.0;
+ this.clusterPercent = wmTezSession.hasClusterFraction()
+ ? wmTezSession.getClusterFraction() * 100.0 : 0;
}
public String getPoolName() {
http://git-wip-us.apache.org/repos/asf/hive/blob/e02597af/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
index 1cf5493..fa2b02e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
@@ -38,7 +38,7 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
@JsonProperty("poolName")
private String poolName;
@JsonProperty("clusterFraction")
- private double clusterFraction;
+ private Double clusterFraction;
/**
* The reason to kill an AM. Note that this is for the entire session, not just for a query.
* Once set, this can never be unset because you can only kill the session once.
@@ -174,7 +174,11 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
void clearWm() {
this.poolName = null;
- this.clusterFraction = 0f;
+ this.clusterFraction = null;
+ }
+
+ public boolean hasClusterFraction() {
+ return this.clusterFraction != null;
}
public double getClusterFraction() {
http://git-wip-us.apache.org/repos/asf/hive/blob/e02597af/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index 96158fc..65e3c82 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import com.google.common.math.DoubleMath;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -64,6 +65,7 @@ import org.apache.hadoop.hive.metastore.api.WMTrigger;
import org.apache.hadoop.hive.ql.exec.tez.AmPluginNode.AmPluginInfo;
import org.apache.hadoop.hive.ql.exec.tez.TezSessionState.HiveResources;
import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput;
+import org.apache.hadoop.hive.ql.exec.tez.WmEvent.EventType;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.session.KillQuery;
import org.apache.hadoop.hive.ql.session.SessionState;
@@ -731,14 +733,16 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
private void dumpPoolState(PoolState ps, List<String> set) {
StringBuilder sb = new StringBuilder();
- sb.append("POOL ").append(ps.fullName).append(": qp ").append(ps.queryParallelism).append(", %% ")
- .append(ps.finalFraction).append(", sessions: ").append(ps.sessions.size())
- .append(", initializing: ").append(ps.initializingSessions.size()).append(", queued: ").append(ps.queue.size());
+ sb.append("POOL ").append(ps.fullName).append(": qp ").append(ps.queryParallelism)
+ .append(", %% ").append(ps.finalFraction).append(", sessions: ").append(ps.sessions.size())
+ .append(", initializing: ").append(ps.initializingSessions.size()).append(", queued: ")
+ .append(ps.queue.size());
set.add(sb.toString());
sb.setLength(0);
for (WmTezSession session : ps.sessions) {
- sb.append("RUNNING: ").append(session.getClusterFraction()).append(" (")
- .append(session.getAllocationState()).append(") => ").append(session.getSessionId());
+ double cf = session.hasClusterFraction() ? session.getClusterFraction() : 0;
+ sb.append("RUNNING: ").append(cf).append(" (") .append(session.getAllocationState())
+ .append(") => ").append(session.getSessionId());
set.add(sb.toString());
sb.setLength(0);
}
@@ -1797,7 +1801,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
if (totalSessions == 0) return 0;
double allocation = finalFractionRemaining / totalSessions;
for (WmTezSession session : sessions) {
- session.setClusterFraction(allocation);
+ updateSessionAllocationWithEvent(session, allocation);
}
// Do not give out the capacity of the initializing sessions to the running ones;
// we expect init to be fast.
@@ -1806,7 +1810,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
if (sessions.isEmpty()) return 0;
boolean isFirst = true;
for (WmTezSession session : sessions) {
- session.setClusterFraction(isFirst ? finalFractionRemaining : 0);
+ updateSessionAllocationWithEvent(session, isFirst ? finalFractionRemaining : 0);
isFirst = false;
}
return finalFractionRemaining;
@@ -1815,6 +1819,19 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
}
+ private void updateSessionAllocationWithEvent(WmTezSession session, double allocation) {
+ WmEvent event = null;
+ WmContext ctx = session.getWmContext();
+ if (ctx != null && session.hasClusterFraction()
+ && !DoubleMath.fuzzyEquals(session.getClusterFraction(), allocation, 0.0001f)) {
+ event = new WmEvent(EventType.UPDATE);
+ }
+ session.setClusterFraction(allocation);
+ if (event != null) {
+ event.endEvent(session);
+ }
+ }
+
public LinkedList<WmTezSession> getSessions() {
return sessions;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e02597af/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
index fb32e90..61bf84c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
@@ -314,7 +314,7 @@ public class TestWorkloadManager {
assertNotSame(session, session2);
wm.addTestEvent().get();
assertEquals(session2.toString(), 1.0, session2.getClusterFraction(), EPSILON);
- assertEquals(0.0, session.getClusterFraction(), EPSILON);
+ assertFalse(session.hasClusterFraction());
qam.assertWasCalledAndReset();
}
@@ -335,14 +335,14 @@ public class TestWorkloadManager {
assertNotSame(session, session2);
session.destroy(); // Destroy before returning to the pool.
assertEquals(1.0, session2.getClusterFraction(), EPSILON);
- assertEquals(0.0, session.getClusterFraction(), EPSILON);
+ assertFalse(session.hasClusterFraction());
qam.assertWasCalledAndReset();
// We never lose pool session, so we should still be able to get.
session = (WmTezSession) wm.getSession(null, mappingInput("user"), conf);
session.returnToSessionManager();
assertEquals(1.0, session2.getClusterFraction(), EPSILON);
- assertEquals(0.0, session.getClusterFraction(), EPSILON);
+ assertFalse(session.hasClusterFraction());
qam.assertWasCalledAndReset();
}
@@ -1095,7 +1095,7 @@ public class TestWorkloadManager {
assertEquals(0, allSessionProviders.get("B.x").getSessions().size());
assertEquals(0, allSessionProviders.get("B.y").getSessions().size());
assertEquals(0, allSessionProviders.get("C").getSessions().size());
- assertEquals(0.0f, sessionA1.getClusterFraction(), EPSILON);
+ assertFalse(sessionA1.hasClusterFraction());
assertFalse(allSessionProviders.get("A").getSessions().contains(sessionA1));
}
@@ -1213,7 +1213,7 @@ public class TestWorkloadManager {
assertNotNull(theOnlySession);
theOnlySession.setWaitForAmRegistryFuture(null);
assertNull(oldSession.getPoolName());
- assertEquals(0f, oldSession.getClusterFraction(), EPSILON);
+ assertFalse(oldSession.hasClusterFraction());
pool.returnSession(theOnlySession);
// Make sure we can actually get a session still - parallelism/etc. should not be affected.
WmTezSession result = (WmTezSession) wm.getSession(null, mappingInput("A"), conf);
@@ -1225,7 +1225,7 @@ public class TestWorkloadManager {
private void assertKilledByWm(WmTezSession session) {
assertNull(session.getPoolName());
- assertEquals(0f, session.getClusterFraction(), EPSILON);
+ assertFalse(session.hasClusterFraction());
assertTrue(session.isIrrelevantForWm());
}