You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/11/09 21:41:21 UTC
hive git commit: HIVE-18028 : fix WM based on cluster smoke test;
add logging (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/master 82cb3d57a -> 3914a1b29
HIVE-18028 : fix WM based on cluster smoke test; add logging (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3914a1b2
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3914a1b2
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3914a1b2
Branch: refs/heads/master
Commit: 3914a1b29248093dfebabfb1852f443941e489c9
Parents: 82cb3d5
Author: sergey <se...@apache.org>
Authored: Thu Nov 9 13:36:01 2017 -0800
Committer: sergey <se...@apache.org>
Committed: Thu Nov 9 13:36:01 2017 -0800
----------------------------------------------------------------------
.../ql/exec/tez/GuaranteedTasksAllocator.java | 1 +
.../hadoop/hive/ql/exec/tez/TezSessionPool.java | 2 -
.../hive/ql/exec/tez/WorkloadManager.java | 79 ++++++++------------
.../ql/exec/tez/WorkloadManagerFederation.java | 19 +++--
.../hadoop/hive/metastore/ObjectStore.java | 9 ++-
5 files changed, 53 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/3914a1b2/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
index 53dd698..0cb2076 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
@@ -147,6 +147,7 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
// HS2 session pool paths, and this patch removes the last one (reopen).
UpdateQueryRequestProto request = UpdateQueryRequestProto
.newBuilder().setGuaranteedTaskCount(intAlloc).build();
+ LOG.info("Updating {} with {} guaranteed tasks", session.getSessionId(), intAlloc);
amCommunicator.sendUpdateQuery(request, (AmPluginNode)session, new UpdateCallback(session));
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3914a1b2/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
index fa0eedb..03a0682 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
@@ -312,8 +312,6 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
HiveConf conf = session.getConf();
conf.set(ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME.varname, amRegistryName);
conf.set(ConfVars.HIVESESSIONID.varname, session.getSessionId());
- // TODO: can be enable temporarily for testing
- // conf.set(LlapTaskSchedulerService.LLAP_PLUGIN_ENDPOINT_ENABLED, "true");
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3914a1b2/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index 039881f..16f5dce 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -17,25 +17,6 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
-import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput;
-import org.apache.hadoop.hive.ql.wm.ExpressionFactory;
-
-import org.apache.hadoop.hive.ql.wm.Trigger.Action;
-
-import org.apache.hadoop.hive.ql.wm.ExecutionTrigger;
-
-import org.apache.hadoop.hive.metastore.api.WMPoolTrigger;
-
-import org.apache.hadoop.hive.metastore.api.WMTrigger;
-
-import org.apache.commons.lang3.StringUtils;
-
-import org.apache.hadoop.hive.metastore.api.WMPool;
-
-import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
-
-import org.apache.hadoop.hive.ql.session.SessionState;
-
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback;
@@ -62,12 +43,23 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.tezplugins.LlapTaskSchedulerService;
+import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
+import org.apache.hadoop.hive.metastore.api.WMPool;
+import org.apache.hadoop.hive.metastore.api.WMPoolTrigger;
+import org.apache.hadoop.hive.metastore.api.WMTrigger;
+import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.wm.ExecutionTrigger;
+import org.apache.hadoop.hive.ql.wm.ExpressionFactory;
import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider;
import org.apache.hadoop.hive.ql.wm.Trigger;
+import org.apache.hadoop.hive.ql.wm.Trigger.Action;
import org.apache.hadoop.hive.ql.wm.TriggerActionHandler;
import org.apache.tez.dag.api.TezConfiguration;
import org.slf4j.Logger;
@@ -78,8 +70,6 @@ import org.slf4j.LoggerFactory;
public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValidator
implements TezSessionPoolSession.Manager, SessionExpirationTracker.RestartImpl {
private static final Logger LOG = LoggerFactory.getLogger(WorkloadManager.class);
- // TODO: this is a temporary setting that will go away, so it's not in HiveConf.
- public static final String TEST_WM_CONFIG = "hive.test.workload.management";
private static final char POOL_SEPARATOR = '.';
private static final String POOL_SEPARATOR_STR = "" + POOL_SEPARATOR;
@@ -147,10 +137,6 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
return INSTANCE;
}
- public static boolean isInUse(Configuration conf) {
- return INSTANCE != null && conf.getBoolean(TEST_WM_CONFIG, false);
- }
-
/** Called once, when HS2 initializes. */
public static WorkloadManager create(String yarnQueue, HiveConf conf, WMFullResourcePlan plan) {
assert INSTANCE == null;
@@ -300,7 +286,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
currentLock.unlock();
}
try {
- LOG.debug("Processing current events");
+ LOG.info("Processing current events");
processCurrentEvents(currentEvents, syncWork);
scheduleWork(syncWork);
} catch (InterruptedException ex) {
@@ -328,7 +314,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
// 1. Restart pool sessions.
for (final WmTezSession toRestart : context.toRestartInUse) {
- LOG.debug("Replacing " + toRestart + " with a new session");
+ LOG.info("Replacing " + toRestart + " with a new session");
workPool.submit(() -> {
try {
// Note: sessions in toRestart are always in use, so they cannot expire in parallel.
@@ -341,7 +327,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
context.toRestartInUse.clear();
// 2. Destroy the sessions that we don't need anymore.
for (final WmTezSession toDestroy : context.toDestroyNoRestart) {
- LOG.debug("Closing " + toDestroy + " without restart");
+ LOG.info("Closing " + toDestroy + " without restart");
workPool.submit(() -> {
try {
toDestroy.close(false);
@@ -375,7 +361,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
if (e.toReturn.remove(sessionToDestroy)) {
LOG.warn("The session was both destroyed and returned by the user; destroying");
}
- LOG.debug("Destroying {}", sessionToDestroy);
+ LOG.info("Destroying {}", sessionToDestroy);
Boolean shouldReturn = handleReturnedInUseSessionOnMasterThread(
e, sessionToDestroy, poolsToRedistribute);
if (shouldReturn == null || shouldReturn) {
@@ -387,7 +373,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
// 2. Now handle actual returns. Sessions may be returned to the pool or may trigger expires.
for (WmTezSession sessionToReturn: e.toReturn) {
- LOG.debug("Returning {}", sessionToReturn);
+ LOG.info("Returning {}", sessionToReturn);
Boolean shouldReturn = handleReturnedInUseSessionOnMasterThread(
e, sessionToReturn, poolsToRedistribute);
if (shouldReturn == null) {
@@ -405,7 +391,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
// 3. Reopen is essentially just destroy + get a new session for a session in use.
for (Map.Entry<WmTezSession, SettableFuture<WmTezSession>> entry : e.toReopen.entrySet()) {
- LOG.debug("Reopening {}", entry.getKey());
+ LOG.info("Reopening {}", entry.getKey());
handeReopenRequestOnMasterThread(
e, entry.getKey(), entry.getValue(), poolsToRedistribute, syncWork);
}
@@ -413,7 +399,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
// 4. All the sessions in use that were not destroyed or returned with a failed update now die.
for (WmTezSession sessionWithUpdateError : e.updateErrors) {
- LOG.debug("Update failed for {}", sessionWithUpdateError);
+ LOG.info("Update failed for {}", sessionWithUpdateError);
handleUpdateErrorOnMasterThread(sessionWithUpdateError, e, syncWork, poolsToRedistribute);
}
e.updateErrors.clear();
@@ -421,7 +407,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
// 5. Now apply a resource plan if any. This is expected to be pretty rare.
boolean hasRequeues = false;
if (e.resourcePlanToApply != null) {
- LOG.debug("Applying new resource plan");
+ LOG.info("Applying new resource plan");
int getReqCount = e.getRequests.size();
applyNewResourcePlanOnMasterThread(e, syncWork, poolsToRedistribute);
hasRequeues = getReqCount != e.getRequests.size();
@@ -435,14 +421,14 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
// want query level fairness, and don't want the get in queue to hold up a session.
GetRequest req;
while ((req = e.getRequests.pollFirst()) != null) {
- LOG.debug("Processing a new get request from " + req.mappingInput);
+ LOG.info("Processing a new get request from " + req.mappingInput);
queueGetRequestOnMasterThread(req, poolsToRedistribute, syncWork);
}
e.toReuse.clear();
// 7. If there was a cluster state change, make sure we redistribute all the pools.
if (e.hasClusterStateChanged) {
- LOG.debug("Processing a cluster state change");
+ LOG.info("Processing a cluster state change");
poolsToRedistribute.addAll(pools.keySet());
e.hasClusterStateChanged = false;
}
@@ -450,7 +436,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
// 8. Finally, for all the pools that have changes, promote queued queries and rebalance.
for (String poolName : poolsToRedistribute) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Processing changes for pool " + poolName + ": " + pools.get(poolName));
+ LOG.info("Processing changes for pool " + poolName + ": " + pools.get(poolName));
}
processPoolChangesOnMasterThread(poolName, syncWork, hasRequeues);
}
@@ -487,7 +473,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
} finally {
sw.lock.unlock();
}
- LOG.debug("Processing " + ((session == null) ? "failed" : "successful")
+ LOG.info("Processing " + ((session == null) ? "failed" : "successful")
+ " initialization result for pool " + sw.poolName);
// We could not have removed the pool for this session, or we would have CANCELED the init.
PoolState pool = pools.get(sw.poolName);
@@ -514,7 +500,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
// the invalid combination of destroy and return themselves, as well as the actual
// statement that destroys or returns it.
if (e.updateErrors.remove(session)) {
- LOG.debug("Ignoring an update error for a session being destroyed or returned");
+ LOG.info("Ignoring an update error for a session being destroyed or returned");
}
SettableFuture<WmTezSession> future = e.toReopen.remove(session);
if (future != null) {
@@ -531,7 +517,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
SettableFuture<WmTezSession> future, HashSet<String> poolsToRedistribute,
WmThreadSyncWork syncWork) throws Exception {
if (e.updateErrors.remove(session)) {
- LOG.debug("Ignoring an update error for a session being reopened");
+ LOG.info("Ignoring an update error for a session being reopened");
}
GetRequest reuseRequest = e.toReuse.remove(session);
if (reuseRequest != null) {
@@ -743,7 +729,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
int queriesToStart = Math.min(pool.queue.size(),
pool.queryParallelism - pool.getTotalActiveSessions());
if (queriesToStart > 0) {
- LOG.debug("Starting {} queries in pool {}", queriesToStart, pool);
+ LOG.info("Starting {} queries in pool {}", queriesToStart, pool);
}
if (hasRequeues) {
// Sort the queue - we may have put items here out of order.
@@ -1083,6 +1069,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
@VisibleForTesting
protected WmTezSession createSessionObject(String sessionId, HiveConf conf) {
conf = (conf == null) ? new HiveConf(this.conf) : conf;
+ conf.set(LlapTaskSchedulerService.LLAP_PLUGIN_ENDPOINT_ENABLED, "true");
return new WmTezSession(sessionId, this, expirationTracker, conf);
}
@@ -1299,7 +1286,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
oldState = state;
switch (oldState) {
case GETTING: {
- LOG.debug("Received a session from AM pool {}", session);
+ LOG.info("Received a session from AM pool {}", session);
assert this.state == SessionInitState.GETTING;
session.setPoolName(poolName);
session.setQueueName(yarnQueue);
@@ -1364,24 +1351,24 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
public void onFailure(Throwable t) {
SettableFuture<WmTezSession> future;
WmTezSession session;
- boolean wasCANCELED = false;
+ boolean wasCanceled = false;
lock.lock();
try {
- wasCANCELED = (state == SessionInitState.CANCELED);
+ wasCanceled = (state == SessionInitState.CANCELED);
session = this.session;
future = this.future;
this.future = null;
this.session = null;
- if (!wasCANCELED) {
+ if (!wasCanceled) {
this.state = SessionInitState.DONE;
}
} finally {
lock.unlock();
}
future.setException(t);
- if (!wasCANCELED) {
+ if (!wasCanceled) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Queueing the initialization failure with " + session);
+ LOG.info("Queueing the initialization failure with " + session);
}
notifyInitializationCompleted(this); // Report failure to the main thread.
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3914a1b2/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java
index 70adc33..c7ed534 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java
@@ -16,28 +16,37 @@
* limitations under the License.
*/package org.apache.hadoop.hive.ql.exec.tez;
+import org.slf4j.LoggerFactory;
+
+import org.slf4j.Logger;
+
import java.util.Set;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput;
public class WorkloadManagerFederation {
+ private static final Logger LOG = LoggerFactory.getLogger(WorkloadManagerFederation.class);
public static TezSessionState getSession(TezSessionState session, HiveConf conf,
MappingInput input, boolean isUnmanagedLlapMode, Set<String> desiredCounters) throws Exception {
// 1. If WM is not present just go to unmanaged.
- if (!WorkloadManager.isInUse(conf)) {
+ WorkloadManager wm = WorkloadManager.getInstance();
+ if (wm == null) {
+ LOG.debug("Using unmanaged session - WM is not initialized");
return getUnmanagedSession(session, conf, desiredCounters, isUnmanagedLlapMode);
}
- WorkloadManager wm = WorkloadManager.getInstance();
- // We will ask WM for preliminary mapping. This allows us to escape to the unmanaged path
- // quickly in the common case. It's still possible that resource plan will be updated and
- // so our preliminary mapping won't work out. We'll handle that below.
+ // 2. We will ask WM for a preliminary mapping. This allows us to escape to the unmanaged path
+ // quickly in the common case. It's still possible that resource plan will be updated and
+ // our preliminary mapping won't work out. We'll handle that below.
if (!wm.isManaged(input)) {
+ LOG.info("Using unmanaged session - no mapping for " + input);
return getUnmanagedSession(session, conf, desiredCounters, isUnmanagedLlapMode);
}
+ // 3. Finally, try WM.
try {
// Note: this may just block to wait for a session based on parallelism.
+ LOG.info("Getting a WM session for " + input);
TezSessionState result = wm.getSession(session, input, conf);
desiredCounters.addAll(wm.getTriggerCounterNames());
return result;
http://git-wip-us.apache.org/repos/asf/hive/blob/3914a1b2/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 9e9239b..c73b991 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -9686,20 +9686,21 @@ public class ObjectStore implements RawStore, Configurable {
public WMFullResourcePlan getActiveResourcePlan() throws MetaException {
boolean commited = false;
Query query = null;
+ WMFullResourcePlan result = null;
try {
openTransaction();
query = pm.newQuery(MWMResourcePlan.class, "status == activeStatus");
query.declareParameters("java.lang.String activeStatus");
query.setUnique(true);
MWMResourcePlan mResourcePlan = (MWMResourcePlan) query.execute(Status.ACTIVE.toString());
- commited = commitTransaction();
- if (mResourcePlan == null) {
- return null; // No active plan.
+ if (mResourcePlan != null) {
+ result = fullFromMResourcePlan(mResourcePlan);
}
- return fullFromMResourcePlan(mResourcePlan);
+ commited = commitTransaction();
} finally {
rollbackAndCleanup(commited, query);
}
+ return result;
}
private WMFullResourcePlan switchStatus(String name, MWMResourcePlan mResourcePlan,