You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/04/22 20:29:00 UTC
svn commit: r1470653 [1/2] - in /incubator/tez/branches/TEZ-1:
tez-common/src/main/java/org/apache/tez/common/
tez-dag/src/main/java/org/apache/hadoop/mapred/
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/
tez-dag/src/main/java/org/apache/tez/d...
Author: sseth
Date: Mon Apr 22 18:28:58 2013
New Revision: 1470653
URL: http://svn.apache.org/r1470653
Log:
TEZ-49. Split TezTask into a piece for static data, and one for Runtime access. (sseth)
Added:
incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezTaskContext.java
incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/RunningTaskContext.java
incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
Removed:
incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezTask.java
incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/TezEngineTask.java
Modified:
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java
incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java
incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java
incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/server/ShuffleHandler.java
incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/SortingOutput.java
incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java
incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java
incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java
incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java
incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java
incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/InputFactory.java
incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/OutputFactory.java
incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/ProcessorFactory.java
incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactory.java
incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactoryImpl.java
incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java
incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java
incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/TezTaskReporterImpl.java
incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestTaskModules.java
incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
Added: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezTaskContext.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezTaskContext.java?rev=1470653&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezTaskContext.java (added)
+++ incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezTaskContext.java Mon Apr 22 18:28:58 2013
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.common;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.engine.records.TezDAGID;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+public abstract class TezTaskContext implements Writable {
+
+ // Serialized Fields
+ private TezTaskAttemptID taskAttemptId;
+ private String user;
+ private String jobName;
+ private String vertexName;
+
+ public TezTaskContext() {
+ }
+
+ public TezTaskContext(TezTaskAttemptID taskAttemptID, String user, String jobName,
+ String vertexName) {
+ this.taskAttemptId = taskAttemptID;
+ this.user = user;
+ this.jobName = jobName;
+ this.vertexName = vertexName;
+ }
+
+ public TezTaskAttemptID getTaskAttemptId() {
+ return taskAttemptId;
+ }
+
+
+
+ public TezDAGID getDAGID() {
+ return taskAttemptId.getTaskID().getVertexID().getDAGId();
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public String getJobName() {
+ return jobName;
+ }
+
+ public String getVertexName() {
+ return this.vertexName;
+ }
+
+ public void statusUpdate() throws IOException, InterruptedException {
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ taskAttemptId.write(out);
+ Text.writeString(out, user);
+ Text.writeString(out, jobName);
+ Text.writeString(out, vertexName);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ taskAttemptId = TezTaskAttemptID.read(in);
+ user = Text.readString(in);
+ jobName = Text.readString(in);
+ vertexName = Text.readString(in);
+ }
+
+}
Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java Mon Apr 22 18:28:58 2013
@@ -65,7 +65,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.log4j.LogManager;
import org.apache.tez.common.ContainerTask;
-import org.apache.tez.common.TezEngineTask;
+import org.apache.tez.common.TezEngineTaskContext;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.engine.api.Task;
import org.apache.tez.engine.records.TezTaskAttemptID;
@@ -139,7 +139,7 @@ public class YarnTezDagChild {
if (LOG.isDebugEnabled()) {
LOG.debug("PID, containerId: " + pid + ", " + containerId);
}
- TezEngineTask taskContext = null;
+ TezEngineTaskContext taskContext = null;
ContainerTask containerTask = null;
UserGroupInformation childUGI = null;
TezTaskAttemptID taskAttemptId = null;
@@ -402,7 +402,7 @@ public class YarnTezDagChild {
}
private static Task createAndConfigureTezTask(
- TezEngineTask taskContext,
+ TezEngineTaskContext taskContext,
TezTaskUmbilicalProtocol master,
Credentials credentials, Token<JobTokenIdentifier> jt)
throws IOException, InterruptedException {
Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java Mon Apr 22 18:28:58 2013
@@ -56,8 +56,8 @@ import org.apache.hadoop.yarn.state.Stat
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.hadoop.yarn.util.Records;
-import org.apache.tez.common.TezEngineTask;
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezEngineTaskContext;
+import org.apache.tez.common.TezTaskContext;
import org.apache.tez.common.counters.DAGCounter;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
@@ -300,11 +300,11 @@ public class TaskAttemptImpl implements
return attemptId;
}
- TezTask createRemoteTask() {
+ TezTaskContext createRemoteTask() {
Vertex vertex = getTask().getVertex();
// TODO TEZ-50 user and jobname
- return new TezEngineTask(getID(), "user", "jobname", getTask()
+ return new TezEngineTaskContext(getID(), "user", "jobname", getTask()
.getVertex().getName(), mrxModuleClassName,
vertex.getInputSpecList(), vertex.getOutputSpecList());
}
@@ -866,7 +866,7 @@ public class TaskAttemptImpl implements
// recovery.
// Create the remote task.
- TezTask remoteTaskContext = ta.createRemoteTask();
+ TezTaskContext remoteTaskContext = ta.createRemoteTask();
// Create startTaskRequest
String[] hostArray = new String[0];
@@ -1285,4 +1285,4 @@ public class TaskAttemptImpl implements
public Map<String, String> getEnvironment() {
return this.environment;
}
-}
\ No newline at end of file
+}
Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java Mon Apr 22 18:28:58 2013
@@ -24,7 +24,7 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezTaskContext;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.engine.common.security.JobTokenIdentifier;
import org.apache.tez.engine.records.TezTaskAttemptID;
@@ -37,7 +37,7 @@ public class AMSchedulerEventTALaunchReq
private final TezTaskAttemptID attemptId;
private final Resource capability;
private final Map<String, LocalResource> localResources;
- private final TezTask remoteTaskContext;
+ private final TezTaskContext remoteTaskContext;
private final TaskAttempt taskAttempt;
private final Credentials credentials;
private Token<JobTokenIdentifier> jobToken;
@@ -50,7 +50,7 @@ public class AMSchedulerEventTALaunchReq
public AMSchedulerEventTALaunchRequest(TezTaskAttemptID attemptId,
Resource capability,
Map<String, LocalResource> localResources,
- TezTask remoteTaskContext, TaskAttempt ta,
+ TezTaskContext remoteTaskContext, TaskAttempt ta,
Credentials credentials, Token<JobTokenIdentifier> jobToken,
String[] hosts, String[] racks, Priority priority,
Map<String, String> environment) {
@@ -88,7 +88,7 @@ public class AMSchedulerEventTALaunchReq
return priority;
}
- public TezTask getRemoteTaskContext() {
+ public TezTaskContext getRemoteTaskContext() {
return remoteTaskContext;
}
Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java Mon Apr 22 18:28:58 2013
@@ -18,23 +18,23 @@
package org.apache.tez.dag.app.rm.container;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezTaskContext;
import org.apache.tez.engine.records.TezTaskAttemptID;
public class AMContainerEventAssignTA extends AMContainerEvent {
private final TezTaskAttemptID attemptId;
// TODO Maybe have tht TAL pull the remoteTask from the TaskAttempt itself ?
- private final TezTask remoteTaskContext;
+ private final TezTaskContext remoteTaskContext;
public AMContainerEventAssignTA(ContainerId containerId,
TezTaskAttemptID attemptId, Object remoteTaskContext) {
super(containerId, AMContainerEventType.C_ASSIGN_TA);
this.attemptId = attemptId;
- this.remoteTaskContext = (TezTask)remoteTaskContext;
+ this.remoteTaskContext = (TezTaskContext)remoteTaskContext;
}
- public TezTask getRemoteTaskContext() {
+ public TezTaskContext getRemoteTaskContext() {
return this.remoteTaskContext;
}
Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java Mon Apr 22 18:28:58 2013
@@ -42,7 +42,7 @@ import org.apache.hadoop.yarn.state.Mult
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezTaskContext;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerHeartbeatHandler;
import org.apache.tez.dag.app.TaskAttemptListener;
@@ -75,8 +75,8 @@ public class AMContainerImpl implements
private final List<TezTaskAttemptID> completedAttempts = new LinkedList<TezTaskAttemptID>();
// TODO Maybe this should be pulled from the TaskAttempt.s
- private final Map<TezTaskAttemptID, TezTask> remoteTaskMap =
- new HashMap<TezTaskAttemptID, TezTask>();
+ private final Map<TezTaskAttemptID, TezTaskContext> remoteTaskMap =
+ new HashMap<TezTaskAttemptID, TezTaskContext>();
// TODO ?? Convert to list and hash.
Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java Mon Apr 22 18:28:58 2013
@@ -18,13 +18,13 @@
package org.apache.tez.dag.app.rm.container;
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezTaskContext;
public class AMContainerTask {
private final boolean shouldDie;
- private final TezTask tezTask;
+ private final TezTaskContext tezTask;
- public AMContainerTask(boolean shouldDie, TezTask tezTask) {
+ public AMContainerTask(boolean shouldDie, TezTaskContext tezTask) {
this.shouldDie = shouldDie;
this.tezTask = tezTask;
}
@@ -33,7 +33,7 @@ public class AMContainerTask {
return this.shouldDie;
}
- public TezTask getTask() {
+ public TezTaskContext getTask() {
return this.tezTask;
}
}
Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java Mon Apr 22 18:28:58 2013
@@ -25,18 +25,18 @@ import org.apache.hadoop.io.Writable;
public class ContainerTask implements Writable {
- TezEngineTask tezEngineTask;
+ TezEngineTaskContext tezEngineTask;
boolean shouldDie;
public ContainerTask() {
}
- public ContainerTask(TezTask tezTaskContext, boolean shouldDie) {
- this.tezEngineTask = (TezEngineTask)tezTaskContext;
+ public ContainerTask(TezTaskContext tezTaskContext, boolean shouldDie) {
+ this.tezEngineTask = (TezEngineTaskContext)tezTaskContext;
this.shouldDie = shouldDie;
}
- public TezEngineTask getTezEngineTaskContext() {
+ public TezEngineTaskContext getTezEngineTaskContext() {
return tezEngineTask;
}
@@ -60,7 +60,7 @@ public class ContainerTask implements Wr
shouldDie = in.readBoolean();
boolean taskComing = in.readBoolean();
if (taskComing) {
- tezEngineTask = new TezEngineTask();
+ tezEngineTask = new TezEngineTaskContext();
tezEngineTask.readFields(in);
}
}
Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/RunningTaskContext.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/RunningTaskContext.java?rev=1470653&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/RunningTaskContext.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/RunningTaskContext.java Mon Apr 22 18:28:58 2013
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.common;
+
+import java.io.IOException;
+
+import javax.crypto.SecretKey;
+
+import org.apache.hadoop.util.Progress;
+import org.apache.tez.engine.api.Partitioner;
+import org.apache.tez.engine.api.Processor;
+
+public class RunningTaskContext {
+
+ protected SecretKey jobTokenSecret;
+ protected TezTaskReporter reporter;
+ protected Partitioner partitioner;
+ protected Processor combineProcessor;
+ protected TezTaskStatus status;
+ protected Progress progress = new Progress();
+
+ public Progress getProgress() {
+ return progress;
+ }
+
+ public void setJobTokenSecret(SecretKey jobTokenSecret) {
+ this.jobTokenSecret = jobTokenSecret;
+ }
+
+ public TezTaskStatus getStatus() {
+ return status;
+ }
+
+ public TezTaskReporter getTaskReporter() {
+ return reporter;
+ }
+
+ // TODO Doesn't belong here.
+ public Processor getCombineProcessor() {
+ return combineProcessor;
+ }
+
+ // TODO Doesn't belong here.
+ public Partitioner getPartitioner() {
+ return partitioner;
+ }
+
+ // TODO Doesn't belong here.
+ public SecretKey getJobTokenSecret() {
+ return jobTokenSecret;
+ }
+
+ public void statusUpdate() throws IOException, InterruptedException {
+ }
+}
Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java?rev=1470653&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java Mon Apr 22 18:28:58 2013
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+public class TezEngineTaskContext extends TezTaskContext {
+
+ // These two could be replaced by a TezConfiguration / DagSpec.
+ private List<InputSpec> inputSpecList;
+ private List<OutputSpec> outputSpecList;
+ private String taskModuleClassName;
+
+ public TezEngineTaskContext() {
+ super();
+ }
+
+ public TezEngineTaskContext(TezTaskAttemptID taskAttemptID, String user,
+ String jobName, String vertexName, String moduleClassName,
+ List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList) {
+ super(taskAttemptID, user, jobName, vertexName);
+ this.inputSpecList = inputSpecList;
+ this.outputSpecList = outputSpecList;
+ if (this.inputSpecList == null) {
+ inputSpecList = new ArrayList<InputSpec>(0);
+ }
+ if (this.outputSpecList == null) {
+ outputSpecList = new ArrayList<OutputSpec>(0);
+ }
+ this.inputSpecList = inputSpecList;
+ this.outputSpecList = outputSpecList;
+ this.taskModuleClassName = moduleClassName;
+ }
+
+ public String getTaskModuleClassName() {
+ return taskModuleClassName;
+ }
+
+ public List<InputSpec> getInputSpecList() {
+ return this.inputSpecList;
+ }
+
+ public List<OutputSpec> getOutputSpecList() {
+ return this.outputSpecList;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ Text.writeString(out, taskModuleClassName);
+ out.writeInt(inputSpecList.size());
+ for (InputSpec inputSpec : inputSpecList) {
+ inputSpec.write(out);
+ }
+ out.writeInt(outputSpecList.size());
+ for (OutputSpec outputSpec : outputSpecList) {
+ outputSpec.write(out);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+
+ taskModuleClassName = Text.readString(in);
+ int numInputSpecs = in.readInt();
+ inputSpecList = new ArrayList<InputSpec>(numInputSpecs);
+ for (int i = 0; i < numInputSpecs; i++) {
+ InputSpec inputSpec = new InputSpec();
+ inputSpec.readFields(in);
+ inputSpecList.add(inputSpec);
+ }
+ int numOutputSpecs = in.readInt();
+ outputSpecList = new ArrayList<OutputSpec>(numOutputSpecs);
+ for (int i = 0; i < numOutputSpecs; i++) {
+ OutputSpec outputSpec = new OutputSpec();
+ outputSpec.readFields(in);
+ outputSpecList.add(outputSpec);
+ }
+ }
+}
Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java Mon Apr 22 18:28:58 2013
@@ -29,11 +29,12 @@ import org.apache.hadoop.io.RawComparato
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.RunningTaskContext;
+import org.apache.tez.common.TezEngineTaskContext;
import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezTask;
import org.apache.tez.common.TezTaskReporter;
-import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.engine.common.ConfigUtils;
import org.apache.tez.engine.common.sort.impl.TezMerger;
import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
@@ -43,7 +44,8 @@ import org.apache.tez.engine.common.task
@SuppressWarnings({"rawtypes"})
public class LocalShuffle {
- private final TezTask task;
+ private final TezEngineTaskContext taskContext;
+ private final RunningTaskContext runningTaskContext;
private final Configuration conf;
private final int tasksInDegree;
@@ -58,11 +60,13 @@ public class LocalShuffle {
private final CompressionCodec codec;
private final TezTaskOutput mapOutputFile;
- public LocalShuffle(TezTask task,
+ public LocalShuffle(TezEngineTaskContext taskContext,
+ RunningTaskContext runningTaskContext,
Configuration conf,
TezTaskReporter reporter
) throws IOException {
- this.task = task;
+ this.taskContext = taskContext;
+ this.runningTaskContext = runningTaskContext;
this.conf = conf;
this.keyClass = ConfigUtils.getMapOutputKeyClass(conf);
this.valClass = ConfigUtils.getMapOutputValueClass(conf);
@@ -87,10 +91,7 @@ public class LocalShuffle {
this.codec = null;
}
- this.tasksInDegree =
- conf.getInt(
- TezJobConfig.TEZ_ENGINE_TASK_INDEGREE,
- TezJobConfig.DEFAULT_TEZ_ENGINE_TASK_INDEGREE);
+ this.tasksInDegree = taskContext.getInputSpecList().get(0).getNumInputs();
// Always local
this.mapOutputFile = new TezLocalTaskOutputFiles();
@@ -100,7 +101,7 @@ public class LocalShuffle {
public TezRawKeyValueIterator run() throws IOException {
// Copy is complete, obviously!
- this.task.getProgress().addPhase("copy", 0.33f).complete();
+ this.runningTaskContext.getProgress().addPhase("copy", 0.33f).complete();
// Merge
return TezMerger.merge(conf, rfs,
@@ -109,9 +110,9 @@ public class LocalShuffle {
getMapFiles(),
false,
sortFactor,
- new Path(task.getTaskAttemptId().toString()),
+ new Path(taskContext.getTaskAttemptId().toString()),
comparator,
- task.getTaskReporter(), spilledRecordsCounter, null, null);
+ runningTaskContext.getTaskReporter(), spilledRecordsCounter, null, null);
}
private Path[] getMapFiles()
Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java Mon Apr 22 18:28:58 2013
@@ -27,8 +27,9 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.util.Progress;
+import org.apache.tez.common.RunningTaskContext;
+import org.apache.tez.common.TezEngineTaskContext;
import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezTask;
import org.apache.tez.common.TezTaskReporter;
import org.apache.tez.common.TezTaskStatus;
import org.apache.tez.common.counters.TezCounter;
@@ -46,7 +47,8 @@ public class Shuffle implements Exceptio
private static final int MIN_EVENTS_TO_FETCH = 100;
private static final int MAX_RPC_OUTSTANDING_EVENTS = 3000000;
- private final TezTask task;
+ private final TezEngineTaskContext taskContext;
+ private final RunningTaskContext runningTaskContext;
private final Configuration conf;
private final TezTaskReporter reporter;
private final ShuffleClientMetrics metrics;
@@ -59,28 +61,31 @@ public class Shuffle implements Exceptio
private final Progress mergePhase;
private final int tasksInDegree;
- public Shuffle(TezTask task,
+ public Shuffle(TezEngineTaskContext taskContext,
+ RunningTaskContext runningTaskContext,
Configuration conf,
int tasksInDegree,
TezTaskReporter reporter,
Processor combineProcessor
) throws IOException {
- this.task = task;
+ this.taskContext = taskContext;
+ this.runningTaskContext = runningTaskContext;
this.conf = conf;
this.reporter = reporter;
this.metrics =
new ShuffleClientMetrics(
- task.getTaskAttemptId(), this.conf,
- this.task.getUser(), this.task.getJobName());
+ taskContext.getTaskAttemptId(), this.conf,
+ this.taskContext.getUser(), this.taskContext.getJobName());
this.tasksInDegree = tasksInDegree;
FileSystem localFS = FileSystem.getLocal(this.conf);
LocalDirAllocator localDirAllocator =
new LocalDirAllocator(TezJobConfig.LOCAL_DIR);
- copyPhase = this.task.getProgress().addPhase("copy", 0.33f);
- mergePhase = this.task.getProgress().addPhase("merge", 0.66f);
+ copyPhase = this.runningTaskContext.getProgress().addPhase("copy", 0.33f);
+ mergePhase = this.runningTaskContext.getProgress().addPhase("merge", 0.66f);
+ // TODO TEZ Get rid of Map / Reduce references.
TezCounter shuffledMapsCounter =
reporter.getCounter(TaskCounter.SHUFFLED_MAPS);
TezCounter reduceShuffleBytes =
@@ -95,12 +100,12 @@ public class Shuffle implements Exceptio
reporter.getCounter(TaskCounter.MERGED_MAP_OUTPUTS);
scheduler =
- new ShuffleScheduler(this.conf, tasksInDegree, task.getStatus(),
+ new ShuffleScheduler(this.conf, tasksInDegree, runningTaskContext.getStatus(),
this, copyPhase,
shuffledMapsCounter,
reduceShuffleBytes,
failedShuffleCounter);
- merger = new MergeManager(this.task.getTaskAttemptId(),
+ merger = new MergeManager(this.taskContext.getTaskAttemptId(),
this.conf, localFS,
localDirAllocator, reporter,
combineProcessor,
@@ -120,7 +125,7 @@ public class Shuffle implements Exceptio
// Start the map-completion events fetcher thread
final EventFetcher eventFetcher =
- new EventFetcher(task.getTaskAttemptId(), reporter, scheduler, this,
+ new EventFetcher(taskContext.getTaskAttemptId(), reporter, scheduler, this,
maxEventsToFetch);
eventFetcher.start();
@@ -131,10 +136,10 @@ public class Shuffle implements Exceptio
TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES);
Fetcher[] fetchers = new Fetcher[numFetchers];
for (int i=0; i < numFetchers; ++i) {
- fetchers[i] = new Fetcher(conf, task.getTaskAttemptId(),
+ fetchers[i] = new Fetcher(conf, taskContext.getTaskAttemptId(),
scheduler, merger,
reporter, metrics, this,
- task.getJobTokenSecret());
+ runningTaskContext.getJobTokenSecret());
fetchers[i].start();
}
@@ -163,9 +168,9 @@ public class Shuffle implements Exceptio
scheduler.close();
copyPhase.complete(); // copy is already complete
- task.getStatus().setPhase(TezTaskStatus.Phase.SORT);
+ runningTaskContext.getStatus().setPhase(TezTaskStatus.Phase.SORT);
- task.statusUpdate();
+ runningTaskContext.statusUpdate();
// Finish the on-going merges...
TezRawKeyValueIterator kvIter = null;
Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/server/ShuffleHandler.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/server/ShuffleHandler.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/server/ShuffleHandler.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/server/ShuffleHandler.java Mon Apr 22 18:28:58 2013
@@ -62,8 +62,8 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices;
import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.tez.common.RunningTaskContext;
import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezTask;
import org.apache.tez.engine.common.security.JobTokenIdentifier;
import org.apache.tez.engine.common.security.JobTokenSecretManager;
import org.apache.tez.engine.common.security.SecureShuffleUtils;
@@ -241,7 +241,7 @@ public class ShuffleHandler extends Abst
userRsrc.remove(appId.toString());
}
- public synchronized void init(Configuration conf, TezTask task) {
+ public synchronized void init(Configuration conf, RunningTaskContext task) {
this.init(conf);
tokenSecret = task.getJobTokenSecret();
}
Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/SortingOutput.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/SortingOutput.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/SortingOutput.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/SortingOutput.java Mon Apr 22 18:28:58 2013
@@ -17,7 +17,7 @@
*/
package org.apache.tez.engine.common.sort;
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.RunningTaskContext;
import org.apache.tez.engine.api.Output;
/**
@@ -26,6 +26,7 @@ import org.apache.tez.engine.api.Output;
*/
public interface SortingOutput extends Output {
- public void setTask(TezTask task);
+ // TODO PreCommit rename
+ public void setTask(RunningTaskContext runningTaskContext);
}
Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java Mon Apr 22 18:28:58 2013
@@ -40,10 +40,10 @@ import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.QuickSort;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.Constants;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezTask;
-import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.RunningTaskContext;
+import org.apache.tez.common.TezEngineTaskContext;
import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.engine.api.Master;
import org.apache.tez.engine.api.Partitioner;
import org.apache.tez.engine.api.Processor;
@@ -71,7 +71,8 @@ public abstract class ExternalSorter {
protected Processor combineProcessor;
protected Partitioner partitioner;
- protected TezTask task;
+ protected TezEngineTaskContext task;
+ protected RunningTaskContext runningTaskContext;
protected Configuration job;
protected FileSystem rfs;
protected TezTaskOutput mapOutputFile;
@@ -102,10 +103,11 @@ public abstract class ExternalSorter {
LOG.info("TEZ_ENGINE_TASK_ATTEMPT_ID: " +
job.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID));
- partitions =
- job.getInt(
- TezJobConfig.TEZ_ENGINE_TASK_OUTDEGREE,
- TezJobConfig.DEFAULT_TEZ_ENGINE_TASK_OUTDEGREE);
+ partitions = task.getOutputSpecList().get(0).getNumOutputs();
+// partitions =
+// job.getInt(
+// TezJobConfig.TEZ_ENGINE_TASK_OUTDEGREE,
+// TezJobConfig.DEFAULT_TEZ_ENGINE_TASK_OUTDEGREE);
rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
// sorter
@@ -123,14 +125,14 @@ public abstract class ExternalSorter {
// counters
mapOutputByteCounter =
- task.getTaskReporter().getCounter(TaskCounter.MAP_OUTPUT_BYTES);
+ runningTaskContext.getTaskReporter().getCounter(TaskCounter.MAP_OUTPUT_BYTES);
mapOutputRecordCounter =
- task.getTaskReporter().getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
+ runningTaskContext.getTaskReporter().getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
fileOutputByteCounter =
- task.getTaskReporter().
+ runningTaskContext.getTaskReporter().
getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);
spilledRecordsCounter =
- task.getTaskReporter().getCounter(TaskCounter.SPILLED_RECORDS);
+ runningTaskContext.getTaskReporter().getCounter(TaskCounter.SPILLED_RECORDS);
// compression
if (ConfigUtils.getCompressMapOutput(job)) {
Class<? extends CompressionCodec> codecClass =
@@ -149,7 +151,7 @@ public abstract class ExternalSorter {
// LOG.info("XXX mapOutputFile: " + mapOutputFile.getClass());
// sortPhase
- sortPhase = task.getProgress().addPhase("sort", 0.333f);
+ sortPhase = runningTaskContext.getProgress().addPhase("sort", 0.333f);
}
/**
@@ -163,8 +165,8 @@ public abstract class ExternalSorter {
}
}
- public void setTask(TezTask task) {
- this.task = task;
+ public void setTask(RunningTaskContext task) {
+ this.runningTaskContext = task;
this.combineProcessor = task.getCombineProcessor();
this.partitioner = task.getPartitioner();
}
@@ -182,10 +184,10 @@ public abstract class ExternalSorter {
Writer writer) throws IOException, InterruptedException {
CombineInput combineIn = new CombineInput(kvIter);
- combineIn.initialize(job, task.getTaskReporter());
+ combineIn.initialize(job, runningTaskContext.getTaskReporter());
CombineOutput combineOut = new CombineOutput(writer);
- combineOut.initialize(job, task.getTaskReporter());
+ combineOut.initialize(job, runningTaskContext.getTaskReporter());
combineProcessor.process(combineIn, combineOut);
@@ -216,8 +218,12 @@ public abstract class ExternalSorter {
// LOG.info("XXX sameVolRename src=" + src + ", dst=" + dst);
}
- public ExternalSorter() {
- super();
+// public ExternalSorter() {
+// super();
+// }
+
+ public ExternalSorter(TezEngineTaskContext tezEngineTask) {
+ this.task = tezEngineTask;
}
public InputStream getSortedStream(int partition) {
Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java Mon Apr 22 18:28:58 2013
@@ -43,8 +43,8 @@ import org.apache.hadoop.io.RawComparato
import org.apache.hadoop.util.IndexedSortable;
import org.apache.hadoop.util.IndexedSorter;
import org.apache.hadoop.util.Progress;
+import org.apache.tez.common.TezEngineTaskContext;
import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezTask;
import org.apache.tez.engine.api.Master;
import org.apache.tez.engine.common.ConfigUtils;
import org.apache.tez.engine.common.sort.SortingOutput;
@@ -98,8 +98,9 @@ public class PipelinedSorter extends Ext
@Inject
public PipelinedSorter(
- @Assisted TezTask task
+ @Assisted TezEngineTaskContext task
) throws IOException {
+ super(task);
}
public void initialize(Configuration conf, Master master)
@@ -211,7 +212,7 @@ public class PipelinedSorter extends Ext
*/
synchronized void collect(Object key, Object value, final int partition
) throws IOException {
- task.getTaskReporter().progress();
+ runningTaskContext.getTaskReporter().progress();
if (key.getClass() != keyClass) {
throw new IOException("Type mismatch in key from map: expected "
+ keyClass.getName() + ", received "
@@ -267,7 +268,7 @@ public class PipelinedSorter extends Ext
}
mapOutputRecordCounter.increment(1);
mapOutputByteCounter.increment(valend - keystart);
- task.getTaskReporter().progress();
+ runningTaskContext.getTaskReporter().progress();
}
public void spill() throws IOException {
@@ -392,7 +393,7 @@ public class PipelinedSorter extends Ext
segmentList, mergeFactor,
new Path(mapId.toString()),
(RawComparator)ConfigUtils.getOutputKeyComparator(job),
- task.getTaskReporter(), sortSegments,
+ runningTaskContext.getTaskReporter(), sortSegments,
null, spilledRecordsCounter, sortPhase.phase());
//write merged output to disk
@@ -402,7 +403,7 @@ public class PipelinedSorter extends Ext
spilledRecordsCounter);
writer.setRLE(merger.needsRLE());
if (combineProcessor == null || numSpills < minSpillsForCombine) {
- TezMerger.writeFile(kvIter, writer, task.getTaskReporter(), job);
+ TezMerger.writeFile(kvIter, writer, runningTaskContext.getTaskReporter(), job);
} else {
runCombineProcessor(kvIter, writer);
}
@@ -525,7 +526,7 @@ public class PipelinedSorter extends Ext
kj = new byte[keymax];
LOG.info("begin sorting Span"+index + " ("+length()+")");
if(length() > 1) {
- sorter.sort(this, 0, length(), task.getTaskReporter());
+ sorter.sort(this, 0, length(), runningTaskContext.getTaskReporter());
}
LOG.info("done sorting Span"+index);
return new SpanIterator(this);
@@ -958,4 +959,5 @@ public class PipelinedSorter extends Ext
public OutputContext getOutputContext() {
return null;
}
+
}
Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java Mon Apr 22 18:28:58 2013
@@ -39,8 +39,9 @@ import org.apache.hadoop.io.RawComparato
import org.apache.hadoop.util.IndexedSortable;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.common.TezEngineTaskContext;
import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezTaskContext;
import org.apache.tez.engine.api.Master;
import org.apache.tez.engine.common.ConfigUtils;
import org.apache.tez.engine.common.sort.impl.ExternalSorter;
@@ -116,8 +117,10 @@ public class DefaultSorter extends Exter
@Inject
public DefaultSorter(
- @Assisted TezTask task
+ @Assisted TezTaskContext task
) throws IOException {
+ // Does this assisted inject work ?
+ super((TezEngineTaskContext)task);
}
@Override
@@ -209,7 +212,7 @@ public class DefaultSorter extends Exter
*/
synchronized void collect(Object key, Object value, final int partition
) throws IOException {
- task.getTaskReporter().progress();
+ runningTaskContext.getTaskReporter().progress();
if (key.getClass() != keyClass) {
throw new IOException("Type mismatch in key from map: expected "
+ keyClass.getName() + ", received "
@@ -574,7 +577,7 @@ public class DefaultSorter extends Exter
// wait for spill
try {
while (spillInProgress) {
- task.getTaskReporter().progress();
+ runningTaskContext.getTaskReporter().progress();
spillDone.await();
}
} catch (InterruptedException e) {
@@ -606,7 +609,7 @@ public class DefaultSorter extends Exter
spillLock.lock();
try {
while (spillInProgress) {
- task.getTaskReporter().progress();
+ runningTaskContext.getTaskReporter().progress();
spillDone.await();
}
checkSpillException();
@@ -702,7 +705,7 @@ public class DefaultSorter extends Exter
if (lspillException instanceof Error) {
final String logMsg = "Task " + task.getTaskAttemptId() + " failed : " +
StringUtils.stringifyException(lspillException);
- task.getTaskReporter().reportFatalError(
+ runningTaskContext.getTaskReporter().reportFatalError(
task.getTaskAttemptId(), lspillException, logMsg);
}
throw new IOException("Spill failed", lspillException);
@@ -741,7 +744,7 @@ public class DefaultSorter extends Exter
throws IOException, InterruptedException {
final int mstart = getMetaStart();
final int mend = getMetaEnd();
- sorter.sort(this, mstart, mend, task.getTaskReporter());
+ sorter.sort(this, mstart, mend, runningTaskContext.getTaskReporter());
spill(mstart, mend);
}
@@ -1091,7 +1094,7 @@ public class DefaultSorter extends Exter
segmentList, mergeFactor,
new Path(mapId.toString()),
(RawComparator)ConfigUtils.getOutputKeyComparator(job),
- task.getTaskReporter(), sortSegments,
+ runningTaskContext.getTaskReporter(), sortSegments,
null, spilledRecordsCounter,
sortPhase.phase());
@@ -1101,7 +1104,7 @@ public class DefaultSorter extends Exter
new Writer(job, finalOut, keyClass, valClass, codec,
spilledRecordsCounter);
if (combineProcessor == null || numSpills < minSpillsForCombine) {
- TezMerger.writeFile(kvIter, writer, task.getTaskReporter(), job);
+ TezMerger.writeFile(kvIter, writer, runningTaskContext.getTaskReporter(), job);
writer.close();
} else {
runCombineProcessor(kvIter, writer);
Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java Mon Apr 22 18:28:58 2013
@@ -29,7 +29,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.util.DataChecksum;
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezTaskContext;
import org.apache.tez.engine.api.Master;
import org.apache.tez.engine.common.shuffle.impl.ShuffleHeader;
import org.apache.tez.engine.common.shuffle.server.ShuffleHandler;
@@ -57,7 +57,7 @@ public class InMemoryShuffleSorter exten
@Inject
public InMemoryShuffleSorter(
- @Assisted TezTask task
+ @Assisted TezTaskContext task
) throws IOException {
super(task);
}
@@ -66,7 +66,7 @@ public class InMemoryShuffleSorter exten
public void initialize(Configuration conf, Master master) throws IOException,
InterruptedException {
super.initialize(conf, master);
- shuffleHandler.init(conf, task);
+ shuffleHandler.init(conf, runningTaskContext);
}
@Override
Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java Mon Apr 22 18:28:58 2013
@@ -20,7 +20,7 @@ package org.apache.tez.engine.lib.input;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezEngineTaskContext;
import org.apache.tez.common.TezTaskReporter;
import org.apache.tez.engine.api.Input;
import org.apache.tez.engine.api.Master;
@@ -38,18 +38,15 @@ import com.google.inject.assistedinject.
public class LocalMergedInput extends ShuffledMergedInput {
TezRawKeyValueIterator rIter = null;
-
- private final TezTask task;
private Configuration conf;
private CombineInput raw;
@Inject
public LocalMergedInput(
- @Assisted TezTask task
+ @Assisted TezEngineTaskContext task
) {
super(task);
- this.task = task;
}
public void initialize(Configuration conf, Master master) throws IOException,
@@ -57,7 +54,7 @@ public class LocalMergedInput extends Sh
this.conf = conf;
LocalShuffle shuffle =
- new LocalShuffle(task, this.conf, (TezTaskReporter)master);
+ new LocalShuffle(task, runningTaskContext, this.conf, (TezTaskReporter)master);
rIter = shuffle.run();
raw = new CombineInput(rIter);
}
Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java Mon Apr 22 18:28:58 2013
@@ -22,8 +22,8 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.RunningTaskContext;
+import org.apache.tez.common.TezEngineTaskContext;
import org.apache.tez.common.TezTaskReporter;
import org.apache.tez.engine.api.Input;
import org.apache.tez.engine.api.Master;
@@ -43,19 +43,21 @@ public class ShuffledMergedInput impleme
static final Log LOG = LogFactory.getLog(ShuffledMergedInput.class);
TezRawKeyValueIterator rIter = null;
- private TezTask task;
+ protected TezEngineTaskContext task;
+ protected RunningTaskContext runningTaskContext;
private Configuration conf;
private CombineInput raw;
@Inject
public ShuffledMergedInput(
- @Assisted TezTask task
+ @Assisted TezEngineTaskContext task
) {
+ this.task = task;
}
- public void setTask(TezTask task) {
- this.task = task;
+ public void setTask(RunningTaskContext runningTaskContext) {
+ this.runningTaskContext = runningTaskContext;
}
public void initialize(Configuration conf, Master master) throws IOException,
@@ -64,12 +66,10 @@ public class ShuffledMergedInput impleme
Shuffle shuffle =
new Shuffle(
- task, this.conf,
- this.conf.getInt(
- TezJobConfig.TEZ_ENGINE_TASK_INDEGREE,
- TezJobConfig.DEFAULT_TEZ_ENGINE_TASK_INDEGREE),
+ task, runningTaskContext, this.conf,
+ task.getInputSpecList().get(0).getNumInputs(),
(TezTaskReporter)master,
- task.getCombineProcessor());
+ runningTaskContext.getCombineProcessor());
rIter = shuffle.run();
raw = new CombineInput(rIter);
Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java Mon Apr 22 18:28:58 2013
@@ -20,7 +20,8 @@ package org.apache.tez.engine.lib.output
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.RunningTaskContext;
+import org.apache.tez.common.TezEngineTaskContext;
import org.apache.tez.engine.api.Master;
import org.apache.tez.engine.api.Output;
import org.apache.tez.engine.common.sort.SortingOutput;
@@ -40,7 +41,7 @@ public class InMemorySortedOutput implem
@Inject
public InMemorySortedOutput(
- @Assisted TezTask task
+ @Assisted TezEngineTaskContext task
) throws IOException {
sorter = new InMemoryShuffleSorter(task);
}
@@ -50,7 +51,7 @@ public class InMemorySortedOutput implem
sorter.initialize(conf, master);
}
- public void setTask(TezTask task) {
+ public void setTask(RunningTaskContext task) {
sorter.setTask(task);
}
Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java Mon Apr 22 18:28:58 2013
@@ -20,12 +20,11 @@ package org.apache.tez.engine.lib.output
import java.io.IOException;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezEngineTaskContext;
import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
import com.google.inject.Inject;
@@ -37,7 +36,7 @@ public class LocalOnFileSorterOutput ext
@Inject
public LocalOnFileSorterOutput(
- @Assisted TezTask task
+ @Assisted TezEngineTaskContext task
) throws IOException {
super(task);
}
Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java Mon Apr 22 18:28:58 2013
@@ -20,7 +20,8 @@ package org.apache.tez.engine.lib.output
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.RunningTaskContext;
+import org.apache.tez.common.TezEngineTaskContext;
import org.apache.tez.engine.api.Master;
import org.apache.tez.engine.api.Output;
import org.apache.tez.engine.common.sort.SortingOutput;
@@ -41,7 +42,7 @@ public class OnFileSortedOutput implemen
@Inject
public OnFileSortedOutput(
- @Assisted TezTask task
+ @Assisted TezEngineTaskContext task
) throws IOException {
sorter = new DefaultSorter(task);
}
@@ -51,7 +52,8 @@ public class OnFileSortedOutput implemen
sorter.initialize(conf, master);
}
- public void setTask(TezTask task) {
+ @Override
+ public void setTask(RunningTaskContext task) {
sorter.setTask(task);
}
Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/InputFactory.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/InputFactory.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/InputFactory.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/InputFactory.java Mon Apr 22 18:28:58 2013
@@ -18,11 +18,11 @@
package org.apache.tez.engine.runtime;
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezEngineTaskContext;
import org.apache.tez.engine.api.Input;
public interface InputFactory {
- Input create(TezTask task);
+ Input create(TezEngineTaskContext task);
}
Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/OutputFactory.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/OutputFactory.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/OutputFactory.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/OutputFactory.java Mon Apr 22 18:28:58 2013
@@ -18,11 +18,11 @@
package org.apache.tez.engine.runtime;
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezEngineTaskContext;
import org.apache.tez.engine.api.Output;
public interface OutputFactory {
- Output create(TezTask task);
+ Output create(TezEngineTaskContext task);
}
Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/ProcessorFactory.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/ProcessorFactory.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/ProcessorFactory.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/ProcessorFactory.java Mon Apr 22 18:28:58 2013
@@ -18,11 +18,11 @@
package org.apache.tez.engine.runtime;
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezEngineTaskContext;
import org.apache.tez.engine.api.Processor;
public interface ProcessorFactory {
- Processor create(TezTask task);
+ Processor create(TezEngineTaskContext task);
}
Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactory.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactory.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactory.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactory.java Mon Apr 22 18:28:58 2013
@@ -18,11 +18,11 @@
package org.apache.tez.engine.runtime;
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezEngineTaskContext;
import org.apache.tez.engine.api.Task;
public interface TezEngineFactory {
- public Task createTask(TezTask taskContext);
+ public Task createTask(TezEngineTaskContext taskContext);
}
Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactoryImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactoryImpl.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactoryImpl.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactoryImpl.java Mon Apr 22 18:28:58 2013
@@ -18,7 +18,7 @@
package org.apache.tez.engine.runtime;
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezEngineTaskContext;
import org.apache.tez.engine.api.Input;
import org.apache.tez.engine.api.Output;
import org.apache.tez.engine.api.Processor;
@@ -47,11 +47,10 @@ implements TezEngineFactory {
this.taskFactory = taskFactory;
}
- public Task createTask(TezTask taskContext) {
+ public Task createTask(TezEngineTaskContext taskContext) {
Input in = inputFactory.create(taskContext);
Output out = outputFactory.create(taskContext);
Processor processor = processorFactory.create(taskContext);
return taskFactory.create(in, processor, out);
- }
-
+ }
}
Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java Mon Apr 22 18:28:58 2013
@@ -63,7 +63,7 @@ import org.apache.hadoop.util.Reflection
import org.apache.tez.common.Constants;
import org.apache.tez.common.ContainerTask;
import org.apache.tez.common.InputSpec;
-import org.apache.tez.common.TezEngineTask;
+import org.apache.tez.common.TezEngineTaskContext;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.TezTaskStatus;
import org.apache.tez.common.counters.TezCounters;
@@ -240,8 +240,8 @@ public class LocalJobRunner implements C
TezTaskAttemptID tezMapId =
IDConverter.fromMRTaskAttemptId(mapId);
mapIds.add(mapId);
- TezEngineTask taskContext =
- new TezEngineTask(
+ TezEngineTaskContext taskContext =
+ new TezEngineTaskContext(
tezMapId, user, localConf.getJobName(), "TODO_vertexName",
InitialTask.class.getName(), null, null);
Injector injector = Guice.createInjector(new InitialTask());
@@ -444,7 +444,7 @@ public class LocalJobRunner implements C
- TezEngineTask taskContext = new TezEngineTask(
+ TezEngineTaskContext taskContext = new TezEngineTaskContext(
IDConverter.fromMRTaskAttemptId(reduceId), user,
localConf.getJobName(), "TODO_vertexName", LocalFinalTask.class.getName(),
Collections.singletonList(new InputSpec("TODO_srcVertexName",
Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java (original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java Mon Apr 22 18:28:58 2013
@@ -40,9 +40,9 @@ import org.apache.hadoop.mapreduce.Input
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezTask;
-import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.TezEngineTaskContext;
import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.engine.api.Input;
import org.apache.tez.engine.api.Master;
import org.apache.tez.mapreduce.processor.MRTask;
@@ -87,7 +87,7 @@ public class SimpleInput implements Inpu
@Inject
public SimpleInput(
- @Assisted TezTask task
+ @Assisted TezEngineTaskContext task
)
{}
Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java (original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java Mon Apr 22 18:28:58 2013
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezEngineTaskContext;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.engine.api.Master;
import org.apache.tez.engine.api.Output;
@@ -65,7 +65,7 @@ public class SimpleOutput implements Out
@Inject
public SimpleOutput(
- @Assisted TezTask task
+ @Assisted TezEngineTaskContext task
) {
}
Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java (original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java Mon Apr 22 18:28:58 2013
@@ -41,24 +41,20 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.RawKeyValueIterator;
import org.apache.hadoop.mapred.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
-import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
-import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
-import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
import org.apache.tez.common.Constants;
-import org.apache.tez.common.TezEngineTask;
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.RunningTaskContext;
+import org.apache.tez.common.TezEngineTaskContext;
import org.apache.tez.common.TezTaskStatus;
import org.apache.tez.common.TezTaskStatus.Phase;
import org.apache.tez.common.TezTaskStatus.State;
@@ -74,14 +70,13 @@ import org.apache.tez.mapreduce.hadoop.I
import org.apache.tez.mapreduce.hadoop.MRConfig;
import org.apache.tez.mapreduce.hadoop.MRTaskStatus;
import org.apache.tez.mapreduce.hadoop.TezTaskUmbilicalProtocol;
-import org.apache.tez.mapreduce.hadoop.TezTypeConverters;
import org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl;
import org.apache.tez.mapreduce.hadoop.mapreduce.JobContextImpl;
import org.apache.tez.mapreduce.hadoop.records.ProceedToCompletionResponse;
import org.apache.tez.mapreduce.partition.MRPartitioner;
public abstract class MRTask
-extends org.apache.tez.common.TezTask {
+extends RunningTaskContext {
static final Log LOG = LogFactory.getLog(MRTask.class);
@@ -95,7 +90,8 @@ extends org.apache.tez.common.TezTask {
protected GcTimeUpdater gcUpdater;
private ResourceCalculatorProcessTree pTree;
private long initCpuCumulativeTime = 0;
- protected TezEngineTask tezTaskContext;
+ protected TezEngineTaskContext tezTaskContext;
+ protected TezTaskAttemptID taskAttemptId;
/* flag to track whether task is done */
AtomicBoolean taskDone = new AtomicBoolean(false);
@@ -123,18 +119,16 @@ extends org.apache.tez.common.TezTask {
private Map<String, FileSystemStatisticUpdater> statisticUpdaters =
new HashMap<String, FileSystemStatisticUpdater>();
- public MRTask(TezTask context) {
- super(context.getTaskAttemptId(), context.getUser(), context.getJobName(),
- ((TezEngineTask)context).getTaskModuleClassName());
-
- tezTaskContext = (TezEngineTask) context;
+ public MRTask(TezEngineTaskContext context) {
+ tezTaskContext = context;
+ this.taskAttemptId = context.getTaskAttemptId();
// TODO TEZAM4 Figure out initialization / run sequence of Input, Process,
// Output. Phase is MR specific.
status =
new MRTaskStatus(
- getTaskAttemptId(),
+ taskAttemptId,
counters,
- (getTaskAttemptId().getTaskID().getVertexID().getId() == 0 ?
+ (taskAttemptId.getTaskID().getVertexID().getId() == 0 ?
Phase.MAP : Phase.SHUFFLE)
);
gcUpdater = new GcTimeUpdater(counters);
@@ -153,11 +147,11 @@ extends org.apache.tez.common.TezTask {
((TezTaskReporterImpl)reporter).startCommunicationThread();
jobConf.set(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID,
- getTaskAttemptId().toString());
+ taskAttemptId.toString());
initResourceCalculatorPlugin();
- LOG.info("MRTask.inited: taskAttemptId = " + getTaskAttemptId().toString());
+ LOG.info("MRTask.inited: taskAttemptId = " + taskAttemptId.toString());
}
private void initResourceCalculatorPlugin() {
@@ -184,7 +178,7 @@ extends org.apache.tez.common.TezTask {
this.jobConf = job;
this.jobContext = new JobContextImpl(job, dagId, mrReporter);
this.taskAttemptContext =
- new TaskAttemptContextImpl(job, getTaskAttemptId(), mrReporter);
+ new TaskAttemptContextImpl(job, taskAttemptId, mrReporter);
this.mrReporter = mrReporter;
if (getState() == State.UNASSIGNED) {
@@ -299,7 +293,7 @@ extends org.apache.tez.common.TezTask {
@Private
public synchronized String getOutputName() {
- return "part-" + NUMBER_FORMAT.format(getTaskAttemptId().getTaskID().getId());
+ return "part-" + NUMBER_FORMAT.format(taskAttemptId.getTaskID().getId());
}
public void waitBeforeCompletion(MRTaskReporter reporter) throws IOException,
@@ -310,7 +304,7 @@ extends org.apache.tez.common.TezTask {
while (!readyToProceed) {
try {
ProceedToCompletionResponse response =
- umbilical.proceedToCompletion(getTaskAttemptId());
+ umbilical.proceedToCompletion(taskAttemptId);
if (LOG.isDebugEnabled()) {
LOG.debug("Got readyToProceed: " + response);
}
@@ -336,7 +330,7 @@ extends org.apache.tez.common.TezTask {
public void outputReady(MRTaskReporter reporter, OutputContext outputContext)
throws IOException,
InterruptedException {
- LOG.info("Task: " + getTaskAttemptId() + " reporting outputReady");
+ LOG.info("Task: " + taskAttemptId + " reporting outputReady");
updateCounters();
statusUpdate();
@@ -344,8 +338,8 @@ extends org.apache.tez.common.TezTask {
int retries = MAX_RETRIES;
while (true) {
try {
- umbilical.outputReady(getTaskAttemptId(), outputContext);
- LOG.info("Task '" + getTaskAttemptId() + "' reported outputReady.");
+ umbilical.outputReady(taskAttemptId, outputContext);
+ LOG.info("Task '" + taskAttemptId + "' reported outputReady.");
return;
} catch (IOException ie) {
LOG.warn("Failure signalling outputReady: " +
@@ -364,7 +358,7 @@ extends org.apache.tez.common.TezTask {
updateCounters();
if (outputContext != null) {
LOG.info("Task: "
- + getTaskAttemptId()
+ + taskAttemptId
+ " is done."
+ " And is in the process of sending output-context with shuffle port: "
+ outputContext.getShufflePort());
@@ -372,7 +366,7 @@ extends org.apache.tez.common.TezTask {
waitBeforeCompletion(reporter);
}
- LOG.info("Task:" + getTaskAttemptId() + " is done."
+ LOG.info("Task:" + taskAttemptId + " is done."
+ " And is in the process of committing");
TezTaskUmbilicalProtocol umbilical = getUmbilical();
// TODO TEZ Interaciton between Commit and OutputReady. Merge ?
@@ -383,7 +377,7 @@ extends org.apache.tez.common.TezTask {
// TODO TEZAM2 - Why is the commitRequired check missing ?
while (true) {
try {
- umbilical.commitPending(getTaskAttemptId(), status);
+ umbilical.commitPending(taskAttemptId, status);
break;
} catch (InterruptedException ie) {
// ignore
@@ -423,8 +417,8 @@ extends org.apache.tez.common.TezTask {
int retries = MAX_RETRIES;
while (true) {
try {
- if (!getUmbilical().statusUpdate(getTaskAttemptId(), status)) {
- LOG.warn("Parent died. Exiting " + getTaskAttemptId());
+ if (!getUmbilical().statusUpdate(taskAttemptId, status)) {
+ LOG.warn("Parent died. Exiting " + taskAttemptId);
System.exit(66);
}
status.clearStatus();
@@ -460,7 +454,7 @@ extends org.apache.tez.common.TezTask {
int retries = MAX_RETRIES;
while (true) {
try {
- while (!umbilical.canCommit(getTaskAttemptId())) {
+ while (!umbilical.canCommit(taskAttemptId)) {
// This will loop till the AM asks for the task to be killed. As
// against, the AM sending a signal to the task to kill itself
// gracefully.
@@ -485,7 +479,7 @@ extends org.apache.tez.common.TezTask {
// task can Commit now
try {
- LOG.info("Task " + getTaskAttemptId() + " is allowed to commit now");
+ LOG.info("Task " + taskAttemptId + " is allowed to commit now");
committer.commitTask(taskAttemptContext);
return;
} catch (IOException iee) {
@@ -512,8 +506,8 @@ extends org.apache.tez.common.TezTask {
int retries = MAX_RETRIES;
while (true) {
try {
- umbilical.done(getTaskAttemptId());
- LOG.info("Task '" + getTaskAttemptId() + "' done.");
+ umbilical.done(taskAttemptId);
+ LOG.info("Task '" + taskAttemptId + "' done.");
return;
} catch (IOException ie) {
LOG.warn("Failure signalling completion: " +
@@ -683,11 +677,11 @@ extends org.apache.tez.common.TezTask {
public void localizeConfiguration(JobConf jobConf)
throws IOException, InterruptedException {
- jobConf.set(JobContext.TASK_ID, getTaskAttemptId().getTaskID().toString());
- jobConf.set(JobContext.TASK_ATTEMPT_ID, getTaskAttemptId().toString());
+ jobConf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString());
+ jobConf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString());
jobConf.setInt(JobContext.TASK_PARTITION,
- getTaskAttemptId().getTaskID().getId());
- jobConf.set(JobContext.ID, getTaskAttemptId().getTaskID().getVertexID().getDAGId().toString());
+ taskAttemptId.getTaskID().getId());
+ jobConf.set(JobContext.ID, taskAttemptId.getTaskID().getVertexID().getDAGId().toString());
}
public abstract TezCounter getOutputRecordsCounter();
@@ -713,4 +707,8 @@ extends org.apache.tez.common.TezTask {
public JobContext getJobContext() {
return jobContext;
}
+
+ public TezTaskAttemptID getTaskAttemptId() {
+ return taskAttemptId;
+ }
}
Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/TezTaskReporterImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/TezTaskReporterImpl.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/TezTaskReporterImpl.java (original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/TezTaskReporterImpl.java Mon Apr 22 18:28:58 2013
@@ -29,7 +29,6 @@ import org.apache.hadoop.ipc.ProtocolSig
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.engine.records.TezTaskAttemptID;
import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;
Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java (original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java Mon Apr 22 18:28:58 2013
@@ -36,7 +36,7 @@ import org.apache.hadoop.mapreduce.split
import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezEngineTaskContext;
import org.apache.tez.common.TezTaskStatus;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
@@ -64,7 +64,7 @@ public class MapProcessor extends MRTask
@Inject
public MapProcessor(
- @Assisted TezTask context
+ @Assisted TezEngineTaskContext context
) throws IOException {
super(context);
}
@@ -89,7 +89,8 @@ public class MapProcessor extends MRTask
throws IOException, InterruptedException {
MRTaskReporter reporter = new MRTaskReporter(getTaskReporter());
boolean useNewApi = jobConf.getUseNewMapper();
- initTask(jobConf, getDAGID(), reporter, useNewApi);
+ initTask(jobConf, taskAttemptId.getTaskID().getVertexID().getDAGId(),
+ reporter, useNewApi);
if (in instanceof SimpleInput) {
((SimpleInput)in).setTask(this);
@@ -176,7 +177,7 @@ public class MapProcessor extends MRTask
) throws IOException, InterruptedException {
// make a task context so we can get the classes
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
- new TaskAttemptContextImpl(job, getTaskAttemptId(), reporter);
+ new TaskAttemptContextImpl(job, taskAttemptId, reporter);
// make a mapper
org.apache.hadoop.mapreduce.Mapper mapper;
@@ -202,7 +203,7 @@ public class MapProcessor extends MRTask
org.apache.hadoop.mapreduce.MapContext
mapContext =
new org.apache.tez.mapreduce.hadoop.mapreduce.MapContextImpl(
- job, IDConverter.toMRTaskAttemptId(getTaskAttemptId()),
+ job, IDConverter.toMRTaskAttemptId(taskAttemptId),
input, output,
getCommitter(),
reporter, split);