You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/12/15 00:00:03 UTC
[1/2] hive git commit: HIVE-18153 : refactor reopen and file
management in TezTask (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/master ca96613da -> e120bd8b0
HIVE-18153 : refactor reopen and file management in TezTask (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/89dbf4e9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/89dbf4e9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/89dbf4e9
Branch: refs/heads/master
Commit: 89dbf4e904592318da954eaf94548ec1b130e17c
Parents: ca96613
Author: sergey <se...@apache.org>
Authored: Thu Dec 14 15:53:44 2017 -0800
Committer: sergey <se...@apache.org>
Committed: Thu Dec 14 15:53:44 2017 -0800
----------------------------------------------------------------------
ql/pom.xml | 2 -
.../hadoop/hive/ql/exec/mr/ExecDriver.java | 1 +
.../hadoop/hive/ql/exec/tez/DagUtils.java | 102 ++++-----
.../hadoop/hive/ql/exec/tez/TezSessionPool.java | 23 +-
.../hive/ql/exec/tez/TezSessionPoolManager.java | 36 +---
.../hive/ql/exec/tez/TezSessionPoolSession.java | 20 +-
.../hive/ql/exec/tez/TezSessionState.java | 208 +++++++++++--------
.../apache/hadoop/hive/ql/exec/tez/TezTask.java | 189 ++++++-----------
.../hive/ql/exec/tez/WorkloadManager.java | 118 +++++++----
.../ql/exec/tez/monitoring/TezJobMonitor.java | 2 +-
.../ql/udf/generic/GenericUDTFGetSplits.java | 5 +-
.../hive/ql/exec/tez/SampleTezSessionState.java | 7 +-
.../hive/ql/exec/tez/TestTezSessionPool.java | 30 ++-
.../hadoop/hive/ql/exec/tez/TestTezTask.java | 73 ++-----
.../hive/ql/exec/tez/TestWorkloadManager.java | 10 +-
15 files changed, 378 insertions(+), 448 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/pom.xml
----------------------------------------------------------------------
diff --git a/ql/pom.xml b/ql/pom.xml
index f35a4c8..cbf71cd 100644
--- a/ql/pom.xml
+++ b/ql/pom.xml
@@ -225,8 +225,6 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-registry</artifactId>
<version>${hadoop.version}</version>
- <optional>true</optional>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
index 88a75ed..3f470eb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
@@ -400,6 +400,7 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop
SessionState ss = SessionState.get();
// TODO: why is there a TezSession in MR ExecDriver?
if (ss != null && HiveConf.getVar(job, ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+ // TODO: this is the only place that uses keepTmpDir. Why?
TezSessionPoolManager.closeIfNotDefault(ss.getTezSession(), true);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index 6c1afa6..e4a6f62 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -17,14 +17,13 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
import com.google.common.base.Function;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
-
import javax.security.auth.login.LoginException;
-
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
@@ -40,7 +39,6 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.tez.mapreduce.common.MRInputSplitDistributor;
@@ -541,16 +539,15 @@ public class DagUtils {
}
}
- private Vertex createVertex(JobConf conf, MergeJoinWork mergeJoinWork, LocalResource appJarLr,
- List<LocalResource> additionalLr, FileSystem fs, Path mrScratchDir, Context ctx,
- VertexType vertexType)
- throws Exception {
+ private Vertex createVertex(JobConf conf, MergeJoinWork mergeJoinWork, FileSystem fs,
+ Path mrScratchDir, Context ctx, VertexType vertexType,
+ Map<String, LocalResource> localResources) throws Exception {
Utilities.setMergeWork(conf, mergeJoinWork, mrScratchDir, false);
if (mergeJoinWork.getMainWork() instanceof MapWork) {
List<BaseWork> mapWorkList = mergeJoinWork.getBaseWorkList();
MapWork mapWork = (MapWork) (mergeJoinWork.getMainWork());
- Vertex mergeVx =
- createVertex(conf, mapWork, appJarLr, additionalLr, fs, mrScratchDir, ctx, vertexType);
+ Vertex mergeVx = createVertex(
+ conf, mapWork, fs, mrScratchDir, ctx, vertexType, localResources);
conf.setClass("mapred.input.format.class", HiveInputFormat.class, InputFormat.class);
// mapreduce.tez.input.initializer.serialize.event.payload should be set
@@ -580,10 +577,8 @@ public class DagUtils {
mergeVx.setVertexManagerPlugin(desc);
return mergeVx;
} else {
- Vertex mergeVx =
- createVertex(conf, (ReduceWork) mergeJoinWork.getMainWork(), appJarLr, additionalLr, fs,
- mrScratchDir, ctx);
- return mergeVx;
+ return createVertex(conf,
+ (ReduceWork) mergeJoinWork.getMainWork(), fs, mrScratchDir, ctx, localResources);
}
}
@@ -591,11 +586,8 @@ public class DagUtils {
* Helper function to create Vertex from MapWork.
*/
private Vertex createVertex(JobConf conf, MapWork mapWork,
- LocalResource appJarLr, List<LocalResource> additionalLr, FileSystem fs,
- Path mrScratchDir, Context ctx, VertexType vertexType)
- throws Exception {
-
- Path tezDir = getTezDir(mrScratchDir);
+ FileSystem fs, Path mrScratchDir, Context ctx, VertexType vertexType,
+ Map<String, LocalResource> localResources) throws Exception {
// set up the operator plan
Utilities.cacheMapWork(conf, mapWork, mrScratchDir);
@@ -726,13 +718,6 @@ public class DagUtils {
// Add the actual source input
String alias = mapWork.getAliasToWork().keySet().iterator().next();
map.addDataSource(alias, dataSource);
-
- Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
- localResources.put(getBaseName(appJarLr), appJarLr);
- for (LocalResource lr: additionalLr) {
- localResources.put(getBaseName(lr), lr);
- }
-
map.addTaskLocalFiles(localResources);
return map;
}
@@ -772,9 +757,9 @@ public class DagUtils {
/*
* Helper function to create Vertex for given ReduceWork.
*/
- private Vertex createVertex(JobConf conf, ReduceWork reduceWork,
- LocalResource appJarLr, List<LocalResource> additionalLr, FileSystem fs,
- Path mrScratchDir, Context ctx) throws Exception {
+ private Vertex createVertex(JobConf conf, ReduceWork reduceWork, FileSystem fs,
+ Path mrScratchDir, Context ctx, Map<String, LocalResource> localResources)
+ throws Exception {
// set up operator plan
conf.set(Utilities.INPUT_NAME, reduceWork.getName());
@@ -796,17 +781,28 @@ public class DagUtils {
reducer.setTaskEnvironment(getContainerEnvironment(conf, false));
reducer.setExecutionContext(vertexExecutionContext);
reducer.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
-
- Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
- localResources.put(getBaseName(appJarLr), appJarLr);
- for (LocalResource lr: additionalLr) {
- localResources.put(getBaseName(lr), lr);
- }
reducer.addTaskLocalFiles(localResources);
-
return reducer;
}
+ public static Map<String, LocalResource> createTezLrMap(
+ LocalResource appJarLr, Collection<LocalResource> additionalLr) {
+ // Note: interestingly this would exclude LLAP app jars that the session adds for LLAP case.
+ // Of course it doesn't matter because vertices run ON LLAP and have those jars, and
+ // moreover we anyway don't localize jars for the vertices on LLAP; but in theory
+ // this is still crappy code that assumes there's one and only app jar.
+ Map<String, LocalResource> localResources = new HashMap<>();
+ if (appJarLr != null) {
+ localResources.put(getBaseName(appJarLr), appJarLr);
+ }
+ if (additionalLr != null) {
+ for (LocalResource lr: additionalLr) {
+ localResources.put(getBaseName(lr), lr);
+ }
+ }
+ return localResources;
+ }
+
/*
* Helper method to create a yarn local resource.
*/
@@ -1064,7 +1060,7 @@ public class DagUtils {
/*
* Helper function to retrieve the basename of a local resource
*/
- public String getBaseName(LocalResource lr) {
+ public static String getBaseName(LocalResource lr) {
return FilenameUtils.getName(lr.getResource().getFile());
}
@@ -1254,30 +1250,26 @@ public class DagUtils {
* @param work The instance of BaseWork representing the actual work to be performed
* by this vertex.
* @param scratchDir HDFS scratch dir for this execution unit.
- * @param appJarLr Local resource for hive-exec.
- * @param additionalLr
* @param fileSystem FS corresponding to scratchDir and LocalResources
* @param ctx This query's context
* @return Vertex
*/
@SuppressWarnings("deprecation")
public Vertex createVertex(JobConf conf, BaseWork work,
- Path scratchDir, LocalResource appJarLr,
- List<LocalResource> additionalLr, FileSystem fileSystem, Context ctx, boolean hasChildren,
- TezWork tezWork, VertexType vertexType) throws Exception {
+ Path scratchDir, FileSystem fileSystem, Context ctx, boolean hasChildren,
+ TezWork tezWork, VertexType vertexType, Map<String, LocalResource> localResources) throws Exception {
Vertex v = null;
// simply dispatch the call to the right method for the actual (sub-) type of
// BaseWork.
if (work instanceof MapWork) {
- v = createVertex(conf, (MapWork) work, appJarLr, additionalLr, fileSystem, scratchDir, ctx,
- vertexType);
+ v = createVertex(
+ conf, (MapWork) work, fileSystem, scratchDir, ctx, vertexType, localResources);
} else if (work instanceof ReduceWork) {
- v = createVertex(conf, (ReduceWork) work, appJarLr,
- additionalLr, fileSystem, scratchDir, ctx);
+ v = createVertex(conf, (ReduceWork) work, fileSystem, scratchDir, ctx, localResources);
} else if (work instanceof MergeJoinWork) {
- v = createVertex(conf, (MergeJoinWork) work, appJarLr, additionalLr, fileSystem, scratchDir,
- ctx, vertexType);
+ v = createVertex(
+ conf, (MergeJoinWork) work, fileSystem, scratchDir, ctx, vertexType, localResources);
// set VertexManagerPlugin if whether it's a cross product destination vertex
List<String> crossProductSources = new ArrayList<>();
for (BaseWork parentWork : tezWork.getParents(work)) {
@@ -1522,4 +1514,18 @@ public class DagUtils {
// -Xmx not specified
return -1;
}
+
+ // The utility of this method is not certain.
+ public static Map<String, LocalResource> getResourcesUpdatableForAm(
+ Collection<LocalResource> allNonAppResources) {
+ HashMap<String, LocalResource> allNonAppFileResources = new HashMap<>();
+ if (allNonAppResources == null) return allNonAppFileResources;
+ for (LocalResource lr : allNonAppResources) {
+ if (lr.getType() == LocalResourceType.FILE) {
+ // TEZ AM will only localize FILE (no script operators in the AM)
+ allNonAppFileResources.put(DagUtils.getBaseName(lr), lr);
+ }
+ }
+ return allNonAppFileResources;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
index 3bcf657..6e2dfe1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
@@ -217,7 +217,10 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
}
}
// If there are async requests, satisfy them first.
- if (!asyncRequests.isEmpty() && session.tryUse(false)) {
+ if (!asyncRequests.isEmpty()) {
+ if (!session.tryUse(false)) {
+ return true; // Session has expired and will be returned to us later.
+ }
future = asyncRequests.poll();
}
if (future == null) {
@@ -238,24 +241,12 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
return true;
}
- void replaceSession(SessionType oldSession, boolean keepTmpDir,
- String[] additionalFilesArray) throws Exception {
- // Retain the stuff from the old session.
+ void replaceSession(SessionType oldSession) throws Exception {
// Re-setting the queue config is an old hack that we may remove in future.
SessionType newSession = sessionObjFactory.create(oldSession);
- Path scratchDir = oldSession.getTezScratchDir();
String queueName = oldSession.getQueueName();
- Set<String> additionalFiles = null;
- if (additionalFilesArray != null) {
- additionalFiles = new HashSet<>();
- for (String file : additionalFilesArray) {
- additionalFiles.add(file);
- }
- } else {
- additionalFiles = oldSession.getAdditionalFilesNotFromConf();
- }
try {
- oldSession.close(keepTmpDir);
+ oldSession.close(false);
} finally {
poolLock.lock();
try {
@@ -280,7 +271,7 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
// probably just get rid of the thread local usage in TezSessionState.
SessionState.setCurrentSessionState(parentSessionState);
}
- newSession.open(additionalFiles, scratchDir);
+ newSession.open();
if (!putSessionBack(newSession, false)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Closing an unneeded session " + newSession
http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
index 8417ebb..3c1b8d0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
@@ -18,16 +18,14 @@
package org.apache.hadoop.hive.ql.exec.tez;
+import org.apache.hadoop.hive.ql.exec.tez.TezSessionState.HiveResources;
+
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
@@ -43,7 +41,6 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.tez.dag.api.TezConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import com.google.common.annotations.VisibleForTesting;
/**
@@ -444,44 +441,35 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
/** Reopens the session that was found to not be running. */
@Override
- public TezSessionState reopen(TezSessionState sessionState,
- Configuration conf, String[] additionalFiles) throws Exception {
+ public TezSessionState reopen(TezSessionState sessionState) throws Exception {
HiveConf sessionConf = sessionState.getConf();
if (sessionState.getQueueName() != null
&& sessionConf.get(TezConfiguration.TEZ_QUEUE_NAME) == null) {
sessionConf.set(TezConfiguration.TEZ_QUEUE_NAME, sessionState.getQueueName());
}
- reopenInternal(sessionState, additionalFiles);
+ reopenInternal(sessionState);
return sessionState;
}
static void reopenInternal(
- TezSessionState sessionState, String[] additionalFiles) throws Exception {
- Set<String> oldAdditionalFiles = sessionState.getAdditionalFilesNotFromConf();
- // TODO: implies the session files and the array are the same if not null; why? very brittle
- if ((oldAdditionalFiles == null || oldAdditionalFiles.isEmpty())
- && (additionalFiles != null)) {
- oldAdditionalFiles = new HashSet<>();
- for (String file : additionalFiles) {
- oldAdditionalFiles.add(file);
- }
- }
+ TezSessionState sessionState) throws Exception {
+ HiveResources resources = sessionState.extractHiveResources();
// TODO: close basically resets the object to a bunch of nulls.
// We should ideally not reuse the object because it's pointless and error-prone.
- sessionState.close(true);
+ sessionState.close(false);
// Note: scratchdir is reused implicitly because the sessionId is the same.
- sessionState.open(oldAdditionalFiles, null);
+ sessionState.open(resources);
}
- public void closeNonDefaultSessions(boolean keepTmpDir) throws Exception {
+ public void closeNonDefaultSessions() throws Exception {
List<TezSessionState> sessionsToClose = null;
synchronized (openSessions) {
sessionsToClose = new ArrayList<TezSessionState>(openSessions);
}
for (TezSessionState sessionState : sessionsToClose) {
System.err.println("Shutting down tez session.");
- closeIfNotDefault(sessionState, keepTmpDir);
+ closeIfNotDefault(sessionState, false);
}
}
@@ -492,9 +480,7 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
if (queueName == null) {
LOG.warn("Pool session has a null queue: " + oldSession);
}
- TezSessionPoolSession newSession = createAndInitSession(
- queueName, oldSession.isDefault(), oldSession.getConf());
- defaultSessionPool.replaceSession(oldSession, false, null);
+ defaultSessionPool.replaceSession(oldSession);
}
/** Called by TezSessionPoolSession when opened. */
http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
index b3ccd24..96ade50 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
@@ -19,11 +19,8 @@
package org.apache.hadoop.hive.ql.exec.tez;
import com.google.common.util.concurrent.SettableFuture;
-
import org.apache.hadoop.hive.registry.impl.TezAmInstance;
-
import org.apache.hadoop.conf.Configuration;
-
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Collection;
@@ -31,9 +28,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-
import javax.security.auth.login.LoginException;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -42,7 +37,6 @@ import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider;
import org.apache.hadoop.hive.ql.wm.TriggerActionHandler;
import org.apache.hadoop.hive.registry.impl.TezAmInstance;
import org.apache.tez.dag.api.TezException;
-
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -66,8 +60,7 @@ class TezSessionPoolSession extends TezSessionState {
void returnAfterUse(TezSessionPoolSession session) throws Exception;
- TezSessionState reopen(TezSessionState session, Configuration conf,
- String[] inputOutputJars) throws Exception;
+ TezSessionState reopen(TezSessionState session) throws Exception;
void destroy(TezSessionState session) throws Exception;
}
@@ -128,10 +121,10 @@ class TezSessionPoolSession extends TezSessionState {
}
@Override
- protected void openInternal(Collection<String> additionalFiles,
- boolean isAsync, LogHelper console, Path scratchDir)
+ protected void openInternal(String[] additionalFiles,
+ boolean isAsync, LogHelper console, HiveResources resources)
throws IOException, LoginException, URISyntaxException, TezException {
- super.openInternal(additionalFiles, isAsync, console, scratchDir);
+ super.openInternal(additionalFiles, isAsync, console, resources);
parent.registerOpenSession(this);
if (expirationTracker != null) {
expirationTracker.addToExpirationQueue(this);
@@ -206,9 +199,8 @@ class TezSessionPoolSession extends TezSessionState {
}
@Override
- public TezSessionState reopen(
- Configuration conf, String[] inputOutputJars) throws Exception {
- return parent.reopen(this, conf, inputOutputJars);
+ public TezSessionState reopen() throws Exception {
+ return parent.reopen(this);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index dd879fc..5e892c6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -17,7 +17,8 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
-import java.util.Collection;
+import org.apache.hadoop.registry.client.api.RegistryOperations;
+
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
@@ -37,7 +38,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.login.LoginException;
-
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.hadoop.conf.Configuration;
@@ -90,7 +90,6 @@ import org.codehaus.jackson.map.annotate.JsonSerialize;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor;
-
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
@@ -126,8 +125,18 @@ public class TezSessionState {
private AtomicReference<String> ownerThread = new AtomicReference<>(null);
- private final Set<String> additionalFilesNotFromConf = new HashSet<String>();
- private final Set<LocalResource> localizedResources = new HashSet<LocalResource>();
+ public static final class HiveResources {
+ public HiveResources(Path dagResourcesDir) {
+ this.dagResourcesDir = dagResourcesDir;
+ }
+ /** A directory that will contain resources related to DAGs and specified in configs. */
+ public final Path dagResourcesDir;
+ public final Set<String> additionalFilesNotFromConf = new HashSet<>();
+ /** Localized resources of this session; both from conf and not from conf (above). */
+ public final Set<LocalResource> localizedResources = new HashSet<>();
+ }
+
+ private HiveResources resources;
@JsonProperty("doAsEnabled")
private boolean doAsEnabled;
private boolean isLegacyLlapMode;
@@ -201,45 +210,32 @@ public class TezSessionState {
}
public void open() throws IOException, LoginException, URISyntaxException, TezException {
- Set<String> noFiles = null;
- open(noFiles, null);
+ String[] noFiles = null;
+ open(noFiles);
}
/**
* Creates a tez session. A session is tied to either a cli/hs2 session. You can
* submit multiple DAGs against a session (as long as they are executed serially).
- * @throws IOException
- * @throws URISyntaxException
- * @throws LoginException
- * @throws TezException
- * @throws InterruptedException
*/
- public void open(String[] additionalFiles)
+ public void open(String[] additionalFilesNotFromConf)
throws IOException, LoginException, URISyntaxException, TezException {
- openInternal(setFromArray(additionalFiles), false, null, null);
+ openInternal(additionalFilesNotFromConf, false, null, null);
}
- private static Set<String> setFromArray(String[] additionalFiles) {
- if (additionalFiles == null) return null;
- Set<String> files = new HashSet<>();
- for (String originalFile : additionalFiles) {
- files.add(originalFile);
- }
- return files;
+
+ public void open(HiveResources resources)
+ throws LoginException, IOException, URISyntaxException, TezException {
+ openInternal(null, false, null, resources);
}
public void beginOpen(String[] additionalFiles, LogHelper console)
throws IOException, LoginException, URISyntaxException, TezException {
- openInternal(setFromArray(additionalFiles), true, console, null);
+ openInternal(additionalFiles, true, console, null);
}
- public void open(Collection<String> additionalFiles, Path scratchDir)
- throws LoginException, IOException, URISyntaxException, TezException {
- openInternal(additionalFiles, false, null, scratchDir);
- }
-
- protected void openInternal(Collection<String> additionalFiles,
- boolean isAsync, LogHelper console, Path scratchDir)
+ protected void openInternal(String[] additionalFilesNotFromConf,
+ boolean isAsync, LogHelper console, HiveResources resources)
throws IOException, LoginException, URISyntaxException, TezException {
// TODO Why is the queue name set again. It has already been setup via setQueueName. Do only one of the two.
String confQueueName = conf.get(TezConfiguration.TEZ_QUEUE_NAME);
@@ -258,25 +254,25 @@ public class TezSessionState {
user = ugi.getShortUserName();
LOG.info("User of session id " + sessionId + " is " + user);
- // create the tez tmp dir
- tezScratchDir = scratchDir == null ? createTezDir(sessionId) : scratchDir;
-
- additionalFilesNotFromConf.clear();
- if (additionalFiles != null) {
- additionalFilesNotFromConf.addAll(additionalFiles);
+ // Create the tez tmp dir and a directory for Hive resources.
+ tezScratchDir = createTezDir(sessionId, null);
+ if (resources != null) {
+ // If we are getting the resources externally, don't relocalize anything.
+ this.resources = resources;
+ } else {
+ this.resources = new HiveResources(createTezDir(sessionId, "resources"));
+ ensureLocalResources(conf, additionalFilesNotFromConf);
}
- refreshLocalResourcesFromConf(conf);
-
// unless already installed on all the cluster nodes, we'll have to
// localize hive-exec.jar as well.
appJarLr = createJarLocalResource(utils.getExecJarPathLocal());
// configuration for the application master
final Map<String, LocalResource> commonLocalResources = new HashMap<String, LocalResource>();
- commonLocalResources.put(utils.getBaseName(appJarLr), appJarLr);
- for (LocalResource lr : localizedResources) {
- commonLocalResources.put(utils.getBaseName(lr), lr);
+ commonLocalResources.put(DagUtils.getBaseName(appJarLr), appJarLr);
+ for (LocalResource lr : this.resources.localizedResources) {
+ commonLocalResources.put(DagUtils.getBaseName(lr), lr);
}
if (llapMode) {
@@ -284,7 +280,7 @@ public class TezSessionState {
addJarLRByClass(LlapTaskSchedulerService.class, commonLocalResources);
addJarLRByClass(LlapProtocolClientImpl.class, commonLocalResources);
addJarLRByClass(LlapProtocolClientProxy.class, commonLocalResources);
- addJarLRByClassName("org.apache.hadoop.registry.client.api.RegistryOperations", commonLocalResources);
+ addJarLRByClass(RegistryOperations.class, commonLocalResources);
}
// Create environment for AM.
@@ -556,36 +552,54 @@ public class TezSessionState {
tezConf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, modifyStr);
}
- public void refreshLocalResourcesFromConf(HiveConf conf)
- throws IOException, LoginException, URISyntaxException, TezException {
-
- String dir = tezScratchDir.toString();
-
- localizedResources.clear();
+ /** This is called in openInternal and in TezTask.updateSession to localize conf resources. */
+ public void ensureLocalResources(Configuration conf, String[] newFilesNotFromConf)
+ throws IOException, LoginException, URISyntaxException, TezException {
+ String dir = resources.dagResourcesDir.toString();
+ resources.localizedResources.clear();
- // these are local resources set through add file, jar, etc
+ // Always localize files from conf; duplicates are handled on FS level.
+ // TODO: we could do the same thing as below and only localize if missing.
+ // That could be especially valuable given that this almost always the same set.
List<LocalResource> lrs = utils.localizeTempFilesFromConf(dir, conf);
if (lrs != null) {
- localizedResources.addAll(lrs);
+ resources.localizedResources.addAll(lrs);
}
- // these are local resources that are set through the mr "tmpjars" property; skip session files.
- List<LocalResource> handlerLr = utils.localizeTempFiles(dir, conf,
- additionalFilesNotFromConf.toArray(new String[additionalFilesNotFromConf.size()]),
- DagUtils.getTempFilesFromConf(conf));
-
- if (handlerLr != null) {
- localizedResources.addAll(handlerLr);
+ // Localize the non-conf resources that are missing from the current list.
+ List<LocalResource> newResources = null;
+ if (newFilesNotFromConf != null && newFilesNotFromConf.length > 0) {
+ boolean hasResources = !resources.additionalFilesNotFromConf.isEmpty();
+ if (hasResources) {
+ for (String s : newFilesNotFromConf) {
+ hasResources = resources.additionalFilesNotFromConf.contains(s);
+ if (!hasResources) break;
+ }
+ }
+ if (!hasResources) {
+ String[] skipFilesFromConf = DagUtils.getTempFilesFromConf(conf);
+ newResources = utils.localizeTempFiles(dir, conf, newFilesNotFromConf, skipFilesFromConf);
+ if (newResources != null) {
+ resources.localizedResources.addAll(newResources);
+ }
+ for (String fullName : newFilesNotFromConf) {
+ resources.additionalFilesNotFromConf.add(fullName);
+ }
+ }
}
- }
- public boolean hasResources(String[] localAmResources) {
- if (localAmResources == null || localAmResources.length == 0) return true;
- if (additionalFilesNotFromConf.isEmpty()) return false;
- for (String s : localAmResources) {
- if (!additionalFilesNotFromConf.contains(s)) return false;
+ // Finally add the files to AM. The old code seems to do this twice, first for all the new
+ // resources regardless of type; and then for all the session resources that are not of type
+ // file (see branch-1 calls to addAppMasterLocalFiles: from updateSession and with resourceMap
+ // from submit).
+ // TODO: Do we really need all this nonsense?
+ if (newResources != null && !newResources.isEmpty()) {
+ session.addAppMasterLocalFiles(DagUtils.createTezLrMap(null, newResources));
+ }
+ if (!resources.localizedResources.isEmpty()) {
+ session.addAppMasterLocalFiles(
+ DagUtils.getResourcesUpdatableForAm(resources.localizedResources));
}
- return true;
}
/**
@@ -593,11 +607,11 @@ public class TezSessionState {
* further DAGs can be executed against it. Only called by session management classes; some
* sessions should not simply be closed by users - e.g. pool sessions need to be restarted.
*
- * @param keepTmpDir
+ * @param keepDagFilesDir
* whether or not to remove the scratch dir at the same time.
* @throws Exception
*/
- void close(boolean keepTmpDir) throws Exception {
+ void close(boolean keepDagFilesDir) throws Exception {
if (session != null) {
LOG.info("Closing Tez Session");
closeClient(session);
@@ -618,20 +632,16 @@ public class TezSessionState {
}
}
- if (!keepTmpDir) {
- cleanupScratchDir();
+ cleanupScratchDir();
+ if (!keepDagFilesDir) {
+ cleanupDagResources();
}
session = null;
sessionFuture = null;
console = null;
tezScratchDir = null;
+ // Do not reset dag resources; if it wasn't cleaned it's still needed.
appJarLr = null;
- additionalFilesNotFromConf.clear();
- localizedResources.clear();
- }
-
- public Set<String> getAdditionalFilesNotFromConf() {
- return additionalFilesNotFromConf;
}
private void closeClient(TezClient client) throws TezException,
@@ -643,12 +653,18 @@ public class TezSessionState {
}
}
- protected final void cleanupScratchDir () throws IOException {
+ protected final void cleanupScratchDir() throws IOException {
FileSystem fs = tezScratchDir.getFileSystem(conf);
fs.delete(tezScratchDir, true);
tezScratchDir = null;
}
+ protected final void cleanupDagResources() throws IOException {
+ FileSystem fs = resources.dagResourcesDir.getFileSystem(conf);
+ fs.delete(resources.dagResourcesDir, true);
+ resources = null;
+ }
+
public String getSessionId() {
return sessionId;
}
@@ -675,10 +691,6 @@ public class TezSessionState {
return session;
}
- public Path getTezScratchDir() {
- return tezScratchDir;
- }
-
public LocalResource getAppJarLr() {
return appJarLr;
}
@@ -687,11 +699,11 @@ public class TezSessionState {
* createTezDir creates a temporary directory in the scratchDir folder to
* be used with Tez. Assumes scratchDir exists.
*/
- private Path createTezDir(String sessionId) throws IOException {
+ private Path createTezDir(String sessionId, String suffix) throws IOException {
// tez needs its own scratch dir (per session)
// TODO: De-link from SessionState. A TezSession can be linked to different Hive Sessions via the pool.
Path tezDir = new Path(SessionState.get().getHdfsScratchDirURIString(), TEZ_DIR);
- tezDir = new Path(tezDir, sessionId);
+ tezDir = new Path(tezDir, sessionId + ((suffix == null) ? "" : ("-" + suffix)));
FileSystem fs = tezDir.getFileSystem(conf);
FsPermission fsPermission = new FsPermission(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIRPERMISSION));
fs.mkdirs(tezDir, fsPermission);
@@ -759,9 +771,8 @@ public class TezSessionState {
final File jar =
new File(Utilities.jarFinderGetJar(clazz));
final String localJarPath = jar.toURI().toURL().toExternalForm();
- final LocalResource jarLr =
- createJarLocalResource(localJarPath);
- lrMap.put(utils.getBaseName(jarLr), jarLr);
+ final LocalResource jarLr = createJarLocalResource(localJarPath);
+ lrMap.put(DagUtils.getBaseName(jarLr), jarLr);
}
private String getSha(final Path localFile) throws IOException, IllegalArgumentException {
@@ -808,7 +819,7 @@ public class TezSessionState {
}
public List<LocalResource> getLocalizedResources() {
- return new ArrayList<>(localizedResources);
+ return new ArrayList<>(resources.localizedResources);
}
public String getUser() {
@@ -849,10 +860,9 @@ public class TezSessionState {
TezSessionPoolManager.getInstance().returnSession(this);
}
- public TezSessionState reopen(
- Configuration conf, String[] inputOutputJars) throws Exception {
+ public TezSessionState reopen() throws Exception {
// By default, TezSessionPoolManager handles this for both pool and non-pool session.
- return TezSessionPoolManager.getInstance().reopen(this, conf, inputOutputJars);
+ return TezSessionPoolManager.getInstance().reopen(this);
}
public void destroy() throws Exception {
@@ -875,4 +885,28 @@ public class TezSessionState {
public KillQuery getKillQuery() {
return killQuery;
}
+
+ public HiveResources extractHiveResources() {
+ HiveResources result = resources;
+ resources = null;
+ return result;
+ }
+
+ public Path replaceHiveResources(HiveResources resources, boolean isAsync) {
+ Path dir = null;
+ if (this.resources != null) {
+ dir = this.resources.dagResourcesDir;
+ if (!isAsync) {
+ try {
+ dir.getFileSystem(conf).delete(dir, true);
+ } catch (Exception ex) {
+ LOG.error("Failed to delete the old resources directory "
+ + dir + "; ignoring " + ex.getLocalizedMessage());
+ }
+ dir = null;
+ }
+ }
+ this.resources = resources;
+ return dir;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 8795cfc..27799a8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -18,8 +18,8 @@
package org.apache.hadoop.hive.ql.exec.tez;
+import org.apache.hive.common.util.Ref;
import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput;
-
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
@@ -31,9 +31,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
import javax.annotation.Nullable;
-
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -67,7 +65,6 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.tez.client.CallerContext;
import org.apache.tez.client.TezClient;
import org.apache.tez.common.counters.CounterGroup;
@@ -87,7 +84,6 @@ import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.client.VertexStatus;
import org.json.JSONObject;
-
import com.google.common.annotations.VisibleForTesting;
/**
@@ -133,7 +129,7 @@ public class TezTask extends Task<TezWork> {
int rc = 1;
boolean cleanContext = false;
Context ctx = null;
- TezSessionState session = null;
+ Ref<TezSessionState> sessionRef = Ref.from(null);
try {
// Get or create Context object. If we create it we have to clean it later as well.
@@ -147,15 +143,15 @@ public class TezTask extends Task<TezWork> {
WmContext wmContext = new WmContext(System.currentTimeMillis(), queryId);
ctx.setWmContext(wmContext);
}
-
// Need to remove this static hack. But this is the way currently to get a session.
SessionState ss = SessionState.get();
// Note: given that we return pool sessions to the pool in the finally block below, and that
// we need to set the global to null to do that, this "reuse" may be pointless.
- session = ss.getTezSession();
+ TezSessionState session = sessionRef.value = ss.getTezSession();
if (session != null && !session.isOpen()) {
LOG.warn("The session: " + session + " has not been opened");
}
+
// We only need a username for UGI to use for groups; getGroups will fetch the groups
// based on Hadoop configuration, as documented at
// https://hadoop.apache.org/docs/r2.8.0/hadoop-project-dist/hadoop-common/GroupsMapping.html
@@ -163,57 +159,51 @@ public class TezTask extends Task<TezWork> {
MappingInput mi = (userName == null) ? new MappingInput("anonymous", null)
: new MappingInput(ss.getUserName(),
UserGroupInformation.createRemoteUser(ss.getUserName()).getGroups());
+
WmContext wmContext = ctx.getWmContext();
- session = WorkloadManagerFederation.getSession(session, conf, mi, getWork().getLlapMode(), wmContext);
+ // jobConf will hold all the configuration for hadoop, tez, and hive
+ JobConf jobConf = utils.createConfiguration(conf);
+ // Get all user jars from work (e.g. input format stuff).
+ String[] allNonConfFiles = work.configureJobConfAndExtractJars(jobConf);
+ // DAG scratch dir. We get a session from the pool so it may be different from Tez one.
+ // TODO: we could perhaps reuse the same directory for HiveResources?
+ Path scratchDir = utils.createTezDir(ctx.getMRScratchDir(), conf);
+ CallerContext callerContext = CallerContext.create(
+ "HIVE", queryPlan.getQueryId(), "HIVE_QUERY_ID", queryPlan.getQueryStr());
+
+ session = sessionRef.value = WorkloadManagerFederation.getSession(
+ sessionRef.value, conf, mi, getWork().getLlapMode(), wmContext);
- LOG.info("Subscribed to counters: {} for queryId: {}", wmContext.getSubscribedCounters(), wmContext.getQueryId());
- ss.setTezSession(session);
try {
- // jobConf will hold all the configuration for hadoop, tez, and hive
- JobConf jobConf = utils.createConfiguration(conf);
-
- // Get all user jars from work (e.g. input format stuff).
- String[] inputOutputJars = work.configureJobConfAndExtractJars(jobConf);
-
- // we will localize all the files (jars, plans, hashtables) to the
- // scratch dir. let's create this and tmp first.
- Path scratchDir = ctx.getMRScratchDir();
-
- // create the tez tmp dir
- scratchDir = utils.createTezDir(scratchDir, conf);
+ ss.setTezSession(session);
+ LOG.info("Subscribed to counters: {} for queryId: {}", wmContext.getSubscribedCounters(),
+ wmContext.getQueryId());
- // This is used to compare global and vertex resources. Global resources are originally
- // derived from session conf via localizeTempFilesFromConf. So, use that here.
- Configuration sessionConf = (session.getConf() != null) ? session.getConf() : conf;
- Map<String,LocalResource> inputOutputLocalResources =
- getExtraLocalResources(jobConf, scratchDir, inputOutputJars, sessionConf);
+ // Ensure the session is open and has the necessary local resources.
+ // This would refresh any conf resources and also local resources.
+ ensureSessionHasResources(session, allNonConfFiles);
- // Ensure the session is open and has the necessary local resources
- updateSession(session, jobConf, scratchDir, inputOutputJars, inputOutputLocalResources);
+ // This is a combination of the jar stuff from conf, and not from conf.
+ List<LocalResource> allNonAppResources = session.getLocalizedResources();
+ logResources(allNonAppResources);
- List<LocalResource> additionalLr = session.getLocalizedResources();
- logResources(additionalLr);
-
- // unless already installed on all the cluster nodes, we'll have to
- // localize hive-exec.jar as well.
- LocalResource appJarLr = session.getAppJarLr();
+ Map<String, LocalResource> allResources = DagUtils.createTezLrMap(
+ session.getAppJarLr(), allNonAppResources);
// next we translate the TezWork to a Tez DAG
- DAG dag = build(jobConf, work, scratchDir, appJarLr, additionalLr, ctx);
- CallerContext callerContext = CallerContext.create(
- "HIVE", queryPlan.getQueryId(),
- "HIVE_QUERY_ID", queryPlan.getQueryStr());
+ DAG dag = build(jobConf, work, scratchDir, ctx, allResources);
dag.setCallerContext(callerContext);
- // Add the extra resources to the dag
- addExtraResourcesToDag(session, dag, inputOutputJars, inputOutputLocalResources);
+ // Note: we no longer call addTaskLocalFiles because all the resources are correctly
+ // updated in the session resource lists now, and thus added to vertices.
+ // If something breaks, dag.addTaskLocalFiles might need to be called here.
// Check isShutdown opportunistically; it's never unset.
if (this.isShutdown) {
throw new HiveException("Operation cancelled");
}
- DAGClient dagClient = submit(jobConf, dag, scratchDir, appJarLr, session,
- additionalLr, inputOutputJars, inputOutputLocalResources);
+ DAGClient dagClient = submit(jobConf, dag, sessionRef);
+ session = sessionRef.value;
boolean wasShutdown = false;
synchronized (dagClientLock) {
assert this.dagClient == null;
@@ -251,7 +241,9 @@ public class TezTask extends Task<TezWork> {
// We return this to the pool even if it's unusable; reopen is supposed to handle this.
wmContext = ctx.getWmContext();
try {
- session.returnToSessionManager();
+ if (sessionRef.value != null) {
+ sessionRef.value.returnToSessionManager();
+ }
} catch (Exception e) {
LOG.error("Failed to return session: {} to pool", session, e);
throw e;
@@ -340,61 +332,23 @@ public class TezTask extends Task<TezWork> {
}
/**
- * Converted the list of jars into local resources
- */
- Map<String,LocalResource> getExtraLocalResources(JobConf jobConf, Path scratchDir,
- String[] inputOutputJars, Configuration sessionConf) throws Exception {
- final Map<String,LocalResource> resources = new HashMap<String,LocalResource>();
- // Skip the files already in session local resources...
- final List<LocalResource> localResources = utils.localizeTempFiles(scratchDir.toString(),
- jobConf, inputOutputJars, DagUtils.getTempFilesFromConf(sessionConf));
- if (null != localResources) {
- for (LocalResource lr : localResources) {
- resources.put(utils.getBaseName(lr), lr);
- }
- }
- return resources;
- }
-
- /**
* Ensures that the Tez Session is open and the AM has all necessary jars configured.
*/
- void updateSession(TezSessionState session,
- JobConf jobConf, Path scratchDir, String[] inputOutputJars,
- Map<String,LocalResource> extraResources) throws Exception {
- final boolean missingLocalResources = !session
- .hasResources(inputOutputJars);
-
+ @VisibleForTesting
+ void ensureSessionHasResources(
+ TezSessionState session, String[] nonConfResources) throws Exception {
TezClient client = session.getSession();
// TODO null can also mean that this operation was interrupted. Should we really try to re-create the session in that case ?
if (client == null) {
+ // Note: the only sane case where this can happen is the non-pool one. We should get rid
+ // of it, in non-pool case perf doesn't matter so we might as well open at get time
+ // and then call update like we do in the else.
// Can happen if the user sets the tez flag after the session was established.
LOG.info("Tez session hasn't been created yet. Opening session");
- session.open(inputOutputJars);
+ session.open(nonConfResources);
} else {
LOG.info("Session is already open");
-
- // Ensure the open session has the necessary resources (StorageHandler)
- if (missingLocalResources) {
- LOG.info("Tez session missing resources," +
- " adding additional necessary resources");
- client.addAppMasterLocalFiles(extraResources);
- }
-
- session.refreshLocalResourcesFromConf(conf);
- }
- }
-
- /**
- * Adds any necessary resources that must be localized in each vertex to the DAG.
- */
- void addExtraResourcesToDag(TezSessionState session, DAG dag,
- String[] inputOutputJars,
- Map<String,LocalResource> inputOutputLocalResources) throws Exception {
- if (!session.hasResources(inputOutputJars)) {
- if (null != inputOutputLocalResources) {
- dag.addTaskLocalFiles(inputOutputLocalResources);
- }
+ session.ensureLocalResources(conf, nonConfResources);
}
}
@@ -406,9 +360,8 @@ public class TezTask extends Task<TezWork> {
}
}
- DAG build(JobConf conf, TezWork work, Path scratchDir,
- LocalResource appJarLr, List<LocalResource> additionalLr, Context ctx)
- throws Exception {
+ DAG build(JobConf conf, TezWork work, Path scratchDir, Context ctx,
+ Map<String, LocalResource> vertexResources) throws Exception {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_BUILD_DAG);
@@ -426,7 +379,7 @@ public class TezTask extends Task<TezWork> {
DAG dag = DAG.create(dagName);
// set some info for the query
- JSONObject json = new JSONObject(new LinkedHashMap()).put("context", "Hive")
+ JSONObject json = new JSONObject(new LinkedHashMap<>()).put("context", "Hive")
.put("description", ctx.getCmd());
String dagInfo = json.toString();
@@ -474,7 +427,6 @@ public class TezTask extends Task<TezWork> {
// For a vertex group, all Outputs use the same Key-class, Val-class and partitioner.
// Pick any one source vertex to figure out the Edge configuration.
-
// now hook up the children
for (BaseWork v: children) {
@@ -488,9 +440,8 @@ public class TezTask extends Task<TezWork> {
// Regular vertices
JobConf wxConf = utils.initializeVertexConf(conf, ctx, w);
checkOutputSpec(w, wxConf);
- Vertex wx =
- utils.createVertex(wxConf, w, scratchDir, appJarLr, additionalLr, fs, ctx, !isFinal,
- work, work.getVertexType(w));
+ Vertex wx = utils.createVertex(wxConf, w, scratchDir, fs, ctx, !isFinal,
+ work, work.getVertexType(w), vertexResources);
if (w.getReservedMemoryMB() > 0) {
// If reversedMemoryMB is set, make memory allocation fraction adjustment as needed
double frac = DagUtils.adjustMemoryReserveFraction(w.getReservedMemoryMB(), super.conf);
@@ -548,38 +499,28 @@ public class TezTask extends Task<TezWork> {
dag.setAccessControls(ac);
}
- DAGClient submit(JobConf conf, DAG dag, Path scratchDir,
- LocalResource appJarLr, TezSessionState sessionState,
- List<LocalResource> additionalLr, String[] inputOutputJars,
- Map<String,LocalResource> inputOutputLocalResources)
- throws Exception {
+ private TezSessionState getNewTezSessionOnError(
+ TezSessionState oldSession) throws Exception {
+ // Note: we don't pass the config to reopen. If the session was already open, it would
+ // have kept running with its current config - preserve that behavior.
+ TezSessionState newSession = oldSession.reopen();
+ console.printInfo("Session re-established.");
+ return newSession;
+ }
+
+ DAGClient submit(JobConf conf, DAG dag, Ref<TezSessionState> sessionStateRef) throws Exception {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG);
DAGClient dagClient = null;
-
- Map<String, LocalResource> resourceMap = new HashMap<String, LocalResource>();
- if (additionalLr != null) {
- for (LocalResource lr: additionalLr) {
- if (lr.getType() == LocalResourceType.FILE) {
- // TEZ AM will only localize FILE (no script operators in the AM)
- resourceMap.put(utils.getBaseName(lr), lr);
- }
- }
- }
-
+ TezSessionState sessionState = sessionStateRef.value;
try {
try {
// ready to start execution on the cluster
- sessionState.getSession().addAppMasterLocalFiles(resourceMap);
dagClient = sessionState.getSession().submitDAG(dag);
} catch (SessionNotRunning nr) {
console.printInfo("Tez session was closed. Reopening...");
-
- // close the old one, but keep the tmp files around
- // conf is passed in only for the case when session conf is null (tests and legacy paths?)
- sessionState = sessionState.reopen(conf, inputOutputJars);
+ sessionStateRef.value = sessionState = getNewTezSessionOnError(sessionState);
console.printInfo("Session re-established.");
-
dagClient = sessionState.getSession().submitDAG(dag);
}
} catch (Exception e) {
@@ -587,14 +528,12 @@ public class TezTask extends Task<TezWork> {
try {
console.printInfo("Dag submit failed due to " + e.getMessage() + " stack trace: "
+ Arrays.toString(e.getStackTrace()) + " retrying...");
- // TODO: this is temporary, need to refactor how reopen is invoked.
- WmContext oldCtx = sessionState.getWmContext();
- sessionState = sessionState.reopen(conf, inputOutputJars);
- sessionState.setWmContext(oldCtx);
+ sessionStateRef.value = sessionState = getNewTezSessionOnError(sessionState);
dagClient = sessionState.getSession().submitDAG(dag);
} catch (Exception retryException) {
// we failed to submit after retrying. Destroy session and bail.
sessionState.destroy();
+ sessionStateRef.value = null;
throw retryException;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index dbdbbf2..1f4843d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -17,6 +17,14 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -39,9 +47,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
-
import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.tezplugins.LlapTaskSchedulerService;
@@ -50,6 +57,7 @@ import org.apache.hadoop.hive.metastore.api.WMPool;
import org.apache.hadoop.hive.metastore.api.WMPoolTrigger;
import org.apache.hadoop.hive.metastore.api.WMTrigger;
import org.apache.hadoop.hive.ql.exec.tez.AmPluginNode.AmPluginInfo;
+import org.apache.hadoop.hive.ql.exec.tez.TezSessionState.HiveResources;
import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.session.KillQuery;
@@ -67,15 +75,6 @@ import org.codehaus.jackson.map.SerializationConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
/** Workload management entry point for HS2.
* Note on how this class operates.
@@ -342,6 +341,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
private List<WmTezSession> toRestartInUse = new LinkedList<>(),
toDestroyNoRestart = new LinkedList<>();
private Map<WmTezSession, KillQueryContext> toKillQuery = new IdentityHashMap<>();
+ private List<Path> pathsToDelete = Lists.newArrayList();
}
private void runWmThread() {
@@ -440,7 +440,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
try {
WmEvent wmEvent = new WmEvent(WmEvent.EventType.RESTART);
// Note: sessions in toRestart are always in use, so they cannot expire in parallel.
- tezAmPool.replaceSession(toRestart, false, null);
+ tezAmPool.replaceSession(toRestart);
wmEvent.endEvent(toRestart);
} catch (Exception ex) {
LOG.error("Failed to restart an old session; ignoring", ex);
@@ -463,6 +463,19 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
});
}
context.toDestroyNoRestart.clear();
+
+ // 4. Delete unneeded directories that were replaced by other ones via reopen.
+ for (final Path path : context.pathsToDelete) {
+ LOG.info("Deleting {}", path);
+ workPool.submit(() -> {
+ try {
+ path.getFileSystem(conf).delete(path, true);
+ } catch (Exception ex) {
+ LOG.error("Failed to delete an old path; ignoring " + ex.getMessage());
+ }
+ });
+ }
+ context.pathsToDelete.clear();
}
/**
@@ -654,7 +667,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
if (LOG.isDebugEnabled()) {
LOG.info("Processing changes for pool " + poolName + ": " + pools.get(poolName));
}
- processPoolChangesOnMasterThread(poolName, hasRequeues);
+ processPoolChangesOnMasterThread(poolName, hasRequeues, syncWork);
}
@@ -852,7 +865,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
case OK:
// If pool didn't exist, checkAndRemoveSessionFromItsPool wouldn't have returned OK.
PoolState pool = pools.get(poolName);
- SessionInitContext sw = new SessionInitContext(future, poolName, session.getQueryId(), session.getWmContext());
+ SessionInitContext sw = new SessionInitContext(future, poolName, session.getQueryId(),
+ session.getWmContext(), session.extractHiveResources());
// We have just removed the session from the same pool, so don't check concurrency here.
pool.initializingSessions.add(sw);
ListenableFuture<WmTezSession> getFuture = tezAmPool.getSessionAsync();
@@ -953,7 +967,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
state = new PoolState(fullName, qp, fraction);
} else {
// This will also take care of the queries if query parallelism changed.
- state.update(qp, fraction, syncWork.toKillQuery, e);
+ state.update(qp, fraction, syncWork, e);
poolsToRedistribute.add(fullName);
}
state.setTriggers(new LinkedList<Trigger>());
@@ -988,7 +1002,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
if (oldPools != null && !oldPools.isEmpty()) {
// Looks like some pools were removed; kill running queries, re-queue the queued ones.
for (PoolState oldPool : oldPools.values()) {
- oldPool.destroy(syncWork.toKillQuery, e.getRequests, e.toReuse);
+ oldPool.destroy(syncWork, e.getRequests, e.toReuse);
}
}
@@ -1027,7 +1041,6 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
return deltaSessions + toTransfer;
}
- @SuppressWarnings("unchecked")
private void failOnFutureFailure(ListenableFuture<?> future) {
Futures.addCallback(future, FATAL_ERROR_CALLBACK);
}
@@ -1088,7 +1101,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
- private void processPoolChangesOnMasterThread(String poolName, boolean hasRequeues) throws Exception {
+ private void processPoolChangesOnMasterThread(
+ String poolName, boolean hasRequeues, WmThreadSyncWork syncWork) throws Exception {
PoolState pool = pools.get(poolName);
if (pool == null) return; // Might be from before the new resource plan.
@@ -1109,15 +1123,15 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
// Note that in theory, we are guaranteed to have a session waiting for us here, but
// the expiration, failures, etc. may cause one to be missing pending restart.
// See SessionInitContext javadoc.
- SessionInitContext sw = new SessionInitContext(queueReq.future, poolName, queueReq.queryId,
- queueReq.wmContext);
+ SessionInitContext sw = new SessionInitContext(
+ queueReq.future, poolName, queueReq.queryId, queueReq.wmContext, null);
ListenableFuture<WmTezSession> getFuture = tezAmPool.getSessionAsync();
Futures.addCallback(getFuture, sw);
// It is possible that all the async methods returned on the same thread because the
// session with registry data and stuff was available in the pool.
// If this happens, we'll take the session out here and "cancel" the init so we skip
// processing the message that the successful init has queued for us.
- boolean isDone = sw.extractSessionAndCancelIfDone(pool.sessions);
+ boolean isDone = sw.extractSessionAndCancelIfDone(pool.sessions, syncWork.pathsToDelete);
if (!isDone) {
pool.initializingSessions.add(sw);
}
@@ -1458,22 +1472,14 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
@Override
- public TezSessionState reopen(TezSessionState session, Configuration conf,
- String[] additionalFiles) throws Exception {
+ public TezSessionState reopen(TezSessionState session) throws Exception {
WmTezSession wmTezSession = ensureOwnedSession(session);
HiveConf sessionConf = wmTezSession.getConf();
if (sessionConf == null) {
+ // TODO: can this ever happen?
LOG.warn("Session configuration is null for " + wmTezSession);
sessionConf = new HiveConf(conf, WorkloadManager.class);
}
- // TODO: ideally, we should handle reopen the same way no matter what. However, the cases
- // with additional files will have to wait until HIVE-17827 is unfucked, because it's
- // difficult to determine how the additionalFiles are to be propagated/reused between
- // two sessions. Once the update logic is encapsulated in the session we can remove this.
- if (additionalFiles != null && additionalFiles.length > 0) {
- TezSessionPoolManager.reopenInternal(session, additionalFiles);
- return session;
- }
SettableFuture<WmTezSession> future = SettableFuture.create();
currentLock.lock();
@@ -1493,7 +1499,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
public void closeAndReopenExpiredSession(TezSessionPoolSession session) throws Exception {
// By definition, this session is not in use and can no longer be in use, so it only
// affects the session pool. We can handle this inline.
- tezAmPool.replaceSession(ensureOwnedSession(session), false, null);
+ tezAmPool.replaceSession(ensureOwnedSession(session));
}
// ======= VARIOUS UTILITY METHOD
@@ -1637,7 +1643,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
public void update(int queryParallelism, double fraction,
- Map<WmTezSession, KillQueryContext> toKill, EventState e) {
+ WmThreadSyncWork syncWork, EventState e) {
this.finalFraction = this.finalFractionRemaining = fraction;
this.queryParallelism = queryParallelism;
// TODO: two possible improvements
@@ -1646,7 +1652,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
// If we could somehow restart queries we could instead put them at the front
// of the queue (esp. in conjunction with (1)) and rerun them.
if (queryParallelism < getTotalActiveSessions()) {
- extractAllSessionsToKill("The query pool was resized by administrator", e.toReuse, toKill);
+ extractAllSessionsToKill("The query pool was resized by administrator",
+ e.toReuse, syncWork);
}
// We will requeue, and not kill, the queries that are not running yet.
// Insert them all before the get requests from this iteration.
@@ -1656,9 +1663,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
}
- public void destroy(Map<WmTezSession, KillQueryContext> toKill,
+ public void destroy(WmThreadSyncWork syncWork,
LinkedList<GetRequest> globalQueue, IdentityHashMap<WmTezSession, GetRequest> toReuse) {
- extractAllSessionsToKill("The query pool was removed by administrator", toReuse, toKill);
+ extractAllSessionsToKill("The query pool was removed by administrator", toReuse, syncWork);
// All the pending get requests should just be requeued elsewhere.
// Note that we never queue session reuse so sessionToReuse would be null.
globalQueue.addAll(0, queue);
@@ -1694,19 +1701,22 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
private void extractAllSessionsToKill(String killReason,
IdentityHashMap<WmTezSession, GetRequest> toReuse,
- Map<WmTezSession, KillQueryContext> toKill) {
+ WmThreadSyncWork syncWork) {
for (WmTezSession sessionToKill : sessions) {
- resetRemovedSessionToKill(toKill, new KillQueryContext(sessionToKill, killReason), toReuse);
+ resetRemovedSessionToKill(syncWork.toKillQuery,
+ new KillQueryContext(sessionToKill, killReason), toReuse);
}
sessions.clear();
for (SessionInitContext initCtx : initializingSessions) {
// It is possible that the background init thread has finished in parallel, queued
// the message for us but also returned the session to the user.
- WmTezSession sessionToKill = initCtx.cancelAndExtractSessionIfDone(killReason);
+ WmTezSession sessionToKill = initCtx.cancelAndExtractSessionIfDone(
+ killReason, syncWork.pathsToDelete);
if (sessionToKill == null) {
continue; // Async op in progress; the callback will take care of this.
}
- resetRemovedSessionToKill(toKill, new KillQueryContext(sessionToKill, killReason), toReuse);
+ resetRemovedSessionToKill(syncWork.toKillQuery,
+ new KillQueryContext(sessionToKill, killReason), toReuse);
}
initializingSessions.clear();
}
@@ -1740,14 +1750,18 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
private SettableFuture<WmTezSession> future;
private SessionInitState state;
private String cancelReason;
+ private HiveResources prelocalizedResources;
+ private Path pathToDelete;
private WmContext wmContext;
- public SessionInitContext(SettableFuture<WmTezSession> future, String poolName, String queryId,
- final WmContext wmContext) {
+ public SessionInitContext(SettableFuture<WmTezSession> future,
+ String poolName, String queryId, WmContext wmContext,
+ HiveResources prelocalizedResources) {
this.state = SessionInitState.GETTING;
this.future = future;
this.poolName = poolName;
this.queryId = queryId;
+ this.prelocalizedResources = prelocalizedResources;
this.wmContext = wmContext;
}
@@ -1765,7 +1779,12 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
session.setPoolName(poolName);
session.setQueueName(yarnQueue);
session.setQueryId(queryId);
- session.setWmContext(wmContext);
+ if (prelocalizedResources != null) {
+ pathToDelete = session.replaceHiveResources(prelocalizedResources, true);
+ }
+ if (wmContext != null) {
+ session.setWmContext(wmContext);
+ }
this.session = session;
this.state = SessionInitState.WAITING_FOR_REGISTRY;
break;
@@ -1855,7 +1874,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
session.setQueryId(null);
// We can just restart the session if we have received one.
try {
- tezAmPool.replaceSession(session, false, null);
+ tezAmPool.replaceSession(session);
} catch (Exception e) {
LOG.error("Failed to restart a failed session", e);
}
@@ -1863,7 +1882,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
/** Cancel the async operation (even if it's done), and return the session if done. */
- public WmTezSession cancelAndExtractSessionIfDone(String cancelReason) {
+ public WmTezSession cancelAndExtractSessionIfDone(String cancelReason, List<Path> toDelete) {
lock.lock();
try {
SessionInitState state = this.state;
@@ -1872,6 +1891,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
if (state == SessionInitState.DONE) {
WmTezSession result = this.session;
this.session = null;
+ if (pathToDelete != null) {
+ toDelete.add(pathToDelete);
+ }
return result;
} else {
// In the states where a background operation is in progress, wait for the callback.
@@ -1887,11 +1909,15 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
/** Extracts the session and cancel the operation, both only if done. */
- public boolean extractSessionAndCancelIfDone(List<WmTezSession> results) {
+ public boolean extractSessionAndCancelIfDone(
+ List<WmTezSession> results, List<Path> toDelete) {
lock.lock();
try {
if (state != SessionInitState.DONE) return false;
this.state = SessionInitState.CANCELED;
+ if (pathToDelete != null) {
+ toDelete.add(pathToDelete);
+ }
if (this.session != null) {
results.add(this.session);
} // Otherwise we have failed; the callback has taken care of the failure.
http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
index 9726af1..5ade1f3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
@@ -92,7 +92,7 @@ public class TezJobMonitor {
try {
// TODO: why does this only kill non-default sessions?
// Nothing for workload management since that only deals with default ones.
- TezSessionPoolManager.getInstance().closeNonDefaultSessions(false);
+ TezSessionPoolManager.getInstance().closeNonDefaultSessions();
} catch (Exception e) {
// ignore
}
http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
index 4148a8a..d6ae171 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
@@ -356,9 +356,8 @@ public class GenericUDTFGetSplits extends GenericUDTF {
// Update the queryId to use the generated applicationId. See comment below about
// why this is done.
HiveConf.setVar(wxConf, HiveConf.ConfVars.HIVEQUERYID, applicationId.toString());
- Vertex wx = utils.createVertex(wxConf, mapWork, scratchDir, appJarLr,
- new ArrayList<LocalResource>(), fs, ctx, false, work,
- work.getVertexType(mapWork));
+ Vertex wx = utils.createVertex(wxConf, mapWork, scratchDir, fs, ctx, false, work,
+ work.getVertexType(mapWork), DagUtils.createTezLrMap(appJarLr, null));
String vertexName = wx.getName();
dag.addVertex(wx);
utils.addCredentials(mapWork, dag);
http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
index 5248454..0a47cda 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
@@ -20,16 +20,12 @@ package org.apache.hadoop.hive.ql.exec.tez;
import com.google.common.util.concurrent.Futures;
-
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
-import java.util.Collection;
import java.util.concurrent.ScheduledExecutorService;
import javax.security.auth.login.LoginException;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.security.UserGroupInformation;
@@ -83,8 +79,7 @@ public class SampleTezSessionState extends WmTezSession {
}
@Override
- public void open(Collection<String> additionalFiles, Path scratchDir)
- throws LoginException, IOException {
+ public void open(HiveResources resources) throws LoginException, IOException {
open();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
index 829ea8c..8fbe9a7 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
@@ -20,20 +20,16 @@ package org.apache.hadoop.hive.ql.exec.tez;
import static org.junit.Assert.*;
-import java.util.HashSet;
-import java.util.Set;
-
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
-
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
public class TestTezSessionPool {
@@ -190,22 +186,22 @@ public class TestTezSessionPool {
Mockito.when(session.isDefault()).thenReturn(false);
Mockito.when(session.getConf()).thenReturn(conf);
- poolManager.reopen(session, conf, null);
+ poolManager.reopen(session);
- Mockito.verify(session).close(true);
- Mockito.verify(session).open(new HashSet<String>(), null);
+ Mockito.verify(session).close(false);
+ Mockito.verify(session).open(Mockito.<TezSessionState.HiveResources>any());
// mocked session starts with default queue
assertEquals("default", session.getQueueName());
// user explicitly specified queue name
conf.set("tez.queue.name", "tezq1");
- poolManager.reopen(session, conf, null);
+ poolManager.reopen(session);
assertEquals("tezq1", poolManager.getSession(null, conf, false, false).getQueueName());
// user unsets queue name, will fallback to default session queue
conf.unset("tez.queue.name");
- poolManager.reopen(session, conf, null);
+ poolManager.reopen(session);
assertEquals("default", poolManager.getSession(null, conf, false, false).getQueueName());
// session.open will unset the queue name from conf but Mockito intercepts the open call
@@ -213,17 +209,17 @@ public class TestTezSessionPool {
conf.unset("tez.queue.name");
// change session's default queue to tezq1 and rerun test sequence
Mockito.when(session.getQueueName()).thenReturn("tezq1");
- poolManager.reopen(session, conf, null);
+ poolManager.reopen(session);
assertEquals("tezq1", poolManager.getSession(null, conf, false, false).getQueueName());
// user sets default queue now
conf.set("tez.queue.name", "default");
- poolManager.reopen(session, conf, null);
+ poolManager.reopen(session);
assertEquals("default", poolManager.getSession(null, conf, false, false).getQueueName());
// user does not specify queue so use session default
conf.unset("tez.queue.name");
- poolManager.reopen(session, conf, null);
+ poolManager.reopen(session);
assertEquals("tezq1", poolManager.getSession(null, conf, false, false).getQueueName());
} catch (Exception e) {
e.printStackTrace();
@@ -328,10 +324,10 @@ public class TestTezSessionPool {
Mockito.when(session.isDefault()).thenReturn(false);
Mockito.when(session.getConf()).thenReturn(conf);
- poolManager.reopen(session, conf, null);
+ poolManager.reopen(session);
- Mockito.verify(session).close(true);
- Mockito.verify(session).open(new HashSet<String>(), null);
+ Mockito.verify(session).close(false);
+ Mockito.verify(session).open(Mockito.<TezSessionState.HiveResources>any());
}
@Test
http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
index 47aa936..44d2b66 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
@@ -23,16 +23,16 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hive.common.util.Ref;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -96,8 +96,8 @@ public class TestTezTask {
when(utils.getTezDir(any(Path.class))).thenReturn(path);
when(
utils.createVertex(any(JobConf.class), any(BaseWork.class), any(Path.class),
- any(LocalResource.class), any(List.class), any(FileSystem.class), any(Context.class),
- anyBoolean(), any(TezWork.class), any(VertexType.class))).thenAnswer(
+ any(FileSystem.class), any(Context.class),
+ anyBoolean(), any(TezWork.class), any(VertexType.class), any(Map.class))).thenAnswer(
new Answer<Vertex>() {
@Override
@@ -163,7 +163,7 @@ public class TestTezTask {
task.setQueryPlan(mockQueryPlan);
conf = new JobConf();
- appLr = mock(LocalResource.class);
+ appLr = createResource("foo.jar");
HiveConf hiveConf = new HiveConf();
hiveConf
@@ -173,8 +173,7 @@ public class TestTezTask {
session = mock(TezClient.class);
sessionState = mock(TezSessionState.class);
when(sessionState.getSession()).thenReturn(session);
- when(sessionState.reopen(any(Configuration.class), any(String[].class)))
- .thenReturn(sessionState);
+ when(sessionState.reopen()).thenReturn(sessionState);
when(session.submitDAG(any(DAG.class)))
.thenThrow(new SessionNotRunning(""))
.thenReturn(mock(DAGClient.class));
@@ -192,7 +191,8 @@ public class TestTezTask {
@Test
public void testBuildDag() throws IllegalArgumentException, IOException, Exception {
- DAG dag = task.build(conf, work, path, appLr, null, new Context(conf));
+ DAG dag = task.build(conf, work, path, new Context(conf),
+ DagUtils.createTezLrMap(appLr, null));
for (BaseWork w: work.getAllWork()) {
Vertex v = dag.getVertex(w.getName());
assertNotNull(v);
@@ -212,17 +212,17 @@ public class TestTezTask {
@Test
public void testEmptyWork() throws IllegalArgumentException, IOException, Exception {
- DAG dag = task.build(conf, new TezWork("", null), path, appLr, null, new Context(conf));
+ DAG dag = task.build(conf, new TezWork("", null), path, new Context(conf),
+ DagUtils.createTezLrMap(appLr, null));
assertEquals(dag.getVertices().size(), 0);
}
@Test
public void testSubmit() throws Exception {
DAG dag = DAG.create("test");
- task.submit(conf, dag, path, appLr, sessionState, Collections.<LocalResource> emptyList(),
- new String[0], Collections.<String,LocalResource> emptyMap());
+ task.submit(conf, dag, Ref.from(sessionState));
// validate close/reopen
- verify(sessionState, times(1)).reopen(any(Configuration.class), any(String[].class));
+ verify(sessionState, times(1)).reopen();
verify(session, times(2)).submitDAG(any(DAG.class));
}
@@ -235,53 +235,22 @@ public class TestTezTask {
@Test
public void testExistingSessionGetsStorageHandlerResources() throws Exception {
final String[] inputOutputJars = new String[] {"file:///tmp/foo.jar"};
- LocalResource res = mock(LocalResource.class);
- final List<LocalResource> resources = Collections.singletonList(res);
- final Map<String,LocalResource> resMap = new HashMap<String,LocalResource>();
- resMap.put("foo.jar", res);
-
- when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars, null))
- .thenReturn(resources);
- when(utils.getBaseName(res)).thenReturn("foo.jar");
- when(sessionState.isOpen()).thenReturn(true);
- when(sessionState.isOpening()).thenReturn(false);
- when(sessionState.hasResources(inputOutputJars)).thenReturn(false);
- task.updateSession(sessionState, conf, path, inputOutputJars, resMap);
- verify(session).addAppMasterLocalFiles(resMap);
- }
-
- @Test
- public void testExtraResourcesAddedToDag() throws Exception {
- final String[] inputOutputJars = new String[] {"file:///tmp/foo.jar"};
- LocalResource res = mock(LocalResource.class);
+ LocalResource res = createResource(inputOutputJars[0]);
final List<LocalResource> resources = Collections.singletonList(res);
- final Map<String,LocalResource> resMap = new HashMap<String,LocalResource>();
- resMap.put("foo.jar", res);
- DAG dag = mock(DAG.class);
- when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars, null))
- .thenReturn(resources);
- when(utils.getBaseName(res)).thenReturn("foo.jar");
+ when(utils.localizeTempFiles(anyString(), any(Configuration.class), eq(inputOutputJars),
+ any(String[].class))).thenReturn(resources);
when(sessionState.isOpen()).thenReturn(true);
when(sessionState.isOpening()).thenReturn(false);
- when(sessionState.hasResources(inputOutputJars)).thenReturn(false);
- task.addExtraResourcesToDag(sessionState, dag, inputOutputJars, resMap);
- verify(dag).addTaskLocalFiles(resMap);
+ task.ensureSessionHasResources(sessionState, inputOutputJars);
+ // TODO: ideally we should have a test for session itself.
+ verify(sessionState).ensureLocalResources(any(Configuration.class), eq(inputOutputJars));
}
- @Test
- public void testGetExtraLocalResources() throws Exception {
- final String[] inputOutputJars = new String[] {"file:///tmp/foo.jar"};
+ private static LocalResource createResource(String url) {
LocalResource res = mock(LocalResource.class);
- final List<LocalResource> resources = Collections.singletonList(res);
- final Map<String,LocalResource> resMap = new HashMap<String,LocalResource>();
- resMap.put("foo.jar", res);
-
- when(utils.localizeTempFiles(eq(path.toString()), eq(conf), eq(inputOutputJars),
- Mockito.<String[]>any())).thenReturn(resources);
- when(utils.getBaseName(res)).thenReturn("foo.jar");
-
- assertEquals(resMap, task.getExtraLocalResources(conf, path, inputOutputJars, null));
+ when(res.getResource()).thenReturn(URL.fromPath(new Path(url)));
+ return res;
}
@Test
http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
index c58e450..fc8f66a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
@@ -42,7 +42,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
@@ -212,9 +211,8 @@ public class TestWorkloadManager {
}
@Override
- public TezSessionState reopen(
- TezSessionState session, Configuration conf, String[] additionalFiles) throws Exception {
- session = super.reopen(session, conf, additionalFiles);
+ public TezSessionState reopen(TezSessionState session) throws Exception {
+ session = super.reopen(session);
ensureWm();
return session;
}
@@ -274,7 +272,7 @@ public class TestWorkloadManager {
null, new MappingInput("user", null), conf, null);
assertEquals(1.0, session.getClusterFraction(), EPSILON);
qam.assertWasCalledAndReset();
- WmTezSession session2 = (WmTezSession) session.reopen(conf, null);
+ WmTezSession session2 = (WmTezSession) session.reopen();
assertNotSame(session, session2);
wm.addTestEvent().get();
assertEquals(session2.toString(), 1.0, session2.getClusterFraction(), EPSILON);
@@ -682,7 +680,7 @@ public class TestWorkloadManager {
waitForThreadToBlock(cdl1, t1);
checkError(error);
// Replacing it directly in the pool should unblock get.
- pool.replaceSession(oob, false, null);
+ pool.replaceSession(oob);
t1.join();
assertNotNull(sessionA1.get());
assertEquals("A", sessionA1.get().getPoolName());
[2/2] hive git commit: HIVE-18003 : add explicit jdbc connection
string args for mappings (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Posted by se...@apache.org.
HIVE-18003 : add explicit jdbc connection string args for mappings (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e120bd8b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e120bd8b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e120bd8b
Branch: refs/heads/master
Commit: e120bd8b0b8b74516651f3ae9e4e7d3a170b0d4d
Parents: 89dbf4e
Author: sergey <se...@apache.org>
Authored: Thu Dec 14 15:55:25 2017 -0800
Committer: sergey <se...@apache.org>
Committed: Thu Dec 14 15:55:25 2017 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 6 +-
.../org/apache/hive/jdbc/HiveConnection.java | 5 +
jdbc/src/java/org/apache/hive/jdbc/Utils.java | 1 +
.../apache/hadoop/hive/ql/exec/tez/TezTask.java | 10 +-
.../hive/ql/exec/tez/UserPoolMapping.java | 38 +++++-
.../hive/ql/exec/tez/WorkloadManager.java | 30 +++--
.../hive/ql/exec/tez/TestWorkloadManager.java | 117 ++++++++++++-------
7 files changed, 147 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/e120bd8b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 7a81612..711dfbd 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2435,9 +2435,13 @@ public class HiveConf extends Configuration {
HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE("hive.server2.tez.interactive.queue", "",
"A single YARN queues to use for Hive Interactive sessions. When this is specified,\n" +
"workload management is enabled and used for these sessions."),
- HIVE_SERVER2_TEZ_WM_WORKER_THREADS("hive.server2.tez.wm.worker.threads", 4,
+ HIVE_SERVER2_WM_WORKER_THREADS("hive.server2.wm.worker.threads", 4,
"Number of worker threads to use to perform the synchronous operations with Tez\n" +
"sessions for workload management (e.g. opening, closing, etc.)"),
+ HIVE_SERVER2_WM_ALLOW_ANY_POOL_VIA_JDBC("hive.server2.wm.allow.any.pool.via.jdbc", false,
+ "Applies when a user specifies a target WM pool in the JDBC connection string. If\n" +
+ "false, the user can only specify a pool he is mapped to (e.g. make a choice among\n" +
+ "multiple group mappings); if true, the user can specify any existing pool."),
HIVE_SERVER2_TEZ_WM_AM_REGISTRY_TIMEOUT("hive.server2.tez.wm.am.registry.timeout", "30s",
new TimeValidator(TimeUnit.SECONDS),
"The timeout for AM registry registration, after which (on attempting to use the\n" +
http://git-wip-us.apache.org/repos/asf/hive/blob/e120bd8b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
index fc937e6..45acf13 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
@@ -138,6 +138,7 @@ public class HiveConnection implements java.sql.Connection {
private TProtocolVersion protocol;
private int fetchSize = HiveStatement.DEFAULT_FETCH_SIZE;
private String initFile = null;
+ private String wmPool = null;
private Properties clientInfo;
/**
@@ -178,6 +179,7 @@ public class HiveConnection implements java.sql.Connection {
if (sessConfMap.containsKey(JdbcConnectionParams.INIT_FILE)) {
initFile = sessConfMap.get(JdbcConnectionParams.INIT_FILE);
}
+ wmPool = sessConfMap.get(JdbcConnectionParams.WM_POOL);
// add supported protocols
supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1);
@@ -680,6 +682,9 @@ public class HiveConnection implements java.sql.Connection {
// set the fetchSize
openConf.put("set:hiveconf:hive.server2.thrift.resultset.default.fetch.size",
Integer.toString(fetchSize));
+ if (wmPool != null) {
+ openConf.put("set:hivevar:wmpool", wmPool);
+ }
// set the session configuration
Map<String, String> sessVars = connParams.getSessionVars();
http://git-wip-us.apache.org/repos/asf/hive/blob/e120bd8b/jdbc/src/java/org/apache/hive/jdbc/Utils.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/Utils.java b/jdbc/src/java/org/apache/hive/jdbc/Utils.java
index 855de88..bb13682 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/Utils.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/Utils.java
@@ -123,6 +123,7 @@ public class Utils {
// Set the fetchSize
static final String FETCH_SIZE = "fetchSize";
static final String INIT_FILE = "initFile";
+ static final String WM_POOL = "wmPool";
// --------------- Begin 2 way ssl options -------------------------
// Use two way ssl. This param will take effect only when ssl=true
http://git-wip-us.apache.org/repos/asf/hive/blob/e120bd8b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 27799a8..1a24b44 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -156,9 +156,13 @@ public class TezTask extends Task<TezWork> {
// based on Hadoop configuration, as documented at
// https://hadoop.apache.org/docs/r2.8.0/hadoop-project-dist/hadoop-common/GroupsMapping.html
String userName = ss.getUserName();
- MappingInput mi = (userName == null) ? new MappingInput("anonymous", null)
- : new MappingInput(ss.getUserName(),
- UserGroupInformation.createRemoteUser(ss.getUserName()).getGroups());
+ List<String> groups = null;
+ if (userName == null) {
+ userName = "anonymous";
+ } else {
+ groups = UserGroupInformation.createRemoteUser(ss.getUserName()).getGroups();
+ }
+ MappingInput mi = new MappingInput(userName, groups, ss.getHiveVariables().get("wmpool"));
WmContext wmContext = ctx.getWmContext();
// jobConf will hold all the configuration for hadoop, tez, and hive
http://git-wip-us.apache.org/repos/asf/hive/blob/e120bd8b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
index 33ee8f7..5919f3f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.Set;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -54,13 +57,25 @@ class UserPoolMapping {
/** Contains all the information necessary to map a query to a pool. */
public static final class MappingInput {
- private final String userName;
+ private final String userName, wmPool;
private final List<String> groups;
// TODO: we may add app name, etc. later
- public MappingInput(String userName, List<String> groups) {
+ public MappingInput(String userName, List<String> groups, String wmPool) {
this.userName = userName;
this.groups = groups;
+ this.wmPool = wmPool;
+ }
+
+ // TODO: move these into tests when there are fewer conflicting patches pending.
+ @VisibleForTesting
+ public MappingInput(String userName) {
+ this(userName, null);
+ }
+
+ @VisibleForTesting
+ public MappingInput(String userName, List<String> groups) {
+ this(userName, groups, null);
}
public List<String> getGroups() {
@@ -73,7 +88,7 @@ class UserPoolMapping {
@Override
public String toString() {
- return getUserName() + "; groups " + groups;
+ return "{" + getUserName() + "; pool " + wmPool + "; groups " + groups + "}";
}
}
@@ -107,17 +122,32 @@ class UserPoolMapping {
}
}
- public String mapSessionToPoolName(MappingInput input) {
+ public String mapSessionToPoolName(MappingInput input, boolean allowAnyPool, Set<String> pools) {
+ if (allowAnyPool && input.wmPool != null) {
+ return (pools == null || pools.contains(input.wmPool)) ? input.wmPool : null;
+ }
// For equal-priority rules, user rules come first because they are more specific (arbitrary).
Mapping mapping = userMappings.get(input.getUserName());
+ boolean isExplicitMatch = false;
+ if (mapping != null) {
+ isExplicitMatch = isExplicitPoolMatch(input, mapping);
+ if (isExplicitMatch) return mapping.fullPoolName;
+ }
for (String group : input.getGroups()) {
Mapping candidate = groupMappings.get(group);
if (candidate == null) continue;
+ isExplicitMatch = isExplicitPoolMatch(input, candidate);
+ if (isExplicitMatch) return candidate.fullPoolName;
if (mapping == null || candidate.priority < mapping.priority) {
mapping = candidate;
}
}
+ if (input.wmPool != null && !isExplicitMatch) return null;
if (mapping != null) return mapping.fullPoolName;
return defaultPoolPath;
}
+
+ private static boolean isExplicitPoolMatch(MappingInput input, Mapping mapping) {
+ return input.wmPool != null && input.wmPool.equals(mapping.fullPoolName);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e120bd8b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index 1f4843d..f0481fc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -103,6 +103,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
private final QueryAllocationManager allocationManager;
private final String yarnQueue;
private final int amRegistryTimeoutMs;
+ private final boolean allowAnyPool;
// Note: it's not clear that we need to track this - unlike PoolManager we don't have non-pool
// sessions, so the pool itself could internally track the sessions it gave out, since
// calling close on an unopened session is probably harmless.
@@ -204,11 +205,14 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
// Only creates the expiration tracker if expiration is configured.
expirationTracker = SessionExpirationTracker.create(conf, this);
- workPool = Executors.newFixedThreadPool(HiveConf.getIntVar(conf, ConfVars.HIVE_SERVER2_TEZ_WM_WORKER_THREADS),
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Workload management worker %d").build());
+ workPool = Executors.newFixedThreadPool(HiveConf.getIntVar(
+ conf, ConfVars.HIVE_SERVER2_WM_WORKER_THREADS), new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("Workload management worker %d").build());
- timeoutPool = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("Workload management timeout thread").build());
+ timeoutPool = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
+ .setDaemon(true).setNameFormat("Workload management timeout thread").build());
+
+ allowAnyPool = HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_WM_ALLOW_ANY_POOL_VIA_JDBC);
wmThread = new Thread(() -> runWmThread(), "Workload management master");
wmThread.setDaemon(true);
@@ -1047,7 +1051,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
private void queueGetRequestOnMasterThread(
GetRequest req, HashSet<String> poolsToRedistribute, WmThreadSyncWork syncWork) {
- String poolName = userPoolMapping.mapSessionToPoolName(req.mappingInput);
+ String poolName = userPoolMapping.mapSessionToPoolName(
+ req.mappingInput, allowAnyPool, allowAnyPool ? pools.keySet() : null);
if (poolName == null) {
req.future.setException(new NoPoolMappingException(
"Cannot find any pool mapping for " + req.mappingInput));
@@ -1319,8 +1324,14 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
}
+ @VisibleForTesting
public WmTezSession getSession(
- TezSessionState session, MappingInput input, HiveConf conf, final WmContext wmContext) throws Exception {
+ TezSessionState session, MappingInput input, HiveConf conf) throws Exception {
+ return getSession(session, input, conf, null);
+ }
+
+ public WmTezSession getSession(TezSessionState session, MappingInput input, HiveConf conf,
+ final WmContext wmContext) throws Exception {
WmEvent wmEvent = new WmEvent(WmEvent.EventType.GET);
// Note: not actually used for pool sessions; verify some things like doAs are not set.
validateConfig(conf);
@@ -1936,8 +1947,11 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
boolean isManaged(MappingInput input) {
// This is always replaced atomically, so we don't care about concurrency here.
- if (userPoolMapping != null) {
- String mappedPool = userPoolMapping.mapSessionToPoolName(input);
+ UserPoolMapping mapping = userPoolMapping;
+ if (mapping != null) {
+ // Don't pass in the pool set - not thread safe; if the user is trying to force us to
+ // use a non-existent pool, we want to fail anyway. We will fail later during get.
+ String mappedPool = mapping.mapSessionToPoolName(input, allowAnyPool, null);
LOG.info("Mapping input: {} mapped to pool: {}", input, mappedPool);
return true;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e120bd8b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
index fc8f66a..98f5c58 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
@@ -32,7 +32,6 @@ import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.SettableFuture;
import java.lang.Thread.State;
@@ -86,7 +85,7 @@ public class TestWorkloadManager {
cdl.countDown();
}
try {
- session.set((WmTezSession) wm.getSession(old, new MappingInput(userName, null), conf, null));
+ session.set((WmTezSession) wm.getSession(old, new MappingInput(userName), conf));
} catch (Throwable e) {
error.compareAndSet(null, e);
}
@@ -227,17 +226,17 @@ public class TestWorkloadManager {
TezSessionState nonPool = mock(TezSessionState.class);
when(nonPool.getConf()).thenReturn(conf);
doNothing().when(nonPool).close(anyBoolean());
- TezSessionState session = wm.getSession(nonPool, new MappingInput("user", null), conf, null);
+ TezSessionState session = wm.getSession(nonPool, new MappingInput("user"), conf);
verify(nonPool).close(anyBoolean());
assertNotSame(nonPool, session);
session.returnToSessionManager();
TezSessionPoolSession diffPool = mock(TezSessionPoolSession.class);
when(diffPool.getConf()).thenReturn(conf);
doNothing().when(diffPool).returnToSessionManager();
- session = wm.getSession(diffPool, new MappingInput("user", null), conf, null);
+ session = wm.getSession(diffPool, new MappingInput("user"), conf);
verify(diffPool).returnToSessionManager();
assertNotSame(diffPool, session);
- TezSessionState session2 = wm.getSession(session, new MappingInput("user", null), conf, null);
+ TezSessionState session2 = wm.getSession(session, new MappingInput("user"), conf);
assertSame(session, session2);
}
@@ -249,11 +248,11 @@ public class TestWorkloadManager {
wm.start();
// The queue should be ignored.
conf.set(TezConfiguration.TEZ_QUEUE_NAME, "test2");
- TezSessionState session = wm.getSession(null, new MappingInput("user", null), conf, null);
+ TezSessionState session = wm.getSession(null, new MappingInput("user"), conf);
assertEquals("test", session.getQueueName());
assertEquals("test", conf.get(TezConfiguration.TEZ_QUEUE_NAME));
session.setQueueName("test2");
- session = wm.getSession(session, new MappingInput("user", null), conf, null);
+ session = wm.getSession(session, new MappingInput("user"), conf);
assertEquals("test", session.getQueueName());
}
@@ -269,7 +268,7 @@ public class TestWorkloadManager {
WorkloadManager wm = new WorkloadManagerForTest("test", conf, 1, qam);
wm.start();
WmTezSession session = (WmTezSession) wm.getSession(
- null, new MappingInput("user", null), conf, null);
+ null, new MappingInput("user"), conf);
assertEquals(1.0, session.getClusterFraction(), EPSILON);
qam.assertWasCalledAndReset();
WmTezSession session2 = (WmTezSession) session.reopen();
@@ -287,10 +286,10 @@ public class TestWorkloadManager {
MockQam qam = new MockQam();
WorkloadManager wm = new WorkloadManagerForTest("test", conf, 2, qam);
wm.start();
- WmTezSession session = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf, null);
+ WmTezSession session = (WmTezSession) wm.getSession(null, new MappingInput("user"), conf);
assertEquals(1.0, session.getClusterFraction(), EPSILON);
qam.assertWasCalledAndReset();
- WmTezSession session2 = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf, null);
+ WmTezSession session2 = (WmTezSession) wm.getSession(null, new MappingInput("user"), conf);
assertEquals(0.5, session.getClusterFraction(), EPSILON);
assertEquals(0.5, session2.getClusterFraction(), EPSILON);
qam.assertWasCalledAndReset();
@@ -301,7 +300,7 @@ public class TestWorkloadManager {
qam.assertWasCalledAndReset();
// We never lose pool session, so we should still be able to get.
- session = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf, null);
+ session = (WmTezSession) wm.getSession(null, new MappingInput("user"), conf);
session.returnToSessionManager();
assertEquals(1.0, session2.getClusterFraction(), EPSILON);
assertEquals(0.0, session.getClusterFraction(), EPSILON);
@@ -322,20 +321,20 @@ public class TestWorkloadManager {
assertEquals(5, wm.getNumSessions());
// Get all the 5 sessions; validate cluster fractions.
WmTezSession session05of06 = (WmTezSession) wm.getSession(
- null, new MappingInput("p1", null), conf, null);
+ null, new MappingInput("p1"), conf);
assertEquals(0.3, session05of06.getClusterFraction(), EPSILON);
WmTezSession session03of06 = (WmTezSession) wm.getSession(
- null, new MappingInput("p2", null), conf, null);
+ null, new MappingInput("p2"), conf);
assertEquals(0.18, session03of06.getClusterFraction(), EPSILON);
WmTezSession session03of06_2 = (WmTezSession) wm.getSession(
- null, new MappingInput("p2", null), conf, null);
+ null, new MappingInput("p2"), conf);
assertEquals(0.09, session03of06.getClusterFraction(), EPSILON);
assertEquals(0.09, session03of06_2.getClusterFraction(), EPSILON);
WmTezSession session02of06 = (WmTezSession) wm.getSession(
- null,new MappingInput("r1", null), conf, null);
+ null,new MappingInput("r1"), conf);
assertEquals(0.12, session02of06.getClusterFraction(), EPSILON);
WmTezSession session04 = (WmTezSession) wm.getSession(
- null, new MappingInput("r2", null), conf, null);
+ null, new MappingInput("r2"), conf);
assertEquals(0.4, session04.getClusterFraction(), EPSILON);
session05of06.returnToSessionManager();
session03of06.returnToSessionManager();
@@ -347,6 +346,7 @@ public class TestWorkloadManager {
@Test(timeout = 10000)
public void testMappings() throws Exception {
HiveConf conf = createConf();
+ conf.set(ConfVars.HIVE_SERVER2_WM_ALLOW_ANY_POOL_VIA_JDBC.varname, "false");
MockQam qam = new MockQam();
WMFullResourcePlan plan = new WMFullResourcePlan(plan(),
Lists.newArrayList(pool("u0"), pool("g0"), pool("g1"), pool("u2")));
@@ -363,6 +363,31 @@ public class TestWorkloadManager {
verifyMapping(wm, conf, new MappingInput("u0", groups("g0")), "u0");
verifyMapping(wm, conf, new MappingInput("u2", groups("g1")), "g1");
verifyMapping(wm, conf, new MappingInput("u2", groups("g0", "g1")), "g0");
+ // Check explicit pool specifications - valid cases where priority is changed.
+ verifyMapping(wm, conf, new MappingInput("u0", groups("g1"), "g1"), "g1");
+ verifyMapping(wm, conf, new MappingInput("u2", groups("g1"), "u2"), "u2");
+ verifyMapping(wm, conf, new MappingInput("zzz", groups("g0", "g1"), "g1"), "g1");
+ // Explicit pool specification - invalid - there's no mapping that matches.
+ try {
+ TezSessionState r = wm.getSession(
+ null, new MappingInput("u0", groups("g0", "g1"), "u2"), conf);
+ fail("Expected failure, but got " + r);
+ } catch (Exception ex) {
+ // Expected.
+ }
+ // Now allow the users to specify any pools.
+ conf.set(ConfVars.HIVE_SERVER2_WM_ALLOW_ANY_POOL_VIA_JDBC.varname, "true");
+ wm = new WorkloadManagerForTest("test", conf, qam, plan);
+ wm.start();
+ verifyMapping(wm, conf, new MappingInput("u0", groups("g0", "g1"), "u2"), "u2");
+ // The mapping that doesn't exist still shouldn't work.
+ try {
+ TezSessionState r = wm.getSession(
+ null, new MappingInput("u0", groups("g0", "g1"), "zzz"), conf);
+ fail("Expected failure, but got " + r);
+ } catch (Exception ex) {
+ // Expected.
+ }
}
private static void verifyMapping(
@@ -372,6 +397,9 @@ public class TestWorkloadManager {
session.returnToSessionManager();
}
+
+
+
@Test(timeout=10000)
public void testQueueing() throws Exception {
final HiveConf conf = createConf();
@@ -381,9 +409,9 @@ public class TestWorkloadManager {
plan.setMappings(Lists.newArrayList(mapping("A", "A"), mapping("B", "B")));
final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
wm.start();
- WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null),
- sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null),
- sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf, null);
+ WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf),
+ sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf),
+ sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B"), conf);
final AtomicReference<WmTezSession> sessionA3 = new AtomicReference<>(),
sessionA4 = new AtomicReference<>();
final AtomicReference<Throwable> error = new AtomicReference<>();
@@ -397,7 +425,7 @@ public class TestWorkloadManager {
assertNull(sessionA4.get());
checkError(error);
// While threads are blocked on A, we should still be able to get and return a B session.
- WmTezSession sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf, null);
+ WmTezSession sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B"), conf);
sessionB1.returnToSessionManager();
sessionB2.returnToSessionManager();
assertNull(sessionA3.get());
@@ -425,8 +453,8 @@ public class TestWorkloadManager {
plan.getPlan().setDefaultPoolPath("A");
final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
wm.start();
- WmTezSession session1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null),
- session2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null);
+ WmTezSession session1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf),
+ session2 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
assertEquals(0.5, session1.getClusterFraction(), EPSILON);
assertEquals(0.5, session2.getClusterFraction(), EPSILON);
qam.assertWasCalledAndReset();
@@ -448,19 +476,19 @@ public class TestWorkloadManager {
final WorkloadManager wm = new WorkloadManagerForTest("test", conf, 2, qam);
wm.start();
WmTezSession session1 = (WmTezSession) wm.getSession(
- null, new MappingInput("user", null), conf, null);
+ null, new MappingInput("user"), conf);
// First, try to reuse from the same pool - should "just work".
WmTezSession session1a = (WmTezSession) wm.getSession(
- session1, new MappingInput("user", null), conf, null);
+ session1, new MappingInput("user"), conf);
assertSame(session1, session1a);
assertEquals(1.0, session1.getClusterFraction(), EPSILON);
// Should still be able to get the 2nd session.
WmTezSession session2 = (WmTezSession) wm.getSession(
- null, new MappingInput("user", null), conf, null);
+ null, new MappingInput("user"), conf);
// Now try to reuse with no other sessions remaining. Should still work.
WmTezSession session2a = (WmTezSession) wm.getSession(
- session2, new MappingInput("user", null), conf, null);
+ session2, new MappingInput("user"), conf);
assertSame(session2, session2a);
assertEquals(0.5, session1.getClusterFraction(), EPSILON);
assertEquals(0.5, session2.getClusterFraction(), EPSILON);
@@ -517,19 +545,19 @@ public class TestWorkloadManager {
plan.setMappings(Lists.newArrayList(mapping("A", "A"), mapping("B", "B")));
final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
wm.start();
- WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null),
- sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null);
+ WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf),
+ sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
assertEquals("A", sessionA1.getPoolName());
assertEquals(0.3f, sessionA1.getClusterFraction(), EPSILON);
assertEquals("A", sessionA2.getPoolName());
assertEquals(0.3f, sessionA2.getClusterFraction(), EPSILON);
- WmTezSession sessionB1 = (WmTezSession) wm.getSession(sessionA1, new MappingInput("B", null), conf, null);
+ WmTezSession sessionB1 = (WmTezSession) wm.getSession(sessionA1, new MappingInput("B"), conf);
assertSame(sessionA1, sessionB1);
assertEquals("B", sessionB1.getPoolName());
assertEquals(0.4f, sessionB1.getClusterFraction(), EPSILON);
assertEquals(0.6f, sessionA2.getClusterFraction(), EPSILON); // A1 removed from A.
// Make sure that we can still get a session from A.
- WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null);
+ WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
assertEquals("A", sessionA3.getPoolName());
assertEquals(0.3f, sessionA3.getClusterFraction(), EPSILON);
assertEquals(0.3f, sessionA3.getClusterFraction(), EPSILON);
@@ -549,7 +577,7 @@ public class TestWorkloadManager {
wm.start();
// One session will be running, the other will be queued in "A"
- WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("U", null), conf, null);
+ WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("U"), conf);
assertEquals("A", sessionA1.getPoolName());
assertEquals(0.5f, sessionA1.getClusterFraction(), EPSILON);
final AtomicReference<WmTezSession> sessionA2 = new AtomicReference<>();
@@ -574,7 +602,7 @@ public class TestWorkloadManager {
assertEquals(0.4f, sessionA2.get().getClusterFraction(), EPSILON);
// The new session will also go to B now.
sessionA2.get().returnToSessionManager();
- WmTezSession sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("U", null), conf, null);
+ WmTezSession sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("U"), conf);
assertEquals("B", sessionB1.getPoolName());
assertEquals(0.4f, sessionB1.getClusterFraction(), EPSILON);
sessionA1.returnToSessionManager();
@@ -598,11 +626,11 @@ public class TestWorkloadManager {
// A: 1/1 running, 1 queued; B: 2/2 running, C: 1/2 running, D: 1/1 running, 1 queued.
// Total: 5/6 running.
- WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null),
- sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf, null),
- sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf, null),
- sessionC1 = (WmTezSession) wm.getSession(null, new MappingInput("C", null), conf, null),
- sessionD1 = (WmTezSession) wm.getSession(null, new MappingInput("D", null), conf, null);
+ WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf),
+ sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B"), conf),
+ sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B"), conf),
+ sessionC1 = (WmTezSession) wm.getSession(null, new MappingInput("C"), conf),
+ sessionD1 = (WmTezSession) wm.getSession(null, new MappingInput("D"), conf);
final AtomicReference<WmTezSession> sessionA2 = new AtomicReference<>(),
sessionD2 = new AtomicReference<>();
final AtomicReference<Throwable> error = new AtomicReference<>();
@@ -738,7 +766,7 @@ public class TestWorkloadManager {
final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
wm.start();
- WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null);
+ WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
// [A: 1, B: 0]
Map<String, SessionTriggerProvider> allSessionProviders = wm.getAllSessionTriggerProviders();
@@ -762,7 +790,7 @@ public class TestWorkloadManager {
assertEquals(0.4f, sessionA1.getClusterFraction(), EPSILON);
assertEquals("B", sessionA1.getPoolName());
- WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null);
+ WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
// [A: 1, B: 1]
allSessionProviders = wm.getAllSessionTriggerProviders();
assertEquals(1, allSessionProviders.get("A").getSessions().size());
@@ -789,7 +817,7 @@ public class TestWorkloadManager {
assertEquals("B", sessionA2.getPoolName());
assertEquals("B", sessionA1.getPoolName());
- WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null);
+ WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
// [A: 1, B: 2]
allSessionProviders = wm.getAllSessionTriggerProviders();
assertEquals(1, allSessionProviders.get("A").getSessions().size());
@@ -829,7 +857,7 @@ public class TestWorkloadManager {
final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
wm.start();
- WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null);
+ WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
// [A: 1, B: 0, B.x: 0, B.y: 0, C: 0]
Map<String, SessionTriggerProvider> allSessionProviders = wm.getAllSessionTriggerProviders();
@@ -887,7 +915,8 @@ public class TestWorkloadManager {
assertTrue(allSessionProviders.get("B.x").getSessions().contains(sessionA1));
assertEquals("B.x", sessionA1.getPoolName());
- WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null);
+ WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
+
// [A: 1, B: 0, B.x: 1, B.y: 0, C: 0]
allSessionProviders = wm.getAllSessionTriggerProviders();
assertEquals(1, allSessionProviders.get("A").getSessions().size());
@@ -986,7 +1015,7 @@ public class TestWorkloadManager {
failedWait.setException(new Exception("foo"));
theOnlySession.setWaitForAmRegistryFuture(failedWait);
try {
- TezSessionState r = wm.getSession(null, new MappingInput("A", null), conf, null);
+ TezSessionState r = wm.getSession(null, new MappingInput("A"), conf);
fail("Expected an error but got " + r);
} catch (Exception ex) {
// Expected.
@@ -1037,7 +1066,7 @@ public class TestWorkloadManager {
assertEquals(0f, oldSession.getClusterFraction(), EPSILON);
pool.returnSession(theOnlySession);
// Make sure we can actually get a session still - parallelism/etc. should not be affected.
- WmTezSession result = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null);
+ WmTezSession result = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
assertEquals(sessionPoolName, result.getPoolName());
assertEquals(1f, result.getClusterFraction(), EPSILON);
result.returnToSessionManager();