You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/11/09 21:41:21 UTC

hive git commit: HIVE-18028 : fix WM based on cluster smoke test; add logging (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

Repository: hive
Updated Branches:
  refs/heads/master 82cb3d57a -> 3914a1b29


HIVE-18028 : fix WM based on cluster smoke test; add logging (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3914a1b2
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3914a1b2
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3914a1b2

Branch: refs/heads/master
Commit: 3914a1b29248093dfebabfb1852f443941e489c9
Parents: 82cb3d5
Author: sergey <se...@apache.org>
Authored: Thu Nov 9 13:36:01 2017 -0800
Committer: sergey <se...@apache.org>
Committed: Thu Nov 9 13:36:01 2017 -0800

----------------------------------------------------------------------
 .../ql/exec/tez/GuaranteedTasksAllocator.java   |  1 +
 .../hadoop/hive/ql/exec/tez/TezSessionPool.java |  2 -
 .../hive/ql/exec/tez/WorkloadManager.java       | 79 ++++++++------------
 .../ql/exec/tez/WorkloadManagerFederation.java  | 19 +++--
 .../hadoop/hive/metastore/ObjectStore.java      |  9 ++-
 5 files changed, 53 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/3914a1b2/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
index 53dd698..0cb2076 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
@@ -147,6 +147,7 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
     //       HS2 session pool paths, and this patch removes the last one (reopen).
     UpdateQueryRequestProto request = UpdateQueryRequestProto
         .newBuilder().setGuaranteedTaskCount(intAlloc).build();
+    LOG.info("Updating {} with {} guaranteed tasks", session.getSessionId(), intAlloc);
     amCommunicator.sendUpdateQuery(request, (AmPluginNode)session, new UpdateCallback(session));
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/3914a1b2/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
index fa0eedb..03a0682 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
@@ -312,8 +312,6 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
       HiveConf conf = session.getConf();
       conf.set(ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME.varname, amRegistryName);
       conf.set(ConfVars.HIVESESSIONID.varname, session.getSessionId());
-      // TODO: can be enable temporarily for testing
-      // conf.set(LlapTaskSchedulerService.LLAP_PLUGIN_ENDPOINT_ENABLED, "true");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/3914a1b2/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index 039881f..16f5dce 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -17,25 +17,6 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
-import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput;
-import org.apache.hadoop.hive.ql.wm.ExpressionFactory;
-
-import org.apache.hadoop.hive.ql.wm.Trigger.Action;
-
-import org.apache.hadoop.hive.ql.wm.ExecutionTrigger;
-
-import org.apache.hadoop.hive.metastore.api.WMPoolTrigger;
-
-import org.apache.hadoop.hive.metastore.api.WMTrigger;
-
-import org.apache.commons.lang3.StringUtils;
-
-import org.apache.hadoop.hive.metastore.api.WMPool;
-
-import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
-
-import org.apache.hadoop.hive.ql.session.SessionState;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.FutureCallback;
@@ -62,12 +43,23 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.tezplugins.LlapTaskSchedulerService;
+import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
+import org.apache.hadoop.hive.metastore.api.WMPool;
+import org.apache.hadoop.hive.metastore.api.WMPoolTrigger;
+import org.apache.hadoop.hive.metastore.api.WMTrigger;
+import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.wm.ExecutionTrigger;
+import org.apache.hadoop.hive.ql.wm.ExpressionFactory;
 import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider;
 import org.apache.hadoop.hive.ql.wm.Trigger;
+import org.apache.hadoop.hive.ql.wm.Trigger.Action;
 import org.apache.hadoop.hive.ql.wm.TriggerActionHandler;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.slf4j.Logger;
@@ -78,8 +70,6 @@ import org.slf4j.LoggerFactory;
 public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValidator
     implements TezSessionPoolSession.Manager, SessionExpirationTracker.RestartImpl {
   private static final Logger LOG = LoggerFactory.getLogger(WorkloadManager.class);
-  // TODO: this is a temporary setting that will go away, so it's not in HiveConf.
-  public static final String TEST_WM_CONFIG = "hive.test.workload.management";
   private static final char POOL_SEPARATOR = '.';
   private static final String POOL_SEPARATOR_STR = "" + POOL_SEPARATOR;
 
@@ -147,10 +137,6 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     return INSTANCE;
   }
 
-  public static boolean isInUse(Configuration conf) {
-    return INSTANCE != null && conf.getBoolean(TEST_WM_CONFIG, false);
-  }
-
   /** Called once, when HS2 initializes. */
   public static WorkloadManager create(String yarnQueue, HiveConf conf, WMFullResourcePlan plan) {
     assert INSTANCE == null;
@@ -300,7 +286,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
         currentLock.unlock();
       }
       try {
-        LOG.debug("Processing current events");
+        LOG.info("Processing current events");
         processCurrentEvents(currentEvents, syncWork);
         scheduleWork(syncWork);
       } catch (InterruptedException ex) {
@@ -328,7 +314,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
 
     // 1. Restart pool sessions.
     for (final WmTezSession toRestart : context.toRestartInUse) {
-      LOG.debug("Replacing " + toRestart + " with a new session");
+      LOG.info("Replacing " + toRestart + " with a new session");
       workPool.submit(() -> {
         try {
           // Note: sessions in toRestart are always in use, so they cannot expire in parallel.
@@ -341,7 +327,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     context.toRestartInUse.clear();
     // 2. Destroy the sessions that we don't need anymore.
     for (final WmTezSession toDestroy : context.toDestroyNoRestart) {
-      LOG.debug("Closing " + toDestroy + " without restart");
+      LOG.info("Closing " + toDestroy + " without restart");
       workPool.submit(() -> {
         try {
           toDestroy.close(false);
@@ -375,7 +361,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       if (e.toReturn.remove(sessionToDestroy)) {
         LOG.warn("The session was both destroyed and returned by the user; destroying");
       }
-      LOG.debug("Destroying {}", sessionToDestroy);
+      LOG.info("Destroying {}", sessionToDestroy);
       Boolean shouldReturn = handleReturnedInUseSessionOnMasterThread(
           e, sessionToDestroy, poolsToRedistribute);
       if (shouldReturn == null || shouldReturn) {
@@ -387,7 +373,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
 
     // 2. Now handle actual returns. Sessions may be returned to the pool or may trigger expires.
     for (WmTezSession sessionToReturn: e.toReturn) {
-      LOG.debug("Returning {}", sessionToReturn);
+      LOG.info("Returning {}", sessionToReturn);
       Boolean shouldReturn = handleReturnedInUseSessionOnMasterThread(
           e, sessionToReturn, poolsToRedistribute);
       if (shouldReturn == null) {
@@ -405,7 +391,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
 
     // 3. Reopen is essentially just destroy + get a new session for a session in use.
     for (Map.Entry<WmTezSession, SettableFuture<WmTezSession>> entry : e.toReopen.entrySet()) {
-      LOG.debug("Reopening {}", entry.getKey());
+      LOG.info("Reopening {}", entry.getKey());
       handeReopenRequestOnMasterThread(
           e, entry.getKey(), entry.getValue(), poolsToRedistribute, syncWork);
     }
@@ -413,7 +399,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
 
     // 4. All the sessions in use that were not destroyed or returned with a failed update now die.
     for (WmTezSession sessionWithUpdateError : e.updateErrors) {
-      LOG.debug("Update failed for {}", sessionWithUpdateError);
+      LOG.info("Update failed for {}", sessionWithUpdateError);
       handleUpdateErrorOnMasterThread(sessionWithUpdateError, e, syncWork, poolsToRedistribute);
     }
     e.updateErrors.clear();
@@ -421,7 +407,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     // 5. Now apply a resource plan if any. This is expected to be pretty rare.
     boolean hasRequeues = false;
     if (e.resourcePlanToApply != null) {
-      LOG.debug("Applying new resource plan");
+      LOG.info("Applying new resource plan");
       int getReqCount = e.getRequests.size();
       applyNewResourcePlanOnMasterThread(e, syncWork, poolsToRedistribute);
       hasRequeues = getReqCount != e.getRequests.size();
@@ -435,14 +421,14 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     //    want query level fairness, and don't want the get in queue to hold up a session.
     GetRequest req;
     while ((req = e.getRequests.pollFirst()) != null) {
-      LOG.debug("Processing a new get request from " + req.mappingInput);
+      LOG.info("Processing a new get request from " + req.mappingInput);
       queueGetRequestOnMasterThread(req, poolsToRedistribute, syncWork);
     }
     e.toReuse.clear();
 
     // 7. If there was a cluster state change, make sure we redistribute all the pools.
     if (e.hasClusterStateChanged) {
-      LOG.debug("Processing a cluster state change");
+      LOG.info("Processing a cluster state change");
       poolsToRedistribute.addAll(pools.keySet());
       e.hasClusterStateChanged = false;
     }
@@ -450,7 +436,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     // 8. Finally, for all the pools that have changes, promote queued queries and rebalance.
     for (String poolName : poolsToRedistribute) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Processing changes for pool " + poolName + ": " + pools.get(poolName));
+        LOG.info("Processing changes for pool " + poolName + ": " + pools.get(poolName));
       }
       processPoolChangesOnMasterThread(poolName, syncWork, hasRequeues);
     }
@@ -487,7 +473,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     } finally {
       sw.lock.unlock();
     }
-    LOG.debug("Processing " + ((session == null) ? "failed" : "successful")
+    LOG.info("Processing " + ((session == null) ? "failed" : "successful")
         + " initialization result for pool " + sw.poolName);
     // We could not have removed the pool for this session, or we would have CANCELED the init.
     PoolState pool = pools.get(sw.poolName);
@@ -514,7 +500,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     // the invalid combination of destroy and return themselves, as well as the actual
     // statement that destroys or returns it.
     if (e.updateErrors.remove(session)) {
-      LOG.debug("Ignoring an update error for a session being destroyed or returned");
+      LOG.info("Ignoring an update error for a session being destroyed or returned");
     }
     SettableFuture<WmTezSession> future = e.toReopen.remove(session);
     if (future != null) {
@@ -531,7 +517,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       SettableFuture<WmTezSession> future, HashSet<String> poolsToRedistribute,
       WmThreadSyncWork syncWork) throws Exception {
     if (e.updateErrors.remove(session)) {
-      LOG.debug("Ignoring an update error for a session being reopened");
+      LOG.info("Ignoring an update error for a session being reopened");
     }
     GetRequest reuseRequest = e.toReuse.remove(session);
     if (reuseRequest != null) {
@@ -743,7 +729,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     int queriesToStart = Math.min(pool.queue.size(),
         pool.queryParallelism - pool.getTotalActiveSessions());
     if (queriesToStart > 0) {
-      LOG.debug("Starting {} queries in pool {}", queriesToStart, pool);
+      LOG.info("Starting {} queries in pool {}", queriesToStart, pool);
     }
     if (hasRequeues) {
       // Sort the queue - we may have put items here out of order.
@@ -1083,6 +1069,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
   @VisibleForTesting
   protected WmTezSession createSessionObject(String sessionId, HiveConf conf) {
     conf = (conf == null) ? new HiveConf(this.conf) : conf;
+    conf.set(LlapTaskSchedulerService.LLAP_PLUGIN_ENDPOINT_ENABLED, "true");
     return new WmTezSession(sessionId, this, expirationTracker, conf);
   }
 
@@ -1299,7 +1286,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
         oldState = state;
         switch (oldState) {
         case GETTING: {
-          LOG.debug("Received a session from AM pool {}", session);
+          LOG.info("Received a session from AM pool {}", session);
           assert this.state == SessionInitState.GETTING;
           session.setPoolName(poolName);
           session.setQueueName(yarnQueue);
@@ -1364,24 +1351,24 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     public void onFailure(Throwable t) {
       SettableFuture<WmTezSession> future;
       WmTezSession session;
-      boolean wasCANCELED = false;
+      boolean wasCanceled = false;
       lock.lock();
       try {
-        wasCANCELED = (state == SessionInitState.CANCELED);
+        wasCanceled = (state == SessionInitState.CANCELED);
         session = this.session;
         future = this.future;
         this.future = null;
         this.session = null;
-        if (!wasCANCELED) {
+        if (!wasCanceled) {
           this.state = SessionInitState.DONE;
         }
       } finally {
         lock.unlock();
       }
       future.setException(t);
-      if (!wasCANCELED) {
+      if (!wasCanceled) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Queueing the initialization failure with " + session);
+          LOG.info("Queueing the initialization failure with " + session);
         }
         notifyInitializationCompleted(this); // Report failure to the main thread.
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/3914a1b2/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java
index 70adc33..c7ed534 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java
@@ -16,28 +16,37 @@
  * limitations under the License.
  */package org.apache.hadoop.hive.ql.exec.tez;
 
+import org.slf4j.LoggerFactory;
+
+import org.slf4j.Logger;
+
 import java.util.Set;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput;
 
 public class WorkloadManagerFederation {
+  private static final Logger LOG = LoggerFactory.getLogger(WorkloadManagerFederation.class);
 
   public static TezSessionState getSession(TezSessionState session, HiveConf conf,
       MappingInput input, boolean isUnmanagedLlapMode, Set<String> desiredCounters) throws Exception {
     // 1. If WM is not present just go to unmanaged.
-    if (!WorkloadManager.isInUse(conf)) {
+    WorkloadManager wm = WorkloadManager.getInstance();
+    if (wm == null) {
+      LOG.debug("Using unmanaged session - WM is not initialized");
       return getUnmanagedSession(session, conf, desiredCounters, isUnmanagedLlapMode);
     }
-    WorkloadManager wm = WorkloadManager.getInstance();
-    // We will ask WM for preliminary mapping. This allows us to escape to the unmanaged path
-    // quickly in the common case. It's still possible that resource plan will be updated and
-    // so our preliminary mapping won't work out. We'll handle that below.
+    // 2. We will ask WM for a preliminary mapping. This allows us to escape to the unmanaged path
+    //    quickly in the common case. It's still possible that resource plan will be updated and
+    //    our preliminary mapping won't work out. We'll handle that below.
     if (!wm.isManaged(input)) {
+      LOG.info("Using unmanaged session - no mapping for " + input);
       return getUnmanagedSession(session, conf, desiredCounters, isUnmanagedLlapMode);
     }
 
+    // 3. Finally, try WM.
     try {
       // Note: this may just block to wait for a session based on parallelism.
+      LOG.info("Getting a WM session for " + input);
       TezSessionState result = wm.getSession(session, input, conf);
       desiredCounters.addAll(wm.getTriggerCounterNames());
       return result;

http://git-wip-us.apache.org/repos/asf/hive/blob/3914a1b2/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 9e9239b..c73b991 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -9686,20 +9686,21 @@ public class ObjectStore implements RawStore, Configurable {
   public WMFullResourcePlan getActiveResourcePlan() throws MetaException {
     boolean commited = false;
     Query query = null;
+    WMFullResourcePlan result = null;
     try {
       openTransaction();
       query = pm.newQuery(MWMResourcePlan.class, "status == activeStatus");
       query.declareParameters("java.lang.String activeStatus");
       query.setUnique(true);
       MWMResourcePlan mResourcePlan = (MWMResourcePlan) query.execute(Status.ACTIVE.toString());
-      commited = commitTransaction();
-      if (mResourcePlan == null) {
-        return null; // No active plan.
+      if (mResourcePlan != null) {
+        result = fullFromMResourcePlan(mResourcePlan);
       }
-      return fullFromMResourcePlan(mResourcePlan);
+      commited = commitTransaction();
     } finally {
       rollbackAndCleanup(commited, query);
     }
+    return result;
   }
 
   private WMFullResourcePlan switchStatus(String name, MWMResourcePlan mResourcePlan,