You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/11/02 18:40:50 UTC
[1/3] hive git commit: HIVE-17841 : implement applying the resource
plan (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/master c5a9673a0 -> 77b99e4c9
http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
index 258a865..94f42dd 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
@@ -23,27 +23,28 @@ import static org.junit.Assert.*;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.*;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.google.common.util.concurrent.SettableFuture;
import com.google.common.collect.Lists;
-import java.io.IOException;
import java.lang.Thread.State;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpHivePool;
import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpResourcePlan;
import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpUserMapping;
import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpUserMappingType;
import org.apache.tez.dag.api.TezConfiguration;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TestWorkloadManager {
+ @SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(TestWorkloadManager.class);
private final class GetSessionRunnable implements Runnable {
@@ -91,7 +92,7 @@ public class TestWorkloadManager {
}
@Override
- public void updateSessionsAsync(double totalMaxAlloc, List<WmTezSession> sessions) {
+ public void updateSessionsAsync(Double totalMaxAlloc, List<WmTezSession> sessions) {
isCalled = true;
}
@@ -105,12 +106,12 @@ public class TestWorkloadManager {
public WorkloadManagerForTest(String yarnQueue, HiveConf conf, int numSessions,
QueryAllocationManager qam) {
- super(yarnQueue, conf, qam, null, createDummyPlan(numSessions));
+ super(yarnQueue, conf, qam, createDummyPlan(numSessions));
}
public WorkloadManagerForTest(String yarnQueue, HiveConf conf,
QueryAllocationManager qam, TmpResourcePlan plan) {
- super(yarnQueue, conf, qam, null, plan);
+ super(yarnQueue, conf, qam, plan);
}
private static TmpResourcePlan createDummyPlan(int numSessions) {
@@ -120,13 +121,42 @@ public class TestWorkloadManager {
}
@Override
- protected WmTezSession createSessionObject(String sessionId) {
- return new SampleTezSessionState(sessionId, this, new HiveConf(getConf()));
+ protected WmTezSession createSessionObject(String sessionId, HiveConf conf) {
+ conf = conf == null ? new HiveConf(getConf()) : conf;
+ return new SampleTezSessionState(sessionId, this, conf);
+ }
+
+ @Override
+ public TezSessionState getSession(
+ TezSessionState session, String userName, HiveConf conf) throws Exception {
+ // We want to wait for the iteration to finish and set the cluster fraction.
+ TezSessionState state = super.getSession(session, userName, conf);
+ ensureWm();
+ return state;
}
@Override
- protected boolean ensureAmIsRegistered(WmTezSession session) throws Exception {
- return true;
+ public void destroy(TezSessionState session) throws Exception {
+ super.destroy(session);
+ ensureWm();
+ }
+
+ private void ensureWm() throws InterruptedException, ExecutionException {
+ addTestEvent().get(); // Wait for the events to be processed.
+ }
+
+ @Override
+ public void returnAfterUse(TezSessionPoolSession session) throws Exception {
+ super.returnAfterUse(session);
+ ensureWm();
+ }
+
+ @Override
+ public TezSessionState reopen(
+ TezSessionState session, Configuration conf, String[] additionalFiles) throws Exception {
+ session = super.reopen(session, conf, additionalFiles);
+ ensureWm();
+ return session;
}
}
@@ -185,7 +215,8 @@ public class TestWorkloadManager {
qam.assertWasCalled();
WmTezSession session2 = (WmTezSession) session.reopen(conf, null);
assertNotSame(session, session2);
- assertEquals(1.0, session2.getClusterFraction(), EPSILON);
+ wm.addTestEvent().get();
+ assertEquals(session2.toString(), 1.0, session2.getClusterFraction(), EPSILON);
assertEquals(0.0, session.getClusterFraction(), EPSILON);
qam.assertWasCalled();
}
@@ -273,7 +304,7 @@ public class TestWorkloadManager {
Thread t1 = new Thread(new GetSessionRunnable(sessionA3, wm, error, conf, cdl, "A")),
t2 = new Thread(new GetSessionRunnable(sessionA4, wm, error, conf, null, "A"));
- waitForThreadToBlockOnQueue(cdl, t1);
+ waitForThreadToBlock(cdl, t1);
t2.start();
assertNull(sessionA3.get());
assertNull(sessionA4.get());
@@ -327,7 +358,7 @@ public class TestWorkloadManager {
CountDownLatch cdl = new CountDownLatch(1), cdl2 = new CountDownLatch(1);
Thread t1 = new Thread(new GetSessionRunnable(session3, wm, error, conf, cdl, null), "t1"),
t2 = new Thread(new GetSessionRunnable(session4, wm, error, conf, cdl2, null), "t2");
- waitForThreadToBlockOnQueue(cdl, t1);
+ waitForThreadToBlock(cdl, t1);
assertNull(session3.get());
checkError(error);
t2.start();
@@ -351,7 +382,7 @@ public class TestWorkloadManager {
session4.get().returnToSessionManager();
}
- private void waitForThreadToBlockOnQueue(CountDownLatch cdl, Thread t1) throws InterruptedException {
+ private void waitForThreadToBlock(CountDownLatch cdl, Thread t1) throws InterruptedException {
t1.start();
cdl.await();
// Wait for t1 to block, just be sure. Not ideal...
@@ -392,6 +423,308 @@ public class TestWorkloadManager {
sessionA2.returnToSessionManager();
}
+ @Test(timeout=10000)
+ public void testApplyPlanUserMapping() throws Exception {
+ final HiveConf conf = createConf();
+ MockQam qam = new MockQam();
+ TmpResourcePlan plan = new TmpResourcePlan(Lists.newArrayList(
+ new TmpHivePool("A", null, 1, 0.5f), new TmpHivePool("B", null, 1, 0.5f)),
+ Lists.newArrayList(create("U", "A")));
+ final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
+ wm.start();
+
+ // One session will be running, the other will be queued in "A"
+ WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, "U", conf);
+ assertEquals("A", sessionA1.getPoolName());
+ assertEquals(0.5f, sessionA1.getClusterFraction(), EPSILON);
+ final AtomicReference<WmTezSession> sessionA2 = new AtomicReference<>();
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+ final CountDownLatch cdl = new CountDownLatch(1);
+ Thread t1 = new Thread(new GetSessionRunnable(sessionA2, wm, error, conf, cdl, "U"));
+ waitForThreadToBlock(cdl, t1);
+ assertNull(sessionA2.get());
+ checkError(error);
+
+ // Now change the resource plan - change the mapping for the user.
+ plan = new TmpResourcePlan(Lists.newArrayList(
+ new TmpHivePool("A", null, 1, 0.6f), new TmpHivePool("B", null, 1, 0.4f)),
+ Lists.newArrayList(create("U", "B")));
+ wm.updateResourcePlanAsync(plan);
+
+ // The session will go to B with the new mapping; check it.
+ t1.join();
+ checkError(error);
+ assertNotNull(sessionA2.get());
+ assertEquals("B", sessionA2.get().getPoolName());
+ assertEquals(0.4f, sessionA2.get().getClusterFraction(), EPSILON);
+ // The new session will also go to B now.
+ sessionA2.get().returnToSessionManager();
+ WmTezSession sessionB1 = (WmTezSession) wm.getSession(null, "U", conf);
+ assertEquals("B", sessionB1.getPoolName());
+ assertEquals(0.4f, sessionB1.getClusterFraction(), EPSILON);
+ sessionA1.returnToSessionManager();
+ sessionB1.returnToSessionManager();
+ }
+
+
+ @Test(timeout=10000)
+ public void testApplyPlanQpChanges() throws Exception {
+ final HiveConf conf = createConf();
+ MockQam qam = new MockQam();
+ TmpResourcePlan plan = new TmpResourcePlan(Lists.newArrayList(
+ new TmpHivePool("A", null, 1, 0.35f), new TmpHivePool("B", null, 2, 0.15f),
+ new TmpHivePool("C", null, 2, 0.3f), new TmpHivePool("D", null, 1, 0.3f)),
+ Lists.newArrayList(create("A", "A"), create("B", "B"),
+ create("C", "C"), create("D", "D")));
+ final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
+ wm.start();
+
+ // A: 1/1 running, 1 queued; B: 2/2 running, C: 1/2 running, D: 1/1 running, 1 queued.
+ // Total: 5/6 running.
+ WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, "A", conf),
+ sessionB1 = (WmTezSession) wm.getSession(null, "B", conf),
+ sessionB2 = (WmTezSession) wm.getSession(null, "B", conf),
+ sessionC1 = (WmTezSession) wm.getSession(null, "C", conf),
+ sessionD1 = (WmTezSession) wm.getSession(null, "D", conf);
+ final AtomicReference<WmTezSession> sessionA2 = new AtomicReference<>(),
+ sessionD2 = new AtomicReference<>();
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+ final CountDownLatch cdl1 = new CountDownLatch(1), cdl2 = new CountDownLatch(1);
+ Thread t1 = new Thread(new GetSessionRunnable(sessionA2, wm, error, conf, cdl1, "A")),
+ t2 = new Thread(new GetSessionRunnable(sessionD2, wm, error, conf, cdl2, "D"));
+ waitForThreadToBlock(cdl1, t1);
+ waitForThreadToBlock(cdl2, t2);
+ checkError(error);
+ assertEquals(0.3f, sessionC1.getClusterFraction(), EPSILON);
+ assertEquals(0.3f, sessionD1.getClusterFraction(), EPSILON);
+
+ // Change the resource plan - resize B and C down, D up, and remove A remapping users to B.
+ // Everything will be killed in A and B, C won't change, D will start one more query from
+ // the queue, and the query queued in A will be re-queued in B and started.
+ // The fractions will also all change.
+ // Total: 4/4 running.
+ plan = new TmpResourcePlan(Lists.newArrayList(new TmpHivePool("B", null, 1, 0.3f),
+ new TmpHivePool("C", null, 1, 0.2f), new TmpHivePool("D", null, 2, 0.5f)),
+ Lists.newArrayList(create("A", "B"), create("B", "B"),
+ create("C", "C"), create("D", "D")));
+ wm.updateResourcePlanAsync(plan);
+ wm.addTestEvent().get();
+
+ t1.join();
+ t2.join();
+ checkError(error);
+ assertNotNull(sessionA2.get());
+ assertNotNull(sessionD2.get());
+ assertEquals("D", sessionD2.get().getPoolName());
+ assertEquals("B", sessionA2.get().getPoolName());
+ assertEquals("C", sessionC1.getPoolName());
+ assertEquals(0.3f, sessionA2.get().getClusterFraction(), EPSILON);
+ assertEquals(0.2f, sessionC1.getClusterFraction(), EPSILON);
+ assertEquals(0.25f, sessionD1.getClusterFraction(), EPSILON);
+
+ assertKilledByWm(sessionA1);
+ assertKilledByWm(sessionB1);
+ assertKilledByWm(sessionB2);
+
+ // Wait for another iteration to make sure event gets processed for D2 to receive allocation.
+ sessionA2.get().returnToSessionManager();
+ assertEquals(0.25f, sessionD2.get().getClusterFraction(), EPSILON);
+ sessionD2.get().returnToSessionManager();
+ sessionC1.returnToSessionManager();
+ sessionD1.returnToSessionManager();
+
+ // Try to "return" stuff that was killed from "under" us. Should be a no-op.
+ sessionA1.returnToSessionManager();
+ sessionB1.returnToSessionManager();
+ sessionB2.returnToSessionManager();
+ assertEquals(4, wm.getTezAmPool().getCurrentSize());
+ }
+
+ @Test(timeout=10000)
+ public void testAmPoolInteractions() throws Exception {
+ final HiveConf conf = createConf();
+ MockQam qam = new MockQam();
+ TmpResourcePlan plan = new TmpResourcePlan(Lists.newArrayList(
+ new TmpHivePool("A", null, 1, 1.0f)), Lists.newArrayList(create("A", "A")));
+ final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
+ wm.start();
+ // Take away the only session, as if it was expiring.
+ TezSessionPool<WmTezSession> pool = wm.getTezAmPool();
+ WmTezSession oob = pool.getSession();
+
+ final AtomicReference<WmTezSession> sessionA1 = new AtomicReference<>();
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+ final CountDownLatch cdl1 = new CountDownLatch(1);
+ Thread t1 = new Thread(new GetSessionRunnable(sessionA1, wm, error, conf, cdl1, "A"));
+ waitForThreadToBlock(cdl1, t1);
+ checkError(error);
+ // Replacing it directly in the pool should unblock get.
+ pool.replaceSession(oob, false, null);
+ t1.join();
+ assertNotNull(sessionA1.get());
+ assertEquals("A", sessionA1.get().getPoolName());
+
+ // Increase qp, check that the pool grows.
+ plan = new TmpResourcePlan(Lists.newArrayList(
+ new TmpHivePool("A", null, 4, 1.0f)), Lists.newArrayList(create("A", "A")));
+ wm.updateResourcePlanAsync(plan);
+ WmTezSession oob2 = pool.getSession(),
+ oob3 = pool.getSession(),
+ oob4 = pool.getSession();
+ pool.returnSession(oob2);
+ assertEquals(1, pool.getCurrentSize());
+
+ // Decrease qp, check that the pool shrinks incl. killing the unused and returned sessions.
+ plan = new TmpResourcePlan(Lists.newArrayList(
+ new TmpHivePool("A", null, 1, 1.0f)), Lists.newArrayList(create("A", "A")));
+ wm.updateResourcePlanAsync(plan);
+ wm.addTestEvent().get();
+ assertEquals(0, pool.getCurrentSize());
+ sessionA1.get().returnToSessionManager();
+ pool.returnSession(oob3);
+ assertEquals(0, pool.getCurrentSize());
+ pool.returnSession(oob4);
+ assertEquals(1, pool.getCurrentSize());
+
+ // Decrease, then increase qp - sessions should not be killed on return.
+ plan = new TmpResourcePlan(Lists.newArrayList(
+ new TmpHivePool("A", null, 2, 1.0f)), Lists.newArrayList(create("A", "A")));
+ wm.updateResourcePlanAsync(plan);
+ oob2 = pool.getSession();
+ oob3 = pool.getSession();
+ assertEquals(0, pool.getCurrentSize());
+ plan = new TmpResourcePlan(Lists.newArrayList(
+ new TmpHivePool("A", null, 1, 1.0f)), Lists.newArrayList(create("A", "A")));
+ wm.updateResourcePlanAsync(plan);
+ plan = new TmpResourcePlan(Lists.newArrayList(
+ new TmpHivePool("A", null, 2, 1.0f)), Lists.newArrayList(create("A", "A")));
+ wm.updateResourcePlanAsync(plan);
+ wm.addTestEvent().get();
+ assertEquals(0, pool.getCurrentSize());
+ pool.returnSession(oob3);
+ pool.returnSession(oob4);
+ assertEquals(2, pool.getCurrentSize());
+ }
+
+ @Test(timeout=10000)
+ public void testAsyncSessionInitFailures() throws Exception {
+ final HiveConf conf = createConf();
+ MockQam qam = new MockQam();
+ TmpResourcePlan plan = new TmpResourcePlan(Lists.newArrayList(
+ new TmpHivePool("A", null, 1, 1.0f)), Lists.newArrayList(create("A", "A")));
+ final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
+ wm.start();
+
+ // Make sure session init gets stuck in init.
+ TezSessionPool<WmTezSession> pool = wm.getTezAmPool();
+ SampleTezSessionState theOnlySession = (SampleTezSessionState) pool.getSession();
+ SettableFuture<Boolean> blockedWait = SettableFuture.create();
+ theOnlySession.setWaitForAmRegistryFuture(blockedWait);
+ pool.returnSession(theOnlySession);
+ assertEquals(1, pool.getCurrentSize());
+
+ final AtomicReference<WmTezSession> sessionA = new AtomicReference<>();
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+ CountDownLatch cdl = new CountDownLatch(1);
+ Thread t1 = new Thread(new GetSessionRunnable(sessionA, wm, error, conf, cdl, "A"));
+ waitForThreadToBlock(cdl, t1);
+ checkError(error);
+ wm.addTestEvent().get();
+ // The session is taken out of the pool, but is waiting for registration.
+ assertEquals(0, pool.getCurrentSize());
+
+ // Change the resource plan, so that the session gets killed.
+ plan = new TmpResourcePlan(Lists.newArrayList(
+ new TmpHivePool("B", null, 1, 1.0f)), Lists.newArrayList(create("A", "B")));
+ wm.updateResourcePlanAsync(plan);
+ wm.addTestEvent().get();
+ blockedWait.set(true); // Meanwhile, the init succeeds!
+ t1.join();
+ try {
+ sessionA.get();
+ fail("Expected an error but got " + sessionA.get());
+ } catch (Throwable t) {
+ // Expected.
+ }
+ try {
+ // The get-session call should also fail.
+ checkError(error);
+ fail("Expected an error");
+ } catch (Exception ex) {
+ // Expected.
+ }
+ error.set(null);
+ theOnlySession = validatePoolAfterCleanup(theOnlySession, conf, wm, pool, "B");
+
+ // Initialization fails, no resource plan change.
+ SettableFuture<Boolean> failedWait = SettableFuture.create();
+ failedWait.setException(new Exception("foo"));
+ theOnlySession.setWaitForAmRegistryFuture(failedWait);
+ try {
+ TezSessionState r = wm.getSession(null, "A", conf);
+ fail("Expected an error but got " + r);
+ } catch (Exception ex) {
+ // Expected.
+ }
+ theOnlySession = validatePoolAfterCleanup(theOnlySession, conf, wm, pool, "B");
+
+ // Init fails, but the session is also killed by WM before that.
+ failedWait = SettableFuture.create();
+ theOnlySession.setWaitForAmRegistryFuture(failedWait);
+ sessionA.set(null);
+ cdl = new CountDownLatch(1);
+ t1 = new Thread(new GetSessionRunnable(sessionA, wm, error, conf, cdl, "A"));
+ waitForThreadToBlock(cdl, t1);
+ wm.addTestEvent().get();
+ // The session is taken out of the pool, but is waiting for registration.
+ assertEquals(0, pool.getCurrentSize());
+
+ plan = new TmpResourcePlan(Lists.newArrayList(
+ new TmpHivePool("A", null, 1, 1.0f)), Lists.newArrayList(create("A", "A")));
+ wm.updateResourcePlanAsync(plan);
+ wm.addTestEvent().get();
+ failedWait.setException(new Exception("moo")); // Meanwhile, the init fails.
+ t1.join();
+ try {
+ sessionA.get();
+ fail("Expected an error but got " + sessionA.get());
+ } catch (Throwable t) {
+ // Expected.
+ }
+ try {
+ // The get-session call should also fail.
+ checkError(error);
+ fail("Expected an error");
+ } catch (Exception ex) {
+ // Expected.
+ }
+ validatePoolAfterCleanup(theOnlySession, conf, wm, pool, "A");
+ }
+
+ private SampleTezSessionState validatePoolAfterCleanup(
+ SampleTezSessionState oldSession, HiveConf conf, WorkloadManager wm,
+ TezSessionPool<WmTezSession> pool, String sessionPoolName) throws Exception {
+ // Make sure the cleanup doesn't leave the pool without a session.
+ SampleTezSessionState theOnlySession = (SampleTezSessionState) pool.getSession();
+ assertNotNull(theOnlySession);
+ theOnlySession.setWaitForAmRegistryFuture(null);
+ assertNull(oldSession.getPoolName());
+ assertEquals(0f, oldSession.getClusterFraction(), EPSILON);
+ pool.returnSession(theOnlySession);
+ // Make sure we can actually get a session still - parallelism/etc. should not be affected.
+ WmTezSession result = (WmTezSession) wm.getSession(null, "A", conf);
+ assertEquals(sessionPoolName, result.getPoolName());
+ assertEquals(1f, result.getClusterFraction(), EPSILON);
+ result.returnToSessionManager();
+ return theOnlySession;
+ }
+
+ private void assertKilledByWm(WmTezSession session) {
+ assertNull(session.getPoolName());
+ assertEquals(0f, session.getClusterFraction(), EPSILON);
+ assertTrue(session.isIrrelevantForWm());
+ }
+
private void checkError(final AtomicReference<Throwable> error) throws Exception {
Throwable t = error.get();
if (t == null) return;
[2/3] hive git commit: HIVE-17841 : implement applying the resource
plan (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index 35e5710..b0c6d58 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -17,20 +17,34 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Semaphore;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -38,12 +52,6 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider;
import org.apache.hadoop.hive.ql.wm.Trigger;
import org.apache.hadoop.hive.ql.wm.TriggerActionHandler;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.tez.common.security.JobTokenIdentifier;
-import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.TezConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,13 +59,14 @@ import org.slf4j.LoggerFactory;
/** Workload management entry point for HS2. */
public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValidator
- implements TezSessionPoolSession.Manager, SessionExpirationTracker.RestartImpl {
+ implements TezSessionPoolSession.Manager, SessionExpirationTracker.RestartImpl {
private static final Logger LOG = LoggerFactory.getLogger(WorkloadManager.class);
// TODO: this is a temporary setting that will go away, so it's not in HiveConf.
public static final String TEST_WM_CONFIG = "hive.test.workload.management";
+ private static final char POOL_SEPARATOR = '/';
private final HiveConf conf;
- private final TezSessionPool<WmTezSession> sessions;
+ private final TezSessionPool<WmTezSession> tezAmPool;
private final SessionExpirationTracker expirationTracker;
private final RestrictedConfigChecker restrictedConfig;
private final QueryAllocationManager allocationManager;
@@ -69,56 +78,50 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
new IdentityHashMap<>();
private final int amRegistryTimeoutMs;
-
- /** Sessions given out (i.e. between get... and return... calls), separated by Hive pool. */
- private final ReentrantReadWriteLock poolsLock = new ReentrantReadWriteLock();
- private final Map<String, PoolState> pools = new HashMap<>();
+ // Note: pools can only be modified by the master thread.
+ private HashMap<String, PoolState> pools;
// Used to make sure that waiting getSessions don't block update.
- private int internalPoolsVersion;
private UserPoolMapping userPoolMapping;
+ private int totalQueryParallelism;
+ // We index the get requests to make sure there are no ordering artifacts when we requeue.
+ private final AtomicLong getRequestVersion = new AtomicLong(Long.MIN_VALUE);
private SessionTriggerProvider sessionTriggerProvider;
private TriggerActionHandler triggerActionHandler;
private TriggerValidatorRunnable triggerValidatorRunnable;
- public static class PoolState {
- // Add stuff here as WM is implemented.
- private final Object lock = new Object();
- private final List<WmTezSession> sessions = new ArrayList<>();
- private final Semaphore sessionsClaimed;
+ // Note: we could use RW lock to allow concurrent calls for different sessions, however all
+ // those calls do is add elements to lists and maps; and we'd need to sync those separately
+ // separately, plus have an object to notify because RW lock does not support conditions
+ // in any sensible way. So, for now the lock is going to be epic.
+ private final ReentrantLock currentLock = new ReentrantLock();
+ private final Condition hasChangesCondition = currentLock.newCondition();
+ // The processing thread will switch between these two objects.
+ private final EventState one = new EventState(), two = new EventState();
+ private boolean hasChanges = false;
+ private EventState current = one;
- private final String fullName;
- private final double finalFraction;
- private double finalFractionRemaining;
- private final int queryParallelism;
- private List<Trigger> triggers = new ArrayList<>();
-
- public PoolState(String fullName, int queryParallelism, double fraction) {
- this.fullName = fullName;
- this.queryParallelism = queryParallelism;
- // A fair semaphore to ensure correct queue order.
- this.sessionsClaimed = new Semaphore(queryParallelism, true);
- this.finalFraction = this.finalFractionRemaining = fraction;
- }
+ /** The master thread the processes the events from EventState. */
+ @VisibleForTesting
+ protected final Thread wmThread;
+ /** Used by the master thread to offload calls blocking on smth other than fast locks. */
+ private final ExecutorService workPool;
+ /** Used to schedule timeouts for some async operations. */
+ private final ScheduledExecutorService timeoutPool;
+ private final WmThreadSyncWork syncWork = new WmThreadSyncWork();
+ @SuppressWarnings("rawtypes")
+ private final FutureCallback FATAL_ERROR_CALLBACK = new FutureCallback() {
@Override
- public String toString() {
- return "[" + fullName + ", query parallelism " + queryParallelism
- + ", fraction of the cluster " + finalFraction + ", fraction used by child pools "
- + (finalFraction - finalFractionRemaining) + ", active sessions " + sessions.size()
- + "]";
- }
-
- @VisibleForTesting
- // will change in HIVE-17809
- public void setTriggers(final List<Trigger> triggers) {
- this.triggers = triggers;
+ public void onSuccess(Object result) {
}
- public List<Trigger> getTriggers() {
- return triggers;
+ @Override
+ public void onFailure(Throwable t) {
+ // TODO: shut down HS2?
+ LOG.error("Workload management fatal error", t);
}
- }
+ };
// TODO: this is temporary before HiveServerEnvironment is merged.
private static volatile WorkloadManager INSTANCE;
@@ -135,71 +138,86 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
/** Called once, when HS2 initializes. */
public static WorkloadManager create(String yarnQueue, HiveConf conf, TmpResourcePlan plan) {
assert INSTANCE == null;
- Token<JobTokenIdentifier> amsToken = createAmsToken();
// We could derive the expected number of AMs to pass in.
- LlapPluginEndpointClient amComm = new LlapPluginEndpointClientImpl(conf, amsToken, -1);
+ LlapPluginEndpointClient amComm = new LlapPluginEndpointClientImpl(conf, null, -1);
QueryAllocationManager qam = new GuaranteedTasksAllocator(conf, amComm);
- return (INSTANCE = new WorkloadManager(yarnQueue, conf, qam, amsToken, plan));
- }
-
- private static Token<JobTokenIdentifier> createAmsToken() {
- if (!UserGroupInformation.isSecurityEnabled()) return null;
- // This application ID is completely bogus.
- ApplicationId id = ApplicationId.newInstance(
- System.nanoTime(), (int)(System.nanoTime() % 100000));
- JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(id.toString()));
- JobTokenSecretManager jobTokenManager = new JobTokenSecretManager();
- Token<JobTokenIdentifier> sessionToken = new Token<>(identifier, jobTokenManager);
- sessionToken.setService(identifier.getJobId());
- return sessionToken;
+ return (INSTANCE = new WorkloadManager(yarnQueue, conf, qam, plan));
}
@VisibleForTesting
WorkloadManager(String yarnQueue, HiveConf conf,
- QueryAllocationManager qam, Token<JobTokenIdentifier> amsToken, TmpResourcePlan plan) {
+ QueryAllocationManager qam, TmpResourcePlan plan) {
this.yarnQueue = yarnQueue;
this.conf = conf;
- int numSessions = initializeHivePools(plan);
- LOG.info("Initializing with " + numSessions + " total query parallelism");
+ this.totalQueryParallelism = applyInitialResourcePlan(plan);
+ LOG.info("Initializing with " + totalQueryParallelism + " total query parallelism");
this.amRegistryTimeoutMs = (int)HiveConf.getTimeVar(
conf, ConfVars.HIVE_SERVER2_TEZ_WM_AM_REGISTRY_TIMEOUT, TimeUnit.MILLISECONDS);
- sessions = new TezSessionPool<>(conf, numSessions, true);
+ tezAmPool = new TezSessionPool<>(conf, totalQueryParallelism, true,
+ new TezSessionPool.SessionObjectFactory<WmTezSession>() {
+ @Override
+ public WmTezSession create(WmTezSession oldSession) {
+ return createSession(oldSession == null ? null : oldSession.getConf());
+ }
+ });
restrictedConfig = new RestrictedConfigChecker(conf);
allocationManager = qam;
// Only creates the expiration tracker if expiration is configured.
expirationTracker = SessionExpirationTracker.create(conf, this);
- for (int i = 0; i < numSessions; i++) {
- sessions.addInitialSession(createSession());
- }
+ ThreadFactory workerFactory = new ThreadFactory() {
+ private final AtomicInteger threadNumber = new AtomicInteger(-1);
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, "Workload management worker " + threadNumber.incrementAndGet());
+ t.setDaemon(true);
+ return t;
+ }
+ };
+ workPool = Executors.newFixedThreadPool(HiveConf.getIntVar(conf,
+ ConfVars.HIVE_SERVER2_TEZ_WM_WORKER_THREADS), workerFactory);
+ ThreadFactory timeoutFactory = new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, "Workload management timeout thread");
+ t.setDaemon(true);
+ return t;
+ }
+ };
+ timeoutPool = Executors.newScheduledThreadPool(1, timeoutFactory);
+
+ wmThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ runWmThread();
+ }
+ }, "Workload management master");
+ wmThread.setDaemon(true);
// TODO: add support for per pool action handler and triggers fetcher (+atomic update to active triggers)
sessionTriggerProvider = new SessionTriggerProvider();
triggerActionHandler = new TriggerViolationActionHandler();
- triggerValidatorRunnable = new TriggerValidatorRunnable(getSessionTriggerProvider(), getTriggerActionHandler());
+ triggerValidatorRunnable = new TriggerValidatorRunnable(
+ getSessionTriggerProvider(), getTriggerActionHandler());
startTriggerValidator(conf);
}
- private int initializeHivePools(TmpResourcePlan plan) {
- poolsLock.writeLock().lock();
- try {
- // FIXME: Add Triggers from metastore to poolstate
- // Note: we assume here that plan has been validated beforehand, so we don't verify
- // that fractions or query parallelism add up.
- int totalQueryParallelism = 0;
- // Use recursion to update parents more conveniently; we don't expect a big tree.
- for (TmpHivePool pool : plan.getRootPools()) {
- totalQueryParallelism += addHivePool(pool, null);
- }
- this.userPoolMapping = new UserPoolMapping(plan.getMappings(), pools.keySet());
- internalPoolsVersion = 0; // Initializing for the first time.
- return totalQueryParallelism;
- } finally {
- poolsLock.writeLock().unlock();
+ // TODO: remove and let the thread handle it via normal ways?
+ private int applyInitialResourcePlan(TmpResourcePlan plan) {
+ int totalQueryParallelism = 0;
+ // Note: we assume here that plan has been validated beforehand, so we don't verify
+ // that fractions or query parallelism add up.
+ this.userPoolMapping = new UserPoolMapping(plan.getMappings());
+ assert pools == null;
+ pools = new HashMap<>();
+ // Use recursion to update parents more conveniently; we don't expect a big tree.
+ for (TmpHivePool pool : plan.getRootPools()) {
+ totalQueryParallelism += addInitialHivePool(pool, null);
}
+ return totalQueryParallelism;
}
- private final static char POOL_SEPARATOR = '/';
- private int addHivePool(TmpHivePool pool, PoolState parent) {
+ // TODO: remove and let the thread handle it via normal ways?
+ private int addInitialHivePool(TmpHivePool pool, PoolState parent) {
String fullName = pool.getName();
int totalQueryParallelism = pool.getQueryParallelism();
double fraction = pool.getResourceFraction();
@@ -211,178 +229,804 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
PoolState state = new PoolState(fullName, totalQueryParallelism, fraction);
if (pool.getChildren() != null) {
for (TmpHivePool child : pool.getChildren()) {
- totalQueryParallelism += addHivePool(child, state);
+ totalQueryParallelism += addInitialHivePool(child, state);
}
}
- LOG.info("Adding Hive pool: " + state);
+ state.setTriggers(pool.triggers);
+ LOG.info("Adding Hive pool: " + state + " with triggers " + pool.triggers);
pools.put(fullName, state);
return totalQueryParallelism;
}
- public TezSessionState getSession(
- TezSessionState session, String userName, HiveConf conf) throws Exception {
- validateConfig(conf);
- WmTezSession result = checkSessionForReuse(session);
- boolean hasAcquired = false;
- String poolName = null;
- while (!hasAcquired) { // This loop handles concurrent plan updates while we are waiting.
- poolName = userPoolMapping.mapSessionToPoolName(userName);
- if (poolName == null) {
- throw new HiveException("Cannot find any pool mapping for user " + userName);
- }
- int internalVersion = -1;
- Semaphore sessionsClaimed = null;
- poolsLock.readLock().lock();
+ public void start() throws Exception {
+ tezAmPool.start();
+ if (expirationTracker != null) {
+ expirationTracker.start();
+ }
+ allocationManager.start();
+ wmThread.start();
+ }
+
+ public void stop() throws Exception {
+ List<TezSessionPoolSession> sessionsToClose = null;
+ synchronized (openSessions) {
+ sessionsToClose = new ArrayList<TezSessionPoolSession>(openSessions.keySet());
+ }
+ for (TezSessionState sessionState : sessionsToClose) {
+ sessionState.close(false);
+ }
+ if (expirationTracker != null) {
+ expirationTracker.stop();
+ }
+ allocationManager.stop();
+ if (wmThread != null) {
+ wmThread.interrupt();
+ }
+ workPool.shutdownNow();
+ timeoutPool.shutdownNow();
+
+ INSTANCE = null;
+ }
+
+ /** Represent a single iteration of work for the master thread. */
+ private final static class EventState {
+ private final Set<WmTezSession> toReturn = Sets.newIdentityHashSet(),
+ toDestroy = Sets.newIdentityHashSet(), updateErrors = Sets.newIdentityHashSet();
+ private final LinkedList<SessionInitContext> initResults = new LinkedList<>();
+ private final IdentityHashMap<WmTezSession, SettableFuture<WmTezSession>> toReopen =
+ new IdentityHashMap<>();
+ private final LinkedList<GetRequest> getRequests = new LinkedList<>();
+ private final IdentityHashMap<WmTezSession, GetRequest> toReuse = new IdentityHashMap<>();
+ private TmpResourcePlan resourcePlanToApply = null;
+ private boolean hasClusterStateChanged = false;
+ private SettableFuture<Boolean> testEvent, applyRpFuture;
+ }
+
+ /**
+ * The work delegated from the master thread that doesn't have an async implementation
+ * (mostly opening and closing the sessions).
+ */
+ private final static class WmThreadSyncWork {
+ private LinkedList<WmTezSession> toRestartInUse = new LinkedList<>(),
+ toDestroyNoRestart = new LinkedList<>();
+ }
+
+ private void runWmThread() {
+ while (true) {
+ EventState currentEvents = null;
+ currentLock.lock();
try {
- PoolState pool = pools.get(poolName);
- if (pool == null) throw new AssertionError("Pool " + poolName + " not found.");
- // No need to take the pool lock, semaphore is final.
- sessionsClaimed = pool.sessionsClaimed;
- internalVersion = internalPoolsVersion;
- } finally {
- poolsLock.readLock().unlock();
- }
- // One cannot simply reuse the session if there are other queries waiting; to maintain
- // fairness, we'll try to take the semaphore instantly, and if that fails we'll return
- // this session back to the pool and potentially give the user a new session later.
- if (result != null) {
- // Handle the special case; the pool may be exactly at capacity w/o queue. In that
- // case, we still should be able to reuse.
- boolean isFromTheSamePool = false;
- String oldPoolName = result.getPoolName();
- if (poolName.equals(oldPoolName)) {
- sessionsClaimed.release();
- isFromTheSamePool = true;
- }
- // Note: we call timed acquire because untimed one ignores fairness.
- hasAcquired = sessionsClaimed.tryAcquire(1, TimeUnit.MILLISECONDS);
- if (hasAcquired) {
- poolsLock.readLock().lock();
- boolean doUnlock = true;
+ while (!hasChanges) {
try {
- if (internalVersion == internalPoolsVersion) {
- if (!isFromTheSamePool) {
- // Free up the usage in the old pool. TODO: ideally not under lock; not critical.
- redistributePoolAllocations(oldPoolName, null, result, true);
- }
- doUnlock = false; // Do not unlock; see below.
- break;
- }
- } finally {
- if (doUnlock) {
- poolsLock.readLock().unlock();
- }
+ hasChangesCondition.await(1, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOG.warn("WM thread was interrupted and will now exit");
+ return;
}
- hasAcquired = false;
}
- // Note: we are short-circuiting session::returnToSessionManager to supply the flag
- returnAfterUse(result, !isFromTheSamePool);
- result = null;
+ hasChanges = false;
+ currentEvents = current;
+ current = (currentEvents == one) ? two : one;
+ } finally {
+ currentLock.unlock();
}
- // We don't expect frequent updates, so check every second.
- while (!(hasAcquired = (hasAcquired || sessionsClaimed.tryAcquire(1, TimeUnit.SECONDS)))) {
- poolsLock.readLock().lock();
- try {
- if (internalVersion != internalPoolsVersion) break;
- } finally {
- poolsLock.readLock().unlock();
+ try {
+ LOG.debug("Processing current events");
+ processCurrentEvents(currentEvents, syncWork);
+ scheduleWork(syncWork);
+ } catch (InterruptedException ex) {
+ LOG.warn("WM thread was interrupted and will now exit");
+ return;
+ } catch (Exception | AssertionError ex) {
+ LOG.error("WM thread encountered an error but will attempt to continue", ex);
+ if (currentEvents.testEvent != null) {
+ currentEvents.testEvent.setException(ex);
+ currentEvents.testEvent = null;
}
+ if (currentEvents.applyRpFuture != null) {
+ currentEvents.applyRpFuture.setException(ex);
+ currentEvents.applyRpFuture = null;
+ }
+ // TODO: we either have to kill HS2 or, as the non-actor model would implicitly,
+ // hope for the best and continue on other threads. Do the latter for now.
+ continue;
}
- if (!hasAcquired) continue;
- // Keep it simple for now - everything between acquiring the semaphore and adding the session
- // to the pool state is done under read lock, blocking pool updates. It's possible to make
- // it more granular if needed. The only potentially lengthy operation is waiting for an
- // expired session to be restarted in the session pool.
- poolsLock.readLock().lock();
- if (internalVersion == internalPoolsVersion) break;
- poolsLock.readLock().unlock();
- hasAcquired = false;
- }
- // We are holding the lock from the end of the loop.
- try {
- assert hasAcquired;
- while (true) {
- // TODO: ideally, we'd need to implement tryGet and deal with the valid wait from a session
- // restarting somehow, as opposed to the invalid case of a session missing from the
- // pool due to some bug. Keep a "restarting" counter in the pool?
- boolean isFromTheSamePool = false;
- if (result == null) {
- result = sessions.getSession();
- } else {
- // If we are just reusing the session from the same pool, do not adjust allocations.
- isFromTheSamePool = poolName.equals(result.getPoolName());
+ }
+ }
+
+ private void scheduleWork(WmThreadSyncWork context) {
+ // Do the work that cannot be done via async calls.
+
+ // 1. Restart pool sessions.
+ for (final WmTezSession toRestart : context.toRestartInUse) {
+ LOG.debug("Replacing " + toRestart + " with a new session");
+ workPool.submit(() -> {
+ try {
+ // Note: sessions in toRestart are always in use, so they cannot expire in parallel.
+ tezAmPool.replaceSession(toRestart, false, null);
+ } catch (Exception ex) {
+ LOG.error("Failed to restart an old session; ignoring " + ex.getMessage());
}
- result.setQueueName(yarnQueue);
- result.setPoolName(poolName);
- if (!ensureAmIsRegistered(result)) continue; // Try another.
- if (!isFromTheSamePool) {
- redistributePoolAllocations(poolName, result, null, false);
+ });
+ }
+ context.toRestartInUse.clear();
+ // 2. Destroy the sessions that we don't need anymore.
+ for (final WmTezSession toDestroy : context.toDestroyNoRestart) {
+ LOG.debug("Closing " + toDestroy + " without restart");
+ workPool.submit(() -> {
+ try {
+ toDestroy.close(false);
+ } catch (Exception ex) {
+ LOG.error("Failed to close an old session; ignoring " + ex.getMessage());
}
- return result;
+ });
+ }
+ context.toDestroyNoRestart.clear();
+ }
+
+ private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throws Exception {
+ // The order of processing is as follows. We'd reclaim or kill all the sessions that we can
+ // reclaim from various user actions and errors, then apply the new plan if any,
+ // then give out all we can give out (restart, get and reopen callers) and rebalance the
+ // resource allocations in all the affected pools.
+ // For every session, we'd check all the concurrent things happening to it.
+
+ // TODO: also account for Tez-internal session restarts;
+ // AM reg info changes; add notifications, ignore errors, and update alloc.
+ HashSet<String> poolsToRedistribute = new HashSet<>();
+
+ // 0. Handle initialization results.
+ for (SessionInitContext sw : e.initResults) {
+ handleInitResultOnMasterThread(sw, syncWork, poolsToRedistribute);
+ }
+ e.initResults.clear();
+
+ // 1. Handle sessions that are being destroyed by users. Destroy implies return.
+ for (WmTezSession sessionToDestroy : e.toDestroy) {
+ if (e.toReturn.remove(sessionToDestroy)) {
+ LOG.warn("The session was both destroyed and returned by the user; destroying");
}
- } finally {
- poolsLock.readLock().unlock();
+ LOG.debug("Destroying {}", sessionToDestroy);
+ Boolean shouldReturn = handleReturnedInUseSessionOnMasterThread(
+ e, sessionToDestroy, poolsToRedistribute);
+ if (shouldReturn == null || shouldReturn) {
+ // Restart if this session is still relevant, even if there's an internal error.
+ syncWork.toRestartInUse.add(sessionToDestroy);
+ }
+ }
+ e.toDestroy.clear();
+
+ // 2. Now handle actual returns. Sessions may be returned to the pool or may trigger expires.
+ for (WmTezSession sessionToReturn: e.toReturn) {
+ LOG.debug("Returning {}", sessionToReturn);
+ Boolean shouldReturn = handleReturnedInUseSessionOnMasterThread(
+ e, sessionToReturn, poolsToRedistribute);
+ if (shouldReturn == null) {
+ // Restart if there's an internal error.
+ syncWork.toRestartInUse.add(sessionToReturn);
+ continue;
+ }
+ if (!shouldReturn) continue;
+ boolean wasReturned = tezAmPool.returnSessionAsync(sessionToReturn);
+ if (!wasReturned) {
+ syncWork.toDestroyNoRestart.add(sessionToReturn);
+ }
+ }
+ e.toReturn.clear();
+
+ // 3. Reopen is essentially just destroy + get a new session for a session in use.
+ for (Map.Entry<WmTezSession, SettableFuture<WmTezSession>> entry : e.toReopen.entrySet()) {
+ LOG.debug("Reopening {}", entry.getKey());
+ handeReopenRequestOnMasterThread(
+ e, entry.getKey(), entry.getValue(), poolsToRedistribute, syncWork);
+ }
+ e.toReopen.clear();
+
+ // 4. All the sessions in use that were not destroyed or returned with a failed update now die.
+ for (WmTezSession sessionWithUpdateError : e.updateErrors) {
+ LOG.debug("Update failed for {}", sessionWithUpdateError);
+ handleUpdateErrorOnMasterThread(sessionWithUpdateError, e, syncWork, poolsToRedistribute);
+ }
+ e.updateErrors.clear();
+
+ // 5. Now apply a resource plan if any. This is expected to be pretty rare.
+ boolean hasRequeues = false;
+ if (e.resourcePlanToApply != null) {
+ LOG.debug("Applying new resource plan");
+ int getReqCount = e.getRequests.size();
+ applyNewResourcePlanOnMasterThread(e, syncWork, poolsToRedistribute);
+ hasRequeues = getReqCount != e.getRequests.size();
+ }
+ e.resourcePlanToApply = null;
+
+ // 6. Handle all the get/reuse requests. We won't actually give out anything here, but merely
+ // map all the requests and place them in an appropriate order in pool queues. The only
+ // exception is the reuse without queue contention; can be granted immediately. If we can't
+ // reuse the session immediately, we will convert the reuse to a normal get, because we
+ // want query level fairness, and don't want the get in queue to hold up a session.
+ GetRequest req;
+ while ((req = e.getRequests.pollFirst()) != null) {
+ LOG.debug("Processing a new get request from " + req.userName);
+ queueGetRequestOnMasterThread(req, poolsToRedistribute, syncWork);
+ }
+ e.toReuse.clear();
+
+ // 7. If there was a cluster state change, make sure we redistribute all the pools.
+ if (e.hasClusterStateChanged) {
+ LOG.debug("Processing a cluster state change");
+ poolsToRedistribute.addAll(pools.keySet());
+ e.hasClusterStateChanged = false;
+ }
+
+ // 8. Finally, for all the pools that have changes, promote queued queries and rebalance.
+ for (String poolName : poolsToRedistribute) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing changes for pool " + poolName + ": " + pools.get(poolName));
+ }
+ processPoolChangesOnMasterThread(poolName, syncWork, hasRequeues);
+ }
+
+ // 9. Notify tests and global async ops.
+ if (e.testEvent != null) {
+ e.testEvent.set(true);
+ e.testEvent = null;
+ }
+ if (e.applyRpFuture != null) {
+ e.applyRpFuture.set(true);
+ e.applyRpFuture = null;
}
}
- @VisibleForTesting
- protected boolean ensureAmIsRegistered(WmTezSession session) throws Exception {
- // Make sure AM is ready to use and registered with AM registry.
+ // ========= Master thread methods
+
+ private void handleInitResultOnMasterThread(
+ SessionInitContext sw, WmThreadSyncWork syncWork, HashSet<String> poolsToRedistribute) {
+ // For the failures, the users have been notified, we just need to clean up. There's no
+ // session here (or it's unused), so no conflicts are possible. We just remove it.
+ // For successes, the user has also been notified, so various requests are also possible;
+ // however, to start, we'd just put the session into the sessions list and go from there.
+ WmTezSession session = null;
+ sw.lock.lock();
try {
- session.waitForAmPluginInfo(amRegistryTimeoutMs);
- } catch (TimeoutException ex) {
- LOG.error("Timed out waiting for AM registry information for " + session.getSessionId());
- session.destroy();
+ if (sw.state == SessionInitState.CANCELED) {
+ // We have processed this on the previous run, after it has already queued the message.
+ return;
+ }
+ assert sw.state == SessionInitState.DONE;
+ session = sw.session;
+ sw.session = null;
+ } finally {
+ sw.lock.unlock();
+ }
+ LOG.debug("Processing " + ((session == null) ? "failed" : "successful")
+ + " initialization result for pool " + sw.poolName);
+ // We could not have removed the pool for this session, or we would have CANCELED the init.
+ PoolState pool = pools.get(sw.poolName);
+ if (pool == null || !pool.initializingSessions.remove(sw)) {
+ // Query parallelism might be fubar.
+ LOG.error("Cannot remove initializing session from the pool "
+ + sw.poolName + " - internal error");
+ }
+ poolsToRedistribute.add(sw.poolName);
+ if (session != null) {
+ if (pool != null) {
+ pool.sessions.add(session);
+ } else {
+ LOG.error("Cannot add new session to the pool "
+ + sw.poolName + " because it was removed unexpectedly - internal error " + session);
+ syncWork.toRestartInUse.add(session);
+ }
+ }
+ }
+
+ private Boolean handleReturnedInUseSessionOnMasterThread(
+ EventState e, WmTezSession session, HashSet<String> poolsToRedistribute) {
+ // This handles the common logic for destroy and return - everything except
+ // the invalid combination of destroy and return themselves, as well as the actual
+ // statement that destroys or returns it.
+ if (e.updateErrors.remove(session)) {
+ LOG.debug("Ignoring an update error for a session being destroyed or returned");
+ }
+ SettableFuture<WmTezSession> future = e.toReopen.remove(session);
+ if (future != null) {
+ future.setException(new AssertionError("Invalid reopen attempt"));
+ }
+ GetRequest reuseRequest = e.toReuse.remove(session);
+ if (reuseRequest != null) {
+ reuseRequest.future.setException(new AssertionError("Invalid reuse attempt"));
+ }
+ return checkAndRemoveSessionFromItsPool(session, poolsToRedistribute);
+ }
+
+ private void handeReopenRequestOnMasterThread(EventState e, WmTezSession session,
+ SettableFuture<WmTezSession> future, HashSet<String> poolsToRedistribute,
+ WmThreadSyncWork syncWork) throws Exception {
+ if (e.updateErrors.remove(session)) {
+ LOG.debug("Ignoring an update error for a session being reopened");
+ }
+ GetRequest reuseRequest = e.toReuse.remove(session);
+ if (reuseRequest != null) {
+ reuseRequest.future.setException(new AssertionError("Invalid reuse attempt"));
+ }
+ // In order to expedite things in a general case, we are not actually going to reopen
+ // anything. Instead, we will try to give out an existing session from the pool, and restart
+ // the problematic one in background.
+ String poolName = session.getPoolName();
+ Boolean isRemoved = checkAndRemoveSessionFromItsPool(session, poolsToRedistribute);
+ // If we fail to remove, it's probably an internal error. We'd try to handle it the same way
+ // as above - by restarting the session. We'd fail the caller to avoid exceeding parallelism.
+ if (isRemoved == null) {
+ future.setException(new RuntimeException("Reopen failed due to an internal error"));
+ syncWork.toRestartInUse.add(session);
+ return;
+ } else if (!isRemoved) {
+ future.setException(new RuntimeException("WM killed this session during reopen: "
+ + session.getReasonForKill()));
+ return; // No longer relevant for WM - bail.
+ }
+ // If pool didn't exist, removeSessionFromItsPool would have returned null.
+ PoolState pool = pools.get(poolName);
+ SessionInitContext sw = new SessionInitContext(future, poolName);
+ // We have just removed the session from the same pool, so don't check concurrency here.
+ pool.initializingSessions.add(sw);
+ ListenableFuture<WmTezSession> getFuture = tezAmPool.getSessionAsync();
+ Futures.addCallback(getFuture, sw);
+ syncWork.toRestartInUse.add(session);
+ }
+
+ private void handleUpdateErrorOnMasterThread(WmTezSession sessionWithUpdateError,
+ EventState e, WmThreadSyncWork syncWork, HashSet<String> poolsToRedistribute) {
+ GetRequest reuseRequest = e.toReuse.remove(sessionWithUpdateError);
+ if (reuseRequest != null) {
+ // This session is bad, so don't allow reuse; just convert it to normal get.
+ reuseRequest.sessionToReuse = null;
+ }
+ // TODO: we should communicate this to the user more explicitly (use kill query API, or
+ // add an option for bg kill checking to TezTask/monitor?
+ // We are assuming the update-error AM is bad and just try to kill it.
+ Boolean isRemoved = checkAndRemoveSessionFromItsPool(sessionWithUpdateError, poolsToRedistribute);
+ if (isRemoved != null && !isRemoved) {
+ // An update error for some session that was actually already killed by us.
+ return;
+ }
+ // Regardless whether it was removed successfully or after failing to remove, restart it.
+ // Since we just restart this from under the user, mark it so we handle it properly when
+ // the user tries to actually use this session and fails, proceeding to return/destroy it.
+ // TODO: propagate this error to TezJobMonitor somehow, after we add the use of KillQuery.
+ sessionWithUpdateError.setIsIrrelevantForWm("Failed to update resource allocation");
+ syncWork.toRestartInUse.add(sessionWithUpdateError);
+ }
+
+ private void applyNewResourcePlanOnMasterThread(
+ EventState e, WmThreadSyncWork syncWork, HashSet<String> poolsToRedistribute) {
+ int totalQueryParallelism = 0;
+ // FIXME: Add Triggers from metastore to poolstate
+ // Note: we assume here that plan has been validated beforehand, so we don't verify
+ // that fractions or query parallelism add up, etc.
+ this.userPoolMapping = new UserPoolMapping(e.resourcePlanToApply.getMappings());
+ HashMap<String, PoolState> oldPools = pools;
+ pools = new HashMap<>();
+ // Use recursion to update parents more conveniently; we don't expect a big tree.
+ for (TmpHivePool pool : e.resourcePlanToApply.getRootPools()) {
+ totalQueryParallelism += addHivePool(
+ pool, null, oldPools, syncWork.toRestartInUse, poolsToRedistribute, e);
+ }
+ if (oldPools != null && !oldPools.isEmpty()) {
+ // Looks like some pools were removed; insert queued queries into the front of get reqs.
+ for (PoolState oldPool : oldPools.values()) {
+ oldPool.destroy(syncWork.toRestartInUse, e.getRequests, e.toReuse);
+ }
+ }
+
+ LOG.info("Updating with " + totalQueryParallelism + " total query parallelism");
+ int deltaSessions = totalQueryParallelism - this.totalQueryParallelism;
+ this.totalQueryParallelism = totalQueryParallelism;
+ if (deltaSessions == 0) return; // Nothing to do.
+ if (deltaSessions < 0) {
+ // First, see if we have unused sessions that we were planning to restart; get rid of those.
+ int toTransfer = Math.min(-deltaSessions, syncWork.toRestartInUse.size());
+ for (int i = 0; i < toTransfer; ++i) {
+ syncWork.toDestroyNoRestart.add(syncWork.toRestartInUse.pollFirst());
+ }
+ deltaSessions += toTransfer;
+ }
+ if (deltaSessions != 0) {
+ failOnFutureFailure(tezAmPool.resizeAsync(
+ deltaSessions, syncWork.toDestroyNoRestart));
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void failOnFutureFailure(ListenableFuture<?> future) {
+ Futures.addCallback(future, FATAL_ERROR_CALLBACK);
+ }
+
+ private void queueGetRequestOnMasterThread(
+ GetRequest req, HashSet<String> poolsToRedistribute, WmThreadSyncWork syncWork) {
+ String poolName = userPoolMapping.mapSessionToPoolName(req.userName);
+ if (poolName == null) {
+ req.future.setException(new HiveException(
+ "Cannot find any pool mapping for user " + req.userName));
+ returnSessionOnFailedReuse(req, syncWork, poolsToRedistribute);
+ return;
+ }
+ PoolState pool = pools.get(poolName);
+ if (pool == null) {
+ req.future.setException(new AssertionError(poolName + " not found (internal error)."));
+ returnSessionOnFailedReuse(req, syncWork, poolsToRedistribute);
+ return;
+ }
+
+ PoolState oldPool = null;
+ if (req.sessionToReuse != null) {
+ // Given that we are trying to reuse, this session MUST be in some pool.sessions.
+ // Kills that could have removed it must have cleared sessionToReuse.
+ String oldPoolName = req.sessionToReuse.getPoolName();
+ oldPool = pools.get(oldPoolName);
+ Boolean isRemoved = checkAndRemoveSessionFromItsPool(req.sessionToReuse, poolsToRedistribute);
+ if (isRemoved == null || !isRemoved) {
+ // This is probably an internal error... abandon the reuse attempt.
+ returnSessionOnFailedReuse(req, syncWork, null);
+ req.sessionToReuse = null;
+ } else if (pool.getTotalActiveSessions() + pool.queue.size() >= pool.queryParallelism) {
+ // One cannot simply reuse the session if there are other queries waiting; to maintain
+ // fairness, we'll try to take a query slot instantly, and if that fails we'll return
+ // this session back to the pool and give the user a new session later.
+ returnSessionOnFailedReuse(req, syncWork, null);
+ req.sessionToReuse = null;
+ }
+ }
+
+ if (req.sessionToReuse != null) {
+ // If we can immediately reuse a session, there's nothing to wait for - just return.
+ req.sessionToReuse.setPoolName(poolName);
+ req.sessionToReuse.setQueueName(yarnQueue);
+ pool.sessions.add(req.sessionToReuse);
+ if (pool != oldPool) {
+ poolsToRedistribute.add(poolName);
+ }
+ req.future.set(req.sessionToReuse);
+ return;
+ }
+ // Otherwise, queue the session and make sure we update this pool.
+ pool.queue.addLast(req);
+ poolsToRedistribute.add(poolName);
+ }
+
+
+ private void processPoolChangesOnMasterThread(
+ String poolName, WmThreadSyncWork context, boolean hasRequeues) throws Exception {
+ PoolState pool = pools.get(poolName);
+ if (pool == null) return; // Might be from before the new resource plan.
+
+ // 1. First, start the queries from the queue.
+ int queriesToStart = Math.min(pool.queue.size(),
+ pool.queryParallelism - pool.getTotalActiveSessions());
+ if (queriesToStart > 0) {
+ LOG.debug("Starting {} queries in pool {}", queriesToStart, pool);
+ }
+ if (hasRequeues) {
+ // Sort the queue - we may have put items here out of order.
+ Collections.sort(pool.queue, GetRequest.ORDER_COMPARATOR);
+ }
+ for (int i = 0; i < queriesToStart; ++i) {
+ GetRequest queueReq = pool.queue.pollFirst();
+ assert queueReq.sessionToReuse == null;
+ // Note that in theory, we are guaranteed to have a session waiting for us here, but
+ // the expiration, failures, etc. may cause one to be missing pending restart.
+ // See SessionInitContext javadoc.
+ SessionInitContext sw = new SessionInitContext(queueReq.future, poolName);
+ ListenableFuture<WmTezSession> getFuture = tezAmPool.getSessionAsync();
+ Futures.addCallback(getFuture, sw);
+ // It is possible that all the async methods returned on the same thread because the
+ // session with registry data and stuff was available in the pool.
+ // If this happens, we'll take the session out here and "cancel" the init so we skip
+ // processing the message that the successful init has queued for us.
+ boolean isDone = sw.extractSessionAndCancelIfDone(pool.sessions);
+ if (!isDone) {
+ pool.initializingSessions.add(sw);
+ }
+ // The user has already been notified of completion by SessionInitContext.
+ }
+
+ // 2. Then, update pool allocations.
+ double totalAlloc = pool.updateAllocationPercentages();
+ // We are calling this here because we expect the method to be completely async. We also don't
+ // want this call itself to go on a thread because we want the percent-to-physics conversion
+ // logic to be consistent between all the separate calls in one master thread processing round.
+ // Note: If allocation manager does not have cluster state, it won't update anything. When the
+ // cluster state changes, it will notify us, and we'd update the queries again.
+ allocationManager.updateSessionsAsync(totalAlloc, pool.sessions);
+
+ // 3. Update triggers for this pool.
+ // TODO: need to merge with per-pool enforcement, it will only work for one pool for now.
+ if (sessionTriggerProvider != null) {
+ sessionTriggerProvider.setOpenSessions(
+ Collections.<TezSessionState>unmodifiableList(pool.sessions));
+ sessionTriggerProvider.setActiveTriggers(Collections.unmodifiableList(pool.triggers));
+ }
+ }
+
+ private void returnSessionOnFailedReuse(
+ GetRequest req, WmThreadSyncWork syncWork, HashSet<String> poolsToRedistribute) {
+ if (req.sessionToReuse == null) return;
+ if (poolsToRedistribute != null) {
+ Boolean isRemoved = checkAndRemoveSessionFromItsPool(req.sessionToReuse, poolsToRedistribute);
+ // The session cannot have been killed; this happens after all the kills in the current
+ // iteration, so we would have cleared sessionToReuse when killing this.
+ assert isRemoved == null || isRemoved;
+ }
+ if (!tezAmPool.returnSessionAsync(req.sessionToReuse)) {
+ syncWork.toDestroyNoRestart.add(req.sessionToReuse);
+ }
+ req.sessionToReuse = null;
+ }
+
+ private int addHivePool(TmpHivePool pool, PoolState parent,
+ HashMap<String, PoolState> oldPools, List<WmTezSession> toKill,
+ HashSet<String> poolsToRedistribute, EventState e) {
+ String fullName = pool.getName();
+ int totalQueryParallelism = pool.getQueryParallelism();
+ double fraction = pool.getResourceFraction();
+ if (parent != null) {
+ fullName = parent.fullName + POOL_SEPARATOR + fullName;
+ fraction = parent.finalFraction * pool.getResourceFraction();
+ parent.finalFractionRemaining -= fraction;
+ }
+ PoolState state = oldPools == null ? null : oldPools.remove(fullName);
+ if (state == null) {
+ state = new PoolState(fullName, totalQueryParallelism, fraction);
+ } else {
+ // This will also take care of the queries if query parallelism changed.
+ state.update(totalQueryParallelism, fraction, toKill, e);
+ poolsToRedistribute.add(fullName);
+ }
+ state.setTriggers(pool.triggers);
+
+ if (pool.getChildren() != null) {
+ for (TmpHivePool child : pool.getChildren()) {
+ totalQueryParallelism += addHivePool(
+ child, state, oldPools, toKill, poolsToRedistribute, e);
+ }
+ }
+ LOG.info("Adding Hive pool: " + state + " with triggers " + pool.triggers);
+ pools.put(fullName, state);
+ return totalQueryParallelism;
+ }
+
+
+ /**
+ * Checks if the session is still relevant for WM and if yes, removes it from its thread.
+ * @return true if the session was removed; false if the session was already processed by WM
+ * thread (so we are dealing with an outdated request); null if the session should be
+ * in WM but wasn't found in the requisite pool (internal error?).
+ */
+ private Boolean checkAndRemoveSessionFromItsPool(
+ WmTezSession session, HashSet<String> poolsToRedistribute) {
+ // It is possible for some request to be queued after a main thread has decided to kill this
+ // session; on the next iteration, we'd be processing that request with an irrelevant session.
+ if (session.isIrrelevantForWm()) {
return false;
}
- return true;
+ // If we did not kill this session we expect everything to be present.
+ String poolName = session.getPoolName();
+ session.clearWm();
+ if (poolName != null) {
+ poolsToRedistribute.add(poolName);
+ PoolState pool = pools.get(poolName);
+ if (pool != null && pool.sessions.remove(session)) return true;
+ }
+ LOG.error("Session was not in the pool (internal error) " + poolName + ": " + session);
+ return null;
}
- private void redistributePoolAllocations(
- String poolName, WmTezSession sessionToAdd, WmTezSession sessionToRemove,
- boolean releaseParallelism) {
- List<WmTezSession> sessionsToUpdate = null;
- double totalAlloc = 0;
- assert sessionToAdd == null || poolName.equals(sessionToAdd.getPoolName());
- assert sessionToRemove == null || poolName.equals(sessionToRemove.getPoolName());
- poolsLock.readLock().lock();
- boolean hasRemoveFailed = false;
+ // ===== EVENT METHODS
+
+ public Future<Boolean> updateResourcePlanAsync(TmpResourcePlan plan) {
+ SettableFuture<Boolean> applyRpFuture = SettableFuture.create();
+ currentLock.lock();
try {
- PoolState pool = pools.get(poolName);
- synchronized (pool.lock) {
- // This should be a 2nd order fn but it's too much pain in Java for one LOC.
- if (sessionToAdd != null) {
- pool.sessions.add(sessionToAdd);
- }
- if (sessionToRemove != null) {
- // TODO: this assumes that the update process will take the write lock, and make
- // everything right w.r.t. semaphores, pool names and other stuff, since we might
- // be releasing a different semaphore from the one we acquired if it's across
- // the update. If the magic in the update is weak, this may become more involved.
- if (!pool.sessions.remove(sessionToRemove)) {
- LOG.error("Session " + sessionToRemove + " could not be removed from the pool");
- if (releaseParallelism) {
- hasRemoveFailed = true;
- }
- } else if (releaseParallelism) {
- pool.sessionsClaimed.release();
- }
- sessionToRemove.setClusterFraction(0);
- }
- totalAlloc = updatePoolAllocations(pool.sessions, pool.finalFractionRemaining);
- sessionsToUpdate = new ArrayList<>(pool.sessions);
+ // TODO: if there's versioning/etc., it will come in here. For now we rely on external
+ // locking or ordering of calls. This should potentially return a Future for that.
+ if (current.resourcePlanToApply != null) {
+ LOG.warn("Several resource plans are being applied at the same time; using the latest");
+ current.applyRpFuture.setException(
+ new HiveException("Another plan was applied in parallel"));
+ }
+ current.resourcePlanToApply = plan;
+ current.applyRpFuture = applyRpFuture;
+ notifyWmThreadUnderLock();
+ } finally {
+ currentLock.unlock();
+ }
+ return applyRpFuture;
+ }
+
+ private final static class GetRequest {
+ public static final Comparator<GetRequest> ORDER_COMPARATOR = new Comparator<GetRequest>() {
+ @Override
+ public int compare(GetRequest o1, GetRequest o2) {
+ if (o1.order == o2.order) return 0;
+ return o1.order < o2.order ? -1 : 1;
+ }
+ };
+ private final long order;
+ private final String userName;
+ private final SettableFuture<WmTezSession> future;
+ private WmTezSession sessionToReuse;
+
+ private GetRequest(String userName, SettableFuture<WmTezSession> future,
+ WmTezSession sessionToReuse, long order) {
+ this.userName = userName;
+ this.future = future;
+ this.sessionToReuse = sessionToReuse;
+ this.order = order;
+ }
+
+ @Override
+ public String toString() {
+ return "[#" + order + ", " + userName + ", reuse " + sessionToReuse + "]";
+ }
+ }
+
+ public TezSessionState getSession(
+ TezSessionState session, String userName, HiveConf conf) throws Exception {
+ // Note: not actually used for pool sessions; verify some things like doAs are not set.
+ validateConfig(conf);
+ SettableFuture<WmTezSession> future = SettableFuture.create();
+ WmTezSession wmSession = checkSessionForReuse(session);
+ GetRequest req = new GetRequest(
+ userName, future, wmSession, getRequestVersion.incrementAndGet());
+ currentLock.lock();
+ try {
+ current.getRequests.add(req);
+ if (req.sessionToReuse != null) {
+ // Note: we assume reuse is only possible for the same user and config.
+ current.toReuse.put(wmSession, req);
}
+ notifyWmThreadUnderLock();
} finally {
- poolsLock.readLock().unlock();
+ currentLock.unlock();
}
- allocationManager.updateSessionsAsync(totalAlloc, sessionsToUpdate);
- updateSessionsTriggers();
- if (hasRemoveFailed) {
- throw new AssertionError("Cannot remove the session from the pool and release "
- + "the query slot; HS2 may fail to accept queries");
+ return future.get();
+ }
+
+ @Override
+ public void destroy(TezSessionState session) throws Exception {
+ WmTezSession wmTezSession = ensureOwnedSession(session);
+ resetGlobalTezSession(wmTezSession);
+ currentLock.lock();
+ try {
+ current.toDestroy.add(wmTezSession);
+ notifyWmThreadUnderLock();
+ } finally {
+ currentLock.unlock();
}
}
+ private void resetGlobalTezSession(WmTezSession wmTezSession) {
+ // This has to be done synchronously to avoid the caller getting this session again.
+ // Ideally we'd get rid of this thread-local nonsense.
+ SessionState sessionState = SessionState.get();
+ if (sessionState != null && sessionState.getTezSession() == wmTezSession) {
+ sessionState.setTezSession(null);
+ }
+ }
+
+ @Override
+ public void returnAfterUse(TezSessionPoolSession session) throws Exception {
+ WmTezSession wmTezSession = ensureOwnedSession(session);
+ resetGlobalTezSession(wmTezSession);
+ currentLock.lock();
+ try {
+ current.toReturn.add(wmTezSession);
+ notifyWmThreadUnderLock();
+ } finally {
+ currentLock.unlock();
+ }
+ }
+
+ // TODO: use this
+ public void nofityOfClusterStateChange() {
+ currentLock.lock();
+ try {
+ current.hasClusterStateChanged = true;
+ notifyWmThreadUnderLock();
+ } finally {
+ currentLock.unlock();
+ }
+ }
+
+ public void addUpdateError(WmTezSession wmTezSession) {
+ currentLock.lock();
+ try {
+ current.updateErrors.add(wmTezSession);
+ notifyWmThreadUnderLock();
+ } finally {
+ currentLock.unlock();
+ }
+ }
+
+ @VisibleForTesting
+ /**
+ * Adds a test event that's processed at the end of WM iteration.
+ * This allows tests to wait for an iteration to finish without messing with the threading
+ * logic (that is prone to races if we e.g. remember the state before and wait for it to change,
+ * self-deadlocking when triggering things explicitly and calling a blocking API, and hanging
+ * forever if we wait for "another iteration"). If addTestEvent is called after all the other
+ * calls of interest, it is guaranteed that the events from those calls will be processed
+ * fully when the future is triggered.
+ */
+ Future<Boolean> addTestEvent() {
+ SettableFuture<Boolean> testEvent = SettableFuture.create();
+ currentLock.lock();
+ try {
+ current.testEvent = testEvent;
+ notifyWmThreadUnderLock();
+ } finally {
+ currentLock.unlock();
+ }
+ return testEvent;
+ }
+
+ public void notifyInitializationCompleted(SessionInitContext initCtx) {
+ currentLock.lock();
+ try {
+ current.initResults.add(initCtx);
+ notifyWmThreadUnderLock();
+ } finally {
+ currentLock.unlock();
+ }
+ }
+
+
+ @Override
+ public TezSessionState reopen(TezSessionState session, Configuration conf,
+ String[] additionalFiles) throws Exception {
+ WmTezSession wmTezSession = ensureOwnedSession(session);
+ HiveConf sessionConf = wmTezSession.getConf();
+ if (sessionConf == null) {
+ LOG.warn("Session configuration is null for " + wmTezSession);
+ sessionConf = new HiveConf(conf, WorkloadManager.class);
+ }
+ // TODO: ideally, we should handle reopen the same way no matter what. However, the cases
+ // with additional files will have to wait until HIVE-17827 is unfucked, because it's
+ // difficult to determine how the additionalFiles are to be propagated/reused between
+ // two sessions. Once the update logic is encapsulated in the session we can remove this.
+ if (additionalFiles != null && additionalFiles.length > 0) {
+ TezSessionPoolManager.reopenInternal(session, additionalFiles);
+ return session;
+ }
+
+ SettableFuture<WmTezSession> future = SettableFuture.create();
+ currentLock.lock();
+ try {
+ if (current.toReopen.containsKey(wmTezSession)) {
+ throw new AssertionError("The session is being reopened more than once " + session);
+ }
+ current.toReopen.put(wmTezSession, future);
+ notifyWmThreadUnderLock();
+ } finally {
+ currentLock.unlock();
+ }
+ return future.get();
+ }
+
+ @Override
+ public void closeAndReopenExpiredSession(TezSessionPoolSession session) throws Exception {
+ // By definition, this session is not in use and can no longer be in use, so it only
+ // affects the session pool. We can handle this inline.
+ tezAmPool.replaceSession(ensureOwnedSession(session), false, null);
+ }
+
+ // ======= VARIOUS UTILITY METHOD
+
+ private void notifyWmThreadUnderLock() {
+ if (hasChanges) return;
+ hasChanges = true;
+ hasChangesCondition.signalAll();
+ }
+
private WmTezSession checkSessionForReuse(TezSessionState session) throws Exception {
if (session == null) return null;
WmTezSession result = null;
@@ -391,7 +1035,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
if (result.isOwnedBy(this)) {
return result;
}
- // TODO: this should never happen, at least for now. Throw?
+ // This should never happen, at least for now. Throw?
LOG.warn("Attempting to reuse a session not belonging to us: " + result);
result.returnToSessionManager();
return null;
@@ -405,15 +1049,6 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
return null;
}
- private double updatePoolAllocations(List<WmTezSession> sessions, double totalFraction) {
- // TODO: real implementation involving in-the-pool policy interface, etc.
- double allocation = totalFraction / sessions.size();
- for (WmTezSession session : sessions) {
- session.setClusterFraction(allocation);
- }
- return totalFraction;
- }
-
private void validateConfig(HiveConf conf) throws HiveException {
String queueName = conf.get(TezConfiguration.TEZ_QUEUE_NAME);
if ((queueName != null) && !queueName.isEmpty()) {
@@ -429,69 +1064,18 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
}
- public void start() throws Exception {
- sessions.startInitialSessions();
- if (expirationTracker != null) {
- expirationTracker.start();
- }
- allocationManager.start();
- }
-
- public void stop() throws Exception {
- List<TezSessionPoolSession> sessionsToClose = null;
- synchronized (openSessions) {
- sessionsToClose = new ArrayList<>(openSessions.keySet());
- }
-
- for (TezSessionPoolSession sessionState : sessionsToClose) {
- sessionState.close(false);
- }
-
- if (expirationTracker != null) {
- expirationTracker.stop();
- }
- allocationManager.stop();
-
- INSTANCE = null;
- }
-
- private WmTezSession createSession() {
- WmTezSession session = createSessionObject(TezSessionState.makeSessionId());
+ private WmTezSession createSession(HiveConf conf) {
+ WmTezSession session = createSessionObject(TezSessionState.makeSessionId(), conf);
session.setQueueName(yarnQueue);
session.setDefault();
- LOG.info("Created new interactive session " + session.getSessionId());
+ LOG.info("Created new interactive session object " + session.getSessionId());
return session;
}
@VisibleForTesting
- protected WmTezSession createSessionObject(String sessionId) {
- return new WmTezSession(sessionId, this, expirationTracker, new HiveConf(conf));
- }
-
- @Override
- public void returnAfterUse(TezSessionPoolSession session) throws Exception {
- returnAfterUse(session, true);
- }
-
- private void returnAfterUse(
- TezSessionPoolSession session, boolean releaseParallelism) throws Exception {
- boolean isInterrupted = Thread.interrupted();
- try {
- WmTezSession wmSession = ensureOwnedSession(session);
- redistributePoolAllocations(wmSession.getPoolName(), null, wmSession, releaseParallelism);
- sessions.returnSession((WmTezSession) session);
- } finally {
- // Reset the interrupt status.
- if (isInterrupted) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- /** Closes a running (expired) pool session and reopens it. */
- @Override
- public void closeAndReopenPoolSession(TezSessionPoolSession oldSession) throws Exception {
- sessions.replaceSession(ensureOwnedSession(oldSession), createSession(), false, null, null);
+ protected WmTezSession createSessionObject(String sessionId, HiveConf conf) {
+ conf = (conf == null) ? new HiveConf(this.conf) : conf;
+ return new WmTezSession(sessionId, this, expirationTracker, conf);
}
private WmTezSession ensureOwnedSession(TezSessionState oldSession) {
@@ -506,9 +1090,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
@Override
public void registerOpenSession(TezSessionPoolSession session) {
synchronized (openSessions) {
- openSessions.put(session, null);
+ openSessions.put(session, true);
}
- updateSessionsTriggers();
}
/** Called by TezSessionPoolSession when closed. */
@@ -517,20 +1100,6 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
synchronized (openSessions) {
openSessions.remove(session);
}
- updateSessionsTriggers();
- }
-
- private void updateSessionsTriggers() {
- if (sessionTriggerProvider != null) {
- List<TezSessionState> openSessions = new ArrayList<>();
- List<Trigger> activeTriggers = new ArrayList<>();
- for (PoolState poolState : pools.values()) {
- activeTriggers.addAll(poolState.getTriggers());
- openSessions.addAll(poolState.sessions);
- }
- sessionTriggerProvider.setOpenSessions(Collections.unmodifiableList(openSessions));
- sessionTriggerProvider.setActiveTriggers(Collections.unmodifiableList(activeTriggers));
- }
}
@VisibleForTesting
@@ -538,41 +1107,25 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
return expirationTracker;
}
- @Override
- public TezSessionState reopen(TezSessionState session, Configuration conf,
- String[] additionalFiles) throws Exception {
- WmTezSession oldSession = ensureOwnedSession(session), newSession = createSession();
- newSession.setPoolName(oldSession.getPoolName());
- HiveConf sessionConf = session.getConf();
- if (sessionConf == null) {
- LOG.warn("Session configuration is null for " + session);
- // default queue name when the initial session was created
- sessionConf = new HiveConf(conf, WorkloadManager.class);
- }
- sessions.replaceSession(oldSession, newSession, true, additionalFiles, sessionConf);
- // We are going to immediately give this session out, so ensure AM registry.
- if (!ensureAmIsRegistered(newSession)) {
- throw new Exception("Session is not usable after reopen");
- }
- // Do not release the parallelism - we are just replacing the session in the same pool.
- redistributePoolAllocations(oldSession.getPoolName(), newSession, oldSession, false);
- return newSession;
+ @VisibleForTesting
+ int getNumSessions() {
+ return tezAmPool.getInitialSize();
}
- @Override
- public void destroy(TezSessionState session) throws Exception {
- LOG.warn("Closing a pool session because of retry failure.");
- // We never want to lose pool sessions. Replace it instead; al trigger duck redistribution.
- WmTezSession wmSession = ensureOwnedSession(session);
- closeAndReopenPoolSession(wmSession);
- redistributePoolAllocations(wmSession.getPoolName(), null, wmSession, true);
+ protected final HiveConf getConf() {
+ return conf;
}
- @VisibleForTesting
- int getNumSessions() {
- return sessions.getInitialSize();
+ public List<String> getTriggerCounterNames() {
+ List<Trigger> activeTriggers = sessionTriggerProvider.getActiveTriggers();
+ List<String> counterNames = new ArrayList<>();
+ for (Trigger trigger : activeTriggers) {
+ counterNames.add(trigger.getExpression().getCounterLimit().getName());
+ }
+ return counterNames;
}
+
@Override
SessionTriggerProvider getSessionTriggerProvider() {
return sessionTriggerProvider;
@@ -588,38 +1141,319 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
return triggerValidatorRunnable;
}
- @VisibleForTesting
- public Map<String, PoolState> getPools() {
- return pools;
+ /**
+ * State of a single pool.
+ * Unless otherwise specified, the members are only modified by the master thread.
+ */
+ private static class PoolState {
+ // Add stuff here as WM is implemented.
+ private final LinkedList<SessionInitContext> initializingSessions = new LinkedList<>();
+ // Note: the list is expected to be a few items; if it's longer we may want an IHM.
+ private final LinkedList<WmTezSession> sessions = new LinkedList<>();
+ private final LinkedList<GetRequest> queue = new LinkedList<>();
+
+ private final String fullName;
+ private double finalFraction;
+ private double finalFractionRemaining;
+ private int queryParallelism = -1;
+ private List<Trigger> triggers = new ArrayList<>();
+
+ public PoolState(String fullName, int queryParallelism, double fraction) {
+ this.fullName = fullName;
+ update(queryParallelism, fraction, null, null);
+ }
+
+ public int getTotalActiveSessions() {
+ return sessions.size() + initializingSessions.size();
+ }
+
+ public void update(int queryParallelism, double fraction,
+ List<WmTezSession> toKill, EventState e) {
+ this.finalFraction = this.finalFractionRemaining = fraction;
+ this.queryParallelism = queryParallelism;
+ // TODO: two possible improvements
+ // 1) Right now we kill all the queries here; we could just kill -qpDelta.
+ // 2) After the queries are killed queued queries would take their place.
+ // If we could somehow restart queries we could instead put them at the front
+ // of the queue (esp. in conjunction with (1)) and rerun them.
+ if (queryParallelism < getTotalActiveSessions()) {
+ extractAllSessionsToKill("The query pool was resized by administrator", e.toReuse, toKill);
+ }
+ // We will requeue, and not kill, the queries that are not running yet.
+ // Insert them all before the get requests from this iteration.
+ GetRequest req;
+ while ((req = queue.pollLast()) != null) {
+ e.getRequests.addFirst(req);
+ }
+ }
+
+ public void destroy(List<WmTezSession> toKill, LinkedList<GetRequest> globalQueue,
+ IdentityHashMap<WmTezSession, GetRequest> toReuse) {
+ extractAllSessionsToKill("The query pool was removed by administrator", toReuse, toKill);
+ // All the pending get requests should just be requeued elsewhere.
+ // Note that we never queue session reuse so sessionToReuse would be null.
+ globalQueue.addAll(0, queue);
+ queue.clear();
+ }
+
+ public double updateAllocationPercentages() {
+ // TODO: real implementation involving in-the-pool policy interface, etc.
+ double allocation = finalFractionRemaining / (sessions.size() + initializingSessions.size());
+ for (WmTezSession session : sessions) {
+ session.setClusterFraction(allocation);
+ }
+ // Do not give out the capacity of the initializing sessions to the running ones;
+ // we expect init to be fast.
+ return finalFractionRemaining - allocation * initializingSessions.size();
+ }
+
+ @Override
+ public String toString() {
+ return "[" + fullName + ", query parallelism " + queryParallelism
+ + ", fraction of the cluster " + finalFraction + ", fraction used by child pools "
+ + (finalFraction - finalFractionRemaining) + ", active sessions " + sessions.size()
+ + ", initializing sessions " + initializingSessions.size() + "]";
+ }
+
+ private void extractAllSessionsToKill(String killReason,
+ IdentityHashMap<WmTezSession, GetRequest> toReuse, List<WmTezSession> toKill) {
+ for (WmTezSession sessionToKill : sessions) {
+ resetRemovedSession(sessionToKill, killReason, toReuse);
+ toKill.add(sessionToKill);
+ }
+ sessions.clear();
+ for (SessionInitContext initCtx : initializingSessions) {
+ // It is possible that the background init thread has finished in parallel, queued
+ // the message for us but also returned the session to the user.
+ WmTezSession sessionToKill = initCtx.cancelAndExtractSessionIfDone(killReason);
+ if (sessionToKill == null) {
+ continue; // Async op in progress; the callback will take care of this.
+ }
+ resetRemovedSession(sessionToKill, killReason, toReuse);
+ toKill.add(sessionToKill);
+ }
+ initializingSessions.clear();
+ }
+
+ private void resetRemovedSession(WmTezSession sessionToKill, String killReason,
+ IdentityHashMap<WmTezSession, GetRequest> toReuse) {
+ assert killReason != null;
+ sessionToKill.setIsIrrelevantForWm(killReason);
+ sessionToKill.clearWm();
+ GetRequest req = toReuse.remove(sessionToKill);
+ if (req != null) {
+ req.sessionToReuse = null;
+ }
+ }
+
+ @VisibleForTesting
+ // will change in HIVE-17809
+ public void setTriggers(final List<Trigger> triggers) {
+ this.triggers = triggers;
+ }
+
+ public List<Trigger> getTriggers() {
+ return triggers;
+ }
}
- protected final HiveConf getConf() {
- return conf;
+
+ private enum SessionInitState {
+ GETTING, // We are getting a session from TezSessionPool
+ WAITING_FOR_REGISTRY, // We have the session but it doesn't have registry info yet.
+ DONE, // We have the session with registry info, or we have failed.
+ CANCELED // The master thread has CANCELED this and will never look at it again.
}
- public List<String> getTriggerCounterNames() {
- List<Trigger> activeTriggers = sessionTriggerProvider.getActiveTriggers();
- List<String> counterNames = new ArrayList<>();
- for (Trigger trigger : activeTriggers) {
- counterNames.add(trigger.getExpression().getCounterLimit().getName());
+ /**
+ * The class that serves as a synchronization point, and future callback,
+ * for async session initialization, as well as parallel cancellation.
+ */
+ private final class SessionInitContext implements FutureCallback<WmTezSession> {
+ private final String poolName;
+
+ private final ReentrantLock lock = new ReentrantLock();
+ private WmTezSession session;
+ private SettableFuture<WmTezSession> future;
+ private SessionInitState state;
+ private String cancelReason;
+
+ public SessionInitContext(SettableFuture<WmTezSession> future, String poolName) {
+ this.state = SessionInitState.GETTING;
+ this.future = future;
+ this.poolName = poolName;
+ }
+
+ @Override
+ public void onSuccess(WmTezSession session) {
+ SessionInitState oldState;
+ SettableFuture<WmTezSession> future = null;
+ lock.lock();
+ try {
+ oldState = state;
+ switch (oldState) {
+ case GETTING: {
+ LOG.debug("Received a session from AM pool {}", session);
+ assert this.state == SessionInitState.GETTING;
+ session.setPoolName(poolName);
+ session.setQueueName(yarnQueue);
+ this.session = session;
+ this.state = SessionInitState.WAITING_FOR_REGISTRY;
+ break;
+ }
+ case WAITING_FOR_REGISTRY: {
+ assert this.session != null;
+ this.state = SessionInitState.DONE;
+ assert session == this.session;
+ future = this.future;
+ this.future = null;
+ break;
+ }
+ case CANCELED: {
+ future = this.future;
+ this.session = null;
+ this.future = null;
+ break;
+ }
+ default: {
+ future = this.future;
+ this.future = null;
+ break;
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ switch (oldState) {
+ case GETTING: {
+ ListenableFuture<WmTezSession> waitFuture = session.waitForAmRegistryAsync(
+ amRegistryTimeoutMs, timeoutPool);
+ Futures.addCallback(waitFuture, this);
+ break;
+ }
+ case WAITING_FOR_REGISTRY: {
+ // Notify the master thread and the user.
+ future.set(session);
+ notifyInitializationCompleted(this);
+ break;
+ }
+ case CANCELED: {
+ // Return session to the pool; we can do it directly here.
+ future.setException(new HiveException(
+ "The query was killed by workload management: " + cancelReason));
+ session.setPoolName(null);
+ session.setClusterFraction(0f);
+ tezAmPool.returnSession(session);
+ break;
+ }
+ default: {
+ AssertionError error = new AssertionError("Unexpected state " + state);
+ future.setException(error);
+ throw error;
+ }
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ SettableFuture<WmTezSession> future;
+ WmTezSession session;
+ boolean wasCANCELED = false;
+ lock.lock();
+ try {
+ wasCANCELED = (state == SessionInitState.CANCELED);
+ session = this.session;
+ future = this.future;
+ this.future = null;
+ this.session = null;
+ if (!wasCANCELED) {
+ this.state = SessionInitState.DONE;
+ }
+ } finally {
+ lock.unlock();
+ }
+ future.setException(t);
+ if (!wasCANCELED) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Queueing the initialization failure with " + session);
+ }
+ notifyInitializationCompleted(this); // Report failure to the main thread.
+ }
+ if (session != null) {
+ session.clearWm();
+ // We can just restart the session if we have received one.
+ try {
+ tezAmPool.replaceSession(session, false, null);
+ } catch (Exception e) {
+ LOG.error("Failed to restart a failed session", e);
+ }
+ }
+ }
+
+ /** Cancel the async operation (even if it's done), and return the session if done. */
+ public WmTezSession cancelAndExtractSessionIfDone(String cancelReason) {
+ lock.lock();
+ try {
+ SessionInitState state = this.state;
+ this.state = SessionInitState.CANCELED;
+ this.cancelReason = cancelReason;
+ if (state == SessionInitState.DONE) {
+ WmTezSession result = this.session;
+ this.session = null;
+ return result;
+ } else {
+ // In the states where a background operation is in progress, wait for the callback.
+ // Also, ignore any duplicate calls; also don't kill failed ones - handled elsewhere.
+ if (state == SessionInitState.CANCELED) {
+ LOG.warn("Duplicate call to extract " + session);
+ }
+ return null;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /** Extracts the session and cancel the operation, both only if done. */
+ public boolean extractSessionAndCancelIfDone(List<WmTezSession> results) {
+ lock.lock();
+ try {
+ if (state != SessionInitState.DONE) return false;
+ this.state = SessionInitState.CANCELED;
+ if (this.session != null) {
+ results.add(this.session);
+ } // Otherwise we have failed; the callback has taken care of the failure.
+ this.session = null;
+ return true;
+ } finally {
+ lock.unlock();
+ }
}
- return counterNames;
}
+
// TODO: temporary until real WM schema is created.
public static class TmpHivePool {
private final String name;
private final List<TmpHivePool> children;
private final int queryParallelism;
private final double resourceFraction;
+ private final List<Trigger> triggers;
public TmpHivePool(String name,
List<TmpHivePool> children, int queryParallelism, double resourceFraction) {
+ this(name, children, queryParallelism, resourceFraction, null);
+ }
+
+ public TmpHivePool(String name,
+ List<TmpHivePool> children, int queryParallelism, double resourceFraction,
+ List<Trigger> triggers) {
this.name = name;
this.children = children;
this.queryParallelism = queryParallelism;
this.resourceFraction = resourceFraction;
+ this.triggers = triggers;
}
public String getName() {
@@ -679,4 +1513,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
return mappings;
}
}
+
+ @VisibleForTesting
+ TezSessionPool<WmTezSession> getTezAmPool() {
+ return tezAmPool;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
index 209cf57..2623a0e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
@@ -92,23 +92,18 @@ public class LlapClusterStateForCompile {
return numExecutorsPerNode;
}
- public boolean initClusterInfo() {
- return initClusterInfo(true);
- }
-
- private boolean isUpdateNeeded(boolean allowUpdate) {
+ private boolean isUpdateNeeded() {
Long lastUpdateLocal = lastClusterUpdateNs;
if (lastUpdateLocal == null) return true;
- if (!allowUpdate) return false;
long elapsed = System.nanoTime() - lastUpdateLocal;
return (elapsed >= updateIntervalNs);
}
- public boolean initClusterInfo(boolean allowUpdate) {
- if (!isUpdateNeeded(allowUpdate)) return true;
+ public boolean initClusterInfo() {
+ if (!isUpdateNeeded()) return true;
synchronized (updateInfoLock) {
// At this point, no one will take the write lock and update, so we can do the last check.
- if (!isUpdateNeeded(allowUpdate)) return true;
+ if (!isUpdateNeeded()) return true;
if (svc == null) {
try {
svc = LlapRegistryService.getClient(conf);
http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
index 59efd43..5248454 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
@@ -19,14 +19,17 @@
package org.apache.hadoop.hive.ql.exec.tez;
-import java.util.Collection;
-import org.apache.hadoop.fs.Path;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
-import java.net.URISyntaxException;
-
+import java.util.Collection;
+import java.util.concurrent.ScheduledExecutorService;
import javax.security.auth.login.LoginException;
-
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.security.UserGroupInformation;
@@ -45,6 +48,7 @@ public class SampleTezSessionState extends WmTezSession {
private final HiveConf hiveConf;
private String user;
private boolean doAsEnabled;
+ private ListenableFuture<Boolean> waitForAmRegFuture;
public SampleTezSessionState(
String sessionId, TezSessionPoolSession.Manager parent, HiveConf conf) {
@@ -52,6 +56,13 @@ public class SampleTezSessionState extends WmTezSession {
? ((TezSessionPoolManager)parent).getExpirationTracker() : null, conf);
this.sessionId = sessionId;
this.hiveConf = conf;
+ waitForAmRegFuture = createDefaultWaitForAmRegistryFuture();
+ }
+
+ private SettableFuture<Boolean> createDefaultWaitForAmRegistryFuture() {
+ SettableFuture<Boolean> noWait = SettableFuture.create();
+ noWait.set(true); // By default, do not wait.
+ return noWait;
}
@Override
@@ -106,4 +117,27 @@ public class SampleTezSessionState extends WmTezSession {
public boolean getDoAsEnabled() {
return this.doAsEnabled;
}
+
+ @Override
+ public SettableFuture<WmTezSession> waitForAmRegistryAsync(
+ int timeoutMs, ScheduledExecutorService timeoutPool) {
+ final SampleTezSessionState session = this;
+ final SettableFuture<WmTezSession> future = SettableFuture.create();
+ Futures.addCallback(waitForAmRegFuture, new FutureCallback<Boolean>() {
+ @Override
+ public void onSuccess(Boolean result) {
+ future.set(session);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ future.setException(t);
+ }
+ });
+ return future;
+ }
+
+ public void setWaitForAmRegistryFuture(ListenableFuture<Boolean> future) {
+ waitForAmRegFuture = future != null ? future : createDefaultWaitForAmRegistryFuture();
+ }
}
[3/3] hive git commit: HIVE-17841 : implement applying the resource
plan (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Posted by se...@apache.org.
HIVE-17841 : implement applying the resource plan (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/77b99e4c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/77b99e4c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/77b99e4c
Branch: refs/heads/master
Commit: 77b99e4c9e39b9905ff98a002ca35f7edd9b7b30
Parents: c5a9673
Author: sergey <se...@apache.org>
Authored: Thu Nov 2 11:30:38 2017 -0700
Committer: sergey <se...@apache.org>
Committed: Thu Nov 2 11:30:38 2017 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 3 +
.../hive/jdbc/TestTriggersWorkloadManager.java | 12 +-
.../hadoop/hive/ql/exec/tez/AmPluginNode.java | 2 +-
.../ql/exec/tez/GuaranteedTasksAllocator.java | 29 +-
.../exec/tez/LlapPluginEndpointClientImpl.java | 2 +-
.../ql/exec/tez/QueryAllocationManager.java | 3 +-
.../ql/exec/tez/SessionExpirationTracker.java | 16 +-
.../hadoop/hive/ql/exec/tez/TezSessionPool.java | 371 ++++-
.../hive/ql/exec/tez/TezSessionPoolManager.java | 74 +-
.../hive/ql/exec/tez/TezSessionPoolSession.java | 26 +-
.../hive/ql/exec/tez/TezSessionState.java | 2 +-
.../hive/ql/exec/tez/UserPoolMapping.java | 16 +-
.../hadoop/hive/ql/exec/tez/WmTezSession.java | 109 +-
.../hive/ql/exec/tez/WorkloadManager.java | 1549 ++++++++++++++----
.../physical/LlapClusterStateForCompile.java | 13 +-
.../hive/ql/exec/tez/SampleTezSessionState.java | 44 +-
.../hive/ql/exec/tez/TestWorkloadManager.java | 365 ++++-
17 files changed, 2093 insertions(+), 543 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index a3c853a..b65bdab 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2419,6 +2419,9 @@ public class HiveConf extends Configuration {
HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE("hive.server2.tez.interactive.queue", "",
"A single YARN queues to use for Hive Interactive sessions. When this is specified,\n" +
"workload management is enabled and used for these sessions."),
+ HIVE_SERVER2_TEZ_WM_WORKER_THREADS("hive.server2.tez.wm.worker.threads", 4,
+ "Number of worker threads to use to perform the synchronous operations with Tez\n" +
+ "sessions for workload management (e.g. opening, closing, etc.)"),
HIVE_SERVER2_TEZ_WM_AM_REGISTRY_TIMEOUT("hive.server2.tez.wm.am.registry.timeout", "30s",
new TimeValidator(TimeUnit.SECONDS),
"The timeout for AM registry registration, after which (on attempting to use the\n" +
http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java
index fdb660a..0ec7e85 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java
@@ -16,17 +16,21 @@
package org.apache.hive.jdbc;
+import com.google.common.collect.Lists;
import java.io.File;
import java.net.URL;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager;
+import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpHivePool;
+import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpResourcePlan;
+import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpUserMapping;
+import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpUserMappingType;
import org.apache.hadoop.hive.ql.wm.Trigger;
import org.apache.hive.jdbc.miniHS2.MiniHS2;
import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType;
@@ -70,6 +74,10 @@ public class TestTriggersWorkloadManager extends TestTriggersTezSessionPoolManag
@Override
protected void setupTriggers(final List<Trigger> triggers) throws Exception {
WorkloadManager wm = WorkloadManager.getInstance();
- wm.getPools().get("llap").setTriggers(triggers);
+ TmpResourcePlan rp = new TmpResourcePlan(Lists.newArrayList(new TmpHivePool(
+ "llap", null, 1, 1.0f, triggers)), Lists.newArrayList(
+ new TmpUserMapping(TmpUserMappingType.DEFAULT, "", "llap", 1)));
+ wm.updateResourcePlanAsync(rp).get(10, TimeUnit.SECONDS);
}
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java
index 35d380c..e4a21cb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java
@@ -39,5 +39,5 @@ public interface AmPluginNode {
}
}
- AmPluginInfo waitForAmPluginInfo(int timeoutMs) throws InterruptedException, TimeoutException;
+ AmPluginInfo getAmPluginInfo() throws InterruptedException, TimeoutException;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
index d978a25..53dd698 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
@@ -64,6 +64,8 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
@Override
public void start() {
+ // Try to get cluster information once, to avoid immediate cluster-update event in WM.
+ clusterState.initClusterInfo();
clusterStateUpdateThread.start();
}
@@ -72,9 +74,13 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
clusterStateUpdateThread.interrupt(); // Don't wait for the thread.
}
+ public void initClusterInfo() {
+ clusterState.initClusterInfo();
+ }
+
@VisibleForTesting
protected int getExecutorCount(boolean allowUpdate) {
- if (!clusterState.initClusterInfo(allowUpdate)) {
+ if (allowUpdate && !clusterState.initClusterInfo()) {
LOG.warn("Failed to get LLAP cluster information for "
+ HiveConf.getTrimmedVar(this.conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS)
+ "; we may rely on outdated cluster status");
@@ -92,15 +98,18 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
}
@Override
- public void updateSessionsAsync(double totalMaxAlloc, List<WmTezSession> sessionsToUpdate) {
- // Do not make a remote call unless we have no information at all.
+ public void updateSessionsAsync(Double totalMaxAlloc, List<WmTezSession> sessionsToUpdate) {
+ // Do not make a remote call under any circumstances - this is supposed to be async.
int totalCount = getExecutorCount(false);
- int totalToDistribute = (int)Math.round(totalCount * totalMaxAlloc);
+ int totalToDistribute = -1;
+ if (totalMaxAlloc != null) {
+ totalToDistribute = (int)Math.round(totalCount * totalMaxAlloc);
+ }
double lastDelta = 0;
for (int i = 0; i < sessionsToUpdate.size(); ++i) {
WmTezSession session = sessionsToUpdate.get(i);
int intAlloc = -1;
- if (i + 1 == sessionsToUpdate.size()) {
+ if (i + 1 == sessionsToUpdate.size() && totalToDistribute >= 0) {
intAlloc = totalToDistribute;
// We rely on the caller to supply a reasonable total; we could log a warning
// if this doesn't match the allocation of the last session beyond some threshold.
@@ -119,10 +128,12 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
intAlloc = (int)roundedAlloc;
}
// Make sure we don't give out more than allowed due to double/rounding artifacts.
- if (intAlloc > totalToDistribute) {
- intAlloc = totalToDistribute;
+ if (totalToDistribute >= 0) {
+ if (intAlloc > totalToDistribute) {
+ intAlloc = totalToDistribute;
+ }
+ totalToDistribute -= intAlloc;
}
- totalToDistribute -= intAlloc;
// This will only send update if it's necessary.
updateSessionAsync(session, intAlloc);
}
@@ -162,7 +173,7 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
// RPC already handles retries, so we will just try to kill the session here.
// This will cause the current query to fail. We could instead keep retrying.
try {
- session.destroy();
+ session.handleUpdateError();
} catch (Exception e) {
LOG.error("Failed to kill the session " + session);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapPluginEndpointClientImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapPluginEndpointClientImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapPluginEndpointClientImpl.java
index 45c3e38..4f373162 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapPluginEndpointClientImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapPluginEndpointClientImpl.java
@@ -114,7 +114,7 @@ public class LlapPluginEndpointClientImpl extends
private void ensureInfo() throws InterruptedException, TimeoutException {
if (info != null) return;
- info = node.waitForAmPluginInfo(0); // Don't wait - should already be initialized.
+ info = node.getAmPluginInfo(); // Don't wait - should already be initialized.
if (info == null) {
throw new AssertionError("A request was created without AM plugin info for " + node);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
index a326db3..acacfd0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
@@ -26,7 +26,8 @@ interface QueryAllocationManager {
* Updates the session allocations asynchoronously.
* @param totalMaxAlloc The total maximum fraction of the cluster to allocate. Used to
* avoid various artifacts, esp. with small numbers and double weirdness.
+ * Null means the total is unknown.
* @param sessions Sessions to update based on their allocation fraction.
*/
- void updateSessionsAsync(double totalMaxAlloc, List<WmTezSession> sessions);
+ void updateSessionsAsync(Double totalMaxAlloc, List<WmTezSession> sessions);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java
index da93a3a..77ff484 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java
@@ -48,7 +48,7 @@ class SessionExpirationTracker {
private volatile SessionState initSessionState;
interface RestartImpl {
- void closeAndReopenPoolSession(TezSessionPoolSession session) throws Exception;
+ void closeAndReopenExpiredSession(TezSessionPoolSession session) throws Exception;
}
public static SessionExpirationTracker create(HiveConf conf, RestartImpl restartImpl) {
@@ -114,7 +114,7 @@ class SessionExpirationTracker {
TezSessionPoolSession next = restartQueue.take();
LOG.info("Restarting the expired session [" + next + "]");
try {
- sessionRestartImpl.closeAndReopenPoolSession(next);
+ sessionRestartImpl.closeAndReopenExpiredSession(next);
} catch (InterruptedException ie) {
throw ie;
} catch (Exception e) {
@@ -225,12 +225,12 @@ class SessionExpirationTracker {
expirationQueue.remove(session);
}
+ public void closeAndRestartExpiredSessionAsync(TezSessionPoolSession session) {
+ restartQueue.add(session);
+ }
+
public void closeAndRestartExpiredSession(
- TezSessionPoolSession session, boolean isAsync) throws Exception {
- if (isAsync) {
- restartQueue.add(session);
- } else {
- sessionRestartImpl.closeAndReopenPoolSession(session);
- }
+ TezSessionPoolSession session) throws Exception {
+ sessionRestartImpl.closeAndReopenExpiredSession(session);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
index b67c933..9c2b3ea 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
@@ -17,17 +17,25 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
-
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+import com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashSet;
-import java.util.Queue;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Set;
-import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -45,35 +53,48 @@ import org.slf4j.LoggerFactory;
class TezSessionPool<SessionType extends TezSessionPoolSession> {
private static final Logger LOG = LoggerFactory.getLogger(TezSessionPool.class);
- /** A queue for initial sessions that have not been started yet. */
- private final Queue<SessionType> initialSessions =
- new ConcurrentLinkedQueue<SessionType>();
+ public interface SessionObjectFactory<SessionType> {
+ SessionType create(SessionType oldSession);
+ }
private final HiveConf initConf;
- private int initialSize;
+ private int initialSize = 0; // For testing only.
+ private final SessionObjectFactory<SessionType> sessionObjFactory;
- // TODO: eventually, this will need to support resize. That would probably require replacement
- // with a RW lock, a semaphore and linked list.
- private BlockingDeque<SessionType> defaultQueuePool;
+ private final ReentrantLock poolLock = new ReentrantLock(true);
+ private final Condition notEmpty = poolLock.newCondition();
+ private final LinkedList<SessionType> pool = new LinkedList<>();
+ private final LinkedList<SettableFuture<SessionType>> asyncRequests = new LinkedList<>();
+ /**
+ * The number of sessions that needs to be started or killed because of the resize calls on
+ * the pool. When increasing the size, we set this to a positive number and start new sessions
+ * on background threads, gradually bringing it back to 0.
+ * When decreasing the size, we try to kill as many existing sessions as we can; if that is
+ * not enough because the sessions are in use or being restarted, we kill them as they are
+ * re-added to the pool.
+ * Repeated calls to resize adjust the delta to ensure correctness between different resizes.
+ */
+ private final AtomicInteger deltaRemaining = new AtomicInteger();
private final String amRegistryName;
private final TezAmRegistryImpl amRegistry;
private final ConcurrentHashMap<String, SessionType> bySessionId =
new ConcurrentHashMap<>();
+ // Preserved at initialization time to have a session to use during resize.
+ // TODO: rather, Tez sessions should not depend on SessionState.
+ private SessionState parentSessionState;
-
- TezSessionPool(HiveConf initConf, int numSessionsTotal, boolean useAmRegistryIfPresent) {
+ TezSessionPool(HiveConf initConf, int numSessionsTotal, boolean useAmRegistryIfPresent,
+ SessionObjectFactory<SessionType> sessionFactory) {
this.initConf = initConf;
- assert numSessionsTotal > 0;
- defaultQueuePool = new LinkedBlockingDeque<>(numSessionsTotal);
+ this.initialSize = numSessionsTotal;
this.amRegistry = useAmRegistryIfPresent ? TezAmRegistryImpl.create(initConf, true) : null;
this.amRegistryName = amRegistry == null ? null : amRegistry.getRegistryName();
+ this.sessionObjFactory = sessionFactory;
}
- void startInitialSessions() throws Exception {
- initialSize = initialSessions.size();
- if (initialSessions.isEmpty()) return;
+ void start() throws Exception {
if (amRegistry != null) {
amRegistry.start();
amRegistry.initializeWithoutRegistering();
@@ -82,82 +103,145 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
amRegistry.populateCache(true);
}
- int threadCount = Math.min(initialSessions.size(),
+ int threadCount = Math.min(initialSize,
HiveConf.getIntVar(initConf, ConfVars.HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS));
Preconditions.checkArgument(threadCount > 0);
if (threadCount == 1) {
- while (true) {
- SessionType session = initialSessions.poll();
+ for (int i = 0; i < initialSize; ++i) {
+ SessionType session = sessionObjFactory.create(null);
if (session == null) break;
startInitialSession(session);
}
} else {
- final SessionState parentSessionState = SessionState.get();
- // The runnable has no mutable state, so each thread can run the same thing.
- final AtomicReference<Exception> firstError = new AtomicReference<>(null);
- Runnable runnable = new Runnable() {
- public void run() {
- if (parentSessionState != null) {
- SessionState.setCurrentSessionState(parentSessionState);
- }
- while (true) {
- SessionType session = initialSessions.poll();
- if (session == null) break;
- if (firstError.get() != null) break; // Best-effort.
- try {
- startInitialSession(session);
- } catch (Exception e) {
- if (!firstError.compareAndSet(null, e)) {
- LOG.error("Failed to start session; ignoring due to previous error", e);
- }
- break;
- }
- }
+ final AtomicInteger remaining = new AtomicInteger(initialSize);
+ this.parentSessionState = SessionState.get();
+ @SuppressWarnings("unchecked")
+ FutureTask<Boolean>[] threadTasks = new FutureTask[threadCount];
+ for (int i = threadTasks.length - 1; i >= 0; --i) {
+ threadTasks[i] = new FutureTask<Boolean>(new CreateSessionsRunnable(remaining));
+ if (i == 0) {
+ // Start is blocking, so run one of the tasks on the main thread.
+ threadTasks[i].run();
+ } else {
+ new Thread(threadTasks[i], "Tez session init " + i).start();
}
- };
- Thread[] threads = new Thread[threadCount - 1];
- for (int i = 0; i < threads.length; ++i) {
- threads[i] = new Thread(runnable, "Tez session init " + i);
- threads[i].start();
- }
- runnable.run();
- for (int i = 0; i < threads.length; ++i) {
- threads[i].join();
}
- Exception ex = firstError.get();
- if (ex != null) {
- throw ex;
+ for (int i = 0; i < threadTasks.length; ++i) {
+ threadTasks[i].get();
}
}
}
- void addInitialSession(SessionType session) {
- initialSessions.add(session);
- }
-
SessionType getSession() throws Exception {
while (true) {
- SessionType result = defaultQueuePool.take();
- if (result.tryUse()) return result;
+ SessionType result = null;
+ poolLock.lock();
+ try {
+ while ((result = pool.poll()) == null) {
+ notEmpty.await(100, TimeUnit.MILLISECONDS);
+ }
+ } finally {
+ poolLock.unlock();
+ }
+ if (result.tryUse(false)) return result;
LOG.info("Couldn't use a session [" + result + "]; attempting another one");
}
}
- void returnSession(SessionType session) throws Exception {
+ ListenableFuture<SessionType> getSessionAsync() throws Exception {
+ SettableFuture<SessionType> future = SettableFuture.create();
+ poolLock.lock();
+ try {
+ // Try to get the session quickly.
+ while (true) {
+ SessionType result = pool.poll();
+ if (result == null) break;
+ if (!result.tryUse(false)) continue;
+ future.set(result);
+ return future;
+ }
+ // The pool is empty; queue the request.
+ asyncRequests.add(future);
+ return future;
+ } finally {
+ poolLock.unlock();
+ }
+ }
+
+ void returnSession(SessionType session) {
+ returnSessionInternal(session, false);
+ }
+
+ boolean returnSessionAsync(SessionType session) {
+ return returnSessionInternal(session, true);
+ }
+
+ private boolean returnSessionInternal(SessionType session, boolean isAsync) {
// Make sure that if the session is returned to the pool, it doesn't live in the global.
SessionState sessionState = SessionState.get();
if (sessionState != null) {
sessionState.setTezSession(null);
}
- if (session.stopUsing()) {
- defaultQueuePool.putFirst(session);
+ if (!session.stopUsing()) return true; // The session will be restarted and return to us.
+ boolean canPutBack = putSessionBack(session, true);
+ if (canPutBack) return true;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closing an unneeded returned session " + session);
+ }
+
+ if (isAsync) return false; // The caller is responsible for destroying the session.
+ try {
+ session.close(false);
+ } catch (Exception ex) {
+ LOG.error("Failed to close " + session, ex);
+ }
+ return true;
+ }
+
+ /**
+ * Puts session back into the pool.
+ * @return true if the session has been put back; false if it's not needed and should be killed.
+ */
+ private boolean putSessionBack(SessionType session, boolean isFirst) {
+ SettableFuture<SessionType> future = null;
+ poolLock.lock();
+ try {
+ // See if we need to kill some sessions because the pool was resized down while
+ // a bunch of sessions were outstanding. See also deltaRemaining javadoc.
+ while (true) {
+ int remainingToKill = -deltaRemaining.get();
+ if (remainingToKill <= 0) break; // No need to kill anything.
+ if (deltaRemaining.compareAndSet(-remainingToKill, -remainingToKill + 1)) {
+ return false;
+ }
+ }
+ // If there are async requests, satisfy them first.
+ if (!asyncRequests.isEmpty() && session.tryUse(false)) {
+ future = asyncRequests.poll();
+ }
+ if (future == null) {
+ // Put session into the pool.
+ if (isFirst) {
+ pool.addFirst(session);
+ } else {
+ pool.addLast(session);
+ }
+ notEmpty.signalAll();
+ }
+ } finally {
+ poolLock.unlock();
}
+ if (future != null) {
+ future.set(session);
+ }
+ return true;
}
- void replaceSession(SessionType oldSession, SessionType newSession,
- boolean keepTmpDir, String[] additionalFilesArray, HiveConf conf) throws Exception {
+ void replaceSession(SessionType oldSession, boolean keepTmpDir,
+ String[] additionalFilesArray) throws Exception {
// Retain the stuff from the old session.
// Re-setting the queue config is an old hack that we may remove in future.
+ SessionType newSession = sessionObjFactory.create(oldSession);
Path scratchDir = oldSession.getTezScratchDir();
String queueName = oldSession.getQueueName();
Set<String> additionalFiles = null;
@@ -171,12 +255,16 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
}
try {
oldSession.close(keepTmpDir);
- boolean wasRemoved = defaultQueuePool.remove(oldSession);
- if (!wasRemoved) {
- LOG.error("Old session was closed but it was not in the pool", oldSession);
+ } finally {
+ poolLock.lock();
+ try {
+ // The expiring session may or may not be in the pool.
+ pool.remove(oldSession);
+ } finally {
+ poolLock.unlock();
}
+
bySessionId.remove(oldSession.getSessionId());
- } finally {
// There's some bogus code that can modify the queue name. Force-set it for pool sessions.
// TODO: this might only be applicable to TezSessionPoolManager; try moving it there?
newSession.getConf().set(TezConfiguration.TEZ_QUEUE_NAME, queueName);
@@ -184,18 +272,36 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
// registry again just in case. TODO: maybe we should enforce that.
configureAmRegistry(newSession);
newSession.open(additionalFiles, scratchDir);
- defaultQueuePool.put(newSession);
+ if (!putSessionBack(newSession, false)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closing an unneeded session " + newSession
+ + "; trying to replace " + oldSession);
+ }
+ try {
+ newSession.close(false);
+ } catch (Exception ex) {
+ LOG.error("Failed to close an unneeded session", ex);
+ }
+ }
}
}
+
private void startInitialSession(SessionType session) throws Exception {
- boolean isUsable = session.tryUse();
+ boolean isUsable = session.tryUse(true);
if (!isUsable) throw new IOException(session + " is not usable at pool startup");
session.getConf().set(TezConfiguration.TEZ_QUEUE_NAME, session.getQueueName());
configureAmRegistry(session);
session.open();
if (session.stopUsing()) {
- defaultQueuePool.put(session);
+ if (!putSessionBack(session, false)) {
+ LOG.warn("Couldn't add a session during initialization");
+ try {
+ session.close(false);
+ } catch (Exception ex) {
+ LOG.error("Failed to close an unneeded session", ex);
+ }
+ }
}
}
@@ -230,7 +336,7 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
@Override
public void onUpdate(TezAmInstance serviceInstance) {
// Presumably we'd get those later if AM updates its stuff.
- LOG.warn("Received an unexpected update for instance={}. Ignoring", serviceInstance);
+ LOG.info("Received an unexpected update for instance={}. Ignoring", serviceInstance);
}
@Override
@@ -239,11 +345,126 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
// For now, we don't take any action. In future, we might restore the session based
// on this and get rid of the logic outside of the pool that replaces/reopens/etc.
LOG.warn("AM for " + sessionId + " has disappeared from the registry");
+ // TODO: this might race if AM for the same session is restarted internally by Tez.
+ // It is possible to receive the create before remove and remove the wrong one.
+ // We need some identity in the value to make sure that doesn't happen.
bySessionId.remove(sessionId);
}
}
+ @VisibleForTesting
int getInitialSize() {
return initialSize;
}
+
+ /**
+ * Resizes the pool asynchronously.
+ * @param delta A number of threads to add or remove.
+ * @param toClose An output list to which newly-unneeded sessions, to be closed by the caller.
+ */
+ public ListenableFuture<?> resizeAsync(int delta, List<SessionType> toClose) {
+ if (delta == 0) return createDummyFuture();
+ poolLock.lock();
+ try {
+ if (delta < 0) {
+ return resizeDownInternal(-delta, toClose);
+ } else {
+ return resizeUpInternal(delta);
+ }
+ } finally {
+ poolLock.unlock();
+ }
+ }
+
+ private ListenableFuture<?> resizeUpInternal(int delta) {
+ // 1) Cancel the kills if any, to avoid killing the returned sessions.
+ // Also sets the count for the async initialization.
+ int oldVal;
+ do {
+ oldVal = deltaRemaining.get();
+ } while (!deltaRemaining.compareAndSet(oldVal, oldVal + delta));
+ int toStart = oldVal + delta;
+ if (toStart <= 0) return createDummyFuture();
+ LOG.info("Resizing the pool; adding " + toStart + " sessions");
+
+ // 2) If we need to create some extra sessions, we'd do it just like startup does.
+ int threadCount = Math.max(1, Math.min(toStart,
+ HiveConf.getIntVar(initConf, ConfVars.HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS)));
+ List<ListenableFutureTask<Boolean>> threadTasks = new ArrayList<>(threadCount);
+ // This is an async method, so always launch threads, even for a single task.
+ for (int i = 0; i < threadCount; ++i) {
+ ListenableFutureTask<Boolean> task = ListenableFutureTask.create(
+ new CreateSessionsRunnable(deltaRemaining));
+ new Thread(task, "Tez pool resize " + i).start();
+ threadTasks.add(task);
+ }
+ return Futures.allAsList(threadTasks);
+ }
+
+ private ListenableFuture<Boolean> resizeDownInternal(int delta, List<SessionType> toClose) {
+ // 1) Cancel the previous expansion, if any.
+ while (true) {
+ int expansionCount = deltaRemaining.get();
+ if (expansionCount <= 0) break;
+ int expansionCancelled = Math.min(expansionCount, delta);
+ if (deltaRemaining.compareAndSet(expansionCount, expansionCount - expansionCancelled)) {
+ delta -= expansionCancelled;
+ break;
+ }
+ }
+ // 2) Drain unused sessions; the close() is sync so delegate to the caller.
+ while (delta > 0) {
+ SessionType session = pool.poll();
+ if (session == null) break;
+ if (!session.tryUse(true)) continue;
+ toClose.add(session);
+ --delta;
+ }
+ // 3) If too many sessions are outstanding (e.g. due to expiration restarts - should
+ // not happen with in-use sessions because WM already kills the extras), we will kill
+ // them as they come back from restarts.
+ if (delta > 0) {
+ int oldVal;
+ do {
+ oldVal = deltaRemaining.get();
+ } while (!deltaRemaining.compareAndSet(oldVal, oldVal - delta));
+ }
+ return createDummyFuture();
+ }
+
+ private ListenableFuture<Boolean> createDummyFuture() {
+ SettableFuture<Boolean> f = SettableFuture.create();
+ f.set(true);
+ return f;
+ }
+
+ private final class CreateSessionsRunnable implements Callable<Boolean> {
+ private final AtomicInteger remaining;
+
+ private CreateSessionsRunnable(AtomicInteger remaining) {
+ this.remaining = remaining;
+ }
+
+ public Boolean call() throws Exception {
+ if (parentSessionState != null) {
+ SessionState.setCurrentSessionState(parentSessionState);
+ }
+ while (true) {
+ int oldVal = remaining.get();
+ if (oldVal <= 0) return true;
+ if (!remaining.compareAndSet(oldVal, oldVal - 1)) continue;
+ startInitialSession(sessionObjFactory.create(null));
+ }
+ }
+ }
+
+ @VisibleForTesting
+ int getCurrentSize() {
+ poolLock.tryLock();
+ try {
+ return pool.size();
+ } finally {
+ poolLock.unlock();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
index 9b4714f..15543d6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
@@ -18,6 +18,11 @@
package org.apache.hadoop.hive.ql.exec.tez;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import javax.security.auth.login.LoginException;
+import org.apache.tez.dag.api.TezException;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -102,7 +107,7 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
public void startPool() throws Exception {
if (defaultSessionPool != null) {
- defaultSessionPool.startInitialSessions();
+ defaultSessionPool.start();
}
if (expirationTracker != null) {
expirationTracker.start();
@@ -110,7 +115,7 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
}
public void setupPool(HiveConf conf) throws Exception {
- String[] defaultQueueList = HiveConf.getTrimmedStringsVar(
+ final String[] defaultQueueList = HiveConf.getTrimmedStringsVar(
conf, HiveConf.ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES);
this.initConf = conf;
int emptyNames = 0; // We don't create sessions for empty entries.
@@ -123,7 +128,37 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
int numSessionsTotal = numSessions * (defaultQueueList.length - emptyNames);
if (numSessionsTotal > 0) {
boolean enableAmRegistry = false;
- defaultSessionPool = new TezSessionPool<>(initConf, numSessionsTotal, enableAmRegistry);
+ defaultSessionPool = new TezSessionPool<>(initConf, numSessionsTotal, enableAmRegistry,
+ new TezSessionPool.SessionObjectFactory<TezSessionPoolSession>() {
+ int queueIx = 0;
+
+ @Override
+ public TezSessionPoolSession create(TezSessionPoolSession oldSession) {
+ if (oldSession != null) {
+ return createAndInitSession(
+ oldSession.getQueueName(), oldSession.isDefault(), oldSession.getConf());
+ }
+ // We never resize the pool, so assume this is initialization.
+ // If that changes, we might have to make the factory interface more complicated.
+ /*
+ * In a single-threaded init case, with this the ordering of sessions in the queue will be
+ * (with 2 sessions 3 queues) s1q1, s1q2, s1q3, s2q1, s2q2, s2q3 there by ensuring uniform
+ * distribution of the sessions across queues at least to begin with. Then as sessions get
+ * freed up, the list may change this ordering.
+ * In a multi threaded init case it's a free for all.
+ */
+ int localQueueIx;
+ synchronized (defaultQueueList) {
+ localQueueIx = queueIx;
+ ++queueIx;
+ if (queueIx == defaultQueueList.length) {
+ queueIx = 0;
+ }
+ }
+ HiveConf sessionConf = new HiveConf(initConf);
+ return createAndInitSession(defaultQueueList[localQueueIx], true, sessionConf);
+ }
+ });
}
numConcurrentLlapQueries = conf.getIntVar(ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES);
@@ -155,24 +190,6 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
if (!hasInitialSessions) {
return;
}
-
-
- /*
- * In a single-threaded init case, with this the ordering of sessions in the queue will be
- * (with 2 sessions 3 queues) s1q1, s1q2, s1q3, s2q1, s2q2, s2q3 there by ensuring uniform
- * distribution of the sessions across queues at least to begin with. Then as sessions get
- * freed up, the list may change this ordering.
- * In a multi threaded init case it's a free for all.
- */
- for (int i = 0; i < numSessions; i++) {
- for (String queueName : defaultQueueList) {
- if (queueName.isEmpty()) {
- continue;
- }
- HiveConf sessionConf = new HiveConf(initConf);
- defaultSessionPool.addInitialSession(createAndInitSession(queueName, true, sessionConf));
- }
- }
}
// TODO Create and init session sets up queue, isDefault - but does not initialize the configuration
@@ -438,7 +455,14 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
&& sessionConf.get(TezConfiguration.TEZ_QUEUE_NAME) == null) {
sessionConf.set(TezConfiguration.TEZ_QUEUE_NAME, sessionState.getQueueName());
}
+ reopenInternal(sessionState, additionalFiles);
+ return sessionState;
+ }
+
+ static void reopenInternal(
+ TezSessionState sessionState, String[] additionalFiles) throws Exception {
Set<String> oldAdditionalFiles = sessionState.getAdditionalFilesNotFromConf();
+ // TODO: implies the session files and the array are the same if not null; why? very brittle
if ((oldAdditionalFiles == null || oldAdditionalFiles.isEmpty())
&& (additionalFiles != null)) {
oldAdditionalFiles = new HashSet<>();
@@ -449,11 +473,11 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
// TODO: close basically resets the object to a bunch of nulls.
// We should ideally not reuse the object because it's pointless and error-prone.
sessionState.close(true);
- // TODO: should we reuse scratchDir too?
+ // Note: scratchdir is reused implicitly because the sessionId is the same.
sessionState.open(oldAdditionalFiles, null);
- return sessionState;
}
+
public void closeNonDefaultSessions(boolean keepTmpDir) throws Exception {
List<TezSessionPoolSession> sessionsToClose = null;
synchronized (openSessions) {
@@ -467,14 +491,14 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
/** Closes a running (expired) pool session and reopens it. */
@Override
- public void closeAndReopenPoolSession(TezSessionPoolSession oldSession) throws Exception {
+ public void closeAndReopenExpiredSession(TezSessionPoolSession oldSession) throws Exception {
String queueName = oldSession.getQueueName();
if (queueName == null) {
LOG.warn("Pool session has a null queue: " + oldSession);
}
TezSessionPoolSession newSession = createAndInitSession(
queueName, oldSession.isDefault(), oldSession.getConf());
- defaultSessionPool.replaceSession(oldSession, newSession, false, null, null);
+ defaultSessionPool.replaceSession(oldSession, false, null);
}
/** Called by TezSessionPoolSession when opened. */
http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
index 6135223..6887d7a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
@@ -18,6 +18,12 @@
package org.apache.hadoop.hive.ql.exec.tez;
+import com.google.common.util.concurrent.SettableFuture;
+
+import org.apache.hadoop.hive.registry.impl.TezAmInstance;
+
+import org.apache.hadoop.conf.Configuration;
+
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Collection;
@@ -139,28 +145,28 @@ class TezSessionPoolSession extends TezSessionState {
* Tries to use this session. When the session is in use, it will not expire.
* @return true if the session can be used; false if it has already expired.
*/
- public boolean tryUse() throws Exception {
+ public boolean tryUse(boolean ignoreExpiration) {
while (true) {
int oldValue = sessionState.get();
if (oldValue == STATE_IN_USE) throw new AssertionError(this + " is already in use");
if (oldValue == STATE_EXPIRED) return false;
- int finalState = shouldExpire() ? STATE_EXPIRED : STATE_IN_USE;
+ int finalState = (!ignoreExpiration && shouldExpire()) ? STATE_EXPIRED : STATE_IN_USE;
if (sessionState.compareAndSet(STATE_NONE, finalState)) {
if (finalState == STATE_IN_USE) return true;
// Restart asynchronously, don't block the caller.
- expirationTracker.closeAndRestartExpiredSession(this, true);
+ expirationTracker.closeAndRestartExpiredSessionAsync(this);
return false;
}
}
}
- boolean stopUsing() throws Exception {
+ boolean stopUsing() {
int finalState = shouldExpire() ? STATE_EXPIRED : STATE_NONE;
if (!sessionState.compareAndSet(STATE_IN_USE, finalState)) {
throw new AssertionError("Unexpected state change; currently " + sessionState.get());
}
if (finalState == STATE_NONE) return true;
- expirationTracker.closeAndRestartExpiredSession(this, true);
+ expirationTracker.closeAndRestartExpiredSessionAsync(this);
return false;
}
@@ -176,13 +182,17 @@ class TezSessionPoolSession extends TezSessionState {
while (true) {
if (sessionState.get() != STATE_NONE) return true; // returnAfterUse will take care of this
if (sessionState.compareAndSet(STATE_NONE, STATE_EXPIRED)) {
- expirationTracker.closeAndRestartExpiredSession(this, isAsync);
+ if (isAsync) {
+ expirationTracker.closeAndRestartExpiredSessionAsync(this);
+ } else {
+ expirationTracker.closeAndRestartExpiredSession(this);
+ }
return true;
}
}
}
- private boolean shouldExpire() {
+ private final boolean shouldExpire() {
return expirationNs != null && (System.nanoTime() - expirationNs) >= 0;
}
@@ -209,4 +219,4 @@ class TezSessionPoolSession extends TezSessionState {
void updateFromRegistry(TezAmInstance si) {
// Nothing to do.
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index 1448168..541ee73 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -629,7 +629,7 @@ public class TezSessionState {
}
}
- public void cleanupScratchDir () throws IOException {
+ protected final void cleanupScratchDir () throws IOException {
FileSystem fs = tezScratchDir.getFileSystem(conf);
fs.delete(tezScratchDir, true);
tezScratchDir = null;
http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
index 81d6b85..96dc7d3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
@@ -41,13 +41,12 @@ class UserPoolMapping {
private final String defaultPoolName;
// TODO: add other types as needed
- public UserPoolMapping(List<TmpUserMapping> mappings, Set<String> poolNames) {
+ public UserPoolMapping(List<TmpUserMapping> mappings) {
String defaultPoolName = null;
for (TmpUserMapping mapping : mappings) {
switch (mapping.getType()) {
case USER: {
- String poolName = getValidPoolName(poolNames, mapping);
- Mapping val = new Mapping(poolName, mapping.getPriority());
+ Mapping val = new Mapping(mapping.getPoolName(), mapping.getPriority());
Mapping oldValue = userMappings.put(mapping.getName(), val);
if (oldValue != null) {
throw new AssertionError("Duplicate mapping for user " + mapping.getName() + "; "
@@ -56,7 +55,7 @@ class UserPoolMapping {
break;
}
case DEFAULT: {
- String poolName = getValidPoolName(poolNames, mapping);
+ String poolName = mapping.getPoolName();
if (defaultPoolName != null) {
throw new AssertionError("Duplicate default mapping; "
+ defaultPoolName + " and " + poolName);
@@ -80,13 +79,4 @@ class UserPoolMapping {
if (userMapping != null) return userMapping.fullPoolName;
return defaultPoolName;
}
-
- private static String getValidPoolName(Set<String> poolNames, TmpUserMapping mapping) {
- String poolName = mapping.getPoolName();
- // Should we really be validating here? The plan should be validated before applying.
- if (!poolNames.contains(mapping.getPoolName())) {
- throw new AssertionError("Invalid pool in the mapping " + poolName);
- }
- return poolName;
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
index 00501ee..0dd1433 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
@@ -18,18 +18,30 @@
package org.apache.hadoop.hive.ql.exec.tez;
-import java.util.concurrent.TimeoutException;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.registry.impl.TezAmInstance;
public class WmTezSession extends TezSessionPoolSession implements AmPluginNode {
private String poolName;
private double clusterFraction;
+ private String killReason = null;
private final Object amPluginInfoLock = new Object();
private AmPluginInfo amPluginInfo = null;
+ private SettableFuture<WmTezSession> amRegistryFuture = null;
+ private ScheduledFuture<?> timeoutTimer = null;
+ private final WorkloadManager wmParent;
/** The actual state of the guaranteed task, and the update state, for the session. */
// Note: hypothetically, a generic WM-aware-session should not know about guaranteed tasks.
@@ -41,34 +53,60 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
}
private final ActualWmState actualState = new ActualWmState();
- public WmTezSession(String sessionId, Manager parent,
+ public WmTezSession(String sessionId, WorkloadManager parent,
SessionExpirationTracker expiration, HiveConf conf) {
super(sessionId, parent, expiration, conf);
+ wmParent = parent;
}
- @Override
- public AmPluginInfo waitForAmPluginInfo(int timeoutMs)
- throws InterruptedException, TimeoutException {
+ @VisibleForTesting
+ WmTezSession(String sessionId, Manager testParent,
+ SessionExpirationTracker expiration, HiveConf conf) {
+ super(sessionId, testParent, expiration, conf);
+ wmParent = null;
+ }
+
+ public ListenableFuture<WmTezSession> waitForAmRegistryAsync(
+ int timeoutMs, ScheduledExecutorService timeoutPool) {
+ SettableFuture<WmTezSession> future = SettableFuture.create();
synchronized (amPluginInfoLock) {
- if (amPluginInfo == null) {
- amPluginInfoLock.wait(timeoutMs);
- if (amPluginInfo == null) {
- throw new TimeoutException("No plugin information for " + getSessionId());
- }
+ if (amPluginInfo != null) {
+ future.set(this);
+ return future;
}
- return amPluginInfo;
+ if (amRegistryFuture != null) {
+ // We don't need this for now, so do not support it.
+ future.setException(new RuntimeException("Multiple waits are not suported"));
+ return future;
+ }
+ amRegistryFuture = future;
+ if (timeoutMs <= 0) return future;
+ // TODO: replace with withTimeout after we get the relevant guava upgrade.
+ this.timeoutTimer = timeoutPool.schedule(
+ new TimeoutRunnable(), timeoutMs, TimeUnit.MILLISECONDS);
}
+ return future;
}
+
@Override
void updateFromRegistry(TezAmInstance si) {
+ AmPluginInfo info = new AmPluginInfo(si.getHost(), si.getPluginPort(),
+ si.getPluginToken(), si.getPluginTokenJobId());
synchronized (amPluginInfoLock) {
- this.amPluginInfo = new AmPluginInfo(si.getHost(), si.getPluginPort(),
- si.getPluginToken(), si.getPluginTokenJobId());
- amPluginInfoLock.notifyAll();
+ this.amPluginInfo = info;
+ if (amRegistryFuture != null) {
+ amRegistryFuture.set(this);
+ amRegistryFuture = null;
+ }
+ if (timeoutTimer != null) {
+ timeoutTimer.cancel(true);
+ timeoutTimer = null;
+ }
}
}
+ @Override
public AmPluginInfo getAmPluginInfo() {
return amPluginInfo; // Only has final fields, no artifacts from the absence of sync.
}
@@ -85,6 +123,11 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
this.clusterFraction = fraction;
}
+ void clearWm() {
+ this.poolName = null;
+ this.clusterFraction = 0f;
+ }
+
double getClusterFraction() {
return this.clusterFraction;
}
@@ -118,4 +161,42 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
return (actualState.sent == actualState.target);
}
}
+
+ public void handleUpdateError() {
+ wmParent.addUpdateError(this);
+ }
+
+ @Override
+ public int hashCode() {
+ return System.identityHashCode(this);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj == this;
+ }
+
+ boolean isIrrelevantForWm() {
+ return killReason != null;
+ }
+
+ String getReasonForKill() {
+ return killReason;
+ }
+
+ void setIsIrrelevantForWm(String killReason) {
+ this.killReason = killReason;
+ }
+
+ private final class TimeoutRunnable implements Runnable {
+ @Override
+ public void run() {
+ synchronized (amPluginInfoLock) {
+ timeoutTimer = null;
+ if (amRegistryFuture == null || amRegistryFuture.isDone()) return;
+ amRegistryFuture.cancel(true);
+ amRegistryFuture = null;
+ }
+ }
+ }
}