You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ss...@apache.org on 2015/07/28 23:58:20 UTC
hive git commit: HIVE-11393. LLAP: Fix API usage to work with
evolving Tez APIs - TEZ-{2651, 2652, 2653}. (Siddharth Seth)
Repository: hive
Updated Branches:
refs/heads/llap 6bdb903e4 -> ef454511d
HIVE-11393. LLAP: Fix API usage to work with evolving Tez APIs - TEZ-{2651,2652,2653}. (Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ef454511
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ef454511
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ef454511
Branch: refs/heads/llap
Commit: ef454511dd8614de0a5d30466d9a7c6bb2c3b10b
Parents: 6bdb903
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Jul 28 14:57:52 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue Jul 28 14:57:52 2015 -0700
----------------------------------------------------------------------
.../llap/tezplugins/LlapContainerLauncher.java | 2 +-
.../llap/tezplugins/LlapTaskCommunicator.java | 37 +++++++-------
.../dag/app/rm/LlapTaskSchedulerService.java | 9 +++-
.../app/rm/TestLlapTaskSchedulerService.java | 13 ++++-
.../hadoop/hive/ql/exec/tez/DagUtils.java | 39 ++++++++-------
.../hive/ql/exec/tez/TezSessionState.java | 51 +++++++++-----------
6 files changed, 82 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/ef454511/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java
index 3f1f58f..07703a2 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java
@@ -25,7 +25,7 @@ public class LlapContainerLauncher extends ContainerLauncher {
private static final Logger LOG = LoggerFactory.getLogger(LlapContainerLauncher.class);
public LlapContainerLauncher(ContainerLauncherContext containerLauncherContext) {
- super(LlapContainerLauncher.class.getName(), containerLauncherContext);
+ super(containerLauncherContext);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/ef454511/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index dc06c97..44fd7e3 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -106,12 +106,13 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
BASE_SUBMIT_WORK_REQUEST = baseBuilder.build();
credentialMap = new ConcurrentHashMap<>();
- sourceStateTracker = new SourceStateTracker(getTaskCommunicatorContext(), this);
+ sourceStateTracker = new SourceStateTracker(getContext(), this);
}
@Override
- public void serviceInit(Configuration conf) throws Exception {
- super.serviceInit(conf);
+ public void initialize() throws Exception {
+ super.initialize();
+ Configuration conf = getConf();
int numThreads = conf.getInt(LlapConfiguration.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS,
LlapConfiguration.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS_DEFAULT);
this.communicator = new TaskCommunicator(numThreads, conf);
@@ -124,14 +125,14 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
}
@Override
- public void serviceStart() {
- super.serviceStart();
+ public void start() {
+ super.start();
this.communicator.start();
}
@Override
- public void serviceStop() {
- super.serviceStop();
+ public void shutdown() {
+ super.shutdown();
if (this.communicator != null) {
this.communicator.stop();
}
@@ -139,7 +140,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
@Override
protected void startRpcServer() {
- Configuration conf = getConfig();
+ Configuration conf = getConf();
try {
JobTokenSecretManager jobTokenSecretManager =
new JobTokenSecretManager();
@@ -232,7 +233,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
// Have to register this up front right now. Otherwise, it's possible for the task to start
// sending out status/DONE/KILLED/FAILED messages before TAImpl knows how to handle them.
- getTaskCommunicatorContext()
+ getContext()
.taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId);
communicator.sendSubmitWork(requestProto, host, port,
new TaskCommunicator.ExecuteRequestCallback<SubmitWorkResponseProto>() {
@@ -255,14 +256,14 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
LOG.info(
"Unable to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " +
containerId + ", Service Busy");
- getTaskCommunicatorContext().taskKilled(taskSpec.getTaskAttemptID(),
+ getContext().taskKilled(taskSpec.getTaskAttemptID(),
TaskAttemptEndReason.SERVICE_BUSY, "Service Busy");
} else {
// All others from the remote service cause the task to FAIL.
LOG.info(
"Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " +
containerId, t);
- getTaskCommunicatorContext()
+ getContext()
.taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER,
t.toString());
}
@@ -272,14 +273,14 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
LOG.info(
"Unable to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " +
containerId + ", Communication Error");
- getTaskCommunicatorContext().taskKilled(taskSpec.getTaskAttemptID(),
+ getContext().taskKilled(taskSpec.getTaskAttemptID(),
TaskAttemptEndReason.COMMUNICATION_ERROR, "Communication Error");
} else {
// Anything else is a FAIL.
LOG.info(
"Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " +
containerId, t);
- getTaskCommunicatorContext()
+ getContext()
.taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER,
t.getMessage());
}
@@ -406,11 +407,11 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
builder.setAmPort(getAddress().getPort());
Credentials taskCredentials = new Credentials();
// Credentials can change across DAGs. Ideally construct only once per DAG.
- taskCredentials.addAll(getTaskCommunicatorContext().getCredentials());
+ taskCredentials.addAll(getContext().getCredentials());
ByteBuffer credentialsBinary = credentialMap.get(taskSpec.getDAGName());
if (credentialsBinary == null) {
- credentialsBinary = serializeCredentials(getTaskCommunicatorContext().getCredentials());
+ credentialsBinary = serializeCredentials(getContext().getCredentials());
credentialMap.putIfAbsent(taskSpec.getDAGName(), credentialsBinary.duplicate());
} else {
credentialsBinary = credentialsBinary.duplicate();
@@ -459,7 +460,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
@Override
public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException {
// TODO Unregister the task for state updates, which could in turn unregister the node.
- getTaskCommunicatorContext().taskKilled(taskAttemptId,
+ getContext().taskKilled(taskAttemptId,
TaskAttemptEndReason.EXTERNAL_PREEMPTION, "Attempt preempted");
entityTracker.unregisterTaskAttempt(taskAttemptId);
}
@@ -598,8 +599,8 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
if (biMap != null) {
synchronized(biMap) {
for (Map.Entry<ContainerId, TezTaskAttemptID> entry : biMap.entrySet()) {
- getTaskCommunicatorContext().taskAlive(entry.getValue());
- getTaskCommunicatorContext().containerAlive(entry.getKey());
+ getContext().taskAlive(entry.getValue());
+ getContext().containerAlive(entry.getKey());
}
}
} else {
http://git-wip-us.apache.org/repos/asf/hive/blob/ef454511/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
index b6ee3d8..f31c6a5 100644
--- a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
+++ b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
@@ -68,6 +68,8 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.serviceplugins.api.TaskScheduler;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
@@ -159,7 +161,12 @@ public class LlapTaskSchedulerService extends TaskScheduler {
public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock clock) {
super(taskSchedulerContext);
this.clock = clock;
- this.conf = taskSchedulerContext.getInitialConfiguration();
+ try {
+ this.conf = TezUtils.createConfFromUserPayload(taskSchedulerContext.getInitialUserPayload());
+ } catch (IOException e) {
+ throw new TezUncheckedException(
+ "Failed to parse user payload for " + LlapTaskSchedulerService.class.getSimpleName(), e);
+ }
this.containerFactory = new ContainerFactory(taskSchedulerContext.getApplicationAttemptId(),
taskSchedulerContext.getCustomClusterIdentifier());
this.memoryPerInstance =
http://git-wip-us.apache.org/repos/asf/hive/blob/ef454511/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
index 245c140..3737e55 100644
--- a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
+++ b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
@@ -43,6 +43,8 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.app.ControlledClock;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
@@ -301,7 +303,8 @@ public class TestLlapTaskSchedulerService {
doReturn(appAttemptId).when(mockAppCallback).getApplicationAttemptId();
doReturn(11111l).when(mockAppCallback).getCustomClusterIdentifier();
- doReturn(conf).when(mockAppCallback).getInitialConfiguration();
+ UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
+ doReturn(userPayload).when(mockAppCallback).getInitialUserPayload();
ts = new LlapTaskSchedulerServiceForTest(mockAppCallback, clock);
@@ -362,7 +365,13 @@ public class TestLlapTaskSchedulerService {
public LlapTaskSchedulerServiceForTest(
TaskSchedulerContext appClient, Clock clock) {
super(appClient, clock);
- this.inTest = appClient.getInitialConfiguration().getBoolean(LLAP_TASK_SCHEDULER_IN_TEST, false);
+ Configuration conf;
+ try {
+ conf = TezUtils.createConfFromUserPayload(appClient.getInitialUserPayload());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ this.inTest = conf.getBoolean(LLAP_TASK_SCHEDULER_IN_TEST, false);
}
protected void schedulePendingTasks() {
http://git-wip-us.apache.org/repos/asf/hive/blob/ef454511/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index 45b092c..bd69744 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -108,6 +108,7 @@ import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
import org.apache.tez.dag.api.VertexGroup;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
@@ -625,15 +626,13 @@ public class DagUtils {
procClassName = MergeFileTezProcessor.class.getName();
}
- String serviceName = findServiceName(mapWork);
+ VertexExecutionContext executionContext = createVertexExecutionContext(mapWork);
map = Vertex.create(mapWork.getName(), ProcessorDescriptor.create(procClassName)
- .setUserPayload(serializedConf), numTasks, getContainerResource(conf))
- .setConf(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, serviceName)
- .setConf(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, serviceName)
- .setConf(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, serviceName);
+ .setUserPayload(serializedConf), numTasks, getContainerResource(conf));
map.setTaskEnvironment(getContainerEnvironment(conf, true));
+ map.setExecutionContext(executionContext);
map.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
assert mapWork.getAliasToWork().keySet().size() == 1;
@@ -671,11 +670,17 @@ public class DagUtils {
return conf;
}
- private String findServiceName(BaseWork work) {
- String serviceName = TezSessionState.DEFAULT_SERVICE;
- if (work.getLlapMode()) serviceName = TezSessionState.LLAP_SERVICE;
- if (work.getUberMode()) serviceName = TezSessionState.LOCAL_SERVICE;
- return serviceName;
+ private VertexExecutionContext createVertexExecutionContext(BaseWork work) {
+ VertexExecutionContext vertexExecutionContext = VertexExecutionContext.createExecuteInContainers(true);
+ if (work.getLlapMode()) {
+ vertexExecutionContext = VertexExecutionContext
+ .create(TezSessionState.LLAP_SERVICE, TezSessionState.LLAP_SERVICE,
+ TezSessionState.LLAP_SERVICE);
+ }
+ if (work.getUberMode()) {
+ vertexExecutionContext = VertexExecutionContext.createExecuteInAm(true);
+ }
+ return vertexExecutionContext;
}
/*
@@ -692,20 +697,18 @@ public class DagUtils {
// create the directories FileSinkOperators need
Utilities.createTmpDirs(conf, reduceWork);
- String serviceName = findServiceName(reduceWork);
+ VertexExecutionContext vertexExecutionContext = createVertexExecutionContext(reduceWork);
// create the vertex
Vertex reducer = Vertex.create(reduceWork.getName(),
ProcessorDescriptor.create(ReduceTezProcessor.class.getName()).
- setUserPayload(TezUtils.createUserPayloadFromConf(conf)),
- reduceWork.isAutoReduceParallelism()?
- reduceWork.getMaxReduceTasks():
- reduceWork.getNumReduceTasks(), getContainerResource(conf))
- .setConf(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, serviceName)
- .setConf(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, serviceName)
- .setConf(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, serviceName);
+ setUserPayload(TezUtils.createUserPayloadFromConf(conf)),
+ reduceWork.isAutoReduceParallelism() ?
+ reduceWork.getMaxReduceTasks() :
+ reduceWork.getNumReduceTasks(), getContainerResource(conf));
reducer.setTaskEnvironment(getContainerEnvironment(conf, false));
+ reducer.setExecutionContext(vertexExecutionContext);
reducer.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
http://git-wip-us.apache.org/repos/asf/hive/blob/ef454511/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index 34e8cc8..ac460b3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -24,10 +24,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -46,22 +44,23 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.optimizer.physical.LlapDecider.LlapMode;
import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.tez.client.TezClient;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.PreWarmVertex;
import org.apache.tez.dag.api.SessionNotRunning;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
-import org.apache.tez.dag.api.TezConstants;
-import org.apache.tez.dag.api.TezException;
+import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor;
+import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor;
+import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
/**
* Holds session state related to Tez
@@ -71,14 +70,9 @@ public class TezSessionState {
private static final Log LOG = LogFactory.getLog(TezSessionState.class.getName());
private static final String TEZ_DIR = "_tez_session_dir";
public static final String LLAP_SERVICE = "LLAP";
- public static final String DEFAULT_SERVICE = TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT;
- public static final String LOCAL_SERVICE = TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT;
private static final String LLAP_SCHEDULER = "org.apache.tez.dag.app.rm.LlapTaskSchedulerService";
private static final String LLAP_LAUNCHER = "org.apache.hadoop.hive.llap.tezplugins.LlapContainerLauncher";
private static final String LLAP_TASK_COMMUNICATOR = "org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator";
- private static final String LLAP_SERVICE_SCHEDULER = LLAP_SERVICE + ":" + LLAP_SCHEDULER;
- private static final String LLAP_SERVICE_LAUNCHER = LLAP_SERVICE + ":" + LLAP_LAUNCHER;
- private static final String LLAP_SERVICE_TASK_COMMUNICATOR = LLAP_SERVICE + ":" + LLAP_TASK_COMMUNICATOR;
private HiveConf conf;
private Path tezScratchDir;
@@ -212,25 +206,23 @@ public class TezSessionState {
// set up the staging directory to use
tezConfig.set(TezConfiguration.TEZ_AM_STAGING_DIR, tezScratchDir.toUri().toString());
+ ServicePluginsDescriptor servicePluginsDescriptor;
+ UserPayload servicePluginPayload = TezUtils.createUserPayloadFromConf(tezConfig);
+
if (llapMode) {
// we need plugins to handle llap and uber mode
- tezConfig.setStrings(TezConfiguration.TEZ_AM_TASK_SCHEDULERS, DEFAULT_SERVICE, LOCAL_SERVICE,
- LLAP_SERVICE_SCHEDULER);
-
- tezConfig.setStrings(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS, DEFAULT_SERVICE,
- LOCAL_SERVICE, LLAP_SERVICE_LAUNCHER);
-
- tezConfig.setStrings(TezConfiguration.TEZ_AM_TASK_COMMUNICATORS, DEFAULT_SERVICE,
- LOCAL_SERVICE, LLAP_SERVICE_TASK_COMMUNICATOR);
+ servicePluginsDescriptor = ServicePluginsDescriptor.create(true,
+ new TaskSchedulerDescriptor[]{
+ TaskSchedulerDescriptor.create(LLAP_SERVICE, LLAP_SCHEDULER)
+ .setUserPayload(servicePluginPayload)},
+ new ContainerLauncherDescriptor[]{
+ ContainerLauncherDescriptor.create(LLAP_SERVICE, LLAP_LAUNCHER)},
+ new TaskCommunicatorDescriptor[]{
+ TaskCommunicatorDescriptor.create(LLAP_SERVICE, LLAP_TASK_COMMUNICATOR)
+ .setUserPayload(servicePluginPayload)});
} else {
// we need plugins to handle llap and uber mode
- tezConfig.setStrings(TezConfiguration.TEZ_AM_TASK_SCHEDULERS, DEFAULT_SERVICE, LOCAL_SERVICE);
-
- tezConfig.setStrings(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS, DEFAULT_SERVICE,
- LOCAL_SERVICE);
-
- tezConfig.setStrings(TezConfiguration.TEZ_AM_TASK_COMMUNICATORS, DEFAULT_SERVICE,
- LOCAL_SERVICE);
+ servicePluginsDescriptor = ServicePluginsDescriptor.create(true);
}
// container prewarming. tell the am how many containers we need
@@ -242,8 +234,9 @@ public class TezSessionState {
tezConfig.setInt(TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, n);
}
- session = TezClient.create("HIVE-" + sessionId, tezConfig, true,
- commonLocalResources, null);
+ session = TezClient.newBuilder("HIVE-" + sessionId, tezConfig).setIsSession(true)
+ .setLocalResources(commonLocalResources)
+ .setServicePluginDescriptor(servicePluginsDescriptor).build();
LOG.info("Opening new Tez Session (id: " + sessionId
+ ", scratch dir: " + tezScratchDir + ")");