You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/11/02 18:40:52 UTC
[3/3] hive git commit: HIVE-17841 : implement applying the resource
plan (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
HIVE-17841 : implement applying the resource plan (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/77b99e4c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/77b99e4c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/77b99e4c
Branch: refs/heads/master
Commit: 77b99e4c9e39b9905ff98a002ca35f7edd9b7b30
Parents: c5a9673
Author: sergey <se...@apache.org>
Authored: Thu Nov 2 11:30:38 2017 -0700
Committer: sergey <se...@apache.org>
Committed: Thu Nov 2 11:30:38 2017 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 3 +
.../hive/jdbc/TestTriggersWorkloadManager.java | 12 +-
.../hadoop/hive/ql/exec/tez/AmPluginNode.java | 2 +-
.../ql/exec/tez/GuaranteedTasksAllocator.java | 29 +-
.../exec/tez/LlapPluginEndpointClientImpl.java | 2 +-
.../ql/exec/tez/QueryAllocationManager.java | 3 +-
.../ql/exec/tez/SessionExpirationTracker.java | 16 +-
.../hadoop/hive/ql/exec/tez/TezSessionPool.java | 371 ++++-
.../hive/ql/exec/tez/TezSessionPoolManager.java | 74 +-
.../hive/ql/exec/tez/TezSessionPoolSession.java | 26 +-
.../hive/ql/exec/tez/TezSessionState.java | 2 +-
.../hive/ql/exec/tez/UserPoolMapping.java | 16 +-
.../hadoop/hive/ql/exec/tez/WmTezSession.java | 109 +-
.../hive/ql/exec/tez/WorkloadManager.java | 1549 ++++++++++++++----
.../physical/LlapClusterStateForCompile.java | 13 +-
.../hive/ql/exec/tez/SampleTezSessionState.java | 44 +-
.../hive/ql/exec/tez/TestWorkloadManager.java | 365 ++++-
17 files changed, 2093 insertions(+), 543 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index a3c853a..b65bdab 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2419,6 +2419,9 @@ public class HiveConf extends Configuration {
HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE("hive.server2.tez.interactive.queue", "",
"A single YARN queues to use for Hive Interactive sessions. When this is specified,\n" +
"workload management is enabled and used for these sessions."),
+ HIVE_SERVER2_TEZ_WM_WORKER_THREADS("hive.server2.tez.wm.worker.threads", 4,
+ "Number of worker threads to use to perform the synchronous operations with Tez\n" +
+ "sessions for workload management (e.g. opening, closing, etc.)"),
HIVE_SERVER2_TEZ_WM_AM_REGISTRY_TIMEOUT("hive.server2.tez.wm.am.registry.timeout", "30s",
new TimeValidator(TimeUnit.SECONDS),
"The timeout for AM registry registration, after which (on attempting to use the\n" +
http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java
index fdb660a..0ec7e85 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java
@@ -16,17 +16,21 @@
package org.apache.hive.jdbc;
+import com.google.common.collect.Lists;
import java.io.File;
import java.net.URL;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager;
+import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpHivePool;
+import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpResourcePlan;
+import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpUserMapping;
+import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpUserMappingType;
import org.apache.hadoop.hive.ql.wm.Trigger;
import org.apache.hive.jdbc.miniHS2.MiniHS2;
import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType;
@@ -70,6 +74,10 @@ public class TestTriggersWorkloadManager extends TestTriggersTezSessionPoolManag
@Override
protected void setupTriggers(final List<Trigger> triggers) throws Exception {
WorkloadManager wm = WorkloadManager.getInstance();
- wm.getPools().get("llap").setTriggers(triggers);
+ TmpResourcePlan rp = new TmpResourcePlan(Lists.newArrayList(new TmpHivePool(
+ "llap", null, 1, 1.0f, triggers)), Lists.newArrayList(
+ new TmpUserMapping(TmpUserMappingType.DEFAULT, "", "llap", 1)));
+ wm.updateResourcePlanAsync(rp).get(10, TimeUnit.SECONDS);
}
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java
index 35d380c..e4a21cb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java
@@ -39,5 +39,5 @@ public interface AmPluginNode {
}
}
- AmPluginInfo waitForAmPluginInfo(int timeoutMs) throws InterruptedException, TimeoutException;
+ AmPluginInfo getAmPluginInfo() throws InterruptedException, TimeoutException;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
index d978a25..53dd698 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
@@ -64,6 +64,8 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
@Override
public void start() {
+ // Try to get cluster information once, to avoid immediate cluster-update event in WM.
+ clusterState.initClusterInfo();
clusterStateUpdateThread.start();
}
@@ -72,9 +74,13 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
clusterStateUpdateThread.interrupt(); // Don't wait for the thread.
}
+ public void initClusterInfo() {
+ clusterState.initClusterInfo();
+ }
+
@VisibleForTesting
protected int getExecutorCount(boolean allowUpdate) {
- if (!clusterState.initClusterInfo(allowUpdate)) {
+ if (allowUpdate && !clusterState.initClusterInfo()) {
LOG.warn("Failed to get LLAP cluster information for "
+ HiveConf.getTrimmedVar(this.conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS)
+ "; we may rely on outdated cluster status");
@@ -92,15 +98,18 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
}
@Override
- public void updateSessionsAsync(double totalMaxAlloc, List<WmTezSession> sessionsToUpdate) {
- // Do not make a remote call unless we have no information at all.
+ public void updateSessionsAsync(Double totalMaxAlloc, List<WmTezSession> sessionsToUpdate) {
+ // Do not make a remote call under any circumstances - this is supposed to be async.
int totalCount = getExecutorCount(false);
- int totalToDistribute = (int)Math.round(totalCount * totalMaxAlloc);
+ int totalToDistribute = -1;
+ if (totalMaxAlloc != null) {
+ totalToDistribute = (int)Math.round(totalCount * totalMaxAlloc);
+ }
double lastDelta = 0;
for (int i = 0; i < sessionsToUpdate.size(); ++i) {
WmTezSession session = sessionsToUpdate.get(i);
int intAlloc = -1;
- if (i + 1 == sessionsToUpdate.size()) {
+ if (i + 1 == sessionsToUpdate.size() && totalToDistribute >= 0) {
intAlloc = totalToDistribute;
// We rely on the caller to supply a reasonable total; we could log a warning
// if this doesn't match the allocation of the last session beyond some threshold.
@@ -119,10 +128,12 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
intAlloc = (int)roundedAlloc;
}
// Make sure we don't give out more than allowed due to double/rounding artifacts.
- if (intAlloc > totalToDistribute) {
- intAlloc = totalToDistribute;
+ if (totalToDistribute >= 0) {
+ if (intAlloc > totalToDistribute) {
+ intAlloc = totalToDistribute;
+ }
+ totalToDistribute -= intAlloc;
}
- totalToDistribute -= intAlloc;
// This will only send update if it's necessary.
updateSessionAsync(session, intAlloc);
}
@@ -162,7 +173,7 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
// RPC already handles retries, so we will just try to kill the session here.
// This will cause the current query to fail. We could instead keep retrying.
try {
- session.destroy();
+ session.handleUpdateError();
} catch (Exception e) {
LOG.error("Failed to kill the session " + session);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapPluginEndpointClientImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapPluginEndpointClientImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapPluginEndpointClientImpl.java
index 45c3e38..4f373162 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapPluginEndpointClientImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapPluginEndpointClientImpl.java
@@ -114,7 +114,7 @@ public class LlapPluginEndpointClientImpl extends
private void ensureInfo() throws InterruptedException, TimeoutException {
if (info != null) return;
- info = node.waitForAmPluginInfo(0); // Don't wait - should already be initialized.
+ info = node.getAmPluginInfo(); // Don't wait - should already be initialized.
if (info == null) {
throw new AssertionError("A request was created without AM plugin info for " + node);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
index a326db3..acacfd0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
@@ -26,7 +26,8 @@ interface QueryAllocationManager {
* Updates the session allocations asynchoronously.
* @param totalMaxAlloc The total maximum fraction of the cluster to allocate. Used to
* avoid various artifacts, esp. with small numbers and double weirdness.
+ * Null means the total is unknown.
* @param sessions Sessions to update based on their allocation fraction.
*/
- void updateSessionsAsync(double totalMaxAlloc, List<WmTezSession> sessions);
+ void updateSessionsAsync(Double totalMaxAlloc, List<WmTezSession> sessions);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java
index da93a3a..77ff484 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java
@@ -48,7 +48,7 @@ class SessionExpirationTracker {
private volatile SessionState initSessionState;
interface RestartImpl {
- void closeAndReopenPoolSession(TezSessionPoolSession session) throws Exception;
+ void closeAndReopenExpiredSession(TezSessionPoolSession session) throws Exception;
}
public static SessionExpirationTracker create(HiveConf conf, RestartImpl restartImpl) {
@@ -114,7 +114,7 @@ class SessionExpirationTracker {
TezSessionPoolSession next = restartQueue.take();
LOG.info("Restarting the expired session [" + next + "]");
try {
- sessionRestartImpl.closeAndReopenPoolSession(next);
+ sessionRestartImpl.closeAndReopenExpiredSession(next);
} catch (InterruptedException ie) {
throw ie;
} catch (Exception e) {
@@ -225,12 +225,12 @@ class SessionExpirationTracker {
expirationQueue.remove(session);
}
+ public void closeAndRestartExpiredSessionAsync(TezSessionPoolSession session) {
+ restartQueue.add(session);
+ }
+
public void closeAndRestartExpiredSession(
- TezSessionPoolSession session, boolean isAsync) throws Exception {
- if (isAsync) {
- restartQueue.add(session);
- } else {
- sessionRestartImpl.closeAndReopenPoolSession(session);
- }
+ TezSessionPoolSession session) throws Exception {
+ sessionRestartImpl.closeAndReopenExpiredSession(session);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
index b67c933..9c2b3ea 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
@@ -17,17 +17,25 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
-
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+import com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashSet;
-import java.util.Queue;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Set;
-import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -45,35 +53,48 @@ import org.slf4j.LoggerFactory;
class TezSessionPool<SessionType extends TezSessionPoolSession> {
private static final Logger LOG = LoggerFactory.getLogger(TezSessionPool.class);
- /** A queue for initial sessions that have not been started yet. */
- private final Queue<SessionType> initialSessions =
- new ConcurrentLinkedQueue<SessionType>();
+ public interface SessionObjectFactory<SessionType> {
+ SessionType create(SessionType oldSession);
+ }
private final HiveConf initConf;
- private int initialSize;
+ private int initialSize = 0; // For testing only.
+ private final SessionObjectFactory<SessionType> sessionObjFactory;
- // TODO: eventually, this will need to support resize. That would probably require replacement
- // with a RW lock, a semaphore and linked list.
- private BlockingDeque<SessionType> defaultQueuePool;
+ private final ReentrantLock poolLock = new ReentrantLock(true);
+ private final Condition notEmpty = poolLock.newCondition();
+ private final LinkedList<SessionType> pool = new LinkedList<>();
+ private final LinkedList<SettableFuture<SessionType>> asyncRequests = new LinkedList<>();
+ /**
+ * The number of sessions that needs to be started or killed because of the resize calls on
+ * the pool. When increasing the size, we set this to a positive number and start new sessions
+ * on background threads, gradually bringing it back to 0.
+ * When decreasing the size, we try to kill as many existing sessions as we can; if that is
+ * not enough because the sessions are in use or being restarted, we kill them as they are
+ * re-added to the pool.
+ * Repeated calls to resize adjust the delta to ensure correctness between different resizes.
+ */
+ private final AtomicInteger deltaRemaining = new AtomicInteger();
private final String amRegistryName;
private final TezAmRegistryImpl amRegistry;
private final ConcurrentHashMap<String, SessionType> bySessionId =
new ConcurrentHashMap<>();
+ // Preserved at initialization time to have a session to use during resize.
+ // TODO: rather, Tez sessions should not depend on SessionState.
+ private SessionState parentSessionState;
-
- TezSessionPool(HiveConf initConf, int numSessionsTotal, boolean useAmRegistryIfPresent) {
+ TezSessionPool(HiveConf initConf, int numSessionsTotal, boolean useAmRegistryIfPresent,
+ SessionObjectFactory<SessionType> sessionFactory) {
this.initConf = initConf;
- assert numSessionsTotal > 0;
- defaultQueuePool = new LinkedBlockingDeque<>(numSessionsTotal);
+ this.initialSize = numSessionsTotal;
this.amRegistry = useAmRegistryIfPresent ? TezAmRegistryImpl.create(initConf, true) : null;
this.amRegistryName = amRegistry == null ? null : amRegistry.getRegistryName();
+ this.sessionObjFactory = sessionFactory;
}
- void startInitialSessions() throws Exception {
- initialSize = initialSessions.size();
- if (initialSessions.isEmpty()) return;
+ void start() throws Exception {
if (amRegistry != null) {
amRegistry.start();
amRegistry.initializeWithoutRegistering();
@@ -82,82 +103,145 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
amRegistry.populateCache(true);
}
- int threadCount = Math.min(initialSessions.size(),
+ int threadCount = Math.min(initialSize,
HiveConf.getIntVar(initConf, ConfVars.HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS));
Preconditions.checkArgument(threadCount > 0);
if (threadCount == 1) {
- while (true) {
- SessionType session = initialSessions.poll();
+ for (int i = 0; i < initialSize; ++i) {
+ SessionType session = sessionObjFactory.create(null);
if (session == null) break;
startInitialSession(session);
}
} else {
- final SessionState parentSessionState = SessionState.get();
- // The runnable has no mutable state, so each thread can run the same thing.
- final AtomicReference<Exception> firstError = new AtomicReference<>(null);
- Runnable runnable = new Runnable() {
- public void run() {
- if (parentSessionState != null) {
- SessionState.setCurrentSessionState(parentSessionState);
- }
- while (true) {
- SessionType session = initialSessions.poll();
- if (session == null) break;
- if (firstError.get() != null) break; // Best-effort.
- try {
- startInitialSession(session);
- } catch (Exception e) {
- if (!firstError.compareAndSet(null, e)) {
- LOG.error("Failed to start session; ignoring due to previous error", e);
- }
- break;
- }
- }
+ final AtomicInteger remaining = new AtomicInteger(initialSize);
+ this.parentSessionState = SessionState.get();
+ @SuppressWarnings("unchecked")
+ FutureTask<Boolean>[] threadTasks = new FutureTask[threadCount];
+ for (int i = threadTasks.length - 1; i >= 0; --i) {
+ threadTasks[i] = new FutureTask<Boolean>(new CreateSessionsRunnable(remaining));
+ if (i == 0) {
+ // Start is blocking, so run one of the tasks on the main thread.
+ threadTasks[i].run();
+ } else {
+ new Thread(threadTasks[i], "Tez session init " + i).start();
}
- };
- Thread[] threads = new Thread[threadCount - 1];
- for (int i = 0; i < threads.length; ++i) {
- threads[i] = new Thread(runnable, "Tez session init " + i);
- threads[i].start();
- }
- runnable.run();
- for (int i = 0; i < threads.length; ++i) {
- threads[i].join();
}
- Exception ex = firstError.get();
- if (ex != null) {
- throw ex;
+ for (int i = 0; i < threadTasks.length; ++i) {
+ threadTasks[i].get();
}
}
}
- void addInitialSession(SessionType session) {
- initialSessions.add(session);
- }
-
SessionType getSession() throws Exception {
while (true) {
- SessionType result = defaultQueuePool.take();
- if (result.tryUse()) return result;
+ SessionType result = null;
+ poolLock.lock();
+ try {
+ while ((result = pool.poll()) == null) {
+ notEmpty.await(100, TimeUnit.MILLISECONDS);
+ }
+ } finally {
+ poolLock.unlock();
+ }
+ if (result.tryUse(false)) return result;
LOG.info("Couldn't use a session [" + result + "]; attempting another one");
}
}
- void returnSession(SessionType session) throws Exception {
+ ListenableFuture<SessionType> getSessionAsync() throws Exception {
+ SettableFuture<SessionType> future = SettableFuture.create();
+ poolLock.lock();
+ try {
+ // Try to get the session quickly.
+ while (true) {
+ SessionType result = pool.poll();
+ if (result == null) break;
+ if (!result.tryUse(false)) continue;
+ future.set(result);
+ return future;
+ }
+ // The pool is empty; queue the request.
+ asyncRequests.add(future);
+ return future;
+ } finally {
+ poolLock.unlock();
+ }
+ }
+
+ void returnSession(SessionType session) {
+ returnSessionInternal(session, false);
+ }
+
+ boolean returnSessionAsync(SessionType session) {
+ return returnSessionInternal(session, true);
+ }
+
+ private boolean returnSessionInternal(SessionType session, boolean isAsync) {
// Make sure that if the session is returned to the pool, it doesn't live in the global.
SessionState sessionState = SessionState.get();
if (sessionState != null) {
sessionState.setTezSession(null);
}
- if (session.stopUsing()) {
- defaultQueuePool.putFirst(session);
+ if (!session.stopUsing()) return true; // The session will be restarted and return to us.
+ boolean canPutBack = putSessionBack(session, true);
+ if (canPutBack) return true;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closing an unneeded returned session " + session);
+ }
+
+ if (isAsync) return false; // The caller is responsible for destroying the session.
+ try {
+ session.close(false);
+ } catch (Exception ex) {
+ LOG.error("Failed to close " + session, ex);
+ }
+ return true;
+ }
+
+ /**
+ * Puts session back into the pool.
+ * @return true if the session has been put back; false if it's not needed and should be killed.
+ */
+ private boolean putSessionBack(SessionType session, boolean isFirst) {
+ SettableFuture<SessionType> future = null;
+ poolLock.lock();
+ try {
+ // See if we need to kill some sessions because the pool was resized down while
+ // a bunch of sessions were outstanding. See also deltaRemaining javadoc.
+ while (true) {
+ int remainingToKill = -deltaRemaining.get();
+ if (remainingToKill <= 0) break; // No need to kill anything.
+ if (deltaRemaining.compareAndSet(-remainingToKill, -remainingToKill + 1)) {
+ return false;
+ }
+ }
+ // If there are async requests, satisfy them first.
+ if (!asyncRequests.isEmpty() && session.tryUse(false)) {
+ future = asyncRequests.poll();
+ }
+ if (future == null) {
+ // Put session into the pool.
+ if (isFirst) {
+ pool.addFirst(session);
+ } else {
+ pool.addLast(session);
+ }
+ notEmpty.signalAll();
+ }
+ } finally {
+ poolLock.unlock();
}
+ if (future != null) {
+ future.set(session);
+ }
+ return true;
}
- void replaceSession(SessionType oldSession, SessionType newSession,
- boolean keepTmpDir, String[] additionalFilesArray, HiveConf conf) throws Exception {
+ void replaceSession(SessionType oldSession, boolean keepTmpDir,
+ String[] additionalFilesArray) throws Exception {
// Retain the stuff from the old session.
// Re-setting the queue config is an old hack that we may remove in future.
+ SessionType newSession = sessionObjFactory.create(oldSession);
Path scratchDir = oldSession.getTezScratchDir();
String queueName = oldSession.getQueueName();
Set<String> additionalFiles = null;
@@ -171,12 +255,16 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
}
try {
oldSession.close(keepTmpDir);
- boolean wasRemoved = defaultQueuePool.remove(oldSession);
- if (!wasRemoved) {
- LOG.error("Old session was closed but it was not in the pool", oldSession);
+ } finally {
+ poolLock.lock();
+ try {
+ // The expiring session may or may not be in the pool.
+ pool.remove(oldSession);
+ } finally {
+ poolLock.unlock();
}
+
bySessionId.remove(oldSession.getSessionId());
- } finally {
// There's some bogus code that can modify the queue name. Force-set it for pool sessions.
// TODO: this might only be applicable to TezSessionPoolManager; try moving it there?
newSession.getConf().set(TezConfiguration.TEZ_QUEUE_NAME, queueName);
@@ -184,18 +272,36 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
// registry again just in case. TODO: maybe we should enforce that.
configureAmRegistry(newSession);
newSession.open(additionalFiles, scratchDir);
- defaultQueuePool.put(newSession);
+ if (!putSessionBack(newSession, false)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closing an unneeded session " + newSession
+ + "; trying to replace " + oldSession);
+ }
+ try {
+ newSession.close(false);
+ } catch (Exception ex) {
+ LOG.error("Failed to close an unneeded session", ex);
+ }
+ }
}
}
+
private void startInitialSession(SessionType session) throws Exception {
- boolean isUsable = session.tryUse();
+ boolean isUsable = session.tryUse(true);
if (!isUsable) throw new IOException(session + " is not usable at pool startup");
session.getConf().set(TezConfiguration.TEZ_QUEUE_NAME, session.getQueueName());
configureAmRegistry(session);
session.open();
if (session.stopUsing()) {
- defaultQueuePool.put(session);
+ if (!putSessionBack(session, false)) {
+ LOG.warn("Couldn't add a session during initialization");
+ try {
+ session.close(false);
+ } catch (Exception ex) {
+ LOG.error("Failed to close an unneeded session", ex);
+ }
+ }
}
}
@@ -230,7 +336,7 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
@Override
public void onUpdate(TezAmInstance serviceInstance) {
// Presumably we'd get those later if AM updates its stuff.
- LOG.warn("Received an unexpected update for instance={}. Ignoring", serviceInstance);
+ LOG.info("Received an unexpected update for instance={}. Ignoring", serviceInstance);
}
@Override
@@ -239,11 +345,126 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
// For now, we don't take any action. In future, we might restore the session based
// on this and get rid of the logic outside of the pool that replaces/reopens/etc.
LOG.warn("AM for " + sessionId + " has disappeared from the registry");
+ // TODO: this might race if AM for the same session is restarted internally by Tez.
+ // It is possible to receive the create before remove and remove the wrong one.
+ // We need some identity in the value to make sure that doesn't happen.
bySessionId.remove(sessionId);
}
}
+ @VisibleForTesting
int getInitialSize() {
return initialSize;
}
+
+ /**
+ * Resizes the pool asynchronously.
+ * @param delta A number of threads to add or remove.
+ * @param toClose An output list to which newly-unneeded sessions, to be closed by the caller.
+ */
+ public ListenableFuture<?> resizeAsync(int delta, List<SessionType> toClose) {
+ if (delta == 0) return createDummyFuture();
+ poolLock.lock();
+ try {
+ if (delta < 0) {
+ return resizeDownInternal(-delta, toClose);
+ } else {
+ return resizeUpInternal(delta);
+ }
+ } finally {
+ poolLock.unlock();
+ }
+ }
+
+ private ListenableFuture<?> resizeUpInternal(int delta) {
+ // 1) Cancel the kills if any, to avoid killing the returned sessions.
+ // Also sets the count for the async initialization.
+ int oldVal;
+ do {
+ oldVal = deltaRemaining.get();
+ } while (!deltaRemaining.compareAndSet(oldVal, oldVal + delta));
+ int toStart = oldVal + delta;
+ if (toStart <= 0) return createDummyFuture();
+ LOG.info("Resizing the pool; adding " + toStart + " sessions");
+
+ // 2) If we need to create some extra sessions, we'd do it just like startup does.
+ int threadCount = Math.max(1, Math.min(toStart,
+ HiveConf.getIntVar(initConf, ConfVars.HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS)));
+ List<ListenableFutureTask<Boolean>> threadTasks = new ArrayList<>(threadCount);
+ // This is an async method, so always launch threads, even for a single task.
+ for (int i = 0; i < threadCount; ++i) {
+ ListenableFutureTask<Boolean> task = ListenableFutureTask.create(
+ new CreateSessionsRunnable(deltaRemaining));
+ new Thread(task, "Tez pool resize " + i).start();
+ threadTasks.add(task);
+ }
+ return Futures.allAsList(threadTasks);
+ }
+
+ private ListenableFuture<Boolean> resizeDownInternal(int delta, List<SessionType> toClose) {
+ // 1) Cancel the previous expansion, if any.
+ while (true) {
+ int expansionCount = deltaRemaining.get();
+ if (expansionCount <= 0) break;
+ int expansionCancelled = Math.min(expansionCount, delta);
+ if (deltaRemaining.compareAndSet(expansionCount, expansionCount - expansionCancelled)) {
+ delta -= expansionCancelled;
+ break;
+ }
+ }
+ // 2) Drain unused sessions; the close() is sync so delegate to the caller.
+ while (delta > 0) {
+ SessionType session = pool.poll();
+ if (session == null) break;
+ if (!session.tryUse(true)) continue;
+ toClose.add(session);
+ --delta;
+ }
+ // 3) If too many sessions are outstanding (e.g. due to expiration restarts - should
+ // not happen with in-use sessions because WM already kills the extras), we will kill
+ // them as they come back from restarts.
+ if (delta > 0) {
+ int oldVal;
+ do {
+ oldVal = deltaRemaining.get();
+ } while (!deltaRemaining.compareAndSet(oldVal, oldVal - delta));
+ }
+ return createDummyFuture();
+ }
+
+ private ListenableFuture<Boolean> createDummyFuture() {
+ SettableFuture<Boolean> f = SettableFuture.create();
+ f.set(true);
+ return f;
+ }
+
+ private final class CreateSessionsRunnable implements Callable<Boolean> {
+ private final AtomicInteger remaining;
+
+ private CreateSessionsRunnable(AtomicInteger remaining) {
+ this.remaining = remaining;
+ }
+
+ public Boolean call() throws Exception {
+ if (parentSessionState != null) {
+ SessionState.setCurrentSessionState(parentSessionState);
+ }
+ while (true) {
+ int oldVal = remaining.get();
+ if (oldVal <= 0) return true;
+ if (!remaining.compareAndSet(oldVal, oldVal - 1)) continue;
+ startInitialSession(sessionObjFactory.create(null));
+ }
+ }
+ }
+
+ @VisibleForTesting
+ int getCurrentSize() {
+ poolLock.tryLock();
+ try {
+ return pool.size();
+ } finally {
+ poolLock.unlock();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
index 9b4714f..15543d6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
@@ -18,6 +18,11 @@
package org.apache.hadoop.hive.ql.exec.tez;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import javax.security.auth.login.LoginException;
+import org.apache.tez.dag.api.TezException;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -102,7 +107,7 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
public void startPool() throws Exception {
if (defaultSessionPool != null) {
- defaultSessionPool.startInitialSessions();
+ defaultSessionPool.start();
}
if (expirationTracker != null) {
expirationTracker.start();
@@ -110,7 +115,7 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
}
public void setupPool(HiveConf conf) throws Exception {
- String[] defaultQueueList = HiveConf.getTrimmedStringsVar(
+ final String[] defaultQueueList = HiveConf.getTrimmedStringsVar(
conf, HiveConf.ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES);
this.initConf = conf;
int emptyNames = 0; // We don't create sessions for empty entries.
@@ -123,7 +128,37 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
int numSessionsTotal = numSessions * (defaultQueueList.length - emptyNames);
if (numSessionsTotal > 0) {
boolean enableAmRegistry = false;
- defaultSessionPool = new TezSessionPool<>(initConf, numSessionsTotal, enableAmRegistry);
+ defaultSessionPool = new TezSessionPool<>(initConf, numSessionsTotal, enableAmRegistry,
+ new TezSessionPool.SessionObjectFactory<TezSessionPoolSession>() {
+ int queueIx = 0;
+
+ @Override
+ public TezSessionPoolSession create(TezSessionPoolSession oldSession) {
+ if (oldSession != null) {
+ return createAndInitSession(
+ oldSession.getQueueName(), oldSession.isDefault(), oldSession.getConf());
+ }
+ // We never resize the pool, so assume this is initialization.
+ // If that changes, we might have to make the factory interface more complicated.
+ /*
+ * In a single-threaded init case, with this the ordering of sessions in the queue will be
+ * (with 2 sessions 3 queues) s1q1, s1q2, s1q3, s2q1, s2q2, s2q3 there by ensuring uniform
+ * distribution of the sessions across queues at least to begin with. Then as sessions get
+ * freed up, the list may change this ordering.
+ * In a multi threaded init case it's a free for all.
+ */
+ int localQueueIx;
+ synchronized (defaultQueueList) {
+ localQueueIx = queueIx;
+ ++queueIx;
+ if (queueIx == defaultQueueList.length) {
+ queueIx = 0;
+ }
+ }
+ HiveConf sessionConf = new HiveConf(initConf);
+ return createAndInitSession(defaultQueueList[localQueueIx], true, sessionConf);
+ }
+ });
}
numConcurrentLlapQueries = conf.getIntVar(ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES);
@@ -155,24 +190,6 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
if (!hasInitialSessions) {
return;
}
-
-
- /*
- * In a single-threaded init case, with this the ordering of sessions in the queue will be
- * (with 2 sessions 3 queues) s1q1, s1q2, s1q3, s2q1, s2q2, s2q3 there by ensuring uniform
- * distribution of the sessions across queues at least to begin with. Then as sessions get
- * freed up, the list may change this ordering.
- * In a multi threaded init case it's a free for all.
- */
- for (int i = 0; i < numSessions; i++) {
- for (String queueName : defaultQueueList) {
- if (queueName.isEmpty()) {
- continue;
- }
- HiveConf sessionConf = new HiveConf(initConf);
- defaultSessionPool.addInitialSession(createAndInitSession(queueName, true, sessionConf));
- }
- }
}
// TODO Create and init session sets up queue, isDefault - but does not initialize the configuration
@@ -438,7 +455,14 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
&& sessionConf.get(TezConfiguration.TEZ_QUEUE_NAME) == null) {
sessionConf.set(TezConfiguration.TEZ_QUEUE_NAME, sessionState.getQueueName());
}
+ reopenInternal(sessionState, additionalFiles);
+ return sessionState;
+ }
+
+ static void reopenInternal(
+ TezSessionState sessionState, String[] additionalFiles) throws Exception {
Set<String> oldAdditionalFiles = sessionState.getAdditionalFilesNotFromConf();
+ // TODO: implies the session files and the array are the same if not null; why? very brittle
if ((oldAdditionalFiles == null || oldAdditionalFiles.isEmpty())
&& (additionalFiles != null)) {
oldAdditionalFiles = new HashSet<>();
@@ -449,11 +473,11 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
// TODO: close basically resets the object to a bunch of nulls.
// We should ideally not reuse the object because it's pointless and error-prone.
sessionState.close(true);
- // TODO: should we reuse scratchDir too?
+ // Note: scratchdir is reused implicitly because the sessionId is the same.
sessionState.open(oldAdditionalFiles, null);
- return sessionState;
}
+
public void closeNonDefaultSessions(boolean keepTmpDir) throws Exception {
List<TezSessionPoolSession> sessionsToClose = null;
synchronized (openSessions) {
@@ -467,14 +491,14 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
/** Closes a running (expired) pool session and reopens it. */
@Override
- public void closeAndReopenPoolSession(TezSessionPoolSession oldSession) throws Exception {
+ public void closeAndReopenExpiredSession(TezSessionPoolSession oldSession) throws Exception {
String queueName = oldSession.getQueueName();
if (queueName == null) {
LOG.warn("Pool session has a null queue: " + oldSession);
}
TezSessionPoolSession newSession = createAndInitSession(
queueName, oldSession.isDefault(), oldSession.getConf());
- defaultSessionPool.replaceSession(oldSession, newSession, false, null, null);
+ defaultSessionPool.replaceSession(oldSession, false, null);
}
/** Called by TezSessionPoolSession when opened. */
http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
index 6135223..6887d7a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
@@ -18,6 +18,12 @@
package org.apache.hadoop.hive.ql.exec.tez;
+import com.google.common.util.concurrent.SettableFuture;
+
+import org.apache.hadoop.hive.registry.impl.TezAmInstance;
+
+import org.apache.hadoop.conf.Configuration;
+
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Collection;
@@ -139,28 +145,28 @@ class TezSessionPoolSession extends TezSessionState {
* Tries to use this session. When the session is in use, it will not expire.
* @return true if the session can be used; false if it has already expired.
*/
- public boolean tryUse() throws Exception {
+ public boolean tryUse(boolean ignoreExpiration) {
while (true) {
int oldValue = sessionState.get();
if (oldValue == STATE_IN_USE) throw new AssertionError(this + " is already in use");
if (oldValue == STATE_EXPIRED) return false;
- int finalState = shouldExpire() ? STATE_EXPIRED : STATE_IN_USE;
+ int finalState = (!ignoreExpiration && shouldExpire()) ? STATE_EXPIRED : STATE_IN_USE;
if (sessionState.compareAndSet(STATE_NONE, finalState)) {
if (finalState == STATE_IN_USE) return true;
// Restart asynchronously, don't block the caller.
- expirationTracker.closeAndRestartExpiredSession(this, true);
+ expirationTracker.closeAndRestartExpiredSessionAsync(this);
return false;
}
}
}
- boolean stopUsing() throws Exception {
+ boolean stopUsing() {
int finalState = shouldExpire() ? STATE_EXPIRED : STATE_NONE;
if (!sessionState.compareAndSet(STATE_IN_USE, finalState)) {
throw new AssertionError("Unexpected state change; currently " + sessionState.get());
}
if (finalState == STATE_NONE) return true;
- expirationTracker.closeAndRestartExpiredSession(this, true);
+ expirationTracker.closeAndRestartExpiredSessionAsync(this);
return false;
}
@@ -176,13 +182,17 @@ class TezSessionPoolSession extends TezSessionState {
while (true) {
if (sessionState.get() != STATE_NONE) return true; // returnAfterUse will take care of this
if (sessionState.compareAndSet(STATE_NONE, STATE_EXPIRED)) {
- expirationTracker.closeAndRestartExpiredSession(this, isAsync);
+ if (isAsync) {
+ expirationTracker.closeAndRestartExpiredSessionAsync(this);
+ } else {
+ expirationTracker.closeAndRestartExpiredSession(this);
+ }
return true;
}
}
}
- private boolean shouldExpire() {
+ private final boolean shouldExpire() {
return expirationNs != null && (System.nanoTime() - expirationNs) >= 0;
}
@@ -209,4 +219,4 @@ class TezSessionPoolSession extends TezSessionState {
void updateFromRegistry(TezAmInstance si) {
// Nothing to do.
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index 1448168..541ee73 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -629,7 +629,7 @@ public class TezSessionState {
}
}
- public void cleanupScratchDir () throws IOException {
+ protected final void cleanupScratchDir () throws IOException {
FileSystem fs = tezScratchDir.getFileSystem(conf);
fs.delete(tezScratchDir, true);
tezScratchDir = null;
http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
index 81d6b85..96dc7d3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
@@ -41,13 +41,12 @@ class UserPoolMapping {
private final String defaultPoolName;
// TODO: add other types as needed
- public UserPoolMapping(List<TmpUserMapping> mappings, Set<String> poolNames) {
+ public UserPoolMapping(List<TmpUserMapping> mappings) {
String defaultPoolName = null;
for (TmpUserMapping mapping : mappings) {
switch (mapping.getType()) {
case USER: {
- String poolName = getValidPoolName(poolNames, mapping);
- Mapping val = new Mapping(poolName, mapping.getPriority());
+ Mapping val = new Mapping(mapping.getPoolName(), mapping.getPriority());
Mapping oldValue = userMappings.put(mapping.getName(), val);
if (oldValue != null) {
throw new AssertionError("Duplicate mapping for user " + mapping.getName() + "; "
@@ -56,7 +55,7 @@ class UserPoolMapping {
break;
}
case DEFAULT: {
- String poolName = getValidPoolName(poolNames, mapping);
+ String poolName = mapping.getPoolName();
if (defaultPoolName != null) {
throw new AssertionError("Duplicate default mapping; "
+ defaultPoolName + " and " + poolName);
@@ -80,13 +79,4 @@ class UserPoolMapping {
if (userMapping != null) return userMapping.fullPoolName;
return defaultPoolName;
}
-
- private static String getValidPoolName(Set<String> poolNames, TmpUserMapping mapping) {
- String poolName = mapping.getPoolName();
- // Should we really be validating here? The plan should be validated before applying.
- if (!poolNames.contains(mapping.getPoolName())) {
- throw new AssertionError("Invalid pool in the mapping " + poolName);
- }
- return poolName;
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
index 00501ee..0dd1433 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
@@ -18,18 +18,30 @@
package org.apache.hadoop.hive.ql.exec.tez;
-import java.util.concurrent.TimeoutException;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.registry.impl.TezAmInstance;
public class WmTezSession extends TezSessionPoolSession implements AmPluginNode {
private String poolName;
private double clusterFraction;
+ private String killReason = null;
private final Object amPluginInfoLock = new Object();
private AmPluginInfo amPluginInfo = null;
+ private SettableFuture<WmTezSession> amRegistryFuture = null;
+ private ScheduledFuture<?> timeoutTimer = null;
+ private final WorkloadManager wmParent;
/** The actual state of the guaranteed task, and the update state, for the session. */
// Note: hypothetically, a generic WM-aware-session should not know about guaranteed tasks.
@@ -41,34 +53,60 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
}
private final ActualWmState actualState = new ActualWmState();
- public WmTezSession(String sessionId, Manager parent,
+ public WmTezSession(String sessionId, WorkloadManager parent,
SessionExpirationTracker expiration, HiveConf conf) {
super(sessionId, parent, expiration, conf);
+ wmParent = parent;
}
- @Override
- public AmPluginInfo waitForAmPluginInfo(int timeoutMs)
- throws InterruptedException, TimeoutException {
+ @VisibleForTesting
+ WmTezSession(String sessionId, Manager testParent,
+ SessionExpirationTracker expiration, HiveConf conf) {
+ super(sessionId, testParent, expiration, conf);
+ wmParent = null;
+ }
+
+ public ListenableFuture<WmTezSession> waitForAmRegistryAsync(
+ int timeoutMs, ScheduledExecutorService timeoutPool) {
+ SettableFuture<WmTezSession> future = SettableFuture.create();
synchronized (amPluginInfoLock) {
- if (amPluginInfo == null) {
- amPluginInfoLock.wait(timeoutMs);
- if (amPluginInfo == null) {
- throw new TimeoutException("No plugin information for " + getSessionId());
- }
+ if (amPluginInfo != null) {
+ future.set(this);
+ return future;
}
- return amPluginInfo;
+ if (amRegistryFuture != null) {
+ // We don't need this for now, so do not support it.
+ future.setException(new RuntimeException("Multiple waits are not suported"));
+ return future;
+ }
+ amRegistryFuture = future;
+ if (timeoutMs <= 0) return future;
+ // TODO: replace with withTimeout after we get the relevant guava upgrade.
+ this.timeoutTimer = timeoutPool.schedule(
+ new TimeoutRunnable(), timeoutMs, TimeUnit.MILLISECONDS);
}
+ return future;
}
+
@Override
void updateFromRegistry(TezAmInstance si) {
+ AmPluginInfo info = new AmPluginInfo(si.getHost(), si.getPluginPort(),
+ si.getPluginToken(), si.getPluginTokenJobId());
synchronized (amPluginInfoLock) {
- this.amPluginInfo = new AmPluginInfo(si.getHost(), si.getPluginPort(),
- si.getPluginToken(), si.getPluginTokenJobId());
- amPluginInfoLock.notifyAll();
+ this.amPluginInfo = info;
+ if (amRegistryFuture != null) {
+ amRegistryFuture.set(this);
+ amRegistryFuture = null;
+ }
+ if (timeoutTimer != null) {
+ timeoutTimer.cancel(true);
+ timeoutTimer = null;
+ }
}
}
+ @Override
public AmPluginInfo getAmPluginInfo() {
return amPluginInfo; // Only has final fields, no artifacts from the absence of sync.
}
@@ -85,6 +123,11 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
this.clusterFraction = fraction;
}
+ void clearWm() {
+ this.poolName = null;
+ this.clusterFraction = 0f;
+ }
+
double getClusterFraction() {
return this.clusterFraction;
}
@@ -118,4 +161,42 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
return (actualState.sent == actualState.target);
}
}
+
+ public void handleUpdateError() {
+ wmParent.addUpdateError(this);
+ }
+
+ @Override
+ public int hashCode() {
+ return System.identityHashCode(this);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj == this;
+ }
+
+ boolean isIrrelevantForWm() {
+ return killReason != null;
+ }
+
+ String getReasonForKill() {
+ return killReason;
+ }
+
+ void setIsIrrelevantForWm(String killReason) {
+ this.killReason = killReason;
+ }
+
+ private final class TimeoutRunnable implements Runnable {
+ @Override
+ public void run() {
+ synchronized (amPluginInfoLock) {
+ timeoutTimer = null;
+ if (amRegistryFuture == null || amRegistryFuture.isDone()) return;
+ amRegistryFuture.cancel(true);
+ amRegistryFuture = null;
+ }
+ }
+ }
}