You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/04/19 01:54:28 UTC
svn commit: r1469642 [9/36] - in /incubator/tez/branches/TEZ-1: ./
example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/
example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/
tez-common/src/main/ tez-common/src/main/java/ te...
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,1576 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.ClusterInfo;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.hadoop.yarn.service.Service;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.dag.api.DAGConfiguration;
+import org.apache.tez.dag.api.DAGLocationHint;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.impl.TezBuilderUtils;
+import org.apache.tez.dag.api.records.AMInfo;
+import org.apache.tez.dag.app.client.ClientService;
+import org.apache.tez.dag.app.client.impl.TezClientService;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.app.dag.event.DAGEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventType;
+import org.apache.tez.dag.app.dag.event.DAGFinishEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
+import org.apache.tez.dag.app.dag.event.TaskEvent;
+import org.apache.tez.dag.app.dag.event.TaskEventType;
+import org.apache.tez.dag.app.dag.event.VertexEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventType;
+import org.apache.tez.dag.app.dag.impl.DAGImpl;
+import org.apache.tez.dag.app.launcher.ContainerLauncher;
+import org.apache.tez.dag.app.launcher.ContainerLauncherImpl;
+import org.apache.tez.dag.app.local.LocalContainerRequestor;
+import org.apache.tez.dag.app.rm.AMSchedulerEvent;
+import org.apache.tez.dag.app.rm.AMSchedulerEventType;
+import org.apache.tez.dag.app.rm.ContainerAllocator;
+import org.apache.tez.dag.app.rm.ContainerRequestor;
+import org.apache.tez.dag.app.rm.NMCommunicatorEventType;
+import org.apache.tez.dag.app.rm.RMCommunicator;
+import org.apache.tez.dag.app.rm.RMCommunicatorEvent;
+import org.apache.tez.dag.app.rm.RMContainerRequestor;
+import org.apache.tez.dag.app.rm.RMContainerRequestor.ContainerRequest;
+import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
+import org.apache.tez.dag.app.rm.container.AMContainer;
+import org.apache.tez.dag.app.rm.container.AMContainerEventType;
+import org.apache.tez.dag.app.rm.container.AMContainerHelpers;
+import org.apache.tez.dag.app.rm.container.AMContainerMap;
+import org.apache.tez.dag.app.rm.container.AMContainerState;
+import org.apache.tez.dag.app.rm.node.AMNodeEventType;
+import org.apache.tez.dag.app.rm.node.AMNodeMap;
+import org.apache.tez.dag.app.speculate.DefaultSpeculator;
+import org.apache.tez.dag.app.speculate.Speculator;
+import org.apache.tez.dag.app.speculate.SpeculatorEvent;
+import org.apache.tez.dag.app.taskclean.TaskCleaner;
+import org.apache.tez.dag.app.taskclean.TaskCleanerImpl;
+import org.apache.tez.engine.common.security.JobTokenSecretManager;
+import org.apache.tez.engine.records.TezDAGID;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+/**
+ * The Map-Reduce Application Master.
+ * The state machine is encapsulated in the implementation of Job interface.
+ * All state changes happens via Job interface. Each event
+ * results in a Finite State Transition in Job.
+ *
+ * MR AppMaster is the composition of loosely coupled services. The services
+ * interact with each other via events. The components resembles the
+ * Actors model. The component acts on received event and send out the
+ * events to other components.
+ * This keeps it highly concurrent with no or minimal synchronization needs.
+ *
+ * The events are dispatched by a central Dispatch mechanism. All components
+ * register to the Dispatcher.
+ *
+ * The information is shared across different components using AppContext.
+ */
+
+@SuppressWarnings("rawtypes")
+public class DAGAppMaster extends CompositeService {
+
+ private static final Log LOG = LogFactory.getLog(DAGAppMaster.class);
+
+ /**
+ * Priority of the MRAppMaster shutdown hook.
+ */
+ public static final int SHUTDOWN_HOOK_PRIORITY = 30;
+
+ private Clock clock;
+ private final long startTime;
+ private final long appSubmitTime;
+ private String appName;
+ private final ApplicationAttemptId appAttemptID;
+ private final ContainerId containerID;
+ private final String nmHost;
+ private final int nmPort;
+ private final int nmHttpPort;
+ private AMContainerMap containers;
+ private AMNodeMap nodes;
+ // TODO Metrics
+ //protected final MRAppMetrics metrics;
+ // TODO Recovery
+ //private Map<TezTaskID, TaskInfo> completedTasksFromPreviousRun;
+ private List<AMInfo> amInfos;
+ private AppContext context;
+ private Dispatcher dispatcher;
+ private ClientService clientService;
+ // TODO Recovery
+ //private Recovery recoveryServ;
+ private ContainerLauncher containerLauncher;
+ private TaskCleaner taskCleaner;
+ private Speculator speculator;
+ private ContainerHeartbeatHandler containerHeartbeatHandler;
+ private TaskHeartbeatHandler taskHeartbeatHandler;
+ private TaskAttemptListener taskAttemptListener;
+ private JobTokenSecretManager jobTokenSecretManager =
+ new JobTokenSecretManager();
+ // TODODAGAM Define DAGID
+ private TezDAGID dagId;
+// private boolean newApiCommitter;
+ private DagEventDispatcher dagEventDispatcher;
+ private VertexEventDispatcher vertexEventDispatcher;
+ private AbstractService stagingDirCleanerService;
+ private boolean inRecovery = false;
+ private SpeculatorEventDispatcher speculatorEventDispatcher;
+ private TaskSchedulerEventHandler taskSchedulerEventHandler;
+
+ private DAGLocationHint dagLocationHint;
+
+ // FIXME need to remove requestor and allocator
+ // private ContainerRequestor containerRequestor;
+ // private ContainerAllocator amScheduler;
+
+
+ private DAG dag;
+ private Credentials fsTokens = new Credentials(); // Filled during init
+ private UserGroupInformation currentUser; // Will be setup during init
+
+ public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
+ ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
+ long appSubmitTime) {
+ this(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort,
+ new SystemClock(), appSubmitTime);
+ }
+
+ public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
+ ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
+ Clock clock, long appSubmitTime) {
+ super(DAGAppMaster.class.getName());
+ this.clock = clock;
+ this.startTime = clock.getTime();
+ this.appSubmitTime = appSubmitTime;
+ this.appAttemptID = applicationAttemptId;
+ this.containerID = containerId;
+ this.nmHost = nmHost;
+ this.nmPort = nmPort;
+ this.nmHttpPort = nmHttpPort;
+ // TODO Metrics
+ //this.metrics = MRAppMetrics.create();
+ LOG.info("Created MRAppMaster for application " + applicationAttemptId);
+ }
+
+ @Override
+ public void init(final Configuration conf) {
+
+ conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
+
+ downloadTokensAndSetupUGI(conf);
+ setupDAGLocationHint(conf);
+
+ context = new RunningAppContext(conf);
+
+ // Job name is the same as the app name util we support DAG of jobs
+ // for an app later
+ appName = conf.get(MRJobConfig.JOB_NAME, "<missing app name>");
+
+ conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, appAttemptID.getAttemptId());
+
+ dagId = new TezDAGID(appAttemptID.getApplicationId(), 1);
+
+ // TODO Committer.
+ // committer = createOutputCommitter(conf);
+
+ // TODO Recovery
+ /*
+ boolean recoveryEnabled = conf.getBoolean(
+ MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+ boolean recoverySupportedByCommitter = committer.isRecoverySupported();
+ if (recoveryEnabled && recoverySupportedByCommitter
+ && appAttemptID.getAttemptId() > 1) {
+ LOG.info("Recovery is enabled. "
+ + "Will try to recover from previous life on best effort basis.");
+ recoveryServ = createRecoveryService(context);
+ addIfService(recoveryServ);
+ dispatcher = recoveryServ.getDispatcher();
+ clock = recoveryServ.getClock();
+ inRecovery = true;
+ } else {
+ LOG.info("Not starting RecoveryService: recoveryEnabled: "
+ + recoveryEnabled + " recoverySupportedByCommitter: "
+ + recoverySupportedByCommitter + " ApplicationAttemptID: "
+ + appAttemptID.getAttemptId());
+ dispatcher = createDispatcher();
+ addIfService(dispatcher);
+ }
+ */
+
+ dispatcher = createDispatcher();
+ addIfService(dispatcher);
+
+ taskHeartbeatHandler = createTaskHeartbeatHandler(context, conf);
+ addIfService(taskHeartbeatHandler);
+
+ containerHeartbeatHandler = createContainerHeartbeatHandler(context, conf);
+ addIfService(containerHeartbeatHandler);
+
+ //service to handle requests to TaskUmbilicalProtocol
+ taskAttemptListener = createTaskAttemptListener(context,
+ taskHeartbeatHandler, containerHeartbeatHandler);
+ addIfService(taskAttemptListener);
+
+ containers = new AMContainerMap(containerHeartbeatHandler,
+ taskAttemptListener, context);
+ addIfService(containers);
+ dispatcher.register(AMContainerEventType.class, containers);
+
+ nodes = new AMNodeMap(dispatcher.getEventHandler(), context);
+ addIfService(nodes);
+ dispatcher.register(AMNodeEventType.class, nodes);
+
+ //service to do the task cleanup
+ taskCleaner = createTaskCleaner(context);
+ addIfService(taskCleaner);
+
+ // FIXME TODO DAGClient
+ //service to handle requests from JobClient
+ clientService = new TezClientService();
+ addIfService(clientService);
+
+ // TODO JobHistory
+ /*
+ //service to log job history events
+ jobHistoryEventHandler = createJobHistoryHandler(context);
+ dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
+ jobHistoryEventHandler);
+ */
+
+ this.dagEventDispatcher = new DagEventDispatcher();
+ this.vertexEventDispatcher = new VertexEventDispatcher();
+
+ //register the event dispatchers
+ dispatcher.register(DAGEventType.class, dagEventDispatcher);
+ dispatcher.register(VertexEventType.class, vertexEventDispatcher);
+ dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
+ dispatcher.register(TaskAttemptEventType.class,
+ new TaskAttemptEventDispatcher());
+ dispatcher.register(TaskCleaner.EventType.class, taskCleaner);
+
+ // FIXME handle speculation
+ // speculator = createSpeculator(conf, context);
+ // addIfService(speculator);
+ speculatorEventDispatcher = new SpeculatorEventDispatcher(conf);
+ dispatcher.register(Speculator.EventType.class,
+ speculatorEventDispatcher);
+
+ // TODO XXX: Rename to NMComm
+ // corresponding service to launch allocated containers via NodeManager
+ // containerLauncher = createNMCommunicator(context);
+ containerLauncher = createContainerLauncher(context);
+ addIfService(containerLauncher);
+ dispatcher.register(NMCommunicatorEventType.class, containerLauncher);
+
+ // service to allocate containers from RM (if non-uber) or to fake it (uber)
+ /*
+ // FIXME remove requestor and allocator
+ containerRequestor = createContainerRequestor(clientService, context);
+ addIfService(containerRequestor);
+ dispatcher.register(RMCommunicatorEventType.class, containerRequestor);
+
+ amScheduler = createAMScheduler(containerRequestor, context);
+ addIfService(amScheduler);
+ dispatcher.register(AMSchedulerEventType.class, amScheduler);
+ */
+
+ taskSchedulerEventHandler = new TaskSchedulerEventHandler(context,
+ clientService);
+ addIfService(taskSchedulerEventHandler);
+ dispatcher.register(AMSchedulerEventType.class,
+ taskSchedulerEventHandler);
+
+ // Add the staging directory cleaner before the history server but after
+ // the container allocator so the staging directory is cleaned after
+ // the history has been flushed but before unregistering with the RM.
+ this.stagingDirCleanerService = createStagingDirCleaningService();
+ addService(stagingDirCleanerService);
+
+
+ // Add the JobHistoryEventHandler last so that it is properly stopped first.
+ // This will guarantee that all history-events are flushed before AM goes
+ // ahead with shutdown.
+ // Note: Even though JobHistoryEventHandler is started last, if any
+ // component creates a JobHistoryEvent in the meanwhile, it will be just be
+ // queued inside the JobHistoryEventHandler
+ // TODO JobHistory
+ //addIfService(this.jobHistoryEventHandler);
+
+ super.init(conf);
+ } // end of init()
+
+ protected Dispatcher createDispatcher() {
+ return new AsyncDispatcher();
+ }
+
+// private OutputCommitter createOutputCommitter(Configuration conf) {
+// OutputCommitter committer = null;
+//
+// LOG.info("OutputCommitter set in config "
+// + conf.get("mapred.output.committer.class"));
+//
+// if (newApiCommitter) {
+// TezTaskID taskID =
+// TezBuilderUtils.newTaskId(dagId, -1, 0);
+// TezTaskAttemptID attemptID =
+// TezBuilderUtils.newTaskAttemptId(taskID, 0);
+// TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf,
+// TezMRTypeConverter.fromTez(attemptID));
+// OutputFormat outputFormat;
+// try {
+// outputFormat = ReflectionUtils.newInstance(taskContext
+// .getOutputFormatClass(), conf);
+// committer = outputFormat.getOutputCommitter(taskContext);
+// } catch (Exception e) {
+// throw new YarnException(e);
+// }
+// } else {
+// committer = ReflectionUtils.newInstance(conf.getClass(
+// "mapred.output.committer.class", FileOutputCommitter.class,
+// org.apache.hadoop.mapred.OutputCommitter.class), conf);
+// }
+// LOG.info("OutputCommitter is " + committer.getClass().getName());
+// return committer;
+// }
+
+// protected boolean keepJobFiles(JobConf conf) {
+// return (conf.getKeepTaskFilesPattern() != null || conf
+// .getKeepFailedTaskFiles());
+// }
+
+ /**
+ * Create the default file System for this job.
+ * @param conf the conf object
+ * @return the default filesystem for this job
+ * @throws IOException
+ */
+ protected FileSystem getFileSystem(Configuration conf) throws IOException {
+ return FileSystem.get(conf);
+ }
+
+ /**
+ * clean up staging directories for the job.
+ * @throws IOException
+ */
+ // TODO DAG Cleanup staging directory as a user task, or a post dag plugin.
+// public void cleanupStagingDir() throws IOException {
+// /* make sure we clean the staging files */
+// String jobTempDir = null;
+// FileSystem fs = getFileSystem(getConfig());
+// try {
+// if (!keepJobFiles(new JobConf(getConfig()))) {
+// jobTempDir = getConfig().get(MRJobConfig.MAPREDUCE_JOB_DIR);
+// if (jobTempDir == null) {
+// LOG.warn("Job Staging directory is null");
+// return;
+// }
+// Path jobTempDirPath = new Path(jobTempDir);
+// LOG.info("Deleting staging directory " + FileSystem.getDefaultUri(getConfig()) +
+// " " + jobTempDir);
+// fs.delete(jobTempDirPath, true);
+// }
+// } catch(IOException io) {
+// LOG.error("Failed to cleanup staging dir " + jobTempDir, io);
+// }
+// }
+
+ /**
+ * Exit call. Just in a function call to enable testing.
+ */
+ protected void sysexit() {
+ System.exit(0);
+ }
+ protected class JobFinishEventHandlerCR implements EventHandler<DAGFinishEvent> {
+ // Considering TaskAttempts are marked as completed before a container exit,
+ // it's very likely that a Container may not have "completed" by the time a
+ // job completes. This would imply that TaskAtetmpts may not be at a FINAL
+ // internal state (state machine state), and cleanup would not have happened.
+
+ // Since the shutdown handler has been called in the same thread which
+ // is handling all other async events, creating a separate thread for shutdown.
+ //
+ // For now, checking to see if all containers have COMPLETED, with a 5
+ // second timeout before the exit.
+ public void handle(DAGFinishEvent event) {
+ LOG.info("Handling JobFinished Event");
+ AMShutdownRunnable r = new AMShutdownRunnable();
+ Thread t = new Thread(r, "AMShutdownThread");
+ t.start();
+ }
+
+ // TODO Job End Notification
+ /*
+ protected void maybeSendJobEndNotification() {
+ if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) {
+ try {
+ LOG.info("Job end notification started for jobID : "
+ + job.getID());
+ JobEndNotifier notifier = new JobEndNotifier();
+ notifier.setConf(getConfig());
+ notifier.notify(job.getReport());
+ } catch (InterruptedException ie) {
+ LOG.warn("Job end notification interrupted for jobID : "
+ + job.getReport().getDAGId(), ie);
+ }
+ }
+ }
+ */
+
+ protected void stopAllServices() {
+ try {
+ // Stop all services
+ // This will also send the final report to the ResourceManager
+ LOG.info("Calling stop for all the services");
+ stop();
+
+ } catch (Throwable t) {
+ LOG.warn("Graceful stop failed ", t);
+ }
+ }
+
+ protected void exit() {
+ LOG.info("Exiting MR AppMaster..GoodBye!");
+ sysexit();
+ }
+
+ private void stopAM() {
+ stopAllServices();
+ exit();
+ }
+
+ protected boolean allContainersComplete() {
+ for (AMContainer amContainer : context.getAllContainers().values()) {
+ if (amContainer.getState() != AMContainerState.COMPLETED) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ protected boolean allTaskAttemptsComplete() {
+ // TODO XXX: Implement.
+ // TaskAttempts will transition to their final state machine state only
+ // after a container is complete and sends out a TA_TERMINATED event.
+ return true;
+ }
+
+ private class AMShutdownRunnable implements Runnable {
+ @Override
+ public void run() {
+ // TODO Job End Notification
+ //maybeSendJobEndNotification();
+ // TODO XXX Add a timeout.
+ LOG.info("Waiting for all containers and TaskAttempts to complete");
+ if (!dag.isUber()) {
+ while (!allContainersComplete() || !allTaskAttemptsComplete()) {
+ try {
+ synchronized (this) {
+ wait(100l);
+ }
+ } catch (InterruptedException e) {
+ LOG.info("AM Shutdown Thread interrupted. Exiting");
+ break;
+ }
+ }
+ LOG.info("All Containers and TaskAttempts Complete. Stopping services");
+ } else {
+ LOG.info("Uberized job. Not waiting for all containers to finish");
+ }
+ stopAM();
+ LOG.info("AM Shutdown Thread Completing");
+ }
+ }
+ }
+
+ private class DAGFinishEventHandler implements EventHandler<DAGFinishEvent> {
+ @Override
+ public void handle(DAGFinishEvent event) {
+ // job has finished
+ // this is the only job, so shut down the Appmaster
+ // note in a workflow scenario, this may lead to creation of a new
+ // job (FIXME?)
+ // TODO Job End Notification
+ /*
+ // Send job-end notification
+ if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) {
+ try {
+ LOG.info("Job end notification started for jobID : "
+ + job.getReport().getDAGId());
+ JobEndNotifier notifier = new JobEndNotifier();
+ notifier.setConf(getConfig());
+ notifier.notify(job.getReport());
+ } catch (InterruptedException ie) {
+ LOG.warn("Job end notification interrupted for jobID : "
+ + job.getReport().getDAGId(), ie);
+ }
+ }
+ */
+ // TODO:currently just wait for some time so clients can know the
+ // final states. Will be removed once RM come on.
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ try {
+ // Stop all services
+ // This will also send the final report to the ResourceManager
+ LOG.info("Calling stop for all the services");
+ stop();
+
+ } catch (Throwable t) {
+ LOG.warn("Graceful stop failed ", t);
+ }
+
+ //Bring the process down by force.
+ //Not needed after HADOOP-7140
+ LOG.info("Exiting MR AppMaster..GoodBye!");
+ sysexit();
+ }
+ }
+
+ /**
+ * create an event handler that handles the job finish event.
+ * @return the dag finish event handler.
+ */
+ protected EventHandler<DAGFinishEvent> createDAGFinishEventHandler() {
+ return new DAGFinishEventHandler();
+ }
+
+ /**
+ * Create the recovery service.
+ * @return an instance of the recovery service.
+ */
+ // TODO Recovery
+ /*
+ protected Recovery createRecoveryService(AppContext appContext) {
+ return new RecoveryService(appContext, getCommitter());
+ }
+ */
+
+ /**
+ * Create the RMContainerRequestor.
+ *
+ * @param clientService
+ * the MR Client Service.
+ * @param appContext
+ * the application context.
+ * @return an instance of the RMContainerRequestor.
+ */
+ protected ContainerRequestor createContainerRequestor(
+ ClientService clientService, AppContext appContext) {
+ return new ContainerRequestorRouter(clientService, appContext);
+ }
+
+ /**
+ * Create the AM Scheduler.
+ *
+ * @param requestor
+ * The Container Requestor.
+ * @param appContext
+ * the application context.
+ * @return an instance of the AMScheduler.
+ */
+ protected ContainerAllocator createAMScheduler(ContainerRequestor requestor,
+ AppContext appContext) {
+ return new AMSchedulerRouter(requestor, appContext);
+ }
+
+ /** Create and initialize (but don't start) a single dag. */
+ protected DAG createDAG(Configuration conf) {
+
+ // create single job
+ DAG newDag =
+ new DAGImpl(dagId, appAttemptID, conf, dispatcher.getEventHandler(),
+ taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
+ // TODO Recovery
+ //completedTasksFromPreviousRun,
+ // TODO Metrics
+ //metrics,
+ //committer, newApiCommitter,
+ currentUser.getUserName(), appSubmitTime,
+ //amInfos,
+ taskHeartbeatHandler, context, dagLocationHint);
+ ((RunningAppContext) context).setDAG(newDag);
+
+ dispatcher.register(DAGFinishEvent.Type.class,
+ createDAGFinishEventHandler());
+ return newDag;
+ } // end createDag()
+
+
+ /**
+ * Obtain the tokens needed by the job and put them in the UGI
+ * @param conf
+ */
+ protected void downloadTokensAndSetupUGI(Configuration conf) {
+
+ try {
+ this.currentUser = UserGroupInformation.getCurrentUser();
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ // Read the file-system tokens from the localized tokens-file.
+ Path jobSubmitDir =
+ FileContext.getLocalFSFileContext().makeQualified(
+ new Path(new File(MRJobConfig.JOB_SUBMIT_DIR)
+ .getAbsolutePath()));
+ Path jobTokenFile =
+ new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE);
+ fsTokens.addAll(Credentials.readTokenStorageFile(jobTokenFile, conf));
+ LOG.info("jobSubmitDir=" + jobSubmitDir + " jobTokenFile="
+ + jobTokenFile);
+
+ for (Token<? extends TokenIdentifier> tk : fsTokens.getAllTokens()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Token of kind " + tk.getKind()
+ + "in current ugi in the AppMaster for service "
+ + tk.getService());
+ }
+ currentUser.addToken(tk); // For use by AppMaster itself.
+ }
+ }
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+ }
+
+ protected void setupDAGLocationHint(Configuration conf) {
+ try {
+ String dagLocationHintFile =
+ conf.get(DAGConfiguration.DAG_LOCATION_HINT_RESOURCE_FILE,
+ DAGConfiguration.DEFAULT_DAG_LOCATION_HINT_RESOURCE_FILE);
+ File f = new File(dagLocationHintFile);
+ if (f.exists()) {
+ this.dagLocationHint = DAGLocationHint.initDAGDagLocationHint(
+ dagLocationHintFile);
+ } else {
+ this.dagLocationHint = new DAGLocationHint();
+ }
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+ }
+
+ protected void addIfService(Object object) {
+ if (object instanceof Service) {
+ addService((Service) object);
+ }
+ }
+
+ // TODO JobHistory
+ /*
+ protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
+ AppContext context) {
+ return new JobHistoryEventHandler2(context, getStartCount());
+ }
+ */
+
+ protected AbstractService createStagingDirCleaningService() {
+ return new StagingDirCleaningService();
+ }
+
+ protected Speculator createSpeculator(Configuration conf, AppContext context) {
+ Class<? extends Speculator> speculatorClass;
+
+ try {
+ speculatorClass
+ // "yarn.mapreduce.job.speculator.class"
+ = conf.getClass(MRJobConfig.MR_AM_JOB_SPECULATOR,
+ DefaultSpeculator.class,
+ Speculator.class);
+ Constructor<? extends Speculator> speculatorConstructor
+ = speculatorClass.getConstructor
+ (Configuration.class, AppContext.class);
+ Speculator result = speculatorConstructor.newInstance(conf, context);
+
+ return result;
+ } catch (InstantiationException ex) {
+ LOG.error("Can't make a speculator -- check "
+ + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+ throw new YarnException(ex);
+ } catch (IllegalAccessException ex) {
+ LOG.error("Can't make a speculator -- check "
+ + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+ throw new YarnException(ex);
+ } catch (InvocationTargetException ex) {
+ LOG.error("Can't make a speculator -- check "
+ + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+ throw new YarnException(ex);
+ } catch (NoSuchMethodException ex) {
+ LOG.error("Can't make a speculator -- check "
+ + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+ throw new YarnException(ex);
+ }
+ }
+
+ protected TaskAttemptListener createTaskAttemptListener(AppContext context,
+ TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh) {
+ TaskAttemptListener lis =
+ new TaskAttemptListenerImpTezDag(context, thh, chh,jobTokenSecretManager);
+ return lis;
+ }
+
+ protected TaskHeartbeatHandler createTaskHeartbeatHandler(AppContext context,
+ Configuration conf) {
+ TaskHeartbeatHandler thh = new TaskHeartbeatHandler(context, conf.getInt(
+ MRJobConfig.MR_AM_TASK_LISTENER_THREAD_COUNT,
+ MRJobConfig.DEFAULT_MR_AM_TASK_LISTENER_THREAD_COUNT));
+ return thh;
+ }
+
+ protected ContainerHeartbeatHandler createContainerHeartbeatHandler(AppContext context,
+ Configuration conf) {
+ ContainerHeartbeatHandler chh = new ContainerHeartbeatHandler(context, conf.getInt(
+ MRJobConfig.MR_AM_TASK_LISTENER_THREAD_COUNT,
+ MRJobConfig.DEFAULT_MR_AM_TASK_LISTENER_THREAD_COUNT));
+ // TODO XXX: Define a CONTAINER_LISTENER_THREAD_COUNT
+ return chh;
+ }
+
+
+ protected TaskCleaner createTaskCleaner(AppContext context) {
+ return new TaskCleanerImpl(context);
+ }
+
+ protected ContainerLauncher
+ createContainerLauncher(final AppContext context) {
+ return new ContainerLauncherImpl(context);
+ }
+
+ //TODO:should have an interface for MRClientService
+ /*
+ protected ClientService createClientService(AppContext context) {
+ return new MRClientService(context);
+ }
+ */
+
+ public ApplicationId getAppID() {
+ return appAttemptID.getApplicationId();
+ }
+
+ public ApplicationAttemptId getAttemptID() {
+ return appAttemptID;
+ }
+
+ public TezDAGID getDAGId() {
+ return dagId;
+ }
+
+ public int getStartCount() {
+ return appAttemptID.getAttemptId();
+ }
+
+ public AppContext getContext() {
+ return context;
+ }
+
+ public Dispatcher getDispatcher() {
+ return dispatcher;
+ }
+
+ // TODO Recovery
+ /*
+ public Map<TezTaskID, TaskInfo> getCompletedTaskFromPreviousRun() {
+ return completedTasksFromPreviousRun;
+ }
+ */
+
+ public List<AMInfo> getAllAMInfos() {
+ return amInfos;
+ }
+
+ public ContainerLauncher getContainerLauncher() {
+ return containerLauncher;
+ }
+
+ public TaskAttemptListener getTaskAttemptListener() {
+ return taskAttemptListener;
+ }
+
+ /**
+ * By the time life-cycle of this router starts, job-init would have already
+ * happened.
+ */
+ private final class ContainerRequestorRouter extends AbstractService
+ implements ContainerRequestor {
+ private final ClientService clientService;
+ private final AppContext context;
+ private ContainerRequestor real;
+
+ public ContainerRequestorRouter(ClientService clientService,
+ AppContext appContext) {
+ super(ContainerRequestorRouter.class.getName());
+ this.clientService = clientService;
+ this.context = appContext;
+ }
+
+ @Override
+ public void start() {
+ if (dag.isUber()) {
+ real = new LocalContainerRequestor(clientService,
+ context);
+ } else {
+ real = new RMContainerRequestor(clientService, context);
+ }
+ ((Service)this.real).init(getConfig());
+ ((Service)this.real).start();
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ if (real != null) {
+ ((Service) real).stop();
+ }
+ super.stop();
+ }
+
+ @Override
+ public void handle(RMCommunicatorEvent event) {
+ real.handle(event);
+ }
+
+ @Override
+ public Resource getAvailableResources() {
+ return real.getAvailableResources();
+ }
+
+ @Override
+ public void addContainerReq(ContainerRequest req) {
+ real.addContainerReq(req);
+ }
+
+ @Override
+ public void decContainerReq(ContainerRequest req) {
+ real.decContainerReq(req);
+ }
+
+ public void setSignalled(boolean isSignalled) {
+ ((RMCommunicator) real).setSignalled(isSignalled);
+ }
+
+ @Override
+ public Map<ApplicationAccessType, String> getApplicationACLs() {
+ return ((RMCommunicator)real).getApplicationAcls();
+ }
+ }
+
+ /**
+ * By the time life-cycle of this router starts, job-init would have already
+ * happened.
+ */
+ private final class AMSchedulerRouter extends AbstractService
+ implements ContainerAllocator {
+ private final ContainerRequestor requestor;
+ private final AppContext context;
+ private ContainerAllocator containerAllocator;
+
+ AMSchedulerRouter(ContainerRequestor requestor,
+ AppContext context) {
+ super(AMSchedulerRouter.class.getName());
+ this.requestor = requestor;
+ this.context = context;
+ }
+
+ @Override
+ public synchronized void start() {
+ // TODO LocalContainerAllocator
+ /*
+ if (job.isUber()) {
+ this.containerAllocator = new LocalContainerAllocator(this.context,
+ jobId, nmHost, nmPort, nmHttpPort, containerID,
+ (MRxTaskUmbilicalProtocol) taskAttemptListener, taskAttemptListener,
+ (RMCommunicator) this.requestor);
+ } else {
+ this.containerAllocator = new RMContainerAllocator(this.requestor,
+ this.context);
+ }
+ */
+ // TODO Fix ContainerAllocator?
+ this.containerAllocator = null;
+ //new RMContainerAllocator(this.requestor,this.context);
+
+ ((Service)this.containerAllocator).init(getConfig());
+ ((Service)this.containerAllocator).start();
+ super.start();
+ }
+
+ @Override
+ public synchronized void stop() {
+ if (containerAllocator != null) {
+ ((Service) this.containerAllocator).stop();
+ super.stop();
+ }
+ }
+
+ @Override
+ public void handle(AMSchedulerEvent event) {
+ this.containerAllocator.handle(event);
+ }
+ }
+
+ public TaskHeartbeatHandler getTaskHeartbeatHandler() {
+ return taskHeartbeatHandler;
+ }
+
+ private final class StagingDirCleaningService extends AbstractService {
+ StagingDirCleaningService() {
+ super(StagingDirCleaningService.class.getName());
+ }
+
+ @Override
+ public synchronized void stop() {
+// try {
+// cleanupStagingDir();
+// } catch (IOException io) {
+// LOG.error("Failed to cleanup staging dir: ", io);
+// }
+ super.stop();
+ }
+ }
+
+ private class RunningAppContext implements AppContext {
+
+ private DAG dag;
+ private final Configuration conf;
+ private final ClusterInfo clusterInfo = new ClusterInfo();
+ private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
+ private final Lock rLock = rwLock.readLock();
+ private final Lock wLock = rwLock.writeLock();
+
+ public RunningAppContext(Configuration config) {
+ this.conf = config;
+ }
+
+ @Override
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return appAttemptID;
+ }
+
+ @Override
+ public ApplicationId getApplicationID() {
+ return appAttemptID.getApplicationId();
+ }
+
+ @Override
+ public String getApplicationName() {
+ return appName;
+ }
+
+ @Override
+ public long getStartTime() {
+ return startTime;
+ }
+
+ @Override
+ public DAG getDAG() {
+ try {
+ rLock.lock();
+ return dag;
+ } finally {
+ rLock.unlock();
+ }
+ }
+
+ @Override
+ public EventHandler getEventHandler() {
+ return dispatcher.getEventHandler();
+ }
+
+ @Override
+ public String getUser() {
+ return this.conf.get(MRJobConfig.USER_NAME);
+ }
+
+ @Override
+ public Clock getClock() {
+ return clock;
+ }
+
+ @Override
+ public ClusterInfo getClusterInfo() {
+ return this.clusterInfo;
+ }
+
+ @Override
+ public AMContainerMap getAllContainers() {
+ return containers;
+ }
+
+ @Override
+ public AMNodeMap getAllNodes() {
+ return nodes;
+ }
+
+ @Override
+ public Map<ApplicationAccessType, String> getApplicationACLs() {
+ if (getServiceState() != STATE.STARTED) {
+ throw new YarnException(
+ "Cannot get ApplicationACLs before all services have started");
+ }
+ return taskSchedulerEventHandler.getApplicationAcls();
+ }
+
+ @Override
+ public TezDAGID getDAGID() {
+ try {
+ rLock.lock();
+ return dag.getID();
+ } finally {
+ rLock.unlock();
+ }
+ }
+
+ @Override
+ public void setDAG(DAG dag) {
+ try {
+ wLock.lock();
+ this.dag = dag;
+ } finally {
+ wLock.unlock();
+ }
+ }
+
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void start() {
+
+ // TODO Recovery
+ // Pull completedTasks etc from recovery
+ /*
+ if (inRecovery) {
+ completedTasksFromPreviousRun = recoveryServ.getCompletedTasks();
+ amInfos = recoveryServ.getAMInfos();
+ }
+ */
+
+ // / Create the AMInfo for the current AppMaster
+ if (amInfos == null) {
+ amInfos = new LinkedList<AMInfo>();
+ }
+ AMInfo amInfo =
+ TezBuilderUtils.newAMInfo(appAttemptID, startTime, containerID, nmHost,
+ nmPort, nmHttpPort);
+ amInfos.add(amInfo);
+
+ // /////////////////// Create the job itself.
+ dag = createDAG(getConfig());
+
+ // End of creating the job.
+
+ // TODO: JobHistory
+ // Send out an MR AM inited event for this AM and all previous AMs.
+ /*
+ for (AMInfo info : amInfos) {
+ dispatcher.getEventHandler().handle(
+ new JobHistoryEvent(job.getID(), new AMStartedEvent(info
+ .getAppAttemptId(), info.getStartTime(), info.getContainerId(),
+ info.getNodeManagerHost(), info.getNodeManagerPort(), info
+ .getNodeManagerHttpPort())));
+ }
+ */
+
+ // metrics system init is really init & start.
+ // It's more test friendly to put it here.
+ DefaultMetricsSystem.initialize("MRAppMaster");
+
+ // create a job event for job intialization
+ DAGEvent initDagEvent = new DAGEvent(dag.getID(), DAGEventType.DAG_INIT);
+ // Send init to the job (this does NOT trigger job execution)
+ // This is a synchronous call, not an event through dispatcher. We want
+ // job-init to be done completely here.
+ dagEventDispatcher.handle(initDagEvent);
+
+
+ // JobImpl's InitTransition is done (call above is synchronous), so the
+ // "uber-decision" (MR-1220) has been made. Query job and switch to
+ // ubermode if appropriate (by registering different container-allocator
+ // and container-launcher services/event-handlers).
+
+ if (dag.isUber()) {
+ speculatorEventDispatcher.disableSpeculation();
+ LOG.info("MRAppMaster uberizing job " + dag.getID()
+ + " in local container (\"uber-AM\") on node "
+ + nmHost + ":" + nmPort + ".");
+ } else {
+ // send init to speculator only for non-uber jobs.
+ // This won't yet start as dispatcher isn't started yet.
+ dispatcher.getEventHandler().handle(
+ new SpeculatorEvent(dag.getID(), clock.getTime()));
+ LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
+ + "job " + dag.getID() + ".");
+ }
+
+ //start all the components
+ super.start();
+
+ // All components have started, start the job.
+ startDags();
+ }
+
+ /**
+ * This can be overridden to instantiate multiple jobs and create a
+ * workflow.
+ *
+ * TODO: Rework the design to actually support this. Currently much of the
+ * job stuff has been moved to init() above to support uberization (MR-1220).
+ * In a typical workflow, one presumably would want to uberize only a subset
+ * of the jobs (the "small" ones), which is awkward with the current design.
+ */
+ @SuppressWarnings("unchecked")
+ protected void startDags() {
+ /** create a job-start event to get this ball rolling */
+ DAGEvent startDagEvent = new DAGEvent(dag.getID(), DAGEventType.DAG_START);
+ /** send the job-start event. this triggers the job execution. */
+ dispatcher.getEventHandler().handle(startDagEvent);
+ }
+
+ private class DagEventDispatcher implements EventHandler<DAGEvent> {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void handle(DAGEvent event) {
+ ((EventHandler<DAGEvent>)context.getDAG()).handle(event);
+ }
+ }
+
+ private class TaskEventDispatcher implements EventHandler<TaskEvent> {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void handle(TaskEvent event) {
+ Task task =
+ context.getDAG().getVertex(event.getTaskID().getVertexID()).
+ getTask(event.getTaskID());
+ ((EventHandler<TaskEvent>)task).handle(event);
+ }
+ }
+
+ private class TaskAttemptEventDispatcher
+ implements EventHandler<TaskAttemptEvent> {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void handle(TaskAttemptEvent event) {
+ DAG dag = context.getDAG();
+ Task task =
+ dag.getVertex(event.getTaskAttemptID().getTaskID().getVertexID()).
+ getTask(event.getTaskAttemptID().getTaskID());
+ TaskAttempt attempt = task.getAttempt(event.getTaskAttemptID());
+ ((EventHandler<TaskAttemptEvent>) attempt).handle(event);
+ }
+ }
+
+ private class VertexEventDispatcher
+ implements EventHandler<VertexEvent> {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void handle(VertexEvent event) {
+ DAG dag = context.getDAG();
+ org.apache.tez.dag.app.dag.Vertex vertex =
+ dag.getVertex(event.getVertexId());
+ ((EventHandler<VertexEvent>) vertex).handle(event);
+ }
+ }
+
+ private class SpeculatorEventDispatcher implements
+ EventHandler<SpeculatorEvent> {
+ private final Configuration conf;
+ private volatile boolean disabled;
+
+ public SpeculatorEventDispatcher(Configuration config) {
+ this.conf = config;
+ }
+
+ @Override
+ public void handle(SpeculatorEvent event) {
+ if (disabled) {
+ return;
+ }
+
+ // FIX handle speculation events properly
+ // if vertex has speculation enabled then handle event else drop it
+ // speculator.handle(event);
+ }
+
+ public void disableSpeculation() {
+ disabled = true;
+ }
+
+ }
+
+ private static void validateInputParam(String value, String param)
+ throws IOException {
+ if (value == null) {
+ String msg = param + " is null";
+ LOG.error(msg);
+ throw new IOException(msg);
+ }
+ }
+
+ public static void main(String[] args) {
+ try {
+ Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
+ // TODO: Deprecated keys?
+ //DeprecatedKeys.init();
+ String containerIdStr =
+ System.getenv(ApplicationConstants.AM_CONTAINER_ID_ENV);
+ String nodeHostString = System.getenv(ApplicationConstants.NM_HOST_ENV);
+ String nodePortString = System.getenv(ApplicationConstants.NM_PORT_ENV);
+ String nodeHttpPortString =
+ System.getenv(ApplicationConstants.NM_HTTP_PORT_ENV);
+ String appSubmitTimeStr =
+ System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
+
+ validateInputParam(containerIdStr,
+ ApplicationConstants.AM_CONTAINER_ID_ENV);
+ validateInputParam(nodeHostString, ApplicationConstants.NM_HOST_ENV);
+ validateInputParam(nodePortString, ApplicationConstants.NM_PORT_ENV);
+ validateInputParam(nodeHttpPortString,
+ ApplicationConstants.NM_HTTP_PORT_ENV);
+ validateInputParam(appSubmitTimeStr,
+ ApplicationConstants.APP_SUBMIT_TIME_ENV);
+
+ ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+ ApplicationAttemptId applicationAttemptId =
+ containerId.getApplicationAttemptId();
+ long appSubmitTime = Long.parseLong(appSubmitTimeStr);
+
+ DAGAppMaster appMaster =
+ new DAGAppMaster(applicationAttemptId, containerId, nodeHostString,
+ Integer.parseInt(nodePortString),
+ Integer.parseInt(nodeHttpPortString), appSubmitTime);
+ ShutdownHookManager.get().addShutdownHook(
+ new DAGAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
+
+ Options opts = getCliOptions();
+ CommandLine cliParser = new GnuParser().parse(opts, args);
+
+ // Default to running mr if nothing specified.
+ // TODO change this once the cleint is ready.
+ String type;
+ DAGConfiguration dagConf = null;
+ if (cliParser.hasOption(OPT_PREDEFINED)) {
+ LOG.info("Running with PreDefined configuration");
+ type = cliParser.getOptionValue(OPT_PREDEFINED, "mr");
+ LOG.info("Running job type: " + type);
+
+ if (type.equals("mr")) {
+ dagConf = (DAGConfiguration)createDAGConfigurationForMR();
+ } else if (type.equals("mrr")) {
+ dagConf = (DAGConfiguration)createDAGConfigurationForMRR();
+ }
+ } else {
+ dagConf = new DAGConfiguration();
+ dagConf.addResource(TezConfiguration.DAG_AM_PLAN_CONFIG_XML);
+ }
+
+ LOG.info("XXXX Running a DAG with "
+ + dagConf.getVertices().length + " vertices ");
+ for (String v : dagConf.getVertices()) {
+ LOG.info("XXXX DAG has vertex " + v);
+ }
+
+ String jobUserName = System
+ .getenv(ApplicationConstants.Environment.USER.name());
+
+ // Do not automatically close FileSystem objects so that in case of
+ // SIGTERM I have a chance to write out the job history. I'll be closing
+ // the objects myself.
+ dagConf.setBoolean("fs.automatic.close", false);
+
+ // TODO TEZ HACK - user name in DAGConfiguration
+ dagConf.set(MRJobConfig.USER_NAME, jobUserName);
+
+ initAndStartAppMaster(appMaster, new YarnConfiguration(dagConf),
+ jobUserName);
+
+ } catch (Throwable t) {
+ LOG.fatal("Error starting MRAppMaster", t);
+ System.exit(1);
+ }
+ }
+
+ private static String OPT_PREDEFINED = "predefined";
+
+ private static Options getCliOptions() {
+ Options opts = new Options();
+ opts.addOption(OPT_PREDEFINED, true,
+ "Whether to run the predefined MR/MRR jobs");
+ return opts;
+ }
+
+ //TODO remove once client is in place
+ private static Path getMRBaseDir() throws IOException {
+ Path basePath = MRApps.getStagingAreaDir(new Configuration(),
+ UserGroupInformation.getCurrentUser().getShortUserName());
+ return new Path(basePath, "dagTest");
+ }
+
+ private static Path getMRRBaseDir() throws IOException {
+ Path basePath = MRApps.getStagingAreaDir(new Configuration(),
+ UserGroupInformation.getCurrentUser().getShortUserName());
+ return new Path(basePath, "mrrTest");
+ }
+
+ private static String getConfFileName(String vertexName) {
+ return MRJobConfig.JOB_CONF_FILE + "_" + vertexName;
+ }
+
+ // TODO remove once client is in place
+ private static Map<String, LocalResource> createLocalResources(
+ Path remoteBaseDir, String[] resourceNames) throws IOException {
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.get(conf);
+
+ Map<String, LocalResource> localResources = new TreeMap<String, LocalResource>();
+
+ for (String resourceName : resourceNames) {
+ Path remoteFile = new Path(remoteBaseDir, resourceName);
+ localResources.put(resourceName, AMContainerHelpers.createLocalResource(
+ fs, remoteFile, LocalResourceType.FILE,
+ LocalResourceVisibility.APPLICATION));
+ LOG.info("Localizing file " + resourceName + " from location "
+ + remoteFile.toString());
+ }
+ return localResources;
+ }
+
+
+ private static String[] getMRLocalRsrcList() {
+ String[] resourceNames = new String[] { MRJobConfig.JOB_JAR,
+ MRJobConfig.JOB_SPLIT, MRJobConfig.JOB_SPLIT_METAINFO,
+ MRJobConfig.JOB_CONF_FILE };
+ return resourceNames;
+ }
+
+ private static String[] getMRRLocalRsrcList() {
+ String[] resourceNames = new String[] { MRJobConfig.JOB_JAR,
+ MRJobConfig.JOB_SPLIT, MRJobConfig.JOB_SPLIT_METAINFO,
+ MRJobConfig.JOB_CONF_FILE, getConfFileName("reduce1"),
+ getConfFileName("reduce2") };
+ return resourceNames;
+ }
+
+ private static Configuration createDAGConfigurationForMRR() throws IOException {
+ org.apache.tez.dag.api.DAG dag = new org.apache.tez.dag.api.DAG();
+ Vertex mapVertex = new Vertex("map",
+ "org.apache.tez.mapreduce.task.InitialTask", 6);
+ Vertex reduce1Vertex = new Vertex("reduce1",
+ "org.apache.tez.mapreduce.task.IntermediateTask", 3);
+ Vertex reduce2Vertex = new Vertex("reduce2",
+ "org.apache.tez.mapreduce.task.FinalTask", 3);
+ Edge edge1 = new Edge(mapVertex, reduce1Vertex, new EdgeProperty());
+ Edge edge2 = new Edge(reduce1Vertex, reduce2Vertex, new EdgeProperty());
+ Map<String, LocalResource> jobRsrcs = createLocalResources(getMRRBaseDir(),
+ getMRRLocalRsrcList());
+
+ Map<String, LocalResource> mapRsrcs = new HashMap<String, LocalResource>();
+ Map<String, LocalResource> reduce1Rsrcs = new HashMap<String, LocalResource>();
+ Map<String, LocalResource> reduce2Rsrcs = new HashMap<String, LocalResource>();
+
+ mapRsrcs.put(MRJobConfig.JOB_SPLIT, jobRsrcs.get(MRJobConfig.JOB_SPLIT));
+ mapRsrcs.put(MRJobConfig.JOB_SPLIT_METAINFO, jobRsrcs.get(MRJobConfig.JOB_SPLIT_METAINFO));
+ mapRsrcs.put(MRJobConfig.JOB_JAR, jobRsrcs.get(MRJobConfig.JOB_JAR));
+ mapRsrcs.put(MRJobConfig.JOB_CONF_FILE, jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
+ mapRsrcs.put(getConfFileName("map"), jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
+
+ reduce1Rsrcs.put(MRJobConfig.JOB_JAR, jobRsrcs.get(MRJobConfig.JOB_JAR));
+ reduce1Rsrcs.put(MRJobConfig.JOB_CONF_FILE, jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
+ reduce1Rsrcs.put(getConfFileName("reduce1"), jobRsrcs.get(getConfFileName("reduce1")));
+
+ reduce2Rsrcs.put(MRJobConfig.JOB_JAR, jobRsrcs.get(MRJobConfig.JOB_JAR));
+ reduce2Rsrcs.put(MRJobConfig.JOB_CONF_FILE, jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
+ reduce2Rsrcs.put(getConfFileName("reduce2"), jobRsrcs.get(getConfFileName("reduce2")));
+
+ Resource mapResource = BuilderUtils.newResource(
+ MRJobConfig.DEFAULT_MAP_MEMORY_MB,
+ MRJobConfig.DEFAULT_MAP_CPU_VCORES);
+ mapVertex.setTaskResource(mapResource);
+ mapVertex.setTaskLocalResources(mapRsrcs);
+ Resource reduceResource = BuilderUtils.newResource(
+ MRJobConfig.DEFAULT_REDUCE_MEMORY_MB,
+ MRJobConfig.DEFAULT_REDUCE_CPU_VCORES);
+ reduce1Vertex.setTaskResource(reduceResource);
+ reduce1Vertex.setTaskLocalResources(reduce1Rsrcs);
+
+ reduce1Vertex.setTaskResource(reduceResource);
+ reduce2Vertex.setTaskLocalResources(reduce2Rsrcs);
+
+ dag.addVertex(mapVertex);
+ dag.addVertex(reduce1Vertex);
+ dag.addVertex(reduce2Vertex);
+ dag.addEdge(edge1);
+ dag.addEdge(edge2);
+ dag.verify();
+ DAGConfiguration dagConf = dag.serializeDag();
+
+ dagConf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
+ dagConf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
+
+ return dagConf;
+ }
+
+ // TODO remove once client is in place
+ private static Configuration createDAGConfigurationForMR() throws IOException {
+ org.apache.tez.dag.api.DAG dag = new org.apache.tez.dag.api.DAG();
+ Vertex mapVertex = new Vertex("map",
+ "org.apache.tez.mapreduce.task.InitialTask", 6);
+ Vertex reduceVertex = new Vertex("reduce",
+ "org.apache.tez.mapreduce.task.FinalTask", 1);
+ Edge edge = new Edge(mapVertex, reduceVertex, new EdgeProperty());
+
+ Map<String, LocalResource> jobRsrcs = createLocalResources(getMRBaseDir(),
+ getMRLocalRsrcList());
+
+ Map<String, LocalResource> mapRsrcs = new HashMap<String, LocalResource>();
+ Map<String, LocalResource> reduceRsrcs = new HashMap<String, LocalResource>();
+
+ mapRsrcs.put(MRJobConfig.JOB_SPLIT, jobRsrcs.get(MRJobConfig.JOB_SPLIT));
+ mapRsrcs.put(MRJobConfig.JOB_SPLIT_METAINFO, jobRsrcs.get(MRJobConfig.JOB_SPLIT_METAINFO));
+ mapRsrcs.put(MRJobConfig.JOB_JAR, jobRsrcs.get(MRJobConfig.JOB_JAR));
+ mapRsrcs.put(MRJobConfig.JOB_CONF_FILE, jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
+ mapRsrcs.put(getConfFileName("map"), jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
+
+ reduceRsrcs.put(MRJobConfig.JOB_JAR, jobRsrcs.get(MRJobConfig.JOB_JAR));
+ reduceRsrcs.put(MRJobConfig.JOB_CONF_FILE, jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
+ reduceRsrcs.put(getConfFileName("reduce"), jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
+
+ Resource mapResource = BuilderUtils.newResource(
+ MRJobConfig.DEFAULT_MAP_MEMORY_MB,
+ MRJobConfig.DEFAULT_MAP_CPU_VCORES);
+ mapVertex.setTaskResource(mapResource);
+ mapVertex.setTaskLocalResources(mapRsrcs);
+ Resource reduceResource = BuilderUtils.newResource(
+ MRJobConfig.DEFAULT_REDUCE_MEMORY_MB,
+ MRJobConfig.DEFAULT_REDUCE_CPU_VCORES);
+ reduceVertex.setTaskResource(reduceResource);
+ reduceVertex.setTaskLocalResources(reduceRsrcs);
+ dag.addVertex(mapVertex);
+ dag.addVertex(reduceVertex);
+ dag.addEdge(edge);
+ dag.verify();
+ DAGConfiguration dagConf = dag.serializeDag();
+
+ dagConf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
+ dagConf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
+
+ return dagConf;
+ }
+
+ // The shutdown hook that runs when a signal is received AND during normal
+ // close of the JVM.
+ static class DAGAppMasterShutdownHook implements Runnable {
+ DAGAppMaster appMaster;
+ DAGAppMasterShutdownHook(DAGAppMaster appMaster) {
+ this.appMaster = appMaster;
+ }
+ public void run() {
+ LOG.info("DAGAppMaster received a signal. Signaling TaskScheduler and "
+ + "JobHistoryEventHandler.");
+ // Notify the JHEH and TaskScheduler that a SIGTERM has been received so
+ // that they don't take too long in shutting down
+
+ // Signal the task scheduler.
+ appMaster.taskSchedulerEventHandler.setSignalled(true);
+
+ // TODO: JobHistory
+ /*
+ if(appMaster.jobHistoryEventHandler != null) {
+ ((JobHistoryEventHandler2) appMaster.jobHistoryEventHandler)
+ .setSignalled(true);
+ }
+ */
+ appMaster.stop();
+ }
+ }
+
+ // TODO XXX Does this really need to be a YarnConfiguration ?
+ protected static void initAndStartAppMaster(final DAGAppMaster appMaster,
+ final YarnConfiguration conf, String jobUserName) throws IOException,
+ InterruptedException {
+ UserGroupInformation.setConfiguration(conf);
+ UserGroupInformation appMasterUgi = UserGroupInformation
+ .createRemoteUser(jobUserName);
+ appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ appMaster.init(conf);
+ appMaster.start();
+ return null;
+ }
+ });
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/HeartbeatHandlerBase.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/HeartbeatHandlerBase.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/HeartbeatHandlerBase.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/HeartbeatHandlerBase.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,170 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+public abstract class HeartbeatHandlerBase<T> extends AbstractService {
+
+
+ protected int timeOut = 5 * 60 * 1000;// 5 mins
+ protected int timeOutCheckInterval = 30 * 1000; // 30 seconds.
+ protected Thread timeOutCheckerThread;
+ private final String name;
+
+ @SuppressWarnings("rawtypes")
+ protected final EventHandler eventHandler;
+ protected final Clock clock;
+ protected final AppContext appContext;
+
+ private ConcurrentMap<T, ReportTime> runningMap;
+ private volatile boolean stopped;
+
+ public HeartbeatHandlerBase(AppContext appContext, int numThreads, String name) {
+ super(name);
+ this.name = name;
+ this.eventHandler = appContext.getEventHandler();
+ this.clock = appContext.getClock();
+ this.appContext = appContext;
+ numThreads = numThreads == 0 ? 1 : numThreads;
+ this.runningMap = new ConcurrentHashMap<T, HeartbeatHandlerBase.ReportTime>(
+ 16, 0.75f, numThreads);
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ super.init(conf);
+ timeOut = getConfiguredTimeout(conf);
+ timeOutCheckInterval = getConfiguredTimeoutCheckInterval(conf);
+ }
+
+ @Override
+ public void start() {
+ timeOutCheckerThread = new Thread(createPingChecker());
+ timeOutCheckerThread.setName(name + " PingChecker");
+ timeOutCheckerThread.start();
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ stopped = true;
+ if (timeOutCheckerThread != null) {
+ timeOutCheckerThread.interrupt();
+ }
+ super.stop();
+ }
+
+ protected Runnable createPingChecker() {
+ return new PingChecker();
+ }
+ protected abstract int getConfiguredTimeout(Configuration conf);
+ protected abstract int getConfiguredTimeoutCheckInterval(Configuration conf);
+
+ public void progressing(T id) {
+ ReportTime time = runningMap.get(id);
+ if (time != null) {
+ time.setLastProgress(clock.getTime());
+ }
+ }
+
+ public void pinged(T id) {
+ ReportTime time = runningMap.get(id);
+ if (time != null) {
+ time.setLastPing(clock.getTime());
+ }
+ }
+
+ public void register(T id) {
+ runningMap.put(id, new ReportTime(clock.getTime()));
+ }
+
+ public void unregister(T id) {
+ runningMap.remove(id);
+ }
+
+
+
+ protected static class ReportTime {
+ private long lastPing;
+ private long lastProgress;
+
+ public ReportTime(long time) {
+ setLastProgress(time);
+ }
+
+ public synchronized void setLastPing(long time) {
+ lastPing = time;
+ }
+
+ public synchronized void setLastProgress(long time) {
+ lastProgress = time;
+ lastPing = time;
+ }
+
+ public synchronized long getLastPing() {
+ return lastPing;
+ }
+
+ public synchronized long getLastProgress() {
+ return lastProgress;
+ }
+ }
+
+ protected abstract boolean hasTimedOut(ReportTime report, long currentTime);
+
+ protected abstract void handleTimeOut(T t);
+
+ private class PingChecker implements Runnable {
+
+ @Override
+ public void run() {
+ while (!stopped && !Thread.currentThread().isInterrupted()) {
+ Iterator<Map.Entry<T, ReportTime>> iterator =
+ runningMap.entrySet().iterator();
+
+ // avoid calculating current time everytime in loop
+ long currentTime = clock.getTime();
+
+ while (iterator.hasNext()) {
+ Map.Entry<T, ReportTime> entry = iterator.next();
+ if(hasTimedOut(entry.getValue(), currentTime)) {
+ // Timed out. Removed from list and send out an event.
+ iterator.remove();
+ handleTimeOut(entry.getKey());
+ }
+ }
+ try {
+ Thread.sleep(timeOutCheckInterval);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ }
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/HeartbeatHandlerBase.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/JobEndNotifier.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/JobEndNotifier.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/JobEndNotifier.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/JobEndNotifier.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,187 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.Proxy;
+import java.net.URL;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.mortbay.log.Log;
+
+/**
+ * <p>This class handles job end notification. Submitters of jobs can choose to
+ * be notified of the end of a job by supplying a URL to which a connection
+ * will be established.
+ * <ul><li> The URL connection is fire and forget by default.</li> <li>
+ * User can specify number of retry attempts and a time interval at which to
+ * attempt retries</li><li>
+ * Cluster administrators can set final parameters to set maximum number of
+ * tries (0 would disable job end notification) and max time interval and a
+ * proxy if needed</li><li>
+ * The URL may contain sentinels which will be replaced by jobId and jobStatus
+ * (eg. SUCCEEDED/KILLED/FAILED) </li> </ul>
+ * </p>
+ */
+public class JobEndNotifier implements Configurable {
+ private static final String JOB_ID = "$jobId";
+ private static final String JOB_STATUS = "$jobStatus";
+
+ private Configuration conf;
+ protected String userUrl;
+ protected String proxyConf;
+ protected int numTries; //Number of tries to attempt notification
+ protected int waitInterval; //Time to wait between retrying notification
+ protected URL urlToNotify; //URL to notify read from the config
+ protected Proxy proxyToUse = Proxy.NO_PROXY; //Proxy to use for notification
+
+ /**
+ * Parse the URL that needs to be notified of the end of the job, along
+ * with the number of retries in case of failure, the amount of time to
+ * wait between retries and proxy settings
+ * @param conf the configuration
+ */
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+
+ numTries = Math.min(
+ conf.getInt(MRJobConfig.MR_JOB_END_RETRY_ATTEMPTS, 0) + 1
+ , conf.getInt(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS, 1)
+ );
+ waitInterval = Math.min(
+ conf.getInt(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, 5)
+ , conf.getInt(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL, 5)
+ );
+ waitInterval = (waitInterval < 0) ? 5 : waitInterval;
+
+ userUrl = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL);
+
+ proxyConf = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY);
+
+ //Configure the proxy to use if its set. It should be set like
+ //proxyType@proxyHostname:port
+ if(proxyConf != null && !proxyConf.equals("") &&
+ proxyConf.lastIndexOf(":") != -1) {
+ int typeIndex = proxyConf.indexOf("@");
+ Proxy.Type proxyType = Proxy.Type.HTTP;
+ if(typeIndex != -1 &&
+ proxyConf.substring(0, typeIndex).compareToIgnoreCase("socks") == 0) {
+ proxyType = Proxy.Type.SOCKS;
+ }
+ String hostname = proxyConf.substring(typeIndex + 1,
+ proxyConf.lastIndexOf(":"));
+ String portConf = proxyConf.substring(proxyConf.lastIndexOf(":") + 1);
+ try {
+ int port = Integer.parseInt(portConf);
+ proxyToUse = new Proxy(proxyType,
+ new InetSocketAddress(hostname, port));
+ Log.info("Job end notification using proxy type \"" + proxyType +
+ "\" hostname \"" + hostname + "\" and port \"" + port + "\"");
+ } catch(NumberFormatException nfe) {
+ Log.warn("Job end notification couldn't parse configured proxy's port "
+ + portConf + ". Not going to use a proxy");
+ }
+ }
+
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ /**
+ * Notify the URL just once. Use best effort. Timeout hard coded to 5
+ * seconds.
+ */
+ protected boolean notifyURLOnce() {
+ boolean success = false;
+ try {
+ Log.info("Job end notification trying " + urlToNotify);
+ HttpURLConnection conn =
+ (HttpURLConnection) urlToNotify.openConnection(proxyToUse);
+ conn.setConnectTimeout(5*1000);
+ conn.setReadTimeout(5*1000);
+ conn.setAllowUserInteraction(false);
+ if(conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
+ Log.warn("Job end notification to " + urlToNotify +" failed with code: "
+ + conn.getResponseCode() + " and message \"" + conn.getResponseMessage()
+ +"\"");
+ }
+ else {
+ success = true;
+ Log.info("Job end notification to " + urlToNotify + " succeeded");
+ }
+ } catch(IOException ioe) {
+ Log.warn("Job end notification to " + urlToNotify + " failed", ioe);
+ }
+ return success;
+ }
+
+ /**
+ * Notify a server of the completion of a submitted job. The user must have
+ * configured MRJobConfig.MR_JOB_END_NOTIFICATION_URL
+ * @param jobReport JobReport used to read JobId and JobStatus
+ * @throws InterruptedException
+ */
+ public void notify(JobReport jobReport)
+ throws InterruptedException {
+ // Do we need job-end notification?
+ if (userUrl == null) {
+ Log.info("Job end notification URL not set, skipping.");
+ return;
+ }
+
+ //Do string replacements for jobId and jobStatus
+ if (userUrl.contains(JOB_ID)) {
+ userUrl = userUrl.replace(JOB_ID, jobReport.getJobId().toString());
+ }
+ if (userUrl.contains(JOB_STATUS)) {
+ userUrl = userUrl.replace(JOB_STATUS, jobReport.getJobState().toString());
+ }
+
+ // Create the URL, ensure sanity
+ try {
+ urlToNotify = new URL(userUrl);
+ } catch (MalformedURLException mue) {
+ Log.warn("Job end notification couldn't parse " + userUrl, mue);
+ return;
+ }
+
+ // Send notification
+ boolean success = false;
+ while (numTries-- > 0 && !success) {
+ Log.info("Job end notification attempts left " + numTries);
+ success = notifyURLOnce();
+ if (!success) {
+ Thread.sleep(waitInterval);
+ }
+ }
+ if (!success) {
+ Log.warn("Job end notification failed to notify : " + urlToNotify);
+ } else {
+ Log.info("Job end notification succeeded for " + jobReport.getJobId());
+ }
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/JobEndNotifier.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/MRClientSecurityInfo.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/MRClientSecurityInfo.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/MRClientSecurityInfo.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/MRClientSecurityInfo.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,58 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app;
+
+import java.lang.annotation.Annotation;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocolPB;
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenInfo;
+import org.apache.hadoop.security.token.TokenSelector;
+import org.apache.hadoop.yarn.security.client.ClientTokenSelector;
+
+public class MRClientSecurityInfo extends SecurityInfo {
+
+ @Override
+ public KerberosInfo getKerberosInfo(Class<?> protocol, Configuration conf) {
+ return null;
+ }
+
+ @Override
+ public TokenInfo getTokenInfo(Class<?> protocol, Configuration conf) {
+ if (!protocol.equals(MRClientProtocolPB.class)) {
+ return null;
+ }
+ return new TokenInfo() {
+
+ @Override
+ public Class<? extends Annotation> annotationType() {
+ return null;
+ }
+
+ @Override
+ public Class<? extends TokenSelector<? extends TokenIdentifier>>
+ value() {
+ return ClientTokenSelector.class;
+ }
+ };
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/MRClientSecurityInfo.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,70 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app;
+
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+/**
+ * This class listens for changes to the state of a Task.
+ */
+public interface TaskAttemptListener {
+
+ InetSocketAddress getAddress();
+
+ void registerRunningContainer(ContainerId containerId);
+// void registerRunningJvm(WrappedJvmID jvmID, ContainerId containerId);
+
+ void registerTaskAttempt(TezTaskAttemptID attemptId, ContainerId containerId);
+
+// void registerTaskAttempt(TezTaskAttemptID attemptId, WrappedJvmID jvmId);
+
+ void unregisterRunningContainer(ContainerId containerId);
+
+// void unregisterRunningJvm(WrappedJvmID jvmID);
+
+ void unregisterTaskAttempt(TezTaskAttemptID attemptID);
+ /**
+ * Register a JVM with the listener. This should be called as soon as a
+ * JVM ID is assigned to a task attempt, before it has been launched.
+ * @param task the task itself for this JVM.
+ * @param jvmID The ID of the JVM .
+ */
+// void registerPendingTask(Task task, WrappedJvmID jvmID);
+
+ /**
+ * Register task attempt. This should be called when the JVM has been
+ * launched.
+ *
+ * @param attemptID
+ * the id of the attempt for this JVM.
+ * @param jvmID the ID of the JVM.
+ */
+// void registerLaunchedTask(TezTaskAttemptID attemptID, WrappedJvmID jvmID);
+
+ /**
+ * Unregister the JVM and the attempt associated with it. This should be
+ * called when the attempt/JVM has finished executing and is being cleaned up.
+ * @param attemptID the ID of the attempt.
+ * @param jvmID the ID of the JVM for that attempt.
+ */
+// void unregister(TezTaskAttemptID attemptID, WrappedJvmID jvmID);
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
------------------------------------------------------------------------------
svn:eol-style = native