You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2017/11/21 02:30:50 UTC
hive git commit: HIVE-18025: Push resource plan changes to
tez/unmanaged sessions (Prasanth Jayachandran reviewed by Sergey Shelukhin)
Repository: hive
Updated Branches:
refs/heads/master 21008897c -> 6c79fa75b
HIVE-18025: Push resource plan changes to tez/unmanaged sessions (Prasanth Jayachandran reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6c79fa75
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6c79fa75
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6c79fa75
Branch: refs/heads/master
Commit: 6c79fa75bef225f7ed1ac5c858879323d2e36825
Parents: 2100889
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Mon Nov 20 18:30:33 2017 -0800
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Mon Nov 20 18:30:33 2017 -0800
----------------------------------------------------------------------
.../hive/jdbc/TestTriggersNoTezSessionPool.java | 16 +++---
.../jdbc/TestTriggersTezSessionPoolManager.java | 16 +++---
.../org/apache/hadoop/hive/ql/exec/DDLTask.java | 58 +++++++++++---------
.../hive/ql/exec/tez/TezSessionPoolManager.java | 50 +++++++++--------
.../hive/ql/exec/tez/TezSessionPoolSession.java | 22 ++++++--
.../hive/ql/exec/tez/WorkloadManager.java | 19 ++++---
.../ql/wm/MetastoreGlobalTriggersFetcher.java | 38 -------------
.../apache/hive/service/server/HiveServer2.java | 58 +++++++++-----------
8 files changed, 128 insertions(+), 149 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/6c79fa75/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java
index bcce3dc..33ef2eb 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java
@@ -16,17 +16,16 @@
package org.apache.hive.jdbc;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
import java.util.List;
+import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
+import org.apache.hadoop.hive.metastore.api.WMPool;
+import org.apache.hadoop.hive.metastore.api.WMResourcePlan;
import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
import org.apache.hadoop.hive.ql.wm.Action;
import org.apache.hadoop.hive.ql.wm.ExecutionTrigger;
import org.apache.hadoop.hive.ql.wm.Expression;
import org.apache.hadoop.hive.ql.wm.ExpressionFactory;
-import org.apache.hadoop.hive.ql.wm.MetastoreGlobalTriggersFetcher;
import org.apache.hadoop.hive.ql.wm.Trigger;
import org.junit.Test;
@@ -56,8 +55,11 @@ public class TestTriggersNoTezSessionPool extends AbstractJdbcTriggersTest {
@Override
void setupTriggers(final List<Trigger> triggers) throws Exception {
- MetastoreGlobalTriggersFetcher triggersFetcher = mock(MetastoreGlobalTriggersFetcher.class);
- when(triggersFetcher.fetch()).thenReturn(triggers);
- TezSessionPoolManager.getInstance().setGlobalTriggersFetcher(triggersFetcher);
+ WMFullResourcePlan rp = new WMFullResourcePlan(
+ new WMResourcePlan("rp"), null);
+ for (Trigger trigger : triggers) {
+ rp.addToTriggers(wmTriggerFromTrigger(trigger));
+ }
+ TezSessionPoolManager.getInstance().updateTriggers(rp);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/6c79fa75/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java
index b377275..a00c2e0 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java
@@ -16,18 +16,17 @@
package org.apache.hive.jdbc;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
+import org.apache.hadoop.hive.metastore.api.WMPool;
+import org.apache.hadoop.hive.metastore.api.WMResourcePlan;
import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
import org.apache.hadoop.hive.ql.wm.Action;
import org.apache.hadoop.hive.ql.wm.ExecutionTrigger;
import org.apache.hadoop.hive.ql.wm.Expression;
import org.apache.hadoop.hive.ql.wm.ExpressionFactory;
-import org.apache.hadoop.hive.ql.wm.MetastoreGlobalTriggersFetcher;
import org.apache.hadoop.hive.ql.wm.Trigger;
import org.junit.Test;
@@ -245,8 +244,11 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest
@Override
protected void setupTriggers(final List<Trigger> triggers) throws Exception {
- MetastoreGlobalTriggersFetcher triggersFetcher = mock(MetastoreGlobalTriggersFetcher.class);
- when(triggersFetcher.fetch()).thenReturn(triggers);
- TezSessionPoolManager.getInstance().setGlobalTriggersFetcher(triggersFetcher);
+ WMFullResourcePlan rp = new WMFullResourcePlan(
+ new WMResourcePlan("rp"), null);
+ for (Trigger trigger : triggers) {
+ rp.addToTriggers(wmTriggerFromTrigger(trigger));
+ }
+ TezSessionPoolManager.getInstance().updateTriggers(rp);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/6c79fa75/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index e7af5e0..b3d7a03 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -114,6 +114,7 @@ import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.exec.ArchiveUtils.PartSpecInfo;
import org.apache.hadoop.hive.ql.exec.FunctionInfo.FunctionResource;
+import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager;
import org.apache.hadoop.hive.ql.hooks.LineageInfo.DataContainer;
@@ -713,48 +714,51 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
resourcePlan.setDefaultPoolPath(desc.getDefaultPoolPath());
}
+ final WorkloadManager wm = WorkloadManager.getInstance();
+ final TezSessionPoolManager pm = TezSessionPoolManager.getInstance();
boolean isActivate = false, isInTest = HiveConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST);
- WorkloadManager wm = null;
if (desc.getStatus() != null) {
resourcePlan.setStatus(desc.getStatus());
isActivate = desc.getStatus() == WMResourcePlanStatus.ACTIVE;
- if (isActivate) {
- wm = WorkloadManager.getInstance();
- if (wm == null && !isInTest) {
- throw new HiveException("Resource plan can only be activated when WM is enabled");
- }
- }
}
WMFullResourcePlan appliedRp = db.alterResourcePlan(
- desc.getRpName(), resourcePlan, desc.isEnableActivate());
- if (!isActivate || (wm == null && isInTest)) return 0;
- assert wm != null;
+ desc.getRpName(), resourcePlan, desc.isEnableActivate());
+ if (!isActivate || (wm == null && isInTest) || (pm == null && isInTest)) {
+ return 0;
+ }
if (appliedRp == null) {
throw new HiveException("Cannot get a resource plan to apply");
// TODO: shut down HS2?
}
final String name = (desc.getNewName() != null) ? desc.getNewName() : desc.getRpName();
LOG.info("Activating a new resource plan " + name + ": " + appliedRp);
- // Note: as per our current constraints, the behavior of two parallel activates is
- // undefined; although only one will succeed and the other will receive exception.
- // We need proper (semi-)transactional modifications to support this without hacks.
- ListenableFuture<Boolean> future = wm.updateResourcePlanAsync(appliedRp);
- boolean isOk = false;
- try {
- // Note: we may add an async option in future. For now, let the task fail for the user.
- future.get();
- isOk = true;
- LOG.info("Successfully activated resource plan " + name);
- return 0;
- } catch (InterruptedException | ExecutionException e) {
- throw new HiveException(e);
- } finally {
- if (!isOk) {
- LOG.error("Failed to activate resource plan " + name);
- // TODO: shut down HS2?
+ if (wm != null) {
+ // Note: as per our current constraints, the behavior of two parallel activates is
+ // undefined; although only one will succeed and the other will receive exception.
+ // We need proper (semi-)transactional modifications to support this without hacks.
+ ListenableFuture<Boolean> future = wm.updateResourcePlanAsync(appliedRp);
+ boolean isOk = false;
+ try {
+ // Note: we may add an async option in future. For now, let the task fail for the user.
+ future.get();
+ isOk = true;
+ LOG.info("Successfully activated resource plan " + name);
+ return 0;
+ } catch (InterruptedException | ExecutionException e) {
+ throw new HiveException(e);
+ } finally {
+ if (!isOk) {
+ LOG.error("Failed to activate resource plan " + name);
+ // TODO: shut down HS2?
+ }
}
}
+ if (pm != null) {
+ pm.updateTriggers(appliedRp);
+ LOG.info("Updated tez session pool manager with active resource plan: {}", appliedRp.getPlan().getName());
+ }
+ return 0;
}
private int dropResourcePlan(Hive db, DropResourcePlanDesc desc) throws HiveException {
http://git-wip-us.apache.org/repos/asf/hive/blob/6c79fa75/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
index 4e48f15..8417ebb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
@@ -30,10 +30,11 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
+import org.apache.hadoop.hive.metastore.api.WMTrigger;
import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolSession.Manager;
-import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.wm.MetastoreGlobalTriggersFetcher;
+import org.apache.hadoop.hive.ql.wm.ExecutionTrigger;
import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider;
import org.apache.hadoop.hive.ql.wm.Trigger;
import org.apache.hadoop.hive.ql.wm.TriggerActionHandler;
@@ -83,13 +84,13 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
/** This is used to close non-default sessions, and also all sessions when stopping. */
private final List<TezSessionState> openSessions = new LinkedList<>();
- private MetastoreGlobalTriggersFetcher globalTriggersFetcher;
+ private final List<Trigger> triggers = new LinkedList<>();
private SessionTriggerProvider sessionTriggerProvider;
private TriggerActionHandler triggerActionHandler;
private TriggerValidatorRunnable triggerValidatorRunnable;
/** Note: this is not thread-safe. */
- public static TezSessionPoolManager getInstance() throws Exception {
+ public static TezSessionPoolManager getInstance() {
TezSessionPoolManager local = instance;
if (local == null) {
instance = local = new TezSessionPoolManager();
@@ -183,15 +184,10 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
}
public void initTriggers(final HiveConf conf) throws HiveException {
- if (globalTriggersFetcher == null) {
- Hive db = Hive.get(conf);
- globalTriggersFetcher = new MetastoreGlobalTriggersFetcher(db);
- }
-
if (triggerValidatorRunnable == null) {
final long triggerValidationIntervalMs = HiveConf.getTimeVar(conf, ConfVars
.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, TimeUnit.MILLISECONDS);
- sessionTriggerProvider = new SessionTriggerProvider(openSessions, globalTriggersFetcher.fetch());
+ sessionTriggerProvider = new SessionTriggerProvider(openSessions, triggers);
triggerActionHandler = new KillTriggerActionHandler();
triggerValidatorRunnable = new TriggerValidatorRunnable(sessionTriggerProvider, triggerActionHandler);
startTriggerValidator(triggerValidationIntervalMs);
@@ -349,6 +345,10 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
expirationTracker.stop();
}
+ if (triggerValidatorRunnable != null) {
+ stopTriggerValidator();
+ }
+
instance = null;
}
@@ -502,14 +502,26 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
public void registerOpenSession(TezSessionPoolSession session) {
synchronized (openSessions) {
openSessions.add(session);
+ updateSessions();
}
- updateSessionsTriggers();
}
- private void updateSessionsTriggers() {
- if (sessionTriggerProvider != null && globalTriggersFetcher != null) {
+ private void updateSessions() {
+ if (sessionTriggerProvider != null) {
sessionTriggerProvider.setSessions(Collections.unmodifiableList(openSessions));
- sessionTriggerProvider.setTriggers(Collections.unmodifiableList(globalTriggersFetcher.fetch()));
+ }
+ }
+
+ public void updateTriggers(final WMFullResourcePlan appliedRp) {
+ if (sessionTriggerProvider != null && appliedRp != null) {
+ List<WMTrigger> wmTriggers = appliedRp.getTriggers();
+ List<Trigger> triggers = new ArrayList<>();
+ if (appliedRp.isSetTriggers()) {
+ for (WMTrigger wmTrigger : wmTriggers) {
+ triggers.add(ExecutionTrigger.fromWMTrigger(wmTrigger));
+ }
+ }
+ sessionTriggerProvider.setTriggers(Collections.unmodifiableList(triggers));
}
}
@@ -521,11 +533,11 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
}
synchronized (openSessions) {
openSessions.remove(session);
+ updateSessions();
}
if (defaultSessionPool != null) {
defaultSessionPool.notifyClosed(session);
}
- updateSessionsTriggers();
}
@VisibleForTesting
@@ -534,13 +546,7 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
}
- @VisibleForTesting
- public void setGlobalTriggersFetcher(MetastoreGlobalTriggersFetcher metastoreGlobalTriggersFetcher) {
- this.globalTriggersFetcher = metastoreGlobalTriggersFetcher;
- updateSessionsTriggers();
- }
-
- public List<String> getTriggerCounterNames() {
+ List<String> getTriggerCounterNames() {
List<String> counterNames = new ArrayList<>();
if (sessionTriggerProvider != null) {
List<Trigger> activeTriggers = sessionTriggerProvider.getTriggers();
http://git-wip-us.apache.org/repos/asf/hive/blob/6c79fa75/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
index 769b24a..b3ccd24 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
@@ -73,14 +73,24 @@ class TezSessionPoolSession extends TezSessionState {
}
public static abstract class AbstractTriggerValidator {
+ private ScheduledExecutorService scheduledExecutorService = null;
abstract Runnable getTriggerValidatorRunnable();
- public void startTriggerValidator(long triggerValidationIntervalMs) {
- final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TriggerValidator").build());
- Runnable triggerValidatorRunnable = getTriggerValidatorRunnable();
- scheduledExecutorService.scheduleWithFixedDelay(triggerValidatorRunnable, triggerValidationIntervalMs,
- triggerValidationIntervalMs, TimeUnit.MILLISECONDS);
+ void startTriggerValidator(long triggerValidationIntervalMs) {
+ if (scheduledExecutorService == null) {
+ scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TriggerValidator").build());
+ Runnable triggerValidatorRunnable = getTriggerValidatorRunnable();
+ scheduledExecutorService.scheduleWithFixedDelay(triggerValidatorRunnable, triggerValidationIntervalMs,
+ triggerValidationIntervalMs, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ void stopTriggerValidator() {
+ if (scheduledExecutorService != null) {
+ scheduledExecutorService.shutdownNow();
+ scheduledExecutorService = null;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/6c79fa75/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index 3990f95..d304701 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -209,15 +209,6 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
wmThread = new Thread(() -> runWmThread(), "Workload management master");
wmThread.setDaemon(true);
-
- final long triggerValidationIntervalMs = HiveConf.getTimeVar(conf,
- HiveConf.ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, TimeUnit.MILLISECONDS);
- TriggerActionHandler triggerActionHandler = new KillMoveTriggerActionHandler(this);
- triggerValidatorRunnable = new PerPoolTriggerValidatorRunnable(perPoolProviders, triggerActionHandler,
- triggerValidationIntervalMs);
- startTriggerValidator(triggerValidationIntervalMs); // TODO: why is this not in start
-
- org.apache.hadoop.metrics2.util.MBeans.register("HiveServer2", "WorkloadManager", this);
}
private static int determineQueryParallelism(WMFullResourcePlan plan) {
@@ -236,6 +227,13 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
allocationManager.start();
wmThread.start();
initRpFuture.get(); // Wait for the initial resource plan to be applied.
+
+ final long triggerValidationIntervalMs = HiveConf.getTimeVar(conf,
+ HiveConf.ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, TimeUnit.MILLISECONDS);
+ TriggerActionHandler triggerActionHandler = new KillMoveTriggerActionHandler(this);
+ triggerValidatorRunnable = new PerPoolTriggerValidatorRunnable(perPoolProviders, triggerActionHandler,
+ triggerValidationIntervalMs);
+ startTriggerValidator(triggerValidationIntervalMs);
}
public void stop() throws Exception {
@@ -256,6 +254,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
workPool.shutdownNow();
timeoutPool.shutdownNow();
+ if (triggerValidatorRunnable != null) {
+ stopTriggerValidator();
+ }
INSTANCE = null;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/6c79fa75/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreGlobalTriggersFetcher.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreGlobalTriggersFetcher.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreGlobalTriggersFetcher.java
deleted file mode 100644
index 87c007f..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreGlobalTriggersFetcher.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.wm;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.hive.ql.metadata.Hive;
-
-/**
- * Fetch global (non-llap) rules from metastore
- */
-public class MetastoreGlobalTriggersFetcher {
- private static final String GLOBAL_TRIGGER_NAME = "global";
- private Hive db;
-
- public MetastoreGlobalTriggersFetcher(final Hive db) {
- this.db = db;
- }
-
- public List<Trigger> fetch() {
- // TODO: this entire class will go away, DDLTask will push RP to TezSessionPoolManager where triggers are available
- return new ArrayList<>();
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/6c79fa75/service/src/java/org/apache/hive/service/server/HiveServer2.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index 5a6d0c8..c3afa19 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -18,16 +18,6 @@
package org.apache.hive.service.server;
-import org.apache.hadoop.hive.metastore.api.WMMapping;
-
-import org.apache.hadoop.hive.metastore.api.WMPool;
-
-import org.apache.hadoop.hive.metastore.api.WMResourcePlan;
-
-import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
-
-import com.google.common.collect.Lists;
-
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
@@ -66,11 +56,10 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.coordinator.LlapCoordinator;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.RawStore;
-import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
+import org.apache.hadoop.hive.metastore.api.WMPool;
+import org.apache.hadoop.hive.metastore.api.WMResourcePlan;
import org.apache.hadoop.hive.metastore.cache.CachedStore;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager;
@@ -107,6 +96,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
/**
* HiveServer2.
@@ -195,29 +185,31 @@ public class HiveServer2 extends CompositeService {
throw new RuntimeException("Failed to get metastore connection", e);
}
- // Initialize workload management.
- String wmQueue = HiveConf.getVar(hiveConf, ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE);
- if (wmQueue != null && !wmQueue.isEmpty()) {
- WMFullResourcePlan resourcePlan;
- try {
- resourcePlan = sessionHive.getActiveResourcePlan();
- } catch (HiveException e) {
- throw new RuntimeException(e);
- }
- if (resourcePlan == null) {
- if (!HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_IN_TEST)) {
- LOG.error("Cannot activate workload management - no active resource plan");
- } else {
- LOG.info("Creating a default resource plan for test");
- resourcePlan = createTestResourcePlan();
- }
+ WMFullResourcePlan resourcePlan;
+ try {
+ resourcePlan = sessionHive.getActiveResourcePlan();
+ } catch (HiveException e) {
+ throw new RuntimeException(e);
+ }
+
+ if (resourcePlan == null) {
+ if (!HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_IN_TEST)) {
+ LOG.error("Cannot activate workload management - no active resource plan");
+ } else {
+ LOG.info("Creating a default resource plan for test");
+ resourcePlan = createTestResourcePlan();
}
- if (resourcePlan != null) {
+ }
+
+ if (resourcePlan != null) {
+ // Initialize workload management.
+ String wmQueue = HiveConf.getVar(hiveConf, ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE);
+ if (wmQueue != null && !wmQueue.isEmpty()) {
LOG.info("Initializing workload management");
wm = WorkloadManager.create(wmQueue, hiveConf, resourcePlan);
- } else {
- wm = null;
}
+ tezSessionPoolManager.updateTriggers(resourcePlan);
+ LOG.info("Updated tez session pool manager with active resource plan: {}", resourcePlan.getPlan().getName());
}
// Create views registry