You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2017/11/21 02:30:50 UTC

hive git commit: HIVE-18025: Push resource plan changes to tez/unmanaged sessions (Prasanth Jayachandran reviewed by Sergey Shelukhin)

Repository: hive
Updated Branches:
  refs/heads/master 21008897c -> 6c79fa75b


HIVE-18025: Push resource plan changes to tez/unmanaged sessions (Prasanth Jayachandran  reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6c79fa75
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6c79fa75
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6c79fa75

Branch: refs/heads/master
Commit: 6c79fa75bef225f7ed1ac5c858879323d2e36825
Parents: 2100889
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Mon Nov 20 18:30:33 2017 -0800
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Mon Nov 20 18:30:33 2017 -0800

----------------------------------------------------------------------
 .../hive/jdbc/TestTriggersNoTezSessionPool.java | 16 +++---
 .../jdbc/TestTriggersTezSessionPoolManager.java | 16 +++---
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 58 +++++++++++---------
 .../hive/ql/exec/tez/TezSessionPoolManager.java | 50 +++++++++--------
 .../hive/ql/exec/tez/TezSessionPoolSession.java | 22 ++++++--
 .../hive/ql/exec/tez/WorkloadManager.java       | 19 ++++---
 .../ql/wm/MetastoreGlobalTriggersFetcher.java   | 38 -------------
 .../apache/hive/service/server/HiveServer2.java | 58 +++++++++-----------
 8 files changed, 128 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/6c79fa75/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java
index bcce3dc..33ef2eb 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java
@@ -16,17 +16,16 @@
 
 package org.apache.hive.jdbc;
 
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 import java.util.List;
 
+import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
+import org.apache.hadoop.hive.metastore.api.WMPool;
+import org.apache.hadoop.hive.metastore.api.WMResourcePlan;
 import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
 import org.apache.hadoop.hive.ql.wm.Action;
 import org.apache.hadoop.hive.ql.wm.ExecutionTrigger;
 import org.apache.hadoop.hive.ql.wm.Expression;
 import org.apache.hadoop.hive.ql.wm.ExpressionFactory;
-import org.apache.hadoop.hive.ql.wm.MetastoreGlobalTriggersFetcher;
 import org.apache.hadoop.hive.ql.wm.Trigger;
 import org.junit.Test;
 
@@ -56,8 +55,11 @@ public class TestTriggersNoTezSessionPool extends AbstractJdbcTriggersTest {
 
   @Override
   void setupTriggers(final List<Trigger> triggers) throws Exception {
-    MetastoreGlobalTriggersFetcher triggersFetcher = mock(MetastoreGlobalTriggersFetcher.class);
-    when(triggersFetcher.fetch()).thenReturn(triggers);
-    TezSessionPoolManager.getInstance().setGlobalTriggersFetcher(triggersFetcher);
+    WMFullResourcePlan rp = new WMFullResourcePlan(
+      new WMResourcePlan("rp"), null);
+    for (Trigger trigger : triggers) {
+      rp.addToTriggers(wmTriggerFromTrigger(trigger));
+    }
+    TezSessionPoolManager.getInstance().updateTriggers(rp);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/6c79fa75/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java
index b377275..a00c2e0 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java
@@ -16,18 +16,17 @@
 
 package org.apache.hive.jdbc;
 
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
+import org.apache.hadoop.hive.metastore.api.WMPool;
+import org.apache.hadoop.hive.metastore.api.WMResourcePlan;
 import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
 import org.apache.hadoop.hive.ql.wm.Action;
 import org.apache.hadoop.hive.ql.wm.ExecutionTrigger;
 import org.apache.hadoop.hive.ql.wm.Expression;
 import org.apache.hadoop.hive.ql.wm.ExpressionFactory;
-import org.apache.hadoop.hive.ql.wm.MetastoreGlobalTriggersFetcher;
 import org.apache.hadoop.hive.ql.wm.Trigger;
 import org.junit.Test;
 
@@ -245,8 +244,11 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest
 
   @Override
   protected void setupTriggers(final List<Trigger> triggers) throws Exception {
-    MetastoreGlobalTriggersFetcher triggersFetcher = mock(MetastoreGlobalTriggersFetcher.class);
-    when(triggersFetcher.fetch()).thenReturn(triggers);
-    TezSessionPoolManager.getInstance().setGlobalTriggersFetcher(triggersFetcher);
+    WMFullResourcePlan rp = new WMFullResourcePlan(
+      new WMResourcePlan("rp"), null);
+    for (Trigger trigger : triggers) {
+      rp.addToTriggers(wmTriggerFromTrigger(trigger));
+    }
+    TezSessionPoolManager.getInstance().updateTriggers(rp);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/6c79fa75/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index e7af5e0..b3d7a03 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -114,6 +114,7 @@ import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.exec.ArchiveUtils.PartSpecInfo;
 import org.apache.hadoop.hive.ql.exec.FunctionInfo.FunctionResource;
+import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
 import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager;
 import org.apache.hadoop.hive.ql.hooks.LineageInfo.DataContainer;
@@ -713,48 +714,51 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
       resourcePlan.setDefaultPoolPath(desc.getDefaultPoolPath());
     }
 
+    final WorkloadManager wm = WorkloadManager.getInstance();
+    final TezSessionPoolManager pm = TezSessionPoolManager.getInstance();
     boolean isActivate = false, isInTest = HiveConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST);
-    WorkloadManager wm = null;
     if (desc.getStatus() != null) {
       resourcePlan.setStatus(desc.getStatus());
       isActivate = desc.getStatus() == WMResourcePlanStatus.ACTIVE;
-      if (isActivate) {
-        wm = WorkloadManager.getInstance();
-        if (wm == null && !isInTest) {
-          throw new HiveException("Resource plan can only be activated when WM is enabled");
-        }
-      }
     }
 
     WMFullResourcePlan appliedRp = db.alterResourcePlan(
-        desc.getRpName(), resourcePlan, desc.isEnableActivate());
-    if (!isActivate || (wm == null && isInTest)) return 0;
-    assert wm != null;
+      desc.getRpName(), resourcePlan, desc.isEnableActivate());
+    if (!isActivate || (wm == null && isInTest) || (pm == null && isInTest)) {
+      return 0;
+    }
     if (appliedRp == null) {
       throw new HiveException("Cannot get a resource plan to apply");
       // TODO: shut down HS2?
     }
     final String name = (desc.getNewName() != null) ? desc.getNewName() : desc.getRpName();
     LOG.info("Activating a new resource plan " + name + ": " + appliedRp);
-    // Note: as per our current constraints, the behavior of two parallel activates is
-    //       undefined; although only one will succeed and the other will receive exception.
-    //       We need proper (semi-)transactional modifications to support this without hacks.
-    ListenableFuture<Boolean> future = wm.updateResourcePlanAsync(appliedRp);
-    boolean isOk = false;
-    try {
-      // Note: we may add an async option in future. For now, let the task fail for the user.
-      future.get();
-      isOk = true;
-      LOG.info("Successfully activated resource plan " + name);
-      return 0;
-    } catch (InterruptedException | ExecutionException e) {
-      throw new HiveException(e);
-    } finally {
-      if (!isOk) {
-        LOG.error("Failed to activate resource plan " + name);
-        // TODO: shut down HS2?
+    if (wm != null) {
+      // Note: as per our current constraints, the behavior of two parallel activates is
+      //       undefined; although only one will succeed and the other will receive exception.
+      //       We need proper (semi-)transactional modifications to support this without hacks.
+      ListenableFuture<Boolean> future = wm.updateResourcePlanAsync(appliedRp);
+      boolean isOk = false;
+      try {
+        // Note: we may add an async option in future. For now, let the task fail for the user.
+        future.get();
+        isOk = true;
+        LOG.info("Successfully activated resource plan " + name);
+        return 0;
+      } catch (InterruptedException | ExecutionException e) {
+        throw new HiveException(e);
+      } finally {
+        if (!isOk) {
+          LOG.error("Failed to activate resource plan " + name);
+          // TODO: shut down HS2?
+        }
       }
     }
+    if (pm != null) {
+      pm.updateTriggers(appliedRp);
+      LOG.info("Updated tez session pool manager with active resource plan: {}", appliedRp.getPlan().getName());
+    }
+    return 0;
   }
 
   private int dropResourcePlan(Hive db, DropResourcePlanDesc desc) throws HiveException {

http://git-wip-us.apache.org/repos/asf/hive/blob/6c79fa75/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
index 4e48f15..8417ebb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
@@ -30,10 +30,11 @@ import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
+import org.apache.hadoop.hive.metastore.api.WMTrigger;
 import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolSession.Manager;
-import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.wm.MetastoreGlobalTriggersFetcher;
+import org.apache.hadoop.hive.ql.wm.ExecutionTrigger;
 import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider;
 import org.apache.hadoop.hive.ql.wm.Trigger;
 import org.apache.hadoop.hive.ql.wm.TriggerActionHandler;
@@ -83,13 +84,13 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
 
   /** This is used to close non-default sessions, and also all sessions when stopping. */
   private final List<TezSessionState> openSessions = new LinkedList<>();
-  private MetastoreGlobalTriggersFetcher globalTriggersFetcher;
+  private final List<Trigger> triggers = new LinkedList<>();
   private SessionTriggerProvider sessionTriggerProvider;
   private TriggerActionHandler triggerActionHandler;
   private TriggerValidatorRunnable triggerValidatorRunnable;
 
   /** Note: this is not thread-safe. */
-  public static TezSessionPoolManager getInstance() throws Exception {
+  public static TezSessionPoolManager getInstance() {
     TezSessionPoolManager local = instance;
     if (local == null) {
       instance = local = new TezSessionPoolManager();
@@ -183,15 +184,10 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
   }
 
   public void initTriggers(final HiveConf conf) throws HiveException {
-    if (globalTriggersFetcher == null) {
-      Hive db = Hive.get(conf);
-      globalTriggersFetcher = new MetastoreGlobalTriggersFetcher(db);
-    }
-
     if (triggerValidatorRunnable == null) {
       final long triggerValidationIntervalMs = HiveConf.getTimeVar(conf, ConfVars
         .HIVE_TRIGGER_VALIDATION_INTERVAL_MS, TimeUnit.MILLISECONDS);
-      sessionTriggerProvider = new SessionTriggerProvider(openSessions, globalTriggersFetcher.fetch());
+      sessionTriggerProvider = new SessionTriggerProvider(openSessions, triggers);
       triggerActionHandler = new KillTriggerActionHandler();
       triggerValidatorRunnable = new TriggerValidatorRunnable(sessionTriggerProvider, triggerActionHandler);
       startTriggerValidator(triggerValidationIntervalMs);
@@ -349,6 +345,10 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
       expirationTracker.stop();
     }
 
+    if (triggerValidatorRunnable != null) {
+      stopTriggerValidator();
+    }
+
     instance = null;
   }
 
@@ -502,14 +502,26 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
   public void registerOpenSession(TezSessionPoolSession session) {
     synchronized (openSessions) {
       openSessions.add(session);
+      updateSessions();
     }
-    updateSessionsTriggers();
   }
 
-  private void updateSessionsTriggers() {
-    if (sessionTriggerProvider != null && globalTriggersFetcher != null) {
+  private void updateSessions() {
+    if (sessionTriggerProvider != null) {
       sessionTriggerProvider.setSessions(Collections.unmodifiableList(openSessions));
-      sessionTriggerProvider.setTriggers(Collections.unmodifiableList(globalTriggersFetcher.fetch()));
+    }
+  }
+
+  public void updateTriggers(final WMFullResourcePlan appliedRp) {
+    if (sessionTriggerProvider != null && appliedRp != null) {
+      List<WMTrigger> wmTriggers = appliedRp.getTriggers();
+      List<Trigger> triggers = new ArrayList<>();
+      if (appliedRp.isSetTriggers()) {
+        for (WMTrigger wmTrigger : wmTriggers) {
+          triggers.add(ExecutionTrigger.fromWMTrigger(wmTrigger));
+        }
+      }
+      sessionTriggerProvider.setTriggers(Collections.unmodifiableList(triggers));
     }
   }
 
@@ -521,11 +533,11 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
     }
     synchronized (openSessions) {
       openSessions.remove(session);
+      updateSessions();
     }
     if (defaultSessionPool != null) {
       defaultSessionPool.notifyClosed(session);
     }
-    updateSessionsTriggers();
   }
 
   @VisibleForTesting
@@ -534,13 +546,7 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
   }
 
 
-  @VisibleForTesting
-  public void setGlobalTriggersFetcher(MetastoreGlobalTriggersFetcher metastoreGlobalTriggersFetcher) {
-    this.globalTriggersFetcher = metastoreGlobalTriggersFetcher;
-    updateSessionsTriggers();
-  }
-
-  public List<String> getTriggerCounterNames() {
+  List<String> getTriggerCounterNames() {
     List<String> counterNames = new ArrayList<>();
     if (sessionTriggerProvider != null) {
       List<Trigger> activeTriggers = sessionTriggerProvider.getTriggers();

http://git-wip-us.apache.org/repos/asf/hive/blob/6c79fa75/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
index 769b24a..b3ccd24 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
@@ -73,14 +73,24 @@ class TezSessionPoolSession extends TezSessionState {
   }
 
   public static abstract class AbstractTriggerValidator {
+    private ScheduledExecutorService scheduledExecutorService = null;
     abstract Runnable getTriggerValidatorRunnable();
 
-    public void startTriggerValidator(long triggerValidationIntervalMs) {
-      final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
-        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TriggerValidator").build());
-      Runnable triggerValidatorRunnable = getTriggerValidatorRunnable();
-      scheduledExecutorService.scheduleWithFixedDelay(triggerValidatorRunnable, triggerValidationIntervalMs,
-        triggerValidationIntervalMs, TimeUnit.MILLISECONDS);
+    void startTriggerValidator(long triggerValidationIntervalMs) {
+      if (scheduledExecutorService == null) {
+        scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
+          new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TriggerValidator").build());
+        Runnable triggerValidatorRunnable = getTriggerValidatorRunnable();
+        scheduledExecutorService.scheduleWithFixedDelay(triggerValidatorRunnable, triggerValidationIntervalMs,
+          triggerValidationIntervalMs, TimeUnit.MILLISECONDS);
+      }
+    }
+
+    void stopTriggerValidator() {
+      if (scheduledExecutorService != null) {
+        scheduledExecutorService.shutdownNow();
+        scheduledExecutorService = null;
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/6c79fa75/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index 3990f95..d304701 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -209,15 +209,6 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
 
     wmThread = new Thread(() -> runWmThread(), "Workload management master");
     wmThread.setDaemon(true);
-
-    final long triggerValidationIntervalMs = HiveConf.getTimeVar(conf,
-      HiveConf.ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, TimeUnit.MILLISECONDS);
-    TriggerActionHandler triggerActionHandler = new KillMoveTriggerActionHandler(this);
-    triggerValidatorRunnable = new PerPoolTriggerValidatorRunnable(perPoolProviders, triggerActionHandler,
-      triggerValidationIntervalMs);
-    startTriggerValidator(triggerValidationIntervalMs); // TODO: why is this not in start
-
-    org.apache.hadoop.metrics2.util.MBeans.register("HiveServer2", "WorkloadManager", this);
   }
 
   private static int determineQueryParallelism(WMFullResourcePlan plan) {
@@ -236,6 +227,13 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     allocationManager.start();
     wmThread.start();
     initRpFuture.get(); // Wait for the initial resource plan to be applied.
+
+    final long triggerValidationIntervalMs = HiveConf.getTimeVar(conf,
+      HiveConf.ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, TimeUnit.MILLISECONDS);
+    TriggerActionHandler triggerActionHandler = new KillMoveTriggerActionHandler(this);
+    triggerValidatorRunnable = new PerPoolTriggerValidatorRunnable(perPoolProviders, triggerActionHandler,
+      triggerValidationIntervalMs);
+    startTriggerValidator(triggerValidationIntervalMs);
   }
 
   public void stop() throws Exception {
@@ -256,6 +254,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     workPool.shutdownNow();
     timeoutPool.shutdownNow();
 
+    if (triggerValidatorRunnable != null) {
+      stopTriggerValidator();
+    }
     INSTANCE = null;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/6c79fa75/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreGlobalTriggersFetcher.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreGlobalTriggersFetcher.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreGlobalTriggersFetcher.java
deleted file mode 100644
index 87c007f..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreGlobalTriggersFetcher.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.wm;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.hive.ql.metadata.Hive;
-
-/**
- * Fetch global (non-llap) rules from metastore
- */
-public class MetastoreGlobalTriggersFetcher {
-  private static final String GLOBAL_TRIGGER_NAME = "global";
-  private Hive db;
-
-  public MetastoreGlobalTriggersFetcher(final Hive db) {
-    this.db = db;
-  }
-
-  public List<Trigger> fetch() {
-    // TODO: this entire class will go away, DDLTask will push RP to TezSessionPoolManager where triggers are available
-    return new ArrayList<>();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/6c79fa75/service/src/java/org/apache/hive/service/server/HiveServer2.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index 5a6d0c8..c3afa19 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -18,16 +18,6 @@
 
 package org.apache.hive.service.server;
 
-import org.apache.hadoop.hive.metastore.api.WMMapping;
-
-import org.apache.hadoop.hive.metastore.api.WMPool;
-
-import org.apache.hadoop.hive.metastore.api.WMResourcePlan;
-
-import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
-
-import com.google.common.collect.Lists;
-
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
@@ -66,11 +56,10 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.coordinator.LlapCoordinator;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.RawStore;
-import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
+import org.apache.hadoop.hive.metastore.api.WMPool;
+import org.apache.hadoop.hive.metastore.api.WMResourcePlan;
 import org.apache.hadoop.hive.metastore.cache.CachedStore;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
 import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
 import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager;
@@ -107,6 +96,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
 
 /**
  * HiveServer2.
@@ -195,29 +185,31 @@ public class HiveServer2 extends CompositeService {
       throw new RuntimeException("Failed to get metastore connection", e);
     }
 
-    // Initialize workload management.
-    String wmQueue = HiveConf.getVar(hiveConf, ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE);
-    if (wmQueue != null && !wmQueue.isEmpty()) {
-      WMFullResourcePlan resourcePlan;
-      try {
-        resourcePlan = sessionHive.getActiveResourcePlan();
-      } catch (HiveException e) {
-        throw new RuntimeException(e);
-      }
-      if (resourcePlan == null) {
-        if (!HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_IN_TEST)) {
-          LOG.error("Cannot activate workload management - no active resource plan");
-        } else {
-          LOG.info("Creating a default resource plan for test");
-          resourcePlan = createTestResourcePlan();
-        }
+    WMFullResourcePlan resourcePlan;
+    try {
+      resourcePlan = sessionHive.getActiveResourcePlan();
+    } catch (HiveException e) {
+      throw new RuntimeException(e);
+    }
+
+    if (resourcePlan == null) {
+      if (!HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_IN_TEST)) {
+        LOG.error("Cannot activate workload management - no active resource plan");
+      } else {
+        LOG.info("Creating a default resource plan for test");
+        resourcePlan = createTestResourcePlan();
       }
-      if (resourcePlan != null) {
+    }
+
+    if (resourcePlan != null) {
+      // Initialize workload management.
+      String wmQueue = HiveConf.getVar(hiveConf, ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE);
+      if (wmQueue != null && !wmQueue.isEmpty()) {
         LOG.info("Initializing workload management");
         wm = WorkloadManager.create(wmQueue, hiveConf, resourcePlan);
-      } else {
-        wm = null;
       }
+      tezSessionPoolManager.updateTriggers(resourcePlan);
+      LOG.info("Updated tez session pool manager with active resource plan: {}", resourcePlan.getPlan().getName());
     }
 
     // Create views registry