You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/04/19 01:54:28 UTC
svn commit: r1469642 [10/36] - in /incubator/tez/branches/TEZ-1: ./
example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/
example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/
tez-common/src/main/ tez-common/src/main/java/ t...
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,556 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.tez.common.ContainerTask;
+import org.apache.tez.common.TezTaskStatus;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputConsumable;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.tez.dag.app.rm.container.AMContainerImpl;
+import org.apache.tez.dag.app.rm.container.AMContainerTask;
+import org.apache.tez.dag.app.security.authorize.MRAMPolicyProvider;
+import org.apache.tez.engine.common.security.JobTokenSecretManager;
+import org.apache.tez.engine.records.OutputContext;
+import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;
+import org.apache.tez.mapreduce.hadoop.ContainerContext;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.TezTaskUmbilicalProtocol;
+import org.apache.tez.mapreduce.hadoop.records.ProceedToCompletionResponse;
+
+@SuppressWarnings("unchecked")
+public class TaskAttemptListenerImpTezDag extends AbstractService implements
+ TezTaskUmbilicalProtocol, TaskAttemptListener {
+
+ private static final ContainerTask TASK_FOR_INVALID_JVM = new ContainerTask(
+ null, true);
+
+ private static ProceedToCompletionResponse COMPLETION_RESPONSE_NO_WAIT =
+ new ProceedToCompletionResponse(true, true);
+
+ private static final Log LOG = LogFactory
+ .getLog(TaskAttemptListenerImpTezDag.class);
+
+ private final AppContext context;
+
+ protected final TaskHeartbeatHandler taskHeartbeatHandler;
+ protected final ContainerHeartbeatHandler containerHeartbeatHandler;
+ private final JobTokenSecretManager jobTokenSecretManager;
+ private InetSocketAddress address;
+ private Server server;
+
+
+ // TODO Use this to figure out whether an incoming ping is valid.
+ private ConcurrentMap<TezTaskAttemptID, ContainerId> attemptToContainerIdMap =
+ new ConcurrentHashMap<TezTaskAttemptID, ContainerId>();
+
+ private Set<ContainerId> registeredContainers = Collections
+ .newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>());
+
+ public TaskAttemptListenerImpTezDag(AppContext context,
+ TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh,
+ JobTokenSecretManager jobTokenSecretManager) {
+ super(TaskAttemptListenerImpTezDag.class.getName());
+ this.context = context;
+ this.jobTokenSecretManager = jobTokenSecretManager;
+ this.taskHeartbeatHandler = thh;
+ this.containerHeartbeatHandler = chh;
+ }
+
+ @Override
+ public void start() {
+ startRpcServer();
+ super.start();
+ }
+
+ protected void startRpcServer() {
+ Configuration conf = getConfig();
+ try {
+ server = new RPC.Builder(conf)
+ .setProtocol(TezTaskUmbilicalProtocol.class)
+ .setBindAddress("0.0.0.0")
+ .setPort(0)
+ .setInstance(this)
+ .setNumHandlers(
+ conf.getInt(MRJobConfig.MR_AM_TASK_LISTENER_THREAD_COUNT,
+ MRJobConfig.DEFAULT_MR_AM_TASK_LISTENER_THREAD_COUNT))
+ .setSecretManager(jobTokenSecretManager).build();
+
+ // Enable service authorization?
+ if (conf.getBoolean(
+ CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
+ false)) {
+ refreshServiceAcls(conf, new MRAMPolicyProvider());
+ }
+
+ server.start();
+ this.address = NetUtils.getConnectAddress(server);
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+ }
+
+ void refreshServiceAcls(Configuration configuration,
+ PolicyProvider policyProvider) {
+ this.server.refreshServiceAcl(configuration, policyProvider);
+ }
+
+ @Override
+ public void stop() {
+ stopRpcServer();
+ super.stop();
+ }
+
+ protected void stopRpcServer() {
+ if (server != null) {
+ server.stop();
+ }
+ }
+
+ public InetSocketAddress getAddress() {
+ return address;
+ }
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ return versionID;
+ }
+
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ return ProtocolSignature.getProtocolSignature(this, protocol,
+ clientVersion, clientMethodsHash);
+ }
+
+ @Override
+ public TezTaskDependencyCompletionEventsUpdate getDependentTasksCompletionEvents(
+ int fromEventIdx, int maxEvents,
+ TezTaskAttemptID taskAttemptID) {
+
+ LOG.info("Dependency Completion Events request from " + taskAttemptID
+ + ". fromEventID " + fromEventIdx + " maxEvents " + maxEvents);
+
+ // TODO: shouldReset is never used. See TT. Ask for Removal.
+ boolean shouldReset = false;
+ TezDependentTaskCompletionEvent[] events =
+ context.getDAG().
+ getVertex(taskAttemptID.getTaskID().getVertexID()).
+ getTaskAttemptCompletionEvents(fromEventIdx, maxEvents);
+
+ taskHeartbeatHandler.progressing(taskAttemptID);
+ pingContainerHeartbeatHandler(taskAttemptID);
+
+ // No filters for now. Only required events stored in a vertex.
+
+ return new TezTaskDependencyCompletionEventsUpdate(events,shouldReset);
+ }
+
+ @Override
+ public ContainerTask getTask(ContainerContext containerContext)
+ throws IOException {
+
+ ContainerTask task = null;
+
+ if (containerContext == null || containerContext.getContainerId() == null) {
+ LOG.info("Invalid task request with an empty containerContext or containerId");
+ task = TASK_FOR_INVALID_JVM;
+ } else {
+ ContainerId containerId = containerContext.getContainerId();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Container with id: " + containerId + " asked for a task");
+ }
+ if (!registeredContainers.contains(containerId)) {
+ LOG.info("Container with id: " + containerId
+ + " is invalid and will be killed");
+ task = TASK_FOR_INVALID_JVM;
+ } else {
+ pingContainerHeartbeatHandler(containerId);
+ AMContainerTask taskContext = pullTaskAttemptContext(containerId);
+ if (taskContext.shouldDie()) {
+ LOG.info("No more tasks for container with id : " + containerId
+ + ". Asking it to die");
+ task = TASK_FOR_INVALID_JVM; // i.e. ask the child to die.
+ } else {
+ if (taskContext.getTask() == null) {
+ LOG.info("No task currently assigned to Container with id: "
+ + containerId);
+ } else {
+ task = new ContainerTask(taskContext.getTask(), false);
+ context.getEventHandler().handle(
+ new TaskAttemptEventStartedRemotely(taskContext.getTask()
+ .getTaskAttemptId(), containerId, context
+ .getApplicationACLs(), context.getAllContainers()
+ .get(containerId).getShufflePort()));
+ LOG.info("Container with id: " + containerId + " given task: "
+ + taskContext.getTask().getTaskAttemptId());
+ }
+ }
+ }
+ }
+ LOG.info("DEBUG: getTask returning task: " + task);
+ return task;
+ }
+
+ @Override
+ public boolean statusUpdate(TezTaskAttemptID taskAttemptId,
+ TezTaskStatus taskStatus) throws IOException, InterruptedException {
+ LOG.info("DEBUG: " + "Status update from: " + taskAttemptId);
+ taskHeartbeatHandler.progressing(taskAttemptId);
+ pingContainerHeartbeatHandler(taskAttemptId);
+ TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();
+ taskAttemptStatus.id = taskAttemptId;
+ // Task sends the updated progress to the TT.
+ taskAttemptStatus.progress = taskStatus.getProgress();
+ LOG.info("DEBUG: " + "Progress of TaskAttempt " + taskAttemptId + " is : "
+ + taskStatus.getProgress());
+
+ // Task sends the updated state-string to the TT.
+ taskAttemptStatus.stateString = taskStatus.getStateString();
+
+ // Set the output-size when map-task finishes. Set by the task itself.
+ // outputSize is never used.
+ taskAttemptStatus.outputSize = taskStatus.getLocalOutputSize();
+
+ // TODO Phase
+ // Task sends the updated phase to the TT.
+ //taskAttemptStatus.phase = MRxTypeConverters.toYarn(taskStatus.getPhase());
+
+ // TODO MRXAM3 - AVoid the 10 layers of convresion.
+ // Counters are updated by the task. Convert counters into new format as
+ // that is the primary storage format inside the AM to avoid multiple
+ // conversions and unnecessary heap usage.
+ taskAttemptStatus.counters = taskStatus.getCounters();
+
+
+ // Map Finish time set by the task (map only)
+ // TODO CLEANMRXAM - maybe differentiate between map / reduce / types
+ if (taskStatus.getMapFinishTime() != 0) {
+ taskAttemptStatus.mapFinishTime = taskStatus.getMapFinishTime();
+ }
+
+ // Shuffle Finish time set by the task (reduce only).
+ if (taskStatus.getShuffleFinishTime() != 0) {
+ taskAttemptStatus.shuffleFinishTime = taskStatus.getShuffleFinishTime();
+ }
+
+ // Sort finish time set by the task (reduce only).
+ if (taskStatus.getSortFinishTime() != 0) {
+ taskAttemptStatus.sortFinishTime = taskStatus.getSortFinishTime();
+ }
+
+ // Not Setting the task state. Used by speculation - will be set in
+ // TaskAttemptImpl
+ // taskAttemptStatus.taskState =
+ // TypeConverter.toYarn(taskStatus.getRunState());
+
+ // set the fetch failures
+ if (taskStatus.getFailedDependencies() != null
+ && taskStatus.getFailedDependencies().size() > 0) {
+ LOG.warn("Failed dependencies are not handled at the moment." +
+ " The job is likely to fail / hang");
+ taskAttemptStatus.fetchFailedMaps = new ArrayList<TezTaskAttemptID>();
+ for (TezTaskAttemptID failedAttemptId : taskStatus
+ .getFailedDependencies()) {
+ taskAttemptStatus.fetchFailedMaps.add(failedAttemptId);
+ }
+ }
+
+ // Task sends the information about the nextRecordRange to the TT
+
+ // TODO: The following are not needed here, but needed to be set somewhere
+ // inside AppMaster.
+ // taskStatus.getRunState(); // Set by the TT/JT. Transform into a state
+ // TODO
+ // taskStatus.getStartTime(); // Used to be set by the TaskTracker. This
+ // should be set by getTask().
+ // taskStatus.getFinishTime(); // Used to be set by TT/JT. Should be set
+ // when task finishes
+ // // This was used by TT to do counter updates only once every minute. So
+ // this
+ // // isn't ever changed by the Task itself.
+ // taskStatus.getIncludeCounters();
+
+ context.getEventHandler().handle(
+ new TaskAttemptEventStatusUpdate(taskAttemptStatus.id,
+ taskAttemptStatus));
+ return true;
+ }
+
+ @Override
+ public void reportDiagnosticInfo(TezTaskAttemptID taskAttemptId, String trace)
+ throws IOException {
+ LOG.info("Diagnostics report from " + taskAttemptId.toString() + ": "
+ + trace);
+
+ taskHeartbeatHandler.progressing(taskAttemptId);
+ pingContainerHeartbeatHandler(taskAttemptId);
+
+ // This is mainly used for cases where we want to propagate exception traces
+ // of tasks that fail.
+
+ // This call exists as a hadoop mapreduce legacy wherein all changes in
+ // counters/progress/phase/output-size are reported through statusUpdate()
+ // call but not diagnosticInformation.
+ context.getEventHandler().handle(
+ new TaskAttemptEventDiagnosticsUpdate(taskAttemptId, trace));
+
+ }
+
+ @Override
+ public boolean ping(TezTaskAttemptID taskAttemptId) throws IOException {
+ LOG.info("Ping from " + taskAttemptId.toString());
+ taskHeartbeatHandler.pinged(taskAttemptId);
+ pingContainerHeartbeatHandler(taskAttemptId);
+ return true;
+ }
+
+ @Override
+ public void done(TezTaskAttemptID taskAttemptId) throws IOException {
+ LOG.info("Done acknowledgement from " + taskAttemptId.toString());
+
+ taskHeartbeatHandler.progressing(taskAttemptId);
+ pingContainerHeartbeatHandler(taskAttemptId);
+
+ context.getEventHandler().handle(
+ new TaskAttemptEvent(taskAttemptId, TaskAttemptEventType.TA_DONE));
+
+ }
+
+ /**
+ * TaskAttempt is reporting that it is in commit_pending and it is waiting for
+ * the commit Response
+ *
+ * <br/>
+ * Commit it a two-phased protocol. First the attempt informs the
+ * ApplicationMaster that it is
+ * {@link #commitPending(TaskAttemptID, TaskStatus)}. Then it repeatedly polls
+ * the ApplicationMaster whether it {@link #canCommit(TaskAttemptID)} This is
+ * a legacy from the centralized commit protocol handling by the JobTracker.
+ */
+ @Override
+ public void commitPending(TezTaskAttemptID taskAttemptId, TezTaskStatus taskStatus)
+ throws IOException, InterruptedException {
+ LOG.info("Commit-pending state update from " + taskAttemptId.toString());
+ // An attempt is asking if it can commit its output. This can be decided
+ // only by the task which is managing the multiple attempts. So redirect the
+ // request there.
+ taskHeartbeatHandler.progressing(taskAttemptId);
+ pingContainerHeartbeatHandler(taskAttemptId);
+ //Ignorable TaskStatus? - since a task will send a LastStatusUpdate
+ context.getEventHandler().handle(
+ new TaskAttemptEvent(
+ taskAttemptId,
+ TaskAttemptEventType.TA_COMMIT_PENDING)
+ );
+ }
+
+ /**
+ * Child checking whether it can commit.
+ *
+ * <br/>
+ * Commit is a two-phased protocol. First the attempt informs the
+ * ApplicationMaster that it is
+ * {@link #commitPending(TaskAttemptID, TaskStatus)}. Then it repeatedly polls
+ * the ApplicationMaster whether it {@link #canCommit(TaskAttemptID)} This is
+ * a legacy from the centralized commit protocol handling by the JobTracker.
+ */
+ @Override
+ public boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException {
+ LOG.info("Commit go/no-go request from " + taskAttemptId.toString());
+ // An attempt is asking if it can commit its output. This can be decided
+ // only by the task which is managing the multiple attempts. So redirect the
+ // request there.
+ taskHeartbeatHandler.progressing(taskAttemptId);
+ pingContainerHeartbeatHandler(taskAttemptId);
+
+ DAG job = context.getDAG();
+ Task task =
+ job.getVertex(taskAttemptId.getTaskID().getVertexID()).
+ getTask(taskAttemptId.getTaskID());
+ return task.canCommit(taskAttemptId);
+ }
+
+ @Override
+ public void shuffleError(TezTaskAttemptID taskId, String message)
+ throws IOException {
+ // TODO: This isn't really used in any MR code. Ask for removal.
+ }
+
+ @Override
+ public void fsError(TezTaskAttemptID taskAttemptId, String message)
+ throws IOException {
+ // This happens only in Child.
+ LOG.fatal("Task: " + taskAttemptId + " - failed due to FSError: " + message);
+ reportDiagnosticInfo(taskAttemptId, "FSError: " + message);
+
+ context.getEventHandler().handle(
+ new TaskAttemptEvent(taskAttemptId, TaskAttemptEventType.TA_FAILED));
+ }
+
+ @Override
+ public void fatalError(TezTaskAttemptID taskAttemptId, String message)
+ throws IOException {
+ // This happens only in Child and in the Task.
+ LOG.fatal("Task: " + taskAttemptId + " - exited : " + message);
+ reportDiagnosticInfo(taskAttemptId, "Error: " + message);
+
+ context.getEventHandler().handle(
+ new TaskAttemptEvent(taskAttemptId, TaskAttemptEventType.TA_FAILED));
+ }
+
+ @Override
+ public void outputReady(TezTaskAttemptID taskAttemptId,
+ OutputContext outputContext) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("AttemptId: " + taskAttemptId + " reported output context: "
+ + outputContext);
+ }
+ context.getEventHandler().handle(
+ new TaskAttemptEventOutputConsumable(taskAttemptId, outputContext));
+ }
+
+ @Override
+ public ProceedToCompletionResponse
+ proceedToCompletion(TezTaskAttemptID taskAttemptId) throws IOException {
+
+ // The async nature of the processing combined with the 1 second interval
+ // between polls (MRTask) implies tasks end up wasting upto 1 second doing
+ // nothing. Similarly for CA_COMMIT.
+
+ DAG job = context.getDAG();
+ Task task =
+ job.getVertex(taskAttemptId.getTaskID().getVertexID()).
+ getTask(taskAttemptId.getTaskID());
+
+ // TODO In-Memory Shuffle
+ /*
+ if (task.needsWaitAfterOutputConsumable()) {
+ TezTaskAttemptID outputReadyAttempt = task.getOutputConsumableAttempt();
+ if (outputReadyAttempt != null) {
+ if (!outputReadyAttempt.equals(taskAttemptId)) {
+ LOG.info("Telling taksAttemptId: "
+ + taskAttemptId
+ + " to die, since the outputReady atempt for this task is different: "
+ + outputReadyAttempt);
+ return new ProceedToCompletionResponse(true, true);
+ }
+ }
+ boolean reducesDone = true;
+ for (Task rTask : job.getTasks(TaskType.REDUCE).values()) {
+ if (rTask.getState() != TaskState.SUCCEEDED) {
+ // TODO EVENTUALLY - could let the map tasks exit after reduces are
+ // done with the shuffle phase, instead of waiting for the reduces to
+ // complete.
+ reducesDone = false;
+ break;
+ }
+ }
+ if (reducesDone) {
+ return new ProceedToCompletionResponse(false, true);
+ } else {
+ return new ProceedToCompletionResponse(false, false);
+ }
+ } else {
+ return COMPLETION_RESPONSE_NO_WAIT;
+ }
+ */
+ return COMPLETION_RESPONSE_NO_WAIT;
+ }
+
+
+
+ // TODO EVENTUALLY remove all mrv2 ids.
+ @Override
+ public void unregisterTaskAttempt(TezTaskAttemptID attemptId) {
+ attemptToContainerIdMap.remove(attemptId);
+ }
+
+ public AMContainerTask pullTaskAttemptContext(ContainerId containerId) {
+ AMContainerImpl container = (AMContainerImpl) context.getAllContainers()
+ .get(containerId);
+ return container.pullTaskContext();
+ }
+
+ @Override
+ public void registerRunningContainer(ContainerId containerId) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ContainerId: " + containerId
+ + " registered with TaskAttemptListener");
+ }
+ registeredContainers.add(containerId);
+ }
+
+ @Override
+ public void registerTaskAttempt(TezTaskAttemptID attemptId,
+ ContainerId containerId) {
+ attemptToContainerIdMap.put(attemptId, containerId);
+ }
+
+ @Override
+ public void unregisterRunningContainer(ContainerId containerId) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Unregistering Container from TaskAttemptListener: "
+ + containerId);
+ }
+ registeredContainers.remove(containerId);
+ }
+
+ private void pingContainerHeartbeatHandler(ContainerId containerId) {
+ containerHeartbeatHandler.pinged(containerId);
+ }
+
+ private void pingContainerHeartbeatHandler(TezTaskAttemptID taskAttemptId) {
+ ContainerId containerId = attemptToContainerIdMap.get(taskAttemptId);
+ if (containerId != null) {
+ containerHeartbeatHandler.pinged(containerId);
+ } else {
+ LOG.warn("Handling communication from attempt: " + taskAttemptId
+ + ", ContainerId not known for this attempt");
+ }
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,68 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+
+/**
+ * This class keeps track of tasks that have already been launched. It
+ * determines if a task is alive and running or marks a task as dead if it does
+ * not hear from it for a long time.
+ *
+ */
+@SuppressWarnings({"unchecked"})
+public class TaskHeartbeatHandler extends HeartbeatHandlerBase<TezTaskAttemptID> {
+
+ public TaskHeartbeatHandler(AppContext context, int numThreads) {
+ super(context, numThreads, "TaskHeartbeatHandler");
+ }
+
+ @Override
+ protected int getConfiguredTimeout(Configuration conf) {
+ return conf.getInt(MRJobConfig.TASK_TIMEOUT, 5 * 60 * 1000);
+ }
+
+ @Override
+ protected int getConfiguredTimeoutCheckInterval(Configuration conf) {
+ return conf.getInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 30 * 1000);
+ }
+
+ @Override
+ protected boolean hasTimedOut(
+ HeartbeatHandlerBase.ReportTime report,
+ long currentTime) {
+ return (timeOut > 0) && (currentTime > report.getLastPing() + timeOut);
+ }
+
+ @Override
+ protected void handleTimeOut(TezTaskAttemptID attemptId) {
+ eventHandler.handle(new TaskAttemptEventDiagnosticsUpdate(attemptId,
+ "AttemptID:" + attemptId.toString()
+ + " Timed out after " + timeOut / 1000 + " secs"));
+ eventHandler.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_TIMED_OUT));
+ }
+}
\ No newline at end of file
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/client/ClientService.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/client/ClientService.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/client/ClientService.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/client/ClientService.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,29 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.client;
+
+import java.net.InetSocketAddress;
+
+// TODONOTES - RPC service for clients
+public interface ClientService {
+
+ InetSocketAddress getBindAddress();
+
+ int getHttpPort();
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/client/ClientService.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/client/impl/TezClientService.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/client/impl/TezClientService.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/client/impl/TezClientService.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/client/impl/TezClientService.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,43 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.client.impl;
+
+import java.net.InetSocketAddress;
+
+import org.apache.tez.dag.app.client.ClientService;
+
+public class TezClientService implements ClientService {
+
+ // TODO remove dummy client service
+ private final InetSocketAddress dummySocketAddress =
+ new InetSocketAddress(0);
+
+ @Override
+ public InetSocketAddress getBindAddress() {
+ // TODO Auto-generated method stub
+ return dummySocketAddress;
+ }
+
+ @Override
+ public int getHttpPort() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/client/impl/TezClientService.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/client/package-info.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/client/package-info.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/client/package-info.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/client/package-info.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+package org.apache.tez.dag.app.client;
+import org.apache.hadoop.classification.InterfaceAudience;
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/client/package-info.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,83 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.engine.records.TezDAGID;
+import org.apache.tez.engine.records.TezVertexID;
+
+/**
+ * Main interface to interact with the job. Provides only getters.
+ */
+public interface DAG {
+
+ TezDAGID getID();
+ String getName();
+ DAGState getState();
+ DAGReport getReport();
+
+ /**
+ * Get all the counters of this DAG. This includes job-counters aggregated
+ * together with the counters of each task. This creates a clone of the
+ * Counters, so use this judiciously.
+ * @return job-counters and aggregate task-counters
+ */
+ TezCounters getAllCounters();
+
+ Map<TezVertexID,Vertex> getVertices();
+ Vertex getVertex(TezVertexID vertexId);
+ List<String> getDiagnostics();
+ int getTotalVertices();
+ int getCompletedVertices();
+ float getProgress();
+ boolean isUber();
+ String getUserName();
+ String getQueueName();
+
+ Configuration getConf();
+
+ /**
+ * @return a path to where the config file for this job is located.
+ */
+ Path getConfFile();
+
+ /**
+ * @return the ACLs for this job for each type of JobACL given.
+ */
+ Map<JobACL, AccessControlList> getJobACLs();
+
+ /**
+ * @return information for MR AppMasters (previously failed and current)
+ */
+ // TODO Recovery
+ //List<AMInfo> getAMInfos();
+
+ boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation);
+
+ VertexLocationHint getVertexLocationHint(TezVertexID vertexId);
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGReport.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGReport.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGReport.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGReport.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,60 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.tez.dag.app.dag;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.api.records.AMInfo;
+
+public abstract class DAGReport {
+ public abstract ApplicationId getDAGId();
+ public abstract DAGState getDAGState();
+ public abstract float getMapProgress();
+ public abstract float getReduceProgress();
+ public abstract float getCleanupProgress();
+ public abstract float getSetupProgress();
+ public abstract long getSubmitTime();
+ public abstract long getStartTime();
+ public abstract long getFinishTime();
+ public abstract String getUser();
+ public abstract String getJobName();
+ public abstract String getTrackingUrl();
+ public abstract String getDiagnostics();
+ public abstract String getJobFile();
+ public abstract List<AMInfo> getAMInfos();
+ public abstract boolean isUber();
+
+ public abstract void setDAGId(ApplicationId dagId);
+ public abstract void setJobState(DAGState dagState);
+ public abstract void setMapProgress(float progress);
+ public abstract void setReduceProgress(float progress);
+ public abstract void setCleanupProgress(float progress);
+ public abstract void setSetupProgress(float progress);
+ public abstract void setSubmitTime(long submitTime);
+ public abstract void setStartTime(long startTime);
+ public abstract void setFinishTime(long finishTime);
+ public abstract void setUser(String user);
+ public abstract void setJobName(String jobName);
+ public abstract void setTrackingUrl(String trackingUrl);
+ public abstract void setDiagnostics(String diagnostics);
+ public abstract void setJobFile(String jobFile);
+ public abstract void setAMInfos(List<AMInfo> amInfos);
+ public abstract void setIsUber(boolean isUber);
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGReport.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,29 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag;
+
+import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
+
+public interface DAGScheduler {
+
+ public void vertexCompleted(Vertex vertex);
+
+ public void scheduleTask(DAGEventSchedulerUpdate event);
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGState.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGState.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGState.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGState.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,29 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.tez.dag.app.dag;
+
+public enum DAGState {
+ NEW,
+ INITED,
+ RUNNING,
+ SUCCEEDED,
+ FAILED,
+ KILLED,
+ ERROR,
+ KILL_WAIT
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGState.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/JobStateInternal.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/JobStateInternal.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/JobStateInternal.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/JobStateInternal.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag;
+
+public enum JobStateInternal {
+ NEW,
+ INITED,
+ RUNNING,
+ SUCCEEDED,
+ FAILED,
+ KILL_WAIT,
+ KILLED,
+ ERROR
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/JobStateInternal.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,74 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag;
+
+import java.util.Map;
+
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.records.TaskReport;
+import org.apache.tez.dag.api.records.TaskState;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskID;
+
+/**
+ * Read only view of Task.
+ */
+public interface Task {
+ TezTaskID getTaskId();
+ TaskReport getReport();
+ TaskState getState();
+ TezCounters getCounters();
+ float getProgress();
+ Map<TezTaskAttemptID, TaskAttempt> getAttempts();
+ TaskAttempt getAttempt(TezTaskAttemptID attemptID);
+
+ /** Has Task reached the final state or not.
+ */
+ boolean isFinished();
+
+ /**
+ * Can the output of the taskAttempt be committed. Note that once the task
+ * gives a go for a commit, further canCommit requests from any other attempts
+ * should return false.
+ *
+ * @param taskAttemptID
+ * @return whether the attempt's output can be committed or not.
+ */
+ boolean canCommit(TezTaskAttemptID taskAttemptID);
+
+
+ /**
+ * Do the running tasks need to stick around after they're done processing and
+ * generating output. Required for tasks which have custom output handling
+ * such as in-memory shuffle.
+ *
+ * @return whether the task needs to stick around.
+ */
+ boolean needsWaitAfterOutputConsumable();
+
+ /**
+ * Get the attempt id which has reported in as output ready. null if not
+ * applicable.
+ *
+ * @return
+ */
+ TezTaskAttemptID getOutputConsumableAttempt();
+
+ public Vertex getVertex();
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,111 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.records.TaskAttemptReport;
+import org.apache.tez.dag.api.records.TaskAttemptState;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+/**
+ * Read only view of TaskAttempt.
+ */
+public interface TaskAttempt {
+ TezTaskAttemptID getID();
+ TaskAttemptReport getReport();
+ List<String> getDiagnostics();
+ TezCounters getCounters();
+ float getProgress();
+ TaskAttemptState getState();
+
+ /**
+ * Has attempt reached the final state or not.
+ * @return true if it has finished, else false
+ */
+ boolean isFinished();
+
+ /**
+ * @return the container ID if a container is assigned, otherwise null.
+ */
+ ContainerId getAssignedContainerID();
+
+ /**
+ * @return container mgr address if a container is assigned, otherwise null.
+ */
+ String getAssignedContainerMgrAddress();
+
+ /**
+ * @return node's id if a container is assigned, otherwise null.
+ */
+ NodeId getNodeId();
+
+ /**
+ * @return node's http address if a container is assigned, otherwise null.
+ */
+ String getNodeHttpAddress();
+
+ /**
+ * @return node's rack name if a container is assigned, otherwise null.
+ */
+ String getNodeRackName();
+
+ /**
+ * @return time at which container is launched. If container is not launched
+ * yet, returns 0.
+ */
+ long getLaunchTime();
+
+ /**
+ * @return attempt's finish time. If attempt is not finished
+ * yet, returns 0.
+ */
+ long getFinishTime();
+
+ /**
+ * @return The attempt's input ready time. If
+ * attempt's input is not ready yet, returns 0.
+ */
+ long getInputReadyTime();
+
+ /**
+ * @return The attempt's output ready time. If attempt's output is not
+ * ready yet, returns 0.
+ */
+ long getOutputReadyTime();
+
+ // TODO TEZDAG - remove all references to ShufflePort
+ /**
+ * @return the port shuffle is on.
+ */
+ public int getShufflePort();
+
+ public Task getTask();
+
+ public boolean getIsRescheduled();
+
+ public Map<String, LocalResource> getLocalResources();
+
+ public Map<String, String> getEnvironment();
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,39 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
+/**
+* TaskAttemptImpl internal state machine states.
+*
+*/
+@Private
+public enum TaskAttemptStateInternal {
+ NEW,
+ START_WAIT,
+ RUNNING,
+ OUTPUT_CONSUMABLE,
+ COMMIT_PENDING,
+ KILL_IN_PROGRESS,
+ FAIL_IN_PROGRESS,
+ KILLED,
+ FAILED,
+ SUCCEEDED
+}
\ No newline at end of file
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskStateInternal.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskStateInternal.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskStateInternal.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskStateInternal.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag;
+
+public enum TaskStateInternal {
+ NEW, SCHEDULED, RUNNING, SUCCEEDED, FAILED, KILL_WAIT, KILLED
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskStateInternal.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,78 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.common.InputSpec;
+import org.apache.tez.common.OutputSpec;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
+import org.apache.tez.engine.records.TezTaskID;
+import org.apache.tez.engine.records.TezVertexID;
+
+
+/**
+ * Main interface to interact with the job. Provides only getters.
+ */
+public interface Vertex extends Comparable<Vertex> {
+
+ TezVertexID getVertexId();
+ int getDistanceFromRoot();
+ String getName();
+ VertexState getState();
+ Configuration getConf();
+
+ /**
+ * Get all the counters of this vertex.
+ * @return aggregate task-counters
+ */
+ TezCounters getAllCounters();
+
+ Map<TezTaskID, Task> getTasks();
+ Task getTask(TezTaskID taskID);
+ List<String> getDiagnostics();
+ int getTotalTasks();
+ int getCompletedTasks();
+ float getProgress();
+
+ TezDependentTaskCompletionEvent[]
+ getTaskAttemptCompletionEvents(int fromEventId, int maxEvents);
+
+ void setInputVertices(Map<Vertex, EdgeProperty> inVertices);
+ void setOutputVertices(Map<Vertex, EdgeProperty> outVertices);
+
+ Map<Vertex, EdgeProperty> getInputVertices();
+ Map<Vertex, EdgeProperty> getOutputVertices();
+
+ List<InputSpec> getInputSpecList();
+ List<OutputSpec> getOutputSpecList();
+
+ int getInputVerticesCount();
+ int getOutputVerticesCount();
+ void scheduleTasks(Collection<TezTaskID> taskIDs);
+ Resource getTaskResource();
+
+ public DAG getDAG();
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,27 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag;
+
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+public interface VertexScheduler {
+ void onVertexStarted();
+ void onVertexCompleted();
+ void onSourceTaskCompleted(TezTaskAttemptID attemptId);
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.app.dag;
+
+public enum VertexState {
+ NEW,
+ INITED,
+ RUNNING,
+ SUCCEEDED,
+ FAILED,
+ KILLED,
+ ERROR,
+ KILL_WAIT,
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEvent.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEvent.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEvent.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,41 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tez.engine.records.TezDAGID;
+
+/**
+ * This class encapsulates job related events.
+ *
+ */
+public class DAGEvent extends AbstractEvent<DAGEventType> {
+
+ private TezDAGID dagId;
+
+ public DAGEvent(TezDAGID dagId, DAGEventType type) {
+ super(type);
+ this.dagId = dagId;
+ }
+
+ public TezDAGID getDAGId() {
+ return dagId;
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEvent.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventCounterUpdate.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventCounterUpdate.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventCounterUpdate.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventCounterUpdate.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.event;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.tez.engine.records.TezDAGID;
+
+public class DAGEventCounterUpdate extends DAGEvent {
+
+ List<CounterIncrementalUpdate> counterUpdates = null;
+
+ public DAGEventCounterUpdate(TezDAGID dagId) {
+ super(dagId, DAGEventType.DAG_COUNTER_UPDATE);
+ counterUpdates = new ArrayList<DAGEventCounterUpdate.CounterIncrementalUpdate>();
+ }
+
+ public void addCounterUpdate(Enum<?> key, long incrValue) {
+ counterUpdates.add(new CounterIncrementalUpdate(key, incrValue));
+ }
+
+ public List<CounterIncrementalUpdate> getCounterUpdates() {
+ return counterUpdates;
+ }
+
+ public static class CounterIncrementalUpdate {
+ Enum<?> key;
+ long incrValue;
+
+ public CounterIncrementalUpdate(Enum<?> key, long incrValue) {
+ this.key = key;
+ this.incrValue = incrValue;
+ }
+
+ public Enum<?> getCounterKey() {
+ return key;
+ }
+
+ public long getIncrementValue() {
+ return incrValue;
+ }
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventCounterUpdate.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventDiagnosticsUpdate.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventDiagnosticsUpdate.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventDiagnosticsUpdate.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventDiagnosticsUpdate.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,35 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.engine.records.TezDAGID;
+
+public class DAGEventDiagnosticsUpdate extends DAGEvent {
+
+ private String diagnosticUpdate;
+
+ public DAGEventDiagnosticsUpdate(TezDAGID dagId, String diagnostic) {
+ super(dagId, DAGEventType.DAG_DIAGNOSTIC_UPDATE);
+ this.diagnosticUpdate = diagnostic;
+ }
+
+ public String getDiagnosticUpdate() {
+ return this.diagnosticUpdate;
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventDiagnosticsUpdate.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,46 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.app.dag.TaskAttempt;
+
+public class DAGEventSchedulerUpdate extends DAGEvent {
+
+ public enum UpdateType {
+ TA_SCHEDULE
+ }
+
+ private final TaskAttempt attempt;
+ private final UpdateType updateType;
+
+ public DAGEventSchedulerUpdate(UpdateType updateType, TaskAttempt attempt) {
+ super(attempt.getID().getTaskID().getVertexID().getDAGId(),
+ DAGEventType.DAG_SCHEDULER_UPDATE);
+ this.attempt = attempt;
+ this.updateType = updateType;
+ }
+
+ public UpdateType getUpdateType() {
+ return updateType;
+ }
+
+ public TaskAttempt getAttempt() {
+ return attempt;
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,53 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+/**
+ * Event types handled by Job.
+ */
+public enum DAGEventType {
+
+ //Producer:Client
+ DAG_KILL,
+
+ //Producer:MRAppMaster
+ DAG_INIT,
+ DAG_START,
+
+ //Producer:Task
+ /*
+ JOB_TASK_COMPLETED,
+ JOB_MAP_TASK_RESCHEDULED,
+ JOB_TASK_ATTEMPT_COMPLETED,
+ */
+
+ //Producer: Vertex
+ DAG_VERTEX_INITED,
+ DAG_VERTEX_STARTED,
+ DAG_VERTEX_COMPLETED,
+ DAG_SCHEDULER_UPDATE,
+
+ //Producer:Job
+ DAG_COMPLETED,
+
+ //Producer:Any component
+ DAG_DIAGNOSTIC_UPDATE,
+ INTERNAL_ERROR,
+ DAG_COUNTER_UPDATE,
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventVertexCompleted.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventVertexCompleted.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventVertexCompleted.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventVertexCompleted.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,42 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.engine.records.TezVertexID;
+
+public class DAGEventVertexCompleted extends DAGEvent {
+
+ private TezVertexID vertexId;
+ private VertexState vertexState;
+
+ public DAGEventVertexCompleted(TezVertexID vertexId, VertexState vertexState) {
+ super(vertexId.getDAGId(), DAGEventType.DAG_VERTEX_COMPLETED);
+ this.vertexId = vertexId;
+ this.vertexState = vertexState;
+ }
+
+ public TezVertexID getVertexId() {
+ return vertexId;
+ }
+
+ public VertexState getVertexState() {
+ return vertexState;
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventVertexCompleted.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGFinishEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGFinishEvent.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGFinishEvent.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGFinishEvent.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,41 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tez.engine.records.TezDAGID;
+
+public class DAGFinishEvent extends AbstractEvent<DAGFinishEvent.Type> {
+
+ public enum Type {
+ STATE_CHANGED
+ }
+
+ private TezDAGID dagId;
+
+ public DAGFinishEvent(TezDAGID dagId) {
+ super(Type.STATE_CHANGED);
+ this.dagId = dagId;
+ }
+
+ public TezDAGID getDagId() {
+ return dagId;
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGFinishEvent.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DiagnosableEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DiagnosableEvent.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DiagnosableEvent.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DiagnosableEvent.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.event;
+
+public interface DiagnosableEvent {
+
+ public String getDiagnosticInfo();
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DiagnosableEvent.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEvent.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEvent.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEvent.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,45 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+/**
+ * This class encapsulates task attempt related events.
+ *
+ */
+public class TaskAttemptEvent extends AbstractEvent<TaskAttemptEventType> {
+
+ private TezTaskAttemptID attemptID;
+
+ /**
+ * Create a new TaskAttemptEvent.
+ * @param id the id of the task attempt
+ * @param type the type of event that happened.
+ */
+ public TaskAttemptEvent(TezTaskAttemptID id, TaskAttemptEventType type) {
+ super(type);
+ this.attemptID = id;
+ }
+
+ public TezTaskAttemptID getTaskAttemptID() {
+ return attemptID;
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEvent.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+public class TaskAttemptEventContainerTerminated extends TaskAttemptEvent
+ implements DiagnosableEvent {
+
+ private final String message;
+
+ public TaskAttemptEventContainerTerminated(TezTaskAttemptID id, String message) {
+ super(id, TaskAttemptEventType.TA_CONTAINER_TERMINATED);
+ this.message = message;
+ }
+
+ @Override
+ public String getDiagnosticInfo() {
+ return message;
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminating.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminating.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminating.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminating.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+public class TaskAttemptEventContainerTerminating extends TaskAttemptEvent
+ implements DiagnosableEvent {
+
+ private final String message;
+
+ public TaskAttemptEventContainerTerminating(TezTaskAttemptID id,
+ String diagMessage) {
+ super(id, TaskAttemptEventType.TA_CONTAINER_TERMINATING);
+ this.message = diagMessage;
+ }
+
+ @Override
+ public String getDiagnosticInfo() {
+ return this.message;
+ }
+
+}
\ No newline at end of file
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminating.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventDiagnosticsUpdate.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventDiagnosticsUpdate.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventDiagnosticsUpdate.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventDiagnosticsUpdate.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,36 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+public class TaskAttemptEventDiagnosticsUpdate extends TaskAttemptEvent {
+
+ private String diagnosticInfo;
+
+ public TaskAttemptEventDiagnosticsUpdate(TezTaskAttemptID attemptID,
+ String diagnosticInfo) {
+ super(attemptID, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE);
+ this.diagnosticInfo = diagnosticInfo;
+ }
+
+ public String getDiagnosticInfo() {
+ return diagnosticInfo;
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventDiagnosticsUpdate.java
------------------------------------------------------------------------------
svn:eol-style = native