You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/26 01:48:53 UTC
[6/7] tez git commit: TEZ-2708. Rename classes and variables post
TEZ-2003 changes. (sseth)
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
deleted file mode 100644
index 07d269d..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
+++ /dev/null
@@ -1,410 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.launcher;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.TezConstants;
-import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
-import org.apache.tez.serviceplugins.api.ContainerLauncher;
-import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
-import org.apache.tez.serviceplugins.api.ContainerStopRequest;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy;
-import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezUncheckedException;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-
-// TODO See what part of this lifecycle and state management can be simplified.
-// Ideally, no state - only sendStart / sendStop.
-
-// TODO Review this entire code and clean it up.
-
-/**
- * This class is responsible for launching of containers.
- */
-public class ContainerLauncherImpl extends ContainerLauncher {
-
- // TODO Ensure the same thread is used to launch / stop the same container. Or - ensure event ordering.
- static final Logger LOG = LoggerFactory.getLogger(ContainerLauncherImpl.class);
-
- private final ConcurrentHashMap<ContainerId, Container> containers =
- new ConcurrentHashMap<>();
- protected ThreadPoolExecutor launcherPool;
- protected static final int INITIAL_POOL_SIZE = 10;
- private final int limitOnPoolSize;
- private final Configuration conf;
- private Thread eventHandlingThread;
- protected BlockingQueue<ContainerOp> eventQueue = new LinkedBlockingQueue<>();
- private ContainerManagementProtocolProxy cmProxy;
- private AtomicBoolean serviceStopped = new AtomicBoolean(false);
-
- private Container getContainer(ContainerOp event) {
- ContainerId id = event.getBaseOperation().getContainerId();
- Container c = containers.get(id);
- if(c == null) {
- c = new Container(id,
- event.getBaseOperation().getNodeId().toString(), event.getBaseOperation().getContainerToken());
- Container old = containers.putIfAbsent(id, c);
- if(old != null) {
- c = old;
- }
- }
- return c;
- }
-
- private void removeContainerIfDone(ContainerId id) {
- Container c = containers.get(id);
- if(c != null && c.isCompletelyDone()) {
- containers.remove(id);
- }
- }
-
-
- private static enum ContainerState {
- PREP, FAILED, RUNNING, DONE, KILLED_BEFORE_LAUNCH
- }
-
- private class Container {
- private ContainerState state;
- // store enough information to be able to cleanup the container
- private ContainerId containerID;
- final private String containerMgrAddress;
- private Token containerToken;
-
- public Container(ContainerId containerID,
- String containerMgrAddress, Token containerToken) {
- this.state = ContainerState.PREP;
- this.containerMgrAddress = containerMgrAddress;
- this.containerID = containerID;
- this.containerToken = containerToken;
- }
-
- public synchronized boolean isCompletelyDone() {
- return state == ContainerState.DONE || state == ContainerState.FAILED;
- }
-
- @SuppressWarnings("unchecked")
- public synchronized void launch(ContainerLaunchRequest event) {
- LOG.info("Launching Container with Id: " + event.getContainerId());
- if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
- state = ContainerState.DONE;
- sendContainerLaunchFailedMsg(event.getContainerId(),
- "Container was killed before it was launched");
- return;
- }
-
- ContainerManagementProtocolProxyData proxy = null;
- try {
-
- proxy = getCMProxy(containerID, containerMgrAddress,
- containerToken);
-
- // Construct the actual Container
- ContainerLaunchContext containerLaunchContext =
- event.getContainerLaunchContext();
-
- // Now launch the actual container
- StartContainerRequest startRequest = Records
- .newRecord(StartContainerRequest.class);
- startRequest.setContainerToken(event.getContainerToken());
- startRequest.setContainerLaunchContext(containerLaunchContext);
-
- StartContainersResponse response =
- proxy.getContainerManagementProtocol().startContainers(
- StartContainersRequest.newInstance(
- Collections.singletonList(startRequest)));
- if (response.getFailedRequests() != null
- && !response.getFailedRequests().isEmpty()) {
- throw response.getFailedRequests().get(containerID).deSerialize();
- }
-
- // after launching, send launched event to task attempt to move
- // it from ASSIGNED to RUNNING state
- getContext().containerLaunched(containerID);
- this.state = ContainerState.RUNNING;
- } catch (Throwable t) {
- String message = "Container launch failed for " + containerID + " : "
- + ExceptionUtils.getStackTrace(t);
- this.state = ContainerState.FAILED;
- sendContainerLaunchFailedMsg(containerID, message);
- } finally {
- if (proxy != null) {
- cmProxy.mayBeCloseProxy(proxy);
- }
- }
- }
-
- @SuppressWarnings("unchecked")
- public synchronized void kill() {
-
- if(isCompletelyDone()) {
- return;
- }
- if(this.state == ContainerState.PREP) {
- this.state = ContainerState.KILLED_BEFORE_LAUNCH;
- } else {
- LOG.info("Sending a stop request to the NM for ContainerId: "
- + containerID);
-
- ContainerManagementProtocolProxyData proxy = null;
- try {
- proxy = getCMProxy(this.containerID, this.containerMgrAddress,
- this.containerToken);
-
- // kill the remote container if already launched
- StopContainersRequest stopRequest = Records
- .newRecord(StopContainersRequest.class);
- stopRequest.setContainerIds(Collections.singletonList(containerID));
-
- proxy.getContainerManagementProtocol().stopContainers(stopRequest);
-
- // If stopContainer returns without an error, assuming the stop made
- // it over to the NodeManager.
- getContext().containerStopRequested(containerID);
- } catch (Throwable t) {
-
- // ignore the cleanup failure
- String message = "cleanup failed for container "
- + this.containerID + " : "
- + ExceptionUtils.getStackTrace(t);
- getContext().containerStopFailed(containerID, message);
- LOG.warn(message);
- this.state = ContainerState.DONE;
- return;
- } finally {
- if (proxy != null) {
- cmProxy.mayBeCloseProxy(proxy);
- }
- }
- this.state = ContainerState.DONE;
- }
- }
- }
-
- public ContainerLauncherImpl(ContainerLauncherContext containerLauncherContext) {
- super(containerLauncherContext);
- try {
- this.conf = TezUtils.createConfFromUserPayload(containerLauncherContext.getInitialUserPayload());
- } catch (IOException e) {
- throw new TezUncheckedException(
- "Failed to parse user payload for " + ContainerLauncherImpl.class.getSimpleName(), e);
- }
- conf.setInt(
- CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
- 0);
- this.limitOnPoolSize = conf.getInt(
- TezConfiguration.TEZ_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT,
- TezConfiguration.TEZ_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT_DEFAULT);
- LOG.info("Upper limit on the thread pool size is " + this.limitOnPoolSize);
- }
-
- @Override
- public void start() {
- // pass a copy of config to ContainerManagementProtocolProxy until YARN-3497 is fixed
- cmProxy =
- new ContainerManagementProtocolProxy(conf);
-
- ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
- "ContainerLauncher #%d").setDaemon(true).build();
-
- // Start with a default core-pool size of 10 and change it dynamically.
- launcherPool = new ThreadPoolExecutor(INITIAL_POOL_SIZE,
- Integer.MAX_VALUE, 1, TimeUnit.HOURS,
- new LinkedBlockingQueue<Runnable>(),
- tf, new CustomizedRejectedExecutionHandler());
- eventHandlingThread = new Thread() {
- @Override
- public void run() {
- ContainerOp event = null;
- while (!Thread.currentThread().isInterrupted()) {
- try {
- event = eventQueue.take();
- } catch (InterruptedException e) {
- if(!serviceStopped.get()) {
- LOG.error("Returning, interrupted : " + e);
- }
- return;
- }
- int poolSize = launcherPool.getCorePoolSize();
-
- // See if we need up the pool size only if haven't reached the
- // maximum limit yet.
- if (poolSize != limitOnPoolSize) {
-
- // nodes where containers will run at *this* point of time. This is
- // *not* the cluster size and doesn't need to be.
- int numNodes =
- getContext().getNumNodes(TezConstants.getTezYarnServicePluginName());
- int idealPoolSize = Math.min(limitOnPoolSize, numNodes);
-
- if (poolSize < idealPoolSize) {
- // Bump up the pool size to idealPoolSize+INITIAL_POOL_SIZE, the
- // later is just a buffer so we are not always increasing the
- // pool-size
- int newPoolSize = Math.min(limitOnPoolSize, idealPoolSize
- + INITIAL_POOL_SIZE);
- LOG.info("Setting ContainerLauncher pool size to " + newPoolSize
- + " as number-of-nodes to talk to is " + numNodes);
- launcherPool.setCorePoolSize(newPoolSize);
- }
- }
-
- // the events from the queue are handled in parallel
- // using a thread pool
- launcherPool.execute(createEventProcessor(event));
-
- // TODO: Group launching of multiple containers to a single
- // NodeManager into a single connection
- }
- }
- };
- eventHandlingThread.setName("ContainerLauncher Event Handler");
- eventHandlingThread.start();
- }
-
- private void shutdownAllContainers() {
- for (Container ct : this.containers.values()) {
- if (ct != null) {
- ct.kill();
- }
- }
- }
-
- @Override
- public void shutdown() {
- if(!serviceStopped.compareAndSet(false, true)) {
- LOG.info("Ignoring multiple stops");
- return;
- }
- // shutdown any containers that might be left running
- shutdownAllContainers();
- if (eventHandlingThread != null) {
- eventHandlingThread.interrupt();
- }
- if (launcherPool != null) {
- launcherPool.shutdownNow();
- }
- }
-
- protected EventProcessor createEventProcessor(ContainerOp event) {
- return new EventProcessor(event);
- }
-
- protected ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData getCMProxy(
- ContainerId containerID, final String containerManagerBindAddr,
- Token containerToken) throws IOException {
- return cmProxy.getProxy(containerManagerBindAddr, containerID);
- }
-
- /**
- * Setup and start the container on remote nodemanager.
- */
- class EventProcessor implements Runnable {
- private ContainerOp event;
-
- EventProcessor(ContainerOp event) {
- this.event = event;
- }
-
- @Override
- public void run() {
- LOG.info("Processing operation {}", event.toString());
-
- // Load ContainerManager tokens before creating a connection.
- // TODO: Do it only once per NodeManager.
- ContainerId containerID = event.getBaseOperation().getContainerId();
-
- Container c = getContainer(event);
- switch(event.getOpType()) {
- case LAUNCH_REQUEST:
- ContainerLaunchRequest launchRequest = event.getLaunchRequest();
- c.launch(launchRequest);
- break;
- case STOP_REQUEST:
- c.kill();
- break;
- }
- removeContainerIfDone(containerID);
- }
- }
-
- /**
- * ThreadPoolExecutor.submit may fail if you are submitting task
- * when ThreadPoolExecutor is shutting down (DAGAppMaster is shutting down).
- * Use this CustomizedRejectedExecutionHandler to just logging rather than abort the application.
- */
- private static class CustomizedRejectedExecutionHandler implements RejectedExecutionHandler {
- @Override
- public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
- LOG.warn("Can't submit task to ThreadPoolExecutor:" + executor);
- }
- }
-
- @SuppressWarnings("unchecked")
- void sendContainerLaunchFailedMsg(ContainerId containerId,
- String message) {
- LOG.error(message);
- getContext().containerLaunchFailed(containerId, message);
- }
-
-
- @Override
- public void launchContainer(ContainerLaunchRequest launchRequest) {
- try {
- eventQueue.put(new ContainerOp(ContainerOp.OPType.LAUNCH_REQUEST, launchRequest));
- } catch (InterruptedException e) {
- throw new TezUncheckedException(e);
- }
- }
-
- @Override
- public void stopContainer(ContainerStopRequest stopRequest) {
- try {
- eventQueue.put(new ContainerOp(ContainerOp.OPType.STOP_REQUEST, stopRequest));
- } catch (InterruptedException e) {
- throw new TezUncheckedException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
new file mode 100644
index 0000000..15a10bd
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.launcher;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.net.UnknownHostException;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
+import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
+import org.apache.tez.serviceplugins.api.ContainerLauncher;
+import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
+import org.apache.tez.serviceplugins.api.ContainerStopRequest;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.ContainerLauncherContextImpl;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.rm.ContainerLauncherEvent;
+import org.apache.tez.dag.app.rm.ContainerLauncherLaunchRequestEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ContainerLauncherManager extends AbstractService
+ implements EventHandler<ContainerLauncherEvent> {
+
+ static final Logger LOG = LoggerFactory.getLogger(TezContainerLauncherImpl.class);
+
+ @VisibleForTesting
+ final ContainerLauncher containerLaunchers[];
+ @VisibleForTesting
+ final ContainerLauncherContext containerLauncherContexts[];
+ protected final ServicePluginLifecycleAbstractService[] containerLauncherServiceWrappers;
+ private final AppContext appContext;
+
+ @VisibleForTesting
+ public ContainerLauncherManager(ContainerLauncher containerLauncher, AppContext context) {
+ super(ContainerLauncherManager.class.getName());
+ this.appContext = context;
+ containerLaunchers = new ContainerLauncher[] {containerLauncher};
+ containerLauncherContexts = new ContainerLauncherContext[] {containerLauncher.getContext()};
+ containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[]{
+ new ServicePluginLifecycleAbstractService<>(containerLauncher)};
+ }
+
+ // Accepting conf to setup final parameters, if required.
+ public ContainerLauncherManager(AppContext context,
+ TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
+ String workingDirectory,
+ List<NamedEntityDescriptor> containerLauncherDescriptors,
+ boolean isPureLocalMode) {
+ super(ContainerLauncherManager.class.getName());
+
+ this.appContext = context;
+ Preconditions.checkArgument(
+ containerLauncherDescriptors != null && !containerLauncherDescriptors.isEmpty(),
+ "ContainerLauncherDescriptors must be specified");
+ containerLauncherContexts = new ContainerLauncherContext[containerLauncherDescriptors.size()];
+ containerLaunchers = new ContainerLauncher[containerLauncherDescriptors.size()];
+ containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[containerLauncherDescriptors.size()];
+
+
+ for (int i = 0; i < containerLauncherDescriptors.size(); i++) {
+ UserPayload userPayload = containerLauncherDescriptors.get(i).getUserPayload();
+ ContainerLauncherContext containerLauncherContext =
+ new ContainerLauncherContextImpl(context, taskCommunicatorManagerInterface, userPayload);
+ containerLauncherContexts[i] = containerLauncherContext;
+ containerLaunchers[i] = createContainerLauncher(containerLauncherDescriptors.get(i), context,
+ containerLauncherContext, taskCommunicatorManagerInterface, workingDirectory, i, isPureLocalMode);
+ containerLauncherServiceWrappers[i] = new ServicePluginLifecycleAbstractService<>(containerLaunchers[i]);
+ }
+ }
+
+ @VisibleForTesting
+ ContainerLauncher createContainerLauncher(
+ NamedEntityDescriptor containerLauncherDescriptor,
+ AppContext context,
+ ContainerLauncherContext containerLauncherContext,
+ TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
+ String workingDirectory,
+ int containerLauncherIndex,
+ boolean isPureLocalMode) {
+ if (containerLauncherDescriptor.getEntityName().equals(
+ TezConstants.getTezYarnServicePluginName())) {
+ return createYarnContainerLauncher(containerLauncherContext);
+ } else if (containerLauncherDescriptor.getEntityName()
+ .equals(TezConstants.getTezUberServicePluginName())) {
+ return createUberContainerLauncher(containerLauncherContext, context,
+ taskCommunicatorManagerInterface,
+ workingDirectory, isPureLocalMode);
+ } else {
+ return createCustomContainerLauncher(containerLauncherContext, containerLauncherDescriptor);
+ }
+ }
+
+ @VisibleForTesting
+ ContainerLauncher createYarnContainerLauncher(ContainerLauncherContext containerLauncherContext) {
+ LOG.info("Creating DefaultContainerLauncher");
+ return new TezContainerLauncherImpl(containerLauncherContext);
+ }
+
+ @VisibleForTesting
+ ContainerLauncher createUberContainerLauncher(ContainerLauncherContext containerLauncherContext,
+ AppContext context,
+ TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
+ String workingDirectory,
+ boolean isPureLocalMode) {
+ LOG.info("Creating LocalContainerLauncher");
+ // TODO Post TEZ-2003. LocalContainerLauncher is special cased, since it makes use of
+ // extensive internals which are only available at runtime. Will likely require
+ // some kind of runtime binding of parameters in the payload to work correctly.
+ try {
+ return
+ new LocalContainerLauncher(containerLauncherContext, context,
+ taskCommunicatorManagerInterface,
+ workingDirectory, isPureLocalMode);
+ } catch (UnknownHostException e) {
+ throw new TezUncheckedException(e);
+ }
+ }
+
+ @VisibleForTesting
+ @SuppressWarnings("unchecked")
+ ContainerLauncher createCustomContainerLauncher(ContainerLauncherContext containerLauncherContext,
+ NamedEntityDescriptor containerLauncherDescriptor) {
+ LOG.info("Creating container launcher {}:{} ", containerLauncherDescriptor.getEntityName(),
+ containerLauncherDescriptor.getClassName());
+ Class<? extends ContainerLauncher> containerLauncherClazz =
+ (Class<? extends ContainerLauncher>) ReflectionUtils.getClazz(
+ containerLauncherDescriptor.getClassName());
+ try {
+ Constructor<? extends ContainerLauncher> ctor = containerLauncherClazz
+ .getConstructor(ContainerLauncherContext.class);
+ return ctor.newInstance(containerLauncherContext);
+ } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
+ throw new TezUncheckedException(e);
+ }
+
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) {
+ for (int i = 0 ; i < containerLaunchers.length ; i++) {
+ containerLauncherServiceWrappers[i].init(conf);
+ }
+ }
+
+ @Override
+ public void serviceStart() {
+ for (int i = 0 ; i < containerLaunchers.length ; i++) {
+ containerLauncherServiceWrappers[i].start();
+ }
+ }
+
+ @Override
+ public void serviceStop() {
+ for (int i = 0 ; i < containerLaunchers.length ; i++) {
+ containerLauncherServiceWrappers[i].stop();
+ }
+ }
+
+ public void dagComplete(DAG dag) {
+ // Nothing required at the moment. Containers are shared across DAGs
+ }
+
+ public void dagSubmitted() {
+ // Nothing to do right now. Indicates that a new DAG has been submitted and
+ // the context has updated information.
+ }
+
+
+ @Override
+ public void handle(ContainerLauncherEvent event) {
+ int launcherId = event.getLauncherId();
+ String schedulerName = appContext.getTaskSchedulerName(event.getSchedulerId());
+ String taskCommName = appContext.getTaskCommunicatorName(event.getTaskCommId());
+ switch (event.getType()) {
+ case CONTAINER_LAUNCH_REQUEST:
+ ContainerLauncherLaunchRequestEvent launchEvent = (ContainerLauncherLaunchRequestEvent) event;
+ ContainerLaunchRequest launchRequest =
+ new ContainerLaunchRequest(launchEvent.getNodeId(), launchEvent.getContainerId(),
+ launchEvent.getContainerToken(), launchEvent.getContainerLaunchContext(),
+ launchEvent.getContainer(), schedulerName,
+ taskCommName);
+ containerLaunchers[launcherId].launchContainer(launchRequest);
+ break;
+ case CONTAINER_STOP_REQUEST:
+ ContainerStopRequest stopRequest =
+ new ContainerStopRequest(event.getNodeId(), event.getContainerId(),
+ event.getContainerToken(), schedulerName, taskCommName);
+ containerLaunchers[launcherId].stopContainer(stopRequest);
+ break;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
deleted file mode 100644
index b56bd5b..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.launcher;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.net.UnknownHostException;
-import java.util.List;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tez.common.ReflectionUtils;
-import org.apache.tez.dag.api.NamedEntityDescriptor;
-import org.apache.tez.dag.api.TezConstants;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
-import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
-import org.apache.tez.serviceplugins.api.ContainerLauncher;
-import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
-import org.apache.tez.serviceplugins.api.ContainerStopRequest;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.ContainerLauncherContextImpl;
-import org.apache.tez.dag.app.TaskAttemptListener;
-import org.apache.tez.dag.app.dag.DAG;
-import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
-import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ContainerLauncherRouter extends AbstractService
- implements EventHandler<NMCommunicatorEvent> {
-
- static final Logger LOG = LoggerFactory.getLogger(ContainerLauncherImpl.class);
-
- @VisibleForTesting
- final ContainerLauncher containerLaunchers[];
- @VisibleForTesting
- final ContainerLauncherContext containerLauncherContexts[];
- protected final ServicePluginLifecycleAbstractService[] containerLauncherServiceWrappers;
- private final AppContext appContext;
-
- @VisibleForTesting
- public ContainerLauncherRouter(ContainerLauncher containerLauncher, AppContext context) {
- super(ContainerLauncherRouter.class.getName());
- this.appContext = context;
- containerLaunchers = new ContainerLauncher[] {containerLauncher};
- containerLauncherContexts = new ContainerLauncherContext[] {containerLauncher.getContext()};
- containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[]{
- new ServicePluginLifecycleAbstractService<>(containerLauncher)};
- }
-
- // Accepting conf to setup final parameters, if required.
- public ContainerLauncherRouter(AppContext context,
- TaskAttemptListener taskAttemptListener,
- String workingDirectory,
- List<NamedEntityDescriptor> containerLauncherDescriptors,
- boolean isPureLocalMode) {
- super(ContainerLauncherRouter.class.getName());
-
- this.appContext = context;
- Preconditions.checkArgument(
- containerLauncherDescriptors != null && !containerLauncherDescriptors.isEmpty(),
- "ContainerLauncherDescriptors must be specified");
- containerLauncherContexts = new ContainerLauncherContext[containerLauncherDescriptors.size()];
- containerLaunchers = new ContainerLauncher[containerLauncherDescriptors.size()];
- containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[containerLauncherDescriptors.size()];
-
-
- for (int i = 0; i < containerLauncherDescriptors.size(); i++) {
- UserPayload userPayload = containerLauncherDescriptors.get(i).getUserPayload();
- ContainerLauncherContext containerLauncherContext =
- new ContainerLauncherContextImpl(context, taskAttemptListener, userPayload);
- containerLauncherContexts[i] = containerLauncherContext;
- containerLaunchers[i] = createContainerLauncher(containerLauncherDescriptors.get(i), context,
- containerLauncherContext, taskAttemptListener, workingDirectory, i, isPureLocalMode);
- containerLauncherServiceWrappers[i] = new ServicePluginLifecycleAbstractService<>(containerLaunchers[i]);
- }
- }
-
- @VisibleForTesting
- ContainerLauncher createContainerLauncher(
- NamedEntityDescriptor containerLauncherDescriptor,
- AppContext context,
- ContainerLauncherContext containerLauncherContext,
- TaskAttemptListener taskAttemptListener,
- String workingDirectory,
- int containerLauncherIndex,
- boolean isPureLocalMode) {
- if (containerLauncherDescriptor.getEntityName().equals(
- TezConstants.getTezYarnServicePluginName())) {
- return createYarnContainerLauncher(containerLauncherContext);
- } else if (containerLauncherDescriptor.getEntityName()
- .equals(TezConstants.getTezUberServicePluginName())) {
- return createUberContainerLauncher(containerLauncherContext, context, taskAttemptListener,
- workingDirectory, isPureLocalMode);
- } else {
- return createCustomContainerLauncher(containerLauncherContext, containerLauncherDescriptor);
- }
- }
-
- @VisibleForTesting
- ContainerLauncher createYarnContainerLauncher(ContainerLauncherContext containerLauncherContext) {
- LOG.info("Creating DefaultContainerLauncher");
- return new ContainerLauncherImpl(containerLauncherContext);
- }
-
- @VisibleForTesting
- ContainerLauncher createUberContainerLauncher(ContainerLauncherContext containerLauncherContext,
- AppContext context,
- TaskAttemptListener taskAttemptListener,
- String workingDirectory,
- boolean isPureLocalMode) {
- LOG.info("Creating LocalContainerLauncher");
- // TODO Post TEZ-2003. LocalContainerLauncher is special cased, since it makes use of
- // extensive internals which are only available at runtime. Will likely require
- // some kind of runtime binding of parameters in the payload to work correctly.
- try {
- return
- new LocalContainerLauncher(containerLauncherContext, context, taskAttemptListener,
- workingDirectory, isPureLocalMode);
- } catch (UnknownHostException e) {
- throw new TezUncheckedException(e);
- }
- }
-
- @VisibleForTesting
- @SuppressWarnings("unchecked")
- ContainerLauncher createCustomContainerLauncher(ContainerLauncherContext containerLauncherContext,
- NamedEntityDescriptor containerLauncherDescriptor) {
- LOG.info("Creating container launcher {}:{} ", containerLauncherDescriptor.getEntityName(),
- containerLauncherDescriptor.getClassName());
- Class<? extends ContainerLauncher> containerLauncherClazz =
- (Class<? extends ContainerLauncher>) ReflectionUtils.getClazz(
- containerLauncherDescriptor.getClassName());
- try {
- Constructor<? extends ContainerLauncher> ctor = containerLauncherClazz
- .getConstructor(ContainerLauncherContext.class);
- return ctor.newInstance(containerLauncherContext);
- } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
- throw new TezUncheckedException(e);
- }
-
- }
-
- @Override
- public void serviceInit(Configuration conf) {
- for (int i = 0 ; i < containerLaunchers.length ; i++) {
- containerLauncherServiceWrappers[i].init(conf);
- }
- }
-
- @Override
- public void serviceStart() {
- for (int i = 0 ; i < containerLaunchers.length ; i++) {
- containerLauncherServiceWrappers[i].start();
- }
- }
-
- @Override
- public void serviceStop() {
- for (int i = 0 ; i < containerLaunchers.length ; i++) {
- containerLauncherServiceWrappers[i].stop();
- }
- }
-
- public void dagComplete(DAG dag) {
- // Nothing required at the moment. Containers are shared across DAGs
- }
-
- public void dagSubmitted() {
- // Nothing to do right now. Indicates that a new DAG has been submitted and
- // the context has updated information.
- }
-
-
- @Override
- public void handle(NMCommunicatorEvent event) {
- int launcherId = event.getLauncherId();
- String schedulerName = appContext.getTaskSchedulerName(event.getSchedulerId());
- String taskCommName = appContext.getTaskCommunicatorName(event.getTaskCommId());
- switch (event.getType()) {
- case CONTAINER_LAUNCH_REQUEST:
- NMCommunicatorLaunchRequestEvent launchEvent = (NMCommunicatorLaunchRequestEvent) event;
- ContainerLaunchRequest launchRequest =
- new ContainerLaunchRequest(launchEvent.getNodeId(), launchEvent.getContainerId(),
- launchEvent.getContainerToken(), launchEvent.getContainerLaunchContext(),
- launchEvent.getContainer(), schedulerName,
- taskCommName);
- containerLaunchers[launcherId].launchContainer(launchRequest);
- break;
- case CONTAINER_STOP_REQUEST:
- ContainerStopRequest stopRequest =
- new ContainerStopRequest(event.getNodeId(), event.getContainerId(),
- event.getContainerToken(), schedulerName, taskCommName);
- containerLaunchers[launcherId].stopContainer(stopRequest);
- break;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index 1d3e6df..6cd6fce 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -63,7 +63,7 @@ import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.TezTaskCommunicatorImpl;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
@@ -83,7 +83,7 @@ public class LocalContainerLauncher extends ContainerLauncher {
private final AppContext context;
private final AtomicBoolean serviceStopped = new AtomicBoolean(false);
private final String workingDirectory;
- private final TaskAttemptListener tal;
+ private final TaskCommunicatorManagerInterface tal;
private final Map<String, String> localEnv;
private final ExecutionContext executionContext;
private final int numExecutors;
@@ -105,7 +105,7 @@ public class LocalContainerLauncher extends ContainerLauncher {
public LocalContainerLauncher(ContainerLauncherContext containerLauncherContext,
AppContext context,
- TaskAttemptListener taskAttemptListener,
+ TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
String workingDirectory,
boolean isPureLocalMode) throws UnknownHostException {
// TODO Post TEZ-2003. Most of this information is dynamic and only available after the AM
@@ -114,7 +114,7 @@ public class LocalContainerLauncher extends ContainerLauncher {
// after the AM starts up.
super(containerLauncherContext);
this.context = context;
- this.tal = taskAttemptListener;
+ this.tal = taskCommunicatorManagerInterface;
this.workingDirectory = workingDirectory;
this.isPureLocalMode = isPureLocalMode;
if (isPureLocalMode) {
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
new file mode 100644
index 0000000..3556c51
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
@@ -0,0 +1,410 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.launcher;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
+import org.apache.tez.serviceplugins.api.ContainerLauncher;
+import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
+import org.apache.tez.serviceplugins.api.ContainerStopRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy;
+import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+
+// TODO See what part of this lifecycle and state management can be simplified.
+// Ideally, no state - only sendStart / sendStop.
+
+// TODO Review this entire code and clean it up.
+
+/**
+ * This class is responsible for launching of containers.
+ */
+public class TezContainerLauncherImpl extends ContainerLauncher {
+
+ // TODO Ensure the same thread is used to launch / stop the same container. Or - ensure event ordering.
+ static final Logger LOG = LoggerFactory.getLogger(TezContainerLauncherImpl.class);
+
+ private final ConcurrentHashMap<ContainerId, Container> containers =
+ new ConcurrentHashMap<>();
+ protected ThreadPoolExecutor launcherPool;
+ protected static final int INITIAL_POOL_SIZE = 10;
+ private final int limitOnPoolSize;
+ private final Configuration conf;
+ private Thread eventHandlingThread;
+ protected BlockingQueue<ContainerOp> eventQueue = new LinkedBlockingQueue<>();
+ private ContainerManagementProtocolProxy cmProxy;
+ private AtomicBoolean serviceStopped = new AtomicBoolean(false);
+
+ private Container getContainer(ContainerOp event) {
+ ContainerId id = event.getBaseOperation().getContainerId();
+ Container c = containers.get(id);
+ if(c == null) {
+ c = new Container(id,
+ event.getBaseOperation().getNodeId().toString(), event.getBaseOperation().getContainerToken());
+ Container old = containers.putIfAbsent(id, c);
+ if(old != null) {
+ c = old;
+ }
+ }
+ return c;
+ }
+
+ private void removeContainerIfDone(ContainerId id) {
+ Container c = containers.get(id);
+ if(c != null && c.isCompletelyDone()) {
+ containers.remove(id);
+ }
+ }
+
+
+ private static enum ContainerState {
+ PREP, FAILED, RUNNING, DONE, KILLED_BEFORE_LAUNCH
+ }
+
+ private class Container {
+ private ContainerState state;
+ // store enough information to be able to cleanup the container
+ private ContainerId containerID;
+ final private String containerMgrAddress;
+ private Token containerToken;
+
+ public Container(ContainerId containerID,
+ String containerMgrAddress, Token containerToken) {
+ this.state = ContainerState.PREP;
+ this.containerMgrAddress = containerMgrAddress;
+ this.containerID = containerID;
+ this.containerToken = containerToken;
+ }
+
+ public synchronized boolean isCompletelyDone() {
+ return state == ContainerState.DONE || state == ContainerState.FAILED;
+ }
+
+ @SuppressWarnings("unchecked")
+ public synchronized void launch(ContainerLaunchRequest event) {
+ LOG.info("Launching Container with Id: " + event.getContainerId());
+ if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
+ state = ContainerState.DONE;
+ sendContainerLaunchFailedMsg(event.getContainerId(),
+ "Container was killed before it was launched");
+ return;
+ }
+
+ ContainerManagementProtocolProxyData proxy = null;
+ try {
+
+ proxy = getCMProxy(containerID, containerMgrAddress,
+ containerToken);
+
+ // Construct the actual Container
+ ContainerLaunchContext containerLaunchContext =
+ event.getContainerLaunchContext();
+
+ // Now launch the actual container
+ StartContainerRequest startRequest = Records
+ .newRecord(StartContainerRequest.class);
+ startRequest.setContainerToken(event.getContainerToken());
+ startRequest.setContainerLaunchContext(containerLaunchContext);
+
+ StartContainersResponse response =
+ proxy.getContainerManagementProtocol().startContainers(
+ StartContainersRequest.newInstance(
+ Collections.singletonList(startRequest)));
+ if (response.getFailedRequests() != null
+ && !response.getFailedRequests().isEmpty()) {
+ throw response.getFailedRequests().get(containerID).deSerialize();
+ }
+
+ // after launching, send launched event to task attempt to move
+ // it from ASSIGNED to RUNNING state
+ getContext().containerLaunched(containerID);
+ this.state = ContainerState.RUNNING;
+ } catch (Throwable t) {
+ String message = "Container launch failed for " + containerID + " : "
+ + ExceptionUtils.getStackTrace(t);
+ this.state = ContainerState.FAILED;
+ sendContainerLaunchFailedMsg(containerID, message);
+ } finally {
+ if (proxy != null) {
+ cmProxy.mayBeCloseProxy(proxy);
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public synchronized void kill() {
+
+ if(isCompletelyDone()) {
+ return;
+ }
+ if(this.state == ContainerState.PREP) {
+ this.state = ContainerState.KILLED_BEFORE_LAUNCH;
+ } else {
+ LOG.info("Sending a stop request to the NM for ContainerId: "
+ + containerID);
+
+ ContainerManagementProtocolProxyData proxy = null;
+ try {
+ proxy = getCMProxy(this.containerID, this.containerMgrAddress,
+ this.containerToken);
+
+ // kill the remote container if already launched
+ StopContainersRequest stopRequest = Records
+ .newRecord(StopContainersRequest.class);
+ stopRequest.setContainerIds(Collections.singletonList(containerID));
+
+ proxy.getContainerManagementProtocol().stopContainers(stopRequest);
+
+ // If stopContainer returns without an error, assuming the stop made
+ // it over to the NodeManager.
+ getContext().containerStopRequested(containerID);
+ } catch (Throwable t) {
+
+ // ignore the cleanup failure
+ String message = "cleanup failed for container "
+ + this.containerID + " : "
+ + ExceptionUtils.getStackTrace(t);
+ getContext().containerStopFailed(containerID, message);
+ LOG.warn(message);
+ this.state = ContainerState.DONE;
+ return;
+ } finally {
+ if (proxy != null) {
+ cmProxy.mayBeCloseProxy(proxy);
+ }
+ }
+ this.state = ContainerState.DONE;
+ }
+ }
+ }
+
+ public TezContainerLauncherImpl(ContainerLauncherContext containerLauncherContext) {
+ super(containerLauncherContext);
+ try {
+ this.conf = TezUtils.createConfFromUserPayload(containerLauncherContext.getInitialUserPayload());
+ } catch (IOException e) {
+ throw new TezUncheckedException(
+ "Failed to parse user payload for " + TezContainerLauncherImpl.class.getSimpleName(), e);
+ }
+ conf.setInt(
+ CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
+ 0);
+ this.limitOnPoolSize = conf.getInt(
+ TezConfiguration.TEZ_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT,
+ TezConfiguration.TEZ_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT_DEFAULT);
+ LOG.info("Upper limit on the thread pool size is " + this.limitOnPoolSize);
+ }
+
+ @Override
+ public void start() {
+ // pass a copy of config to ContainerManagementProtocolProxy until YARN-3497 is fixed
+ cmProxy =
+ new ContainerManagementProtocolProxy(conf);
+
+ ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
+ "ContainerLauncher #%d").setDaemon(true).build();
+
+ // Start with a default core-pool size of 10 and change it dynamically.
+ launcherPool = new ThreadPoolExecutor(INITIAL_POOL_SIZE,
+ Integer.MAX_VALUE, 1, TimeUnit.HOURS,
+ new LinkedBlockingQueue<Runnable>(),
+ tf, new CustomizedRejectedExecutionHandler());
+ eventHandlingThread = new Thread() {
+ @Override
+ public void run() {
+ ContainerOp event = null;
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ event = eventQueue.take();
+ } catch (InterruptedException e) {
+ if(!serviceStopped.get()) {
+ LOG.error("Returning, interrupted : " + e);
+ }
+ return;
+ }
+ int poolSize = launcherPool.getCorePoolSize();
+
+ // See if we need up the pool size only if haven't reached the
+ // maximum limit yet.
+ if (poolSize != limitOnPoolSize) {
+
+ // nodes where containers will run at *this* point of time. This is
+ // *not* the cluster size and doesn't need to be.
+ int numNodes =
+ getContext().getNumNodes(TezConstants.getTezYarnServicePluginName());
+ int idealPoolSize = Math.min(limitOnPoolSize, numNodes);
+
+ if (poolSize < idealPoolSize) {
+ // Bump up the pool size to idealPoolSize+INITIAL_POOL_SIZE, the
+ // later is just a buffer so we are not always increasing the
+ // pool-size
+ int newPoolSize = Math.min(limitOnPoolSize, idealPoolSize
+ + INITIAL_POOL_SIZE);
+ LOG.info("Setting ContainerLauncher pool size to " + newPoolSize
+ + " as number-of-nodes to talk to is " + numNodes);
+ launcherPool.setCorePoolSize(newPoolSize);
+ }
+ }
+
+ // the events from the queue are handled in parallel
+ // using a thread pool
+ launcherPool.execute(createEventProcessor(event));
+
+ // TODO: Group launching of multiple containers to a single
+ // NodeManager into a single connection
+ }
+ }
+ };
+ eventHandlingThread.setName("ContainerLauncher Event Handler");
+ eventHandlingThread.start();
+ }
+
+ private void shutdownAllContainers() {
+ for (Container ct : this.containers.values()) {
+ if (ct != null) {
+ ct.kill();
+ }
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ if(!serviceStopped.compareAndSet(false, true)) {
+ LOG.info("Ignoring multiple stops");
+ return;
+ }
+ // shutdown any containers that might be left running
+ shutdownAllContainers();
+ if (eventHandlingThread != null) {
+ eventHandlingThread.interrupt();
+ }
+ if (launcherPool != null) {
+ launcherPool.shutdownNow();
+ }
+ }
+
+ protected EventProcessor createEventProcessor(ContainerOp event) {
+ return new EventProcessor(event);
+ }
+
+ protected ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData getCMProxy(
+ ContainerId containerID, final String containerManagerBindAddr,
+ Token containerToken) throws IOException {
+ return cmProxy.getProxy(containerManagerBindAddr, containerID);
+ }
+
+ /**
+ * Setup and start the container on remote nodemanager.
+ */
+ class EventProcessor implements Runnable {
+ private ContainerOp event;
+
+ EventProcessor(ContainerOp event) {
+ this.event = event;
+ }
+
+ @Override
+ public void run() {
+ LOG.info("Processing operation {}", event.toString());
+
+ // Load ContainerManager tokens before creating a connection.
+ // TODO: Do it only once per NodeManager.
+ ContainerId containerID = event.getBaseOperation().getContainerId();
+
+ Container c = getContainer(event);
+ switch(event.getOpType()) {
+ case LAUNCH_REQUEST:
+ ContainerLaunchRequest launchRequest = event.getLaunchRequest();
+ c.launch(launchRequest);
+ break;
+ case STOP_REQUEST:
+ c.kill();
+ break;
+ }
+ removeContainerIfDone(containerID);
+ }
+ }
+
+ /**
+ * ThreadPoolExecutor.submit may fail if you are submitting task
+ * when ThreadPoolExecutor is shutting down (DAGAppMaster is shutting down).
+ * Use this CustomizedRejectedExecutionHandler to just logging rather than abort the application.
+ */
+ private static class CustomizedRejectedExecutionHandler implements RejectedExecutionHandler {
+ @Override
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+ LOG.warn("Can't submit task to ThreadPoolExecutor:" + executor);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ void sendContainerLaunchFailedMsg(ContainerId containerId,
+ String message) {
+ LOG.error(message);
+ getContext().containerLaunchFailed(containerId, message);
+ }
+
+
+ @Override
+ public void launchContainer(ContainerLaunchRequest launchRequest) {
+ try {
+ eventQueue.put(new ContainerOp(ContainerOp.OPType.LAUNCH_REQUEST, launchRequest));
+ } catch (InterruptedException e) {
+ throw new TezUncheckedException(e);
+ }
+ }
+
+ @Override
+ public void stopContainer(ContainerStopRequest stopRequest) {
+ try {
+ eventQueue.put(new ContainerOp(ContainerOp.OPType.STOP_REQUEST, stopRequest));
+ } catch (InterruptedException e) {
+ throw new TezUncheckedException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherEvent.java
new file mode 100644
index 0000000..add3254
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherEvent.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class ContainerLauncherEvent extends AbstractEvent<ContainerLauncherEventType> {
+
+ private final ContainerId containerId;
+ private final NodeId nodeId;
+ private final Token containerToken;
+ private final int launcherId;
+ private final int schedulerId;
+ private final int taskCommId;
+
+ public ContainerLauncherEvent(ContainerId containerId, NodeId nodeId,
+ Token containerToken, ContainerLauncherEventType type,
+ int launcherId,
+ int schedulerId, int taskCommId) {
+ super(type);
+ this.containerId = containerId;
+ this.nodeId = nodeId;
+ this.containerToken = containerToken;
+ this.launcherId = launcherId;
+ this.schedulerId = schedulerId;
+ this.taskCommId = taskCommId;
+ }
+
+ public ContainerId getContainerId() {
+ return this.containerId;
+ }
+
+ public NodeId getNodeId() {
+ return this.nodeId;
+ }
+
+ public Token getContainerToken() {
+ return this.containerToken;
+ }
+
+ public int getLauncherId() {
+ return launcherId;
+ }
+
+ public int getSchedulerId() {
+ return schedulerId;
+ }
+
+ public int getTaskCommId() {
+ return taskCommId;
+ }
+
+ public String toSrting() {
+ return super.toString() + " for container " + containerId + ", nodeId: "
+ + nodeId + ", launcherId: " + launcherId + ", schedulerId=" + schedulerId +
+ ", taskCommId=" + taskCommId;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result
+ + ((containerId == null) ? 0 : containerId.hashCode());
+ result = prime * result
+ + ((containerToken == null) ? 0 : containerToken.hashCode());
+ result = prime * result + ((nodeId == null) ? 0 : nodeId.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ ContainerLauncherEvent other = (ContainerLauncherEvent) obj;
+ if (containerId == null) {
+ if (other.containerId != null)
+ return false;
+ } else if (!containerId.equals(other.containerId))
+ return false;
+ if (containerToken == null) {
+ if (other.containerToken != null)
+ return false;
+ } else if (!containerToken.equals(other.containerToken))
+ return false;
+ if (nodeId == null) {
+ if (other.nodeId != null)
+ return false;
+ } else if (!nodeId.equals(other.nodeId))
+ return false;
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherEventType.java
new file mode 100644
index 0000000..079509c
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherEventType.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+// TODO - Re-use the events in ContainerLauncher..
+public enum ContainerLauncherEventType {
+ CONTAINER_LAUNCH_REQUEST,
+ CONTAINER_STOP_REQUEST
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherLaunchRequestEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherLaunchRequestEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherLaunchRequestEvent.java
new file mode 100644
index 0000000..411451e
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherLaunchRequestEvent.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+
+public class ContainerLauncherLaunchRequestEvent extends ContainerLauncherEvent {
+
+ private final ContainerLaunchContext clc;
+ private final Container container;
+ // The task communicator index for the specific container being launched.
+
+ public ContainerLauncherLaunchRequestEvent(ContainerLaunchContext clc,
+ Container container, int launcherId, int schedulerId,
+ int taskCommId) {
+ super(container.getId(), container.getNodeId(), container
+ .getContainerToken(), ContainerLauncherEventType.CONTAINER_LAUNCH_REQUEST,
+ launcherId, schedulerId, taskCommId);
+ this.clc = clc;
+ this.container = container;
+ }
+
+ public ContainerLaunchContext getContainerLaunchContext() {
+ return this.clc;
+ }
+
+ public Container getContainer() {
+ return container;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+
+ ContainerLauncherLaunchRequestEvent that = (ContainerLauncherLaunchRequestEvent) o;
+
+ if (clc != null ? !clc.equals(that.clc) : that.clc != null) {
+ return false;
+ }
+ if (container != null ? !container.equals(that.container) : that.container != null) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = super.hashCode();
+ result = 7001 * result + (clc != null ? clc.hashCode() : 0);
+ result = 7001 * result + (container != null ? container.hashCode() : 0);
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherStopRequestEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherStopRequestEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherStopRequestEvent.java
new file mode 100644
index 0000000..69e7d30
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherStopRequestEvent.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Token;
+
+public class ContainerLauncherStopRequestEvent extends ContainerLauncherEvent {
+
+ public ContainerLauncherStopRequestEvent(ContainerId containerId, NodeId nodeId,
+ Token containerToken, int launcherId, int schedulerId,
+ int taskCommId) {
+ super(containerId, nodeId, containerToken,
+ ContainerLauncherEventType.CONTAINER_STOP_REQUEST, launcherId, schedulerId, taskCommId);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java
deleted file mode 100644
index dc50c37..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.rm;
-
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.hadoop.yarn.event.AbstractEvent;
-
-public class NMCommunicatorEvent extends AbstractEvent<NMCommunicatorEventType> {
-
- private final ContainerId containerId;
- private final NodeId nodeId;
- private final Token containerToken;
- private final int launcherId;
- private final int schedulerId;
- private final int taskCommId;
-
- public NMCommunicatorEvent(ContainerId containerId, NodeId nodeId,
- Token containerToken, NMCommunicatorEventType type, int launcherId,
- int schedulerId, int taskCommId) {
- super(type);
- this.containerId = containerId;
- this.nodeId = nodeId;
- this.containerToken = containerToken;
- this.launcherId = launcherId;
- this.schedulerId = schedulerId;
- this.taskCommId = taskCommId;
- }
-
- public ContainerId getContainerId() {
- return this.containerId;
- }
-
- public NodeId getNodeId() {
- return this.nodeId;
- }
-
- public Token getContainerToken() {
- return this.containerToken;
- }
-
- public int getLauncherId() {
- return launcherId;
- }
-
- public int getSchedulerId() {
- return schedulerId;
- }
-
- public int getTaskCommId() {
- return taskCommId;
- }
-
- public String toSrting() {
- return super.toString() + " for container " + containerId + ", nodeId: "
- + nodeId + ", launcherId: " + launcherId + ", schedulerId=" + schedulerId +
- ", taskCommId=" + taskCommId;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result
- + ((containerId == null) ? 0 : containerId.hashCode());
- result = prime * result
- + ((containerToken == null) ? 0 : containerToken.hashCode());
- result = prime * result + ((nodeId == null) ? 0 : nodeId.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- NMCommunicatorEvent other = (NMCommunicatorEvent) obj;
- if (containerId == null) {
- if (other.containerId != null)
- return false;
- } else if (!containerId.equals(other.containerId))
- return false;
- if (containerToken == null) {
- if (other.containerToken != null)
- return false;
- } else if (!containerToken.equals(other.containerToken))
- return false;
- if (nodeId == null) {
- if (other.nodeId != null)
- return false;
- } else if (!nodeId.equals(other.nodeId))
- return false;
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEventType.java
deleted file mode 100644
index 9f3d989..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEventType.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.rm;
-
-// TODO - Re-use the events in ContainerLauncher..
-public enum NMCommunicatorEventType {
- CONTAINER_LAUNCH_REQUEST,
- CONTAINER_STOP_REQUEST
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java
deleted file mode 100644
index c57b6be..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.rm;
-
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-
-public class NMCommunicatorLaunchRequestEvent extends NMCommunicatorEvent {
-
- private final ContainerLaunchContext clc;
- private final Container container;
- // The task communicator index for the specific container being launched.
-
- public NMCommunicatorLaunchRequestEvent(ContainerLaunchContext clc,
- Container container, int launcherId, int schedulerId, int taskCommId) {
- super(container.getId(), container.getNodeId(), container
- .getContainerToken(), NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST,
- launcherId, schedulerId, taskCommId);
- this.clc = clc;
- this.container = container;
- }
-
- public ContainerLaunchContext getContainerLaunchContext() {
- return this.clc;
- }
-
- public Container getContainer() {
- return container;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- if (!super.equals(o)) {
- return false;
- }
-
- NMCommunicatorLaunchRequestEvent that = (NMCommunicatorLaunchRequestEvent) o;
-
- if (clc != null ? !clc.equals(that.clc) : that.clc != null) {
- return false;
- }
- if (container != null ? !container.equals(that.container) : that.container != null) {
- return false;
- }
-
- return true;
- }
-
- @Override
- public int hashCode() {
- int result = super.hashCode();
- result = 7001 * result + (clc != null ? clc.hashCode() : 0);
- result = 7001 * result + (container != null ? container.hashCode() : 0);
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java
deleted file mode 100644
index 352f450..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.rm;
-
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Token;
-
-public class NMCommunicatorStopRequestEvent extends NMCommunicatorEvent {
-
- public NMCommunicatorStopRequestEvent(ContainerId containerId, NodeId nodeId,
- Token containerToken, int launcherId, int schedulerId, int taskCommId) {
- super(containerId, nodeId, containerToken,
- NMCommunicatorEventType.CONTAINER_STOP_REQUEST, launcherId, schedulerId, taskCommId);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java
index 2a9797f..37aa96b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java
@@ -33,7 +33,7 @@ import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
public class TaskSchedulerContextImpl implements TaskSchedulerContext {
- private final TaskSchedulerEventHandler tseh;
+ private final TaskSchedulerManager taskSchedulerManager;
private final AppContext appContext;
private final int schedulerId;
private final String trackingUrl;
@@ -42,11 +42,11 @@ public class TaskSchedulerContextImpl implements TaskSchedulerContext {
private final int clientPort;
private final UserPayload initialUserPayload;
- public TaskSchedulerContextImpl(TaskSchedulerEventHandler tseh, AppContext appContext,
+ public TaskSchedulerContextImpl(TaskSchedulerManager taskSchedulerManager, AppContext appContext,
int schedulerId, String trackingUrl, long customClusterIdentifier,
String appHostname, int clientPort,
UserPayload initialUserPayload) {
- this.tseh = tseh;
+ this.taskSchedulerManager = taskSchedulerManager;
this.appContext = appContext;
this.schedulerId = schedulerId;
this.trackingUrl = trackingUrl;
@@ -62,54 +62,55 @@ public class TaskSchedulerContextImpl implements TaskSchedulerContext {
// taskAllocated() upcall and deallocateTask() downcall
@Override
public void taskAllocated(Object task, Object appCookie, Container container) {
- tseh.taskAllocated(schedulerId, task, appCookie, container);
+ taskSchedulerManager.taskAllocated(schedulerId, task, appCookie, container);
}
@Override
public void containerCompleted(Object taskLastAllocated, ContainerStatus containerStatus) {
- tseh.containerCompleted(schedulerId, taskLastAllocated, containerStatus);
+ taskSchedulerManager.containerCompleted(schedulerId, taskLastAllocated, containerStatus);
}
@Override
public void containerBeingReleased(ContainerId containerId) {
- tseh.containerBeingReleased(schedulerId, containerId);
+ taskSchedulerManager.containerBeingReleased(schedulerId, containerId);
}
@Override
public void nodesUpdated(List<NodeReport> updatedNodes) {
- tseh.nodesUpdated(schedulerId, updatedNodes);
+ taskSchedulerManager.nodesUpdated(schedulerId, updatedNodes);
}
@Override
public void appShutdownRequested() {
- tseh.appShutdownRequested(schedulerId);
+ taskSchedulerManager.appShutdownRequested(schedulerId);
}
@Override
public void setApplicationRegistrationData(Resource maxContainerCapability,
Map<ApplicationAccessType, String> appAcls,
ByteBuffer clientAMSecretKey) {
- tseh.setApplicationRegistrationData(schedulerId, maxContainerCapability, appAcls, clientAMSecretKey);
+ taskSchedulerManager.setApplicationRegistrationData(schedulerId, maxContainerCapability, appAcls,
+ clientAMSecretKey);
}
@Override
public void onError(Throwable t) {
- tseh.onError(schedulerId, t);
+ taskSchedulerManager.onError(schedulerId, t);
}
@Override
public float getProgress() {
- return tseh.getProgress(schedulerId);
+ return taskSchedulerManager.getProgress(schedulerId);
}
@Override
public void preemptContainer(ContainerId containerId) {
- tseh.preemptContainer(schedulerId, containerId);
+ taskSchedulerManager.preemptContainer(schedulerId, containerId);
}
@Override
public AppFinalStatus getFinalAppStatus() {
- return tseh.getFinalAppStatus();
+ return taskSchedulerManager.getFinalAppStatus();
}
@Override
@@ -130,7 +131,7 @@ public class TaskSchedulerContextImpl implements TaskSchedulerContext {
@Override
public ContainerSignatureMatcher getContainerSignatureMatcher() {
- return tseh.getContainerSignatureMatcher();
+ return taskSchedulerManager.getContainerSignatureMatcher();
}
@Override