You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/04/19 01:54:28 UTC
svn commit: r1469642 [12/36] - in /incubator/tez/branches/TEZ-1: ./
example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/
example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/
tez-common/src/main/ tez-common/src/main/java/ t...
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,1241 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobACLsManager;
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
+import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
+import org.apache.hadoop.yarn.state.SingleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DAGConfiguration;
+import org.apache.tez.dag.api.DAGLocationHint;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.api.client.impl.TezBuilderUtils;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.DAGReport;
+import org.apache.tez.dag.app.dag.DAGScheduler;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
+import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
+import org.apache.tez.dag.app.dag.event.DAGEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventType;
+import org.apache.tez.dag.app.dag.event.DAGFinishEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
+import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
+import org.apache.tez.dag.app.dag.event.VertexEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventType;
+import org.apache.tez.engine.common.security.JobTokenIdentifier;
+import org.apache.tez.engine.common.security.JobTokenSecretManager;
+import org.apache.tez.engine.records.TezDAGID;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezVertexID;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+/** Implementation of Job interface. Maintains the state machines of Job.
+ * The read and write calls use ReadWriteLock for concurrency.
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
+ EventHandler<DAGEvent> {
+
+ private static final Log LOG = LogFactory.getLog(DAGImpl.class);
+
+ //final fields
+ private final ApplicationAttemptId applicationAttemptId;
+ private final TezDAGID dagId;
+ private final Clock clock;
+ private final JobACLsManager aclsManager;
+ private final String username;
+ private final Map<JobACL, AccessControlList> jobACLs;
+
+ // TODO Recovery
+ //private final List<AMInfo> amInfos;
+ private final Lock readLock;
+ private final Lock writeLock;
+ private final String dagName;
+ private final TaskAttemptListener taskAttemptListener;
+ private final TaskHeartbeatHandler taskHeartbeatHandler;
+ private final Object tasksSyncHandle = new Object();
+
+ private DAGScheduler dagScheduler;
+
+ /**
+ * maps nodes to tasks that have run on those nodes
+ */
+ private final HashMap<NodeId, List<TezTaskAttemptID>>
+ nodesToSucceededTaskAttempts = new HashMap<NodeId, List<TezTaskAttemptID>>();
+
+ private final EventHandler eventHandler;
+ // TODO Metrics
+ //private final MRAppMetrics metrics;
+ private final String userName;
+ private final String queueName;
+ private final long appSubmitTime;
+ private final AppContext appContext;
+
+ volatile Map<TezVertexID, Vertex> vertices =
+ new HashMap<TezVertexID, Vertex>();
+ private Map<String, EdgeProperty> edgeProperties =
+ new HashMap<String, EdgeProperty>();
+ private TezCounters dagCounters = new TezCounters();
+ private Object fullCountersLock = new Object();
+ private TezCounters fullCounters = null;
+
+ public DAGConfiguration conf;
+
+ //fields initialized in init
+ private FileSystem fs;
+ private Path remoteJobSubmitDir;
+ public Path remoteJobConfFile;
+
+ private final List<String> diagnostics = new ArrayList<String>();
+
+ private static final DiagnosticsUpdateTransition
+ DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
+ private static final InternalErrorTransition
+ INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
+ private static final CounterUpdateTransition COUNTER_UPDATE_TRANSITION =
+ new CounterUpdateTransition();
+ private static final DAGSchedulerUpdateTransition
+ DAG_SCHEDULER_UPDATE_TRANSITION = new DAGSchedulerUpdateTransition();
+
+ // Location hints for all vertices in DAG
+ private final DAGLocationHint dagLocationHint;
+
+ protected static final
+ StateMachineFactory<DAGImpl, DAGState, DAGEventType, DAGEvent>
+ stateMachineFactory
+ = new StateMachineFactory<DAGImpl, DAGState, DAGEventType, DAGEvent>
+ (DAGState.NEW)
+
+ // Transitions from NEW state
+ .addTransition(DAGState.NEW, DAGState.NEW,
+ DAGEventType.DAG_DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(DAGState.NEW, DAGState.NEW,
+ DAGEventType.DAG_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
+ .addTransition
+ (DAGState.NEW,
+ EnumSet.of(DAGState.INITED, DAGState.FAILED),
+ DAGEventType.DAG_INIT,
+ new InitTransition())
+ .addTransition(DAGState.NEW, DAGState.KILLED,
+ DAGEventType.DAG_KILL,
+ new KillNewJobTransition())
+ .addTransition(DAGState.NEW, DAGState.ERROR,
+ DAGEventType.INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+
+ // Transitions from INITED state
+ .addTransition(DAGState.INITED, DAGState.INITED,
+ DAGEventType.DAG_DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(DAGState.INITED, DAGState.INITED,
+ DAGEventType.DAG_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
+ .addTransition(DAGState.INITED, DAGState.RUNNING,
+ DAGEventType.DAG_START,
+ new StartTransition())
+ .addTransition(DAGState.INITED, DAGState.KILLED,
+ DAGEventType.DAG_KILL,
+ new KillInitedJobTransition())
+ .addTransition(DAGState.INITED, DAGState.ERROR,
+ DAGEventType.INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+
+ // Transitions from RUNNING state
+ .addTransition
+ (DAGState.RUNNING,
+ EnumSet.of(DAGState.RUNNING, DAGState.SUCCEEDED, DAGState.FAILED),
+ DAGEventType.DAG_VERTEX_COMPLETED,
+ new VertexCompletedTransition())
+ .addTransition
+ (DAGState.RUNNING,
+ EnumSet.of(DAGState.RUNNING, DAGState.SUCCEEDED, DAGState.FAILED),
+ DAGEventType.DAG_COMPLETED,
+ new JobNoTasksCompletedTransition())
+ .addTransition(DAGState.RUNNING, DAGState.KILL_WAIT,
+ DAGEventType.DAG_KILL, new KillVerticesTransition())
+ .addTransition(DAGState.RUNNING, DAGState.RUNNING,
+ DAGEventType.DAG_DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(DAGState.RUNNING, DAGState.RUNNING,
+ DAGEventType.DAG_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
+ .addTransition(DAGState.RUNNING, DAGState.RUNNING,
+ DAGEventType.DAG_SCHEDULER_UPDATE,
+ DAG_SCHEDULER_UPDATE_TRANSITION)
+ .addTransition(
+ DAGState.RUNNING,
+ DAGState.ERROR, DAGEventType.INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+
+ // Transitions from KILL_WAIT state.
+ .addTransition
+ (DAGState.KILL_WAIT,
+ EnumSet.of(DAGState.KILL_WAIT, DAGState.KILLED),
+ DAGEventType.DAG_VERTEX_COMPLETED,
+ new KillWaitTaskCompletedTransition())
+ .addTransition(DAGState.KILL_WAIT, DAGState.KILL_WAIT,
+ DAGEventType.DAG_DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(DAGState.KILL_WAIT, DAGState.KILL_WAIT,
+ DAGEventType.DAG_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
+ .addTransition(
+ DAGState.KILL_WAIT,
+ DAGState.ERROR, DAGEventType.INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+
+ // Transitions from SUCCEEDED state
+ .addTransition(DAGState.SUCCEEDED, DAGState.SUCCEEDED,
+ DAGEventType.DAG_DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(DAGState.SUCCEEDED, DAGState.SUCCEEDED,
+ DAGEventType.DAG_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
+ .addTransition(
+ DAGState.SUCCEEDED,
+ DAGState.ERROR, DAGEventType.INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+ // Ignore-able events
+ .addTransition(DAGState.SUCCEEDED, DAGState.SUCCEEDED,
+ EnumSet.of(DAGEventType.DAG_KILL,
+ DAGEventType.DAG_VERTEX_COMPLETED))
+
+ // Transitions from FAILED state
+ .addTransition(DAGState.FAILED, DAGState.FAILED,
+ DAGEventType.DAG_DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(DAGState.FAILED, DAGState.FAILED,
+ DAGEventType.DAG_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
+ .addTransition(
+ DAGState.FAILED,
+ DAGState.ERROR, DAGEventType.INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+ // Ignore-able events
+ .addTransition(DAGState.FAILED, DAGState.FAILED,
+ EnumSet.of(DAGEventType.DAG_KILL,
+ DAGEventType.DAG_VERTEX_COMPLETED))
+
+ // Transitions from KILLED state
+ .addTransition(DAGState.KILLED, DAGState.KILLED,
+ DAGEventType.DAG_DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(DAGState.KILLED, DAGState.KILLED,
+ DAGEventType.DAG_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
+ .addTransition(
+ DAGState.KILLED,
+ DAGState.ERROR, DAGEventType.INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+ // Ignore-able events
+ .addTransition(DAGState.KILLED, DAGState.KILLED,
+ EnumSet.of(DAGEventType.DAG_KILL,
+ DAGEventType.DAG_VERTEX_COMPLETED))
+
+ // No transitions from INTERNAL_ERROR state. Ignore all.
+ .addTransition(
+ DAGState.ERROR,
+ DAGState.ERROR,
+ EnumSet.of(DAGEventType.DAG_INIT,
+ DAGEventType.DAG_KILL,
+ DAGEventType.DAG_VERTEX_COMPLETED,
+ DAGEventType.DAG_DIAGNOSTIC_UPDATE,
+ DAGEventType.INTERNAL_ERROR))
+ .addTransition(DAGState.ERROR, DAGState.ERROR,
+ DAGEventType.DAG_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
+ // create the topology tables
+ .installTopology();
+
+ private final StateMachine<DAGState, DAGEventType, DAGEvent> stateMachine;
+
+ //changing fields while the job is running
+ private int numVertices;
+ private int numCompletedVertices = 0;
+ private int numFailedVertices = 0;
+ private int numKilledVertices = 0;
+ private boolean isUber = false;
+
+ private Credentials fsTokens;
+ private Token<JobTokenIdentifier> jobToken;
+ private JobTokenSecretManager jobTokenSecretManager;
+
+ private long startTime;
+ private long finishTime;
+
+ public DAGImpl(TezDAGID dagId, ApplicationAttemptId applicationAttemptId,
+ Configuration conf, EventHandler eventHandler,
+ TaskAttemptListener taskAttemptListener,
+ JobTokenSecretManager jobTokenSecretManager,
+ Credentials fsTokenCredentials, Clock clock,
+ // TODO Metrics
+ //MRAppMetrics metrics,
+ String userName,
+ long appSubmitTime,
+ // TODO Recovery
+ //List<AMInfo> amInfos,
+ TaskHeartbeatHandler thh,
+ AppContext appContext,
+ DAGLocationHint dagLocationHint) {
+ this.applicationAttemptId = applicationAttemptId;
+ this.dagId = dagId;
+ this.dagName = conf.get(JobContext.JOB_NAME, "<missing job name>");
+ this.conf = new DAGConfiguration(conf);
+ // TODO Metrics
+ //this.metrics = metrics;
+ this.clock = clock;
+ // TODO Recovery
+ //this.amInfos = amInfos;
+ this.appContext = appContext;
+ this.userName = userName;
+ this.queueName = conf.get(MRJobConfig.QUEUE_NAME, "default");
+ this.appSubmitTime = appSubmitTime;
+
+ this.taskAttemptListener = taskAttemptListener;
+ this.taskHeartbeatHandler = thh;
+ this.eventHandler = eventHandler;
+ ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ this.readLock = readWriteLock.readLock();
+ this.writeLock = readWriteLock.writeLock();
+
+ this.fsTokens = fsTokenCredentials;
+ this.jobTokenSecretManager = jobTokenSecretManager;
+
+ this.aclsManager = new JobACLsManager(conf);
+ this.username = System.getProperty("user.name");
+ this.jobACLs = aclsManager.constructJobACLs(conf);
+
+ this.dagLocationHint = dagLocationHint;
+
+ // This "this leak" is okay because the retained pointer is in an
+ // instance variable.
+ stateMachine = stateMachineFactory.make(this);
+ }
+
+ protected StateMachine<DAGState, DAGEventType, DAGEvent> getStateMachine() {
+ return stateMachine;
+ }
+
+ @Override
+ public TezDAGID getID() {
+ return dagId;
+ }
+
+ @Override
+ public DAGConfiguration getConf() {
+ return conf;
+ }
+
+ EventHandler getEventHandler() {
+ return this.eventHandler;
+ }
+
+ @Override
+ public boolean checkAccess(UserGroupInformation callerUGI,
+ JobACL jobOperation) {
+ AccessControlList jobACL = jobACLs.get(jobOperation);
+ if (jobACL == null) {
+ return true;
+ }
+ return aclsManager.checkAccess(callerUGI, jobOperation, username, jobACL);
+ }
+
+ @Override
+ public Vertex getVertex(TezVertexID vertexID) {
+ readLock.lock();
+ try {
+ return vertices.get(vertexID);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public boolean isUber() {
+ return isUber;
+ }
+
+ @Override
+ public TezCounters getAllCounters() {
+
+ readLock.lock();
+
+ try {
+ DAGState state = getInternalState();
+ if (state == DAGState.ERROR || state == DAGState.FAILED
+ || state == DAGState.KILLED || state == DAGState.SUCCEEDED) {
+ this.mayBeConstructFinalFullCounters();
+ return fullCounters;
+ }
+
+ TezCounters counters = new TezCounters();
+ counters.incrAllCounters(dagCounters);
+ return incrTaskCounters(counters, vertices.values());
+
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public static TezCounters incrTaskCounters(
+ TezCounters counters, Collection<Vertex> vertices) {
+ for (Vertex vertex : vertices) {
+ counters.incrAllCounters(vertex.getAllCounters());
+ }
+ return counters;
+ }
+
+ @Override
+ public List<String> getDiagnostics() {
+ readLock.lock();
+ try {
+ return diagnostics;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public DAGReport getReport() {
+ readLock.lock();
+ try {
+ DAGState state = getState();
+
+ // jobFile can be null if the job is not yet inited.
+ String jobFile =
+ remoteJobConfFile == null ? "" : remoteJobConfFile.toString();
+
+ StringBuilder diagsb = new StringBuilder();
+ for (String s : getDiagnostics()) {
+ diagsb.append(s).append("\n");
+ }
+
+ if (getInternalState() == DAGState.NEW) {
+ /*
+ return MRBuilderUtils.newJobReport(dagId, dagName, username, state,
+ appSubmitTime, startTime, finishTime, setupProgress, 0.0f, 0.0f,
+ cleanupProgress, jobFile, amInfos, isUber, diagsb.toString());
+ */
+ // TODO
+ return TezBuilderUtils.newDAGReport();
+ }
+
+ // TODO
+ return TezBuilderUtils.newDAGReport();
+ /*
+ return MRBuilderUtils.newJobReport(dagId, dagName, username, state,
+ appSubmitTime, startTime, finishTime, setupProgress,
+ this.mapProgress, this.reduceProgress,
+ cleanupProgress, jobFile, amInfos, isUber, diagsb.toString());
+ */
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public float getProgress() {
+ this.readLock.lock();
+ try {
+ float progress = 0.0f;
+ for (Vertex v : getVertices().values()) {
+ progress += v.getProgress();
+ }
+ return progress / getTotalVertices();
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
+ public Map<TezVertexID, Vertex> getVertices() {
+ synchronized (tasksSyncHandle) {
+ return Collections.unmodifiableMap(vertices);
+ }
+ }
+
+ @Override
+ public DAGState getState() {
+ readLock.lock();
+ try {
+ return getStateMachine().getCurrentState();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ protected void startRootVertices() {
+ for (Vertex v : vertices.values()) {
+ if (v.getInputVerticesCount() == 0) {
+ eventHandler.handle(new VertexEvent(v.getVertexId(),
+ VertexEventType.V_START));
+ }
+ }
+ }
+
+ protected void initializeVertices() {
+ for (Vertex v : vertices.values()) {
+ eventHandler.handle(new VertexEvent(v.getVertexId(),
+ VertexEventType.V_INIT));
+ }
+ }
+
+ @Override
+ /**
+ * The only entry point to change the DAG.
+ */
+ public void handle(DAGEvent event) {
+ LOG.info("DEBUG: Processing DAGEvent " + event.getDAGId() + " of type "
+ + event.getType() + " while in state " + getInternalState()
+ + ". Event: " + event);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing DAGEvent " + event.getDAGId() + " of type "
+ + event.getType() + " while in state " + getInternalState());
+ }
+ try {
+ writeLock.lock();
+ DAGState oldState = getInternalState();
+ try {
+ getStateMachine().doTransition(event.getType(), event);
+ } catch (InvalidStateTransitonException e) {
+ LOG.error("Can't handle this event at current state", e);
+ addDiagnostic("Invalid event " + event.getType() +
+ " on Job " + this.dagId);
+ eventHandler.handle(new DAGEvent(this.dagId,
+ DAGEventType.INTERNAL_ERROR));
+ }
+ //notify the eventhandler of state change
+ if (oldState != getInternalState()) {
+ LOG.info(dagId + " transitioned from " + oldState + " to "
+ + getInternalState());
+ }
+ }
+
+ finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Private
+ public DAGState getInternalState() {
+ readLock.lock();
+ try {
+ return getStateMachine().getCurrentState();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ void setFinishTime() {
+ finishTime = clock.getTime();
+ }
+
+ void logJobHistorySubmittedEvent() {
+ // TODO JobHistory
+ /*
+ JobSubmittedEvent jse = new JobSubmittedEvent(job.dagId,
+ job.conf.get(MRJobConfig.JOB_NAME, "test"),
+ job.conf.get(MRJobConfig.USER_NAME, "mapred"),
+ job.appSubmitTime,
+ job.remoteJobConfFile.toString(),
+ job.jobACLs, job.queueName);
+ this.eventHandler.handle(new JobHistoryEvent(job.dagId, jse));
+ */
+ }
+
+ void logJobHistoryFinishedEvent() {
+ this.setFinishTime();
+ // TODO JobHistory
+ /*
+ JobFinishedEvent jfe = createJobFinishedEvent(this);
+ LOG.info("Calling handler for JobFinishedEvent ");
+ this.getEventHandler().handle(new JobHistoryEvent(this.dagId, jfe));
+ */
+ }
+
+ void logJobHistoryInitedEvent() {
+ // TODO JobHistory
+ /*
+ JobInitedEvent jie =
+ new JobInitedEvent(job.oldJobId,
+ job.startTime,
+ job.numMapTasks, job.numReduceTasks,
+ job.getState().toString(),
+ job.isUber()); //Will transition to state running. Currently in INITED
+ job.eventHandler.handle(new JobHistoryEvent(job.dagId, jie));
+ JobInfoChangeEvent jice = new JobInfoChangeEvent(job.oldJobId,
+ job.appSubmitTime, job.startTime);
+ job.eventHandler.handle(new JobHistoryEvent(job.dagId, jice));
+ */
+ }
+
+ void logJobHistoryUnsuccesfulEvent() {
+ // TODO JobHistory
+ /*
+ JobUnsuccessfulCompletionEvent unsuccessfulJobEvent =
+ new JobUnsuccessfulCompletionEvent(oldJobId,
+ finishTime,
+ succeededMapTaskCount,
+ succeededReduceTaskCount,
+ finalState.toString());
+ eventHandler.handle(new JobHistoryEvent(dagId, unsuccessfulJobEvent));
+ */
+ }
+
+ void logJobHistoryUnsuccesfulEventForNewJob() {
+ // TODO JobHistory
+ /*
+ JobUnsuccessfulCompletionEvent failedEvent =
+ new JobUnsuccessfulCompletionEvent(job.oldJobId,
+ job.finishTime, 0, 0,
+ DAGState.KILLED.toString());
+ job.eventHandler.handle(new JobHistoryEvent(job.dagId, failedEvent));
+ */
+ }
+
+ /**
+ * Create the default file System for this job.
+ * @param conf the conf object
+ * @return the default filesystem for this job
+ * @throws IOException
+ */
+ protected FileSystem getFileSystem(Configuration conf) throws IOException {
+ return FileSystem.get(conf);
+ }
+
+ static DAGState checkJobCompleteSuccess(DAGImpl job) {
+ // check for Job success
+ if (job.numCompletedVertices == job.vertices.size()) {
+ // TODO: Maybe set cleanup progress. Otherwise job progress will
+ // always stay at 0.95 when reported from an AM.
+ // TODO DAG committer
+ job.logJobHistoryFinishedEvent();
+ return job.finished(DAGState.SUCCEEDED);
+ }
+ return null;
+ }
+
+ DAGState finished(DAGState finalState) {
+ // TODO Metrics
+ /*
+ if (getInternalState() == DAGState.RUNNING) {
+ metrics.endRunningJob(this);
+ }
+ */
+ if (finishTime == 0) setFinishTime();
+ eventHandler.handle(new DAGFinishEvent(dagId));
+
+ // TODO Metrics
+ /*
+ switch (finalState) {
+ case KILLED:
+ metrics.killedJob(this);
+ break;
+ case FAILED:
+ metrics.failedJob(this);
+ break;
+ case SUCCEEDED:
+ metrics.completedJob(this);
+ }
+ */
+ return finalState;
+ }
+
+ @Override
+ public String getUserName() {
+ return userName;
+ }
+
+ @Override
+ public String getQueueName() {
+ return queueName;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.v2.app2.job.Job#getConfFile()
+ */
+ @Override
+ public Path getConfFile() {
+ return remoteJobConfFile;
+ }
+
+ @Override
+ public String getName() {
+ return dagName;
+ }
+
+ @Override
+ public int getTotalVertices() {
+ readLock.lock();
+ try {
+ return numVertices;
+ } finally {
+ readLock.unlock();
+ }
+
+ }
+
+ @Override
+ public int getCompletedVertices() {
+ readLock.lock();
+ try {
+ return numCompletedVertices;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.v2.app2.job.Job#getJobACLs()
+ */
+ @Override
+ public Map<JobACL, AccessControlList> getJobACLs() {
+ return Collections.unmodifiableMap(jobACLs);
+ }
+
+ // TODO Recovery
+ /*
+ @Override
+ public List<AMInfo> getAMInfos() {
+ return amInfos;
+ }
+ */
+
+ /**
+ * ChainMapper and ChainReducer must execute in parallel, so they're not
+ * compatible with uberization/LocalContainerLauncher (100% sequential).
+ */
+ private boolean isChainJob(Configuration conf) {
+ boolean isChainJob = false;
+ try {
+ String mapClassName = conf.get(MRJobConfig.MAP_CLASS_ATTR);
+ if (mapClassName != null) {
+ Class<?> mapClass = Class.forName(mapClassName);
+ if (ChainMapper.class.isAssignableFrom(mapClass))
+ isChainJob = true;
+ }
+ } catch (ClassNotFoundException cnfe) {
+ // don't care; assume it's not derived from ChainMapper
+ }
+ try {
+ String reduceClassName = conf.get(MRJobConfig.REDUCE_CLASS_ATTR);
+ if (reduceClassName != null) {
+ Class<?> reduceClass = Class.forName(reduceClassName);
+ if (ChainReducer.class.isAssignableFrom(reduceClass))
+ isChainJob = true;
+ }
+ } catch (ClassNotFoundException cnfe) {
+ // don't care; assume it's not derived from ChainReducer
+ }
+ return isChainJob;
+ }
+
+ /*
+ private int getBlockSize() {
+ String inputClassName = conf.get(MRJobConfig.INPUT_FORMAT_CLASS_ATTR);
+ if (inputClassName != null) {
+ Class<?> inputClass - Class.forName(inputClassName);
+ if (FileInputFormat<K, V>)
+ }
+ }
+ */
+
+ public static class InitTransition
+ implements MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
+
+ /**
+ * Note that this transition method is called directly (and synchronously)
+ * by MRAppMaster's init() method (i.e., no RPC, no thread-switching;
+ * just plain sequential call within AM context), so we can trigger
+ * modifications in AM state from here (at least, if AM is written that
+ * way; MR version is).
+ */
+ @Override
+ public DAGState transition(DAGImpl job, DAGEvent event) {
+ // TODO Metrics
+ //job.metrics.submittedJob(job);
+ //job.metrics.preparingJob(job);
+ try {
+ setup(job);
+ job.fs = job.getFileSystem(job.conf);
+
+ //log to job history
+ job.logJobHistorySubmittedEvent();
+
+ checkTaskLimits();
+
+ // TODO: Committer
+ /*
+ if (job.newApiCommitter) {
+ job.jobContext = new JobContextImpl(job.conf,
+ job.oldJobId);
+ } else {
+ job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(
+ job.conf, job.oldJobId);
+ }
+
+ // do the setup
+ job.committer.setupJob(job.jobContext);
+ job.setupProgress = 1.0f;
+ */
+
+ // create the vertices
+ String[] vertexNames = job.getConf().getVertices();
+ job.numVertices = vertexNames.length;
+ for (int i=0; i < job.numVertices; ++i) {
+ VertexImpl v = createVertex(job, vertexNames[i], i);
+ job.addVertex(v);
+ }
+
+ job.edgeProperties = job.getConf().getEdgeProperties();
+
+ // setup the dag
+ for (Vertex v : job.vertices.values()) {
+ parseVertexEdges(job, v);
+ }
+
+ job.dagScheduler = new DAGSchedulerNaturalOrder(job, job.eventHandler);
+ //job.dagScheduler = new DAGSchedulerMRR(job, job.eventHandler);
+
+ // TODO Metrics
+ //job.metrics.endPreparingJob(job);
+ return DAGState.INITED;
+
+ } catch (IOException e) {
+ LOG.warn("Job init failed", e);
+ job.addDiagnostic("Job init failed : "
+ + StringUtils.stringifyException(e));
+ job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
+ // TODO Metrics
+ //job.metrics.endPreparingJob(job);
+ return job.finished(DAGState.FAILED);
+ }
+ }
+
+ private VertexImpl createVertex(DAGImpl dag, String vertexName, int vId) {
+ TezVertexID vertexId = TezBuilderUtils.newVertexID(dag.getID(), vId);
+ return new VertexImpl(
+ vertexId, vertexName, dag.conf,
+ dag.eventHandler, dag.taskAttemptListener,
+ dag.jobTokenSecretManager, dag.jobToken, dag.fsTokens, dag.clock,
+ dag.taskHeartbeatHandler, dag.appContext,
+ dag.dagLocationHint.getVertexLocationHint(vertexName));
+ }
+
+ private void parseVertexEdges(DAGImpl dag, Vertex vertex) {
+ String[] inVerticesNames =
+ dag.getConf().getInputVertices(vertex.getName());
+ List<String> inEdges =
+ dag.getConf().getInputEdgeIds(vertex.getName());
+ Map<Vertex, EdgeProperty> inVertices =
+ new HashMap<Vertex, EdgeProperty>();
+ for (int i=0; i < inVerticesNames.length; ++i) {
+ String vertexName = inVerticesNames[i];
+ inVertices.put(dag.getVertex(vertexName),
+ dag.edgeProperties.get(inEdges.get(i)));
+ }
+ vertex.setInputVertices(inVertices);
+
+ String[] outVerticesNames =
+ dag.getConf().getOutputVertices(vertex.getName());
+ List<String> outEdges =
+ dag.getConf().getOutputEdgeIds(vertex.getName());
+ Map<Vertex, EdgeProperty> outVertices =
+ new HashMap<Vertex, EdgeProperty>();
+ for (int i=0; i < outVerticesNames.length; ++i) {
+ String vertexName = outVerticesNames[i];
+ outVertices.put(dag.getVertex(vertexName),
+ dag.edgeProperties.get(outEdges.get(i)));
+ }
+ vertex.setOutputVertices(outVertices);
+ }
+
+ protected void setup(DAGImpl job) throws IOException {
+
+ String dagIdString = job.dagId.toString();
+
+ dagIdString.replace("application", "job");
+
+ String user =
+ UserGroupInformation.getCurrentUser().getShortUserName();
+ Path path = MRApps.getStagingAreaDir(job.conf, user);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("startJobs: parent=" + path + " child=" + dagIdString);
+ }
+
+ job.remoteJobSubmitDir =
+ FileSystem.get(job.conf).makeQualified(
+ new Path(path, dagIdString));
+ job.remoteJobConfFile =
+ new Path(job.remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);
+
+ // Prepare the TaskAttemptListener server for authentication of Containers
+ // TaskAttemptListener gets the information via jobTokenSecretManager.
+ JobTokenIdentifier identifier =
+ new JobTokenIdentifier(new Text(dagIdString));
+ job.jobToken =
+ new Token<JobTokenIdentifier>(identifier, job.jobTokenSecretManager);
+ job.jobToken.setService(identifier.getJobId());
+ // Add it to the jobTokenSecretManager so that TaskAttemptListener server
+ // can authenticate containers(tasks)
+ job.jobTokenSecretManager.addTokenForJob(dagIdString, job.jobToken);
+ LOG.info("Adding job token for " + dagIdString
+ + " to jobTokenSecretManager");
+
+ // Upload the jobTokens onto the remote FS so that ContainerManager can
+ // localize it to be used by the Containers(tasks)
+ Credentials tokenStorage = new Credentials();
+ TokenCache.setJobToken(job.jobToken, tokenStorage);
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ tokenStorage.addAll(job.fsTokens);
+ }
+ }
+
+ /**
+ * If the number of tasks are greater than the configured value
+ * throw an exception that will fail job initialization
+ */
+ private void checkTaskLimits() {
+ // no code, for now
+ }
+ } // end of InitTransition
+
+ public static class StartTransition
+ implements SingleArcTransition<DAGImpl, DAGEvent> {
+ /**
+ * This transition executes in the event-dispatcher thread, though it's
+ * triggered in MRAppMaster's startJobs() method.
+ */
+ @Override
+ public void transition(DAGImpl job, DAGEvent event) {
+ job.startTime = job.clock.getTime();
+ job.initializeVertices();
+ job.logJobHistoryInitedEvent();
+ // TODO Metrics
+ //job.metrics.runningJob(job);
+
+ // If we have no tasks, just transition to job completed
+ if (job.numVertices == 0) {
+ job.eventHandler.handle(
+ new DAGEvent(job.dagId, DAGEventType.DAG_COMPLETED));
+ }
+
+ // Start all vertices with no incoming edges when job starts
+ job.startRootVertices();
+ }
+ }
+
+ private void abortJob(
+ org.apache.hadoop.mapreduce.JobStatus.State finalState) {
+ // TODO: Committer
+ /*
+ try {
+ committer.abortJob(jobContext, finalState);
+ } catch (IOException e) {
+ LOG.warn("Could not abortJob", e);
+ }
+ if (finishTime == 0) setFinishTime();
+
+ cleanupProgress = 1.0f;
+ */
+
+ logJobHistoryUnsuccesfulEvent();
+ }
+
+ // JobFinishedEvent triggers the move of the history file out of the staging
+ // area. May need to create a new event type for this if JobFinished should
+ // not be generated for KilledJobs, etc.
+ /*
+ private static JobFinishedEvent createJobFinishedEvent(DAGImpl job) {
+
+ job.mayBeConstructFinalFullCounters();
+
+ JobFinishedEvent jfe = new JobFinishedEvent(
+ job.oldJobId, job.finishTime,
+ job.succeededMapTaskCount, job.succeededReduceTaskCount,
+ job.failedMapTaskCount, job.failedReduceTaskCount,
+ job.finalMapCounters,
+ job.finalReduceCounters,
+ job.fullCounters);
+ return jfe;
+ }
+ */
+
+ Map<String, Vertex> vertexMap = new HashMap<String, Vertex>();
+ void addVertex(Vertex v) {
+ vertices.put(v.getVertexId(), v);
+ vertexMap.put(v.getName(), v);
+ }
+
+ Vertex getVertex(String vertexName) {
+ return vertexMap.get(vertexName);
+ }
+
+ private void mayBeConstructFinalFullCounters() {
+ // Calculating full-counters. This should happen only once for the job.
+ synchronized (this.fullCountersLock) {
+ if (this.fullCounters != null) {
+ // Already constructed. Just return.
+ return;
+ }
+ this.constructFinalFullcounters();
+ }
+ }
+
+ @Private
+ public void constructFinalFullcounters() {
+ this.fullCounters = new TezCounters();
+ this.fullCounters.incrAllCounters(dagCounters);
+ for (Vertex v : this.vertices.values()) {
+ this.fullCounters.incrAllCounters(v.getAllCounters());
+ }
+ }
+
+ // Task-start has been moved out of InitTransition, so this arc simply
+ // hardcodes 0 for both map and reduce finished tasks.
+ private static class KillNewJobTransition
+ implements SingleArcTransition<DAGImpl, DAGEvent> {
+ @Override
+ public void transition(DAGImpl job, DAGEvent event) {
+ job.setFinishTime();
+ job.logJobHistoryUnsuccesfulEventForNewJob();
+ job.finished(DAGState.KILLED);
+ }
+ }
+
+ private static class KillInitedJobTransition
+ implements SingleArcTransition<DAGImpl, DAGEvent> {
+ @Override
+ public void transition(DAGImpl job, DAGEvent event) {
+ job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
+ job.addDiagnostic("Job received Kill in INITED state.");
+ job.finished(DAGState.KILLED);
+ }
+ }
+
+ private static class KillVerticesTransition
+ implements SingleArcTransition<DAGImpl, DAGEvent> {
+ @Override
+ public void transition(DAGImpl job, DAGEvent event) {
+ job.addDiagnostic("Job received Kill while in RUNNING state.");
+ for (Vertex v : job.vertices.values()) {
+ job.eventHandler.handle(
+ new VertexEvent(v.getVertexId(), VertexEventType.V_KILL)
+ );
+ }
+ // TODO Metrics
+ //job.metrics.endRunningJob(job);
+ }
+ }
+
+ private static class VertexCompletedTransition implements
+ MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
+
+ @Override
+ public DAGState transition(DAGImpl job, DAGEvent event) {
+ job.numCompletedVertices++;
+ LOG.info("Num completed vertices: " + job.numCompletedVertices);
+ DAGEventVertexCompleted vertexEvent = (DAGEventVertexCompleted) event;
+ Vertex vertex = job.vertices.get(vertexEvent.getVertexId());
+ if (vertexEvent.getVertexState() == VertexState.SUCCEEDED) {
+ vertexSucceeded(job, vertex);
+ } else if (vertexEvent.getVertexState() == VertexState.FAILED) {
+ vertexFailed(job, vertex);
+ } else if (vertexEvent.getVertexState() == VertexState.KILLED) {
+ vertexKilled(job, vertex);
+ }
+
+ job.dagScheduler.vertexCompleted(vertex);
+
+ return checkJobForCompletion(job);
+ }
+
+ protected DAGState checkJobForCompletion(DAGImpl job) {
+ //check for Job failure
+ if (job.numFailedVertices > 0) {
+ job.setFinishTime();
+
+ String diagnosticMsg = "Job failed as vertices failed. " +
+ " failedVertices:" + job.numFailedVertices +
+ " killedVertices:" + job.numKilledVertices;
+ LOG.info(diagnosticMsg);
+ job.addDiagnostic(diagnosticMsg);
+ job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
+ return job.finished(DAGState.FAILED);
+ }
+
+ DAGState jobCompleteSuccess = DAGImpl.checkJobCompleteSuccess(job);
+ if (jobCompleteSuccess != null) {
+ return jobCompleteSuccess;
+ }
+
+ //return the current state, Job not finished yet
+ return job.getInternalState();
+ }
+
+ private void vertexSucceeded(DAGImpl job, Vertex vertex) {
+ job.numCompletedVertices++;
+ // TODO: Metrics
+ //job.metrics.completedTask(task);
+ }
+
+ private void vertexFailed(DAGImpl job, Vertex vertex) {
+ job.numFailedVertices++;
+ job.addDiagnostic("Vertex failed " + vertex.getVertexId());
+ // TODO: Metrics
+ //job.metrics.failedTask(task);
+ }
+
+ private void vertexKilled(DAGImpl job, Vertex vertex) {
+ job.numKilledVertices++;
+ job.addDiagnostic("Vertex killed " + vertex.getVertexId());
+ // TODO: Metrics
+ //job.metrics.killedTask(task);
+ }
+ }
+
+ // Transition class for handling jobs with no tasks
+ static class JobNoTasksCompletedTransition implements
+ MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
+
+ @Override
+ public DAGState transition(DAGImpl job, DAGEvent event) {
+ DAGState jobCompleteSuccess = DAGImpl.checkJobCompleteSuccess(job);
+ if (jobCompleteSuccess != null) {
+ return jobCompleteSuccess;
+ }
+
+ // Return the current state, Job not finished yet
+ return job.getInternalState();
+ }
+ }
+
+ private static class KillWaitTaskCompletedTransition extends
+ VertexCompletedTransition {
+ @Override
+ protected DAGState checkJobForCompletion(DAGImpl job) {
+ if (job.numCompletedVertices == job.vertices.size()) {
+ job.setFinishTime();
+ job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
+ return job.finished(DAGState.KILLED);
+ }
+ //return the current state, Job not finished yet
+ return job.getInternalState();
+ }
+ }
+
+ private void addDiagnostic(String diag) {
+ diagnostics.add(diag);
+ }
+
+ private static class DiagnosticsUpdateTransition implements
+ SingleArcTransition<DAGImpl, DAGEvent> {
+ @Override
+ public void transition(DAGImpl job, DAGEvent event) {
+ job.addDiagnostic(((DAGEventDiagnosticsUpdate) event)
+ .getDiagnosticUpdate());
+ }
+ }
+
+ private static class CounterUpdateTransition implements
+ SingleArcTransition<DAGImpl, DAGEvent> {
+ @Override
+ public void transition(DAGImpl job, DAGEvent event) {
+ DAGEventCounterUpdate jce = (DAGEventCounterUpdate) event;
+ for (DAGEventCounterUpdate.CounterIncrementalUpdate ci : jce
+ .getCounterUpdates()) {
+ job.dagCounters.findCounter(ci.getCounterKey()).increment(
+ ci.getIncrementValue());
+ }
+ }
+ }
+
+ private static class DAGSchedulerUpdateTransition implements
+ SingleArcTransition<DAGImpl, DAGEvent> {
+ @Override
+ public void transition(DAGImpl dag, DAGEvent event) {
+ DAGEventSchedulerUpdate sEvent = (DAGEventSchedulerUpdate) event;
+ switch(sEvent.getUpdateType()) {
+ case TA_SCHEDULE:
+ dag.dagScheduler.scheduleTask(sEvent);
+ break;
+ default:
+ LOG.warn("Unknown DAGEventSchedulerUpdate:" + sEvent.getUpdateType());
+ }
+ }
+}
+
+ private static class InternalErrorTransition implements
+ SingleArcTransition<DAGImpl, DAGEvent> {
+ @Override
+ public void transition(DAGImpl job, DAGEvent event) {
+ //TODO Is this JH event required.
+ job.setFinishTime();
+ job.logJobHistoryUnsuccesfulEventForNewJob();
+ job.finished(DAGState.ERROR);
+ }
+ }
+
+ @Override
+ public VertexLocationHint getVertexLocationHint(TezVertexID vertexId) {
+ return dagLocationHint.getVertexLocationHint(
+ getVertex(vertexId).getName());
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerMRR.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerMRR.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerMRR.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerMRR.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,146 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.impl;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.DAGScheduler;
+import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
+
+@SuppressWarnings("rawtypes")
+public class DAGSchedulerMRR implements DAGScheduler {
+
+ private static final Log LOG = LogFactory.getLog(DAGSchedulerMRR.class);
+
+ private final DAG dag;
+ private final EventHandler handler;
+ private Vertex currentPartitioner = null;
+ private Vertex currentShuffler = null;
+ private int currentShufflerDepth = 0;
+
+ public DAGSchedulerMRR(DAG dag, EventHandler dispatcher) {
+ this.dag = dag;
+ this.handler = dispatcher;
+ }
+
+ @Override
+ public void vertexCompleted(Vertex vertex) {
+ if(currentPartitioner!= null) {
+ if(vertex != currentPartitioner) {
+ String message = vertex.getVertexId() + " finished. Expecting "
+ + currentPartitioner + " to finish.";
+ LOG.fatal(message);
+ throw new TezException(message);
+ }
+ LOG.info("Current partitioner " + currentPartitioner.getVertexId()
+ + " is completed. "
+ + (currentShuffler!=null?currentShuffler.getVertexId():"null")
+ + " is new partitioner");
+ currentPartitioner = currentShuffler;
+ currentShuffler = null;
+ } else {
+ if(vertex != currentShuffler) {
+ String message = vertex.getVertexId() + " finished. Expecting "
+ + currentShuffler.getVertexId() + " to finish";
+ LOG.fatal(message);
+ throw new TezException(message);
+ }
+ }
+ }
+
+ @Override
+ public void scheduleTask(DAGEventSchedulerUpdate event) {
+ TaskAttempt attempt = event.getAttempt();
+ Vertex vertex = dag.getVertex(attempt.getID().getTaskID().getVertexID());
+ int vertexDistanceFromRoot = vertex.getDistanceFromRoot();
+ if(vertexDistanceFromRoot == 0) {
+ currentPartitioner = vertex;
+ LOG.info(vertex.getVertexId() + " is first partitioner");
+ }
+ if(vertexDistanceFromRoot > currentShufflerDepth) {
+ if(currentShuffler == null) {
+ currentShuffler = vertex;
+ currentShufflerDepth = vertexDistanceFromRoot;
+ LOG.info(currentShuffler.getVertexId() + " is new shuffler at depth " +
+ currentShufflerDepth);
+ } else {
+ if(currentShufflerDepth+1 == vertexDistanceFromRoot &&
+ currentPartitioner == null
+ ) {
+ currentPartitioner = currentShuffler;
+ currentShuffler = vertex;
+ currentShufflerDepth = vertexDistanceFromRoot;
+ LOG.info("Shuffler " + currentPartitioner.getVertexId() +
+ " becomes partitioner as new shuffler " +
+ currentShuffler.getVertexId() + " has started at depth " +
+ currentShufflerDepth);
+ } else {
+ String message = vertex.getVertexId()
+ + " has scheduled tasks at depth " + vertexDistanceFromRoot
+ + " greater than depth " + currentShufflerDepth
+ + " of current shuffler " + currentShuffler.getVertexId()
+ + ". Unexpected.";
+ LOG.fatal(message);
+ throw new TezException(message);
+ }
+ }
+ }
+
+ // natural priority. Handles failures and retries.
+ int priority = (vertexDistanceFromRoot + 1) * 3;
+
+ if(currentShuffler == vertex) {
+ if(currentPartitioner != null) {
+ // special priority for current reducers while current partitioners are
+ // still running. Schedule at priority one higher than natural priority
+ // of previous vertex.
+ priority -= 4; // this == (partitionerDepth+1)*3 - 1
+ }
+ } else {
+ if(attempt.getIsRescheduled()) {
+ // higher priority for retries of failed attempts. Only makes sense in
+ // case the task is faulty and we want to retry before other tasks in
+ // the same vertex to fail fast. But looks like this may happen also for
+ // other cases like retry because outputs were unavailable.
+ priority -= 2;
+ }
+ }
+
+ LOG.info("Scheduling " + attempt.getID() +
+ " with depth " + vertexDistanceFromRoot +
+ " at priority " + priority);
+
+ TaskAttemptEventSchedule attemptEvent =
+ new TaskAttemptEventSchedule(attempt.getID(),
+ BuilderUtils.newPriority(priority));
+ sendEvent(attemptEvent);
+ }
+
+ @SuppressWarnings("unchecked")
+ void sendEvent(TaskAttemptEventSchedule event) {
+ handler.handle(event);
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerMRR.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,80 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.impl;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.DAGScheduler;
+import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
+
+@SuppressWarnings("rawtypes")
+public class DAGSchedulerNaturalOrder implements DAGScheduler {
+
+ private static final Log LOG =
+ LogFactory.getLog(DAGSchedulerNaturalOrder.class);
+
+ private final DAG dag;
+ private final EventHandler handler;
+
+ public DAGSchedulerNaturalOrder(DAG dag, EventHandler dispatcher) {
+ this.dag = dag;
+ this.handler = dispatcher;
+ }
+
+ @Override
+ public void vertexCompleted(Vertex vertex) {
+ }
+
+ @Override
+ public void scheduleTask(DAGEventSchedulerUpdate event) {
+ TaskAttempt attempt = event.getAttempt();
+ Vertex vertex = dag.getVertex(attempt.getID().getTaskID().getVertexID());
+ int vertexDistanceFromRoot = vertex.getDistanceFromRoot();
+
+ // natural priority. Handles failures and retries.
+ int priority = (vertexDistanceFromRoot + 1) * 2;
+
+ if(attempt.getIsRescheduled()) {
+ // higher priority for retries of failed attempts. Only makes sense in
+ // case the task is faulty and we want to retry before other tasks in
+ // the same vertex to fail fast. But looks like this may happen also for
+ // other cases like retry because outputs were unavailable.
+ priority--;
+ }
+
+ LOG.info("Scheduling " + attempt.getID() + " at priority " + priority);
+
+ TaskAttemptEventSchedule attemptEvent =
+ new TaskAttemptEventSchedule(attempt.getID(),
+ BuilderUtils.newPriority(priority));
+ sendEvent(attemptEvent);
+ }
+
+ @SuppressWarnings("unchecked")
+ void sendEvent(TaskAttemptEventSchedule event) {
+ handler.handle(event);
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexScheduler.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexScheduler.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexScheduler.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexScheduler.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,48 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.impl;
+
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.VertexScheduler;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+/**
+ * Starts all tasks immediately on vertex start
+ */
+public class ImmediateStartVertexScheduler implements VertexScheduler {
+ final Vertex managedVertex;
+
+ ImmediateStartVertexScheduler(Vertex vertex) {
+ managedVertex = vertex;
+ }
+
+ @Override
+ public void onVertexStarted() {
+ managedVertex.scheduleTasks(managedVertex.getTasks().keySet());
+ }
+
+ @Override
+ public void onVertexCompleted() {
+ }
+
+ @Override
+ public void onSourceTaskCompleted(TezTaskAttemptID attemptId) {
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexScheduler.java
------------------------------------------------------------------------------
svn:eol-style = native