You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ac...@apache.org on 2013/03/15 22:26:48 UTC
svn commit: r1457129 [14/38] - in /incubator/tez: ./ tez-ampool/
tez-ampool/src/ tez-ampool/src/main/ tez-ampool/src/main/bin/
tez-ampool/src/main/conf/ tez-ampool/src/main/java/
tez-ampool/src/main/java/org/ tez-ampool/src/main/java/org/apache/ tez-am...
Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskStatus.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskStatus.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskStatus.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskStatus.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,299 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.hadoop;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.tez.common.TezTaskStatus;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.records.TezTaskAttemptID;
+
+public class MRTaskStatus implements TezTaskStatus {
+
+ static final Log LOG =
+ LogFactory.getLog(TaskStatus.class.getName());
+ // max task-status string size
+ static final int MAX_STRING_SIZE = 1024;
+
+ private TezTaskAttemptID taskAttemptId;
+ private State state = State.UNASSIGNED;
+ private float progress = 0.0f;
+ private String diagnostics = "";
+ private String userStatusInfo = "";
+ private Phase phase;
+ private TezCounters counters;
+
+ private long localOutputSize;
+ List<TezTaskAttemptID> failedTaskDependencies =
+ new ArrayList<TezTaskAttemptID>();
+
+ private long startTime;
+ private long finishTime;
+ private long sortFinishTime;
+ private long mapFinishTime;
+ private long shuffleFinishTime;
+
+ // For serialization.
+ public MRTaskStatus() {
+ }
+
+ public MRTaskStatus(
+ TezTaskAttemptID taskAttemptId,
+ TezCounters counters, Phase phase) {
+ this.taskAttemptId = taskAttemptId;
+ this.counters = counters;
+ this.phase = phase;
+ }
+
+ @Override
+ public TezTaskAttemptID getTaskAttemptId() {
+ return taskAttemptId;
+ }
+
+ @Override
+ public float getProgress() {
+ return progress;
+ }
+
+ @Override
+ public void setProgress(float progress) {
+ this.progress = progress;
+ }
+
+ @Override
+ public State getRunState() {
+ return state;
+ }
+
+ @Override
+ public void setRunState(State state) {
+ this.state = state;
+ }
+
+ @Override
+ public String getDiagnosticInfo() {
+ return diagnostics;
+ }
+
+ @Override
+ public void setDiagnosticInfo(String info) {
+ this.diagnostics = info;
+ }
+
+ @Override
+ public String getStateString() {
+ return userStatusInfo;
+ }
+
+ @Override
+ public void setStateString(String userStatusInfo) {
+ this.userStatusInfo = userStatusInfo;
+ }
+
+ @Override
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ @Override
+ public long getShuffleFinishTime() {
+ return shuffleFinishTime;
+ }
+
+ @Override
+ public long getMapFinishTime() {
+ return mapFinishTime;
+ }
+
+ @Override
+ public long getSortFinishTime() {
+ return sortFinishTime;
+ }
+
+ @Override
+ public long getStartTime() {
+ return startTime;
+ }
+
+ @Override
+ public Phase getPhase() {
+ return phase;
+ }
+
+ @Override
+ public void setPhase(Phase phase) {
+ Phase oldPhase = getPhase();
+ if (oldPhase != phase) {
+ // sort phase started
+ if (phase == Phase.SORT){
+ if (oldPhase == Phase.MAP) {
+ setMapFinishTime(System.currentTimeMillis());
+ } else {
+ setShuffleFinishTime(System.currentTimeMillis());
+ }
+ } else if (phase == Phase.REDUCE) {
+ setSortFinishTime(System.currentTimeMillis());
+ }
+ this.phase = phase;
+ }
+ }
+
+ @Override
+ public TezCounters getCounters() {
+ return counters;
+ }
+
+ @Override
+ public void setCounters(TezCounters counters) {
+ this.counters = counters;
+ }
+
+ @Override
+ public long getLocalOutputSize() {
+ return localOutputSize;
+ }
+
+ @Override
+ public List<TezTaskAttemptID> getFailedDependencies() {
+ return failedTaskDependencies;
+ }
+
+ @Override
+ public void addFailedDependency(TezTaskAttemptID taskAttemptId) {
+ failedTaskDependencies.add(taskAttemptId);
+ }
+
+ @Override
+ synchronized public void clearStatus() {
+ userStatusInfo = "";
+ failedTaskDependencies.clear();
+ }
+
+ @Override
+ synchronized public void statusUpdate(
+ float progress, String userDiagnosticInfo, TezCounters counters) {
+ setProgress(progress);
+ setDiagnosticInfo(userDiagnosticInfo);
+ setCounters(counters);
+ }
+
+ @Override
+ public void setOutputSize(long localOutputSize) {
+ this.localOutputSize = localOutputSize;
+ }
+
+ @Override
+ public void setFinishTime(long finishTime) {
+ if(this.getStartTime() > 0 && finishTime > 0) {
+ if (getShuffleFinishTime() == 0) {
+ setShuffleFinishTime(finishTime);
+ }
+ if (getSortFinishTime() == 0){
+ setSortFinishTime(finishTime);
+ }
+ if (getMapFinishTime() == 0) {
+ setMapFinishTime(finishTime);
+ }
+ this.finishTime = finishTime;
+ }
+ }
+
+ @Override
+ public void setShuffleFinishTime(long shuffleFinishTime) {
+ this.shuffleFinishTime = shuffleFinishTime;
+ }
+
+ @Override
+ public void setMapFinishTime(long mapFinishTime) {
+ this.mapFinishTime = mapFinishTime;
+ }
+
+ @Override
+ public void setSortFinishTime(long sortFinishTime) {
+ this.sortFinishTime = sortFinishTime;
+ if (getShuffleFinishTime() == this.shuffleFinishTime ){
+ setShuffleFinishTime(sortFinishTime);
+ }
+ }
+
+ @Override
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ taskAttemptId.write(out);
+ WritableUtils.writeEnum(out, state);
+ out.writeFloat(progress);
+ WritableUtils.writeString(out, diagnostics);
+ WritableUtils.writeString(out, userStatusInfo);
+ WritableUtils.writeEnum(out, phase);
+
+ counters.write(out);
+
+ out.writeLong(localOutputSize);
+ out.writeLong(startTime);
+ out.writeLong(finishTime);
+ out.writeLong(sortFinishTime);
+ out.writeLong(mapFinishTime);
+ out.writeLong(shuffleFinishTime);
+
+ out.writeInt(failedTaskDependencies.size());
+ for(TezTaskAttemptID taskAttemptId : failedTaskDependencies) {
+ taskAttemptId.write(out);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ taskAttemptId = TezTaskAttemptID.read(in);
+ state = WritableUtils.readEnum(in, State.class);
+ progress = in.readFloat();
+ diagnostics = WritableUtils.readString(in);
+ userStatusInfo = WritableUtils.readString(in);
+ phase = WritableUtils.readEnum(in, Phase.class);
+ counters = new TezCounters();
+
+ counters.readFields(in);
+
+ localOutputSize = in.readLong();
+ startTime = in.readLong();
+ finishTime = in.readLong();
+ sortFinishTime = in.readLong();
+ mapFinishTime = in.readLong();
+ shuffleFinishTime = in.readLong();
+
+ int numFailedDependencies = in.readInt();
+ for (int i = 0 ; i < numFailedDependencies ; i++) {
+ TezTaskAttemptID taskAttemptId = TezTaskAttemptID.read(in);
+ failedTaskDependencies.add(taskAttemptId);
+ }
+
+ }
+
+}
Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskType.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskType.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskType.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskType.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.hadoop;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Enum for map, reduce, job-setup, job-cleanup, task-cleanup task types.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public enum MRTaskType {
+
+ MAP, REDUCE, JOB_SETUP, JOB_CLEANUP, TASK_CLEANUP;
+
+ public String toString() {
+ switch (this) {
+ case MAP:
+ return "m";
+ case REDUCE:
+ return "r";
+ default:
+ return this.name();
+ }
+ }
+
+ public static MRTaskType fromString(String taskTypeString) {
+ if (taskTypeString.equals("m") || taskTypeString.equals(MRTaskType.MAP.toString())) {
+ return MRTaskType.MAP;
+ } else if (taskTypeString.equals("r") || taskTypeString.equals(MRTaskType.REDUCE.toString())) {
+ return MRTaskType.REDUCE;
+ } else {
+ return MRTaskType.valueOf(taskTypeString);
+ }
+ }
+
+ public String toSerializedString() {
+ return this.name();
+ }
+}
Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTaskUmbilicalProtocol.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTaskUmbilicalProtocol.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTaskUmbilicalProtocol.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.tez.api.Master;
+import org.apache.tez.common.TezTaskStatus;
+import org.apache.tez.mapreduce.hadoop.records.ProceedToCompletionResponse;
+import org.apache.tez.records.TezTaskAttemptID;
+import org.apache.tez.records.OutputContext;
+
+/** Protocol that task child process uses to contact its parent process. The
+ * parent is a daemon which which polls the central master for a new map or
+ * reduce task and runs it as a child process. All communication between child
+ * and parent is via this protocol. */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public interface TezTaskUmbilicalProtocol extends Master {
+
+ public static final long versionID = 19L;
+
+ ContainerTask getTask(ContainerContext containerContext) throws IOException;
+
+ boolean statusUpdate(TezTaskAttemptID taskId, TezTaskStatus taskStatus)
+ throws IOException, InterruptedException;
+
+ void reportDiagnosticInfo(TezTaskAttemptID taskid, String trace) throws IOException;
+
+ boolean ping(TezTaskAttemptID taskid) throws IOException;
+
+ void done(TezTaskAttemptID taskid) throws IOException;
+
+ void commitPending(TezTaskAttemptID taskId, TezTaskStatus taskStatus)
+ throws IOException, InterruptedException;
+
+ boolean canCommit(TezTaskAttemptID taskid) throws IOException;
+
+ void shuffleError(TezTaskAttemptID taskId, String message) throws IOException;
+
+ void fsError(TezTaskAttemptID taskId, String message) throws IOException;
+
+ void fatalError(TezTaskAttemptID taskId, String message) throws IOException;
+
+ // TODO TEZAM5 Can commitPending and outputReady be collapsed into a single
+ // call.
+ // IAC outputReady followed by commit is a little confusing - since the output
+ // isn't really in place till a commit is called. Maybe rename to
+ // processingComplete or some such.
+
+ // TODO EVENTUALLY This is not the most useful API. Once there's some kind of
+ // support for the Task handing output over to the Container, this won't rally
+ // be required. i.e. InMemShuffle running as a service in the Container, or
+ // the second task in getTask(). ContainerUmbilical would include getTask and
+ // getServices...
+
+ void outputReady(TezTaskAttemptID taskAttemptId, OutputContext outputContext)
+ throws IOException;
+
+ ProceedToCompletionResponse
+ proceedToCompletion(TezTaskAttemptID taskAttemptId) throws IOException;
+}
Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop;
+
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.tez.common.TezTaskStatus.Phase;
+import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.records.TezDependentTaskCompletionEvent;
+import org.apache.tez.records.TezTaskAttemptID;
+
+public class TezTypeConverters {
+
+ // Tez objects will be imported. Others will use the fully qualified name when
+ // required.
+ // All public methods named toYarn / toTez / toMapReduce
+
+ public static org.apache.hadoop.mapreduce.v2.api.records.Phase toYarn(
+ Phase phase) {
+ return org.apache.hadoop.mapreduce.v2.api.records.Phase.valueOf(phase
+ .name());
+ }
+
+ public static TaskAttemptId toYarn(TezTaskAttemptID taskAttemptId) {
+ TaskAttemptID mrTaskAttemptId = IDConverter
+ .toMRTaskAttemptId(taskAttemptId);
+ TaskAttemptId mrv2TaskAttemptId = TypeConverter.toYarn(mrTaskAttemptId);
+ return mrv2TaskAttemptId;
+ }
+
+ public static TezTaskAttemptID toTez(TaskAttemptId taskAttemptId) {
+ TaskAttemptID mrTaskAttemptId = TypeConverter.fromYarn(taskAttemptId);
+ TezTaskAttemptID tezTaskAttemptId = IDConverter
+ .fromMRTaskAttemptId(mrTaskAttemptId);
+ return tezTaskAttemptId;
+ }
+
+ public static TezDependentTaskCompletionEvent.Status toTez(
+ TaskAttemptCompletionEventStatus status) {
+ return TezDependentTaskCompletionEvent.Status.valueOf(status.toString());
+ }
+
+ public static TezDependentTaskCompletionEvent toTez(
+ TaskAttemptCompletionEvent event) {
+ return new TezDependentTaskCompletionEvent(event.getEventId(),
+ toTez(event.getAttemptId()), event.getAttemptId().getTaskId()
+ .getTaskType() == TaskType.MAP, toTez(event.getStatus()),
+ event.getMapOutputServerAddress());
+ }
+
+ public static Counters fromTez(TezCounters tezCounters) {
+ if (tezCounters == null) {
+ return null;
+ }
+ Counters counters = new Counters();
+ for (CounterGroup xGrp : tezCounters) {
+ counters.addGroup(xGrp.getName(), xGrp.getDisplayName());
+ for (TezCounter xCounter : xGrp) {
+ Counter counter =
+ counters.findCounter(xGrp.getName(), xCounter.getName());
+ counter.setValue(xCounter.getValue());
+
+ }
+ }
+ return counters;
+ }
+}
Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/JobContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/JobContextImpl.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/JobContextImpl.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/JobContextImpl.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,65 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop.mapred;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.tez.mapreduce.hadoop.IDConverter;
+import org.apache.tez.records.TezJobID;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class JobContextImpl
+ extends org.apache.hadoop.mapreduce.task.JobContextImpl
+ implements JobContext {
+ private JobConf job;
+ private Progressable progress;
+
+ public JobContextImpl(JobConf conf, TezJobID jobId,
+ Progressable progress) {
+ super(conf, IDConverter.toMRJobId(jobId));
+ this.job = conf;
+ this.progress = progress;
+ }
+
+ public JobContextImpl(JobConf conf, TezJobID jobId) {
+ this(conf, jobId, Reporter.NULL);
+ }
+
+ /**
+ * Get the job Configuration
+ *
+ * @return JobConf
+ */
+ public JobConf getJobConf() {
+ return job;
+ }
+
+ /**
+ * Get the progress mechanism for reporting progress.
+ *
+ * @return progress mechanism
+ */
+ public Progressable getProgressible() {
+ return progress;
+ }
+}
Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRCounters.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRCounters.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRCounters.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRCounters.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,312 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.hadoop.mapred;
+
+import static org.apache.hadoop.mapreduce.util.CountersStrings.toEscapedCompactString;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.logging.Log;
+
+public class MRCounters extends org.apache.hadoop.mapred.Counters {
+ private final org.apache.tez.common.counters.TezCounters raw;
+
+ public MRCounters(org.apache.tez.common.counters.TezCounters raw) {
+ this.raw = raw;
+ }
+
+ @Override
+ public synchronized org.apache.hadoop.mapred.Counters.Group getGroup(String groupName) {
+ return new MRCounterGroup(raw.getGroup(groupName));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public synchronized Collection<String> getGroupNames() {
+ return IteratorUtils.toList(raw.getGroupNames().iterator()); }
+
+ @Override
+ public synchronized String makeCompactString() {
+ StringBuilder builder = new StringBuilder();
+ boolean first = true;
+ for(Group group: this){
+ for(Counter counter: group) {
+ if (first) {
+ first = false;
+ } else {
+ builder.append(',');
+ }
+ builder.append(group.getDisplayName());
+ builder.append('.');
+ builder.append(counter.getDisplayName());
+ builder.append(':');
+ builder.append(counter.getCounter());
+ }
+ }
+ return builder.toString();
+ }
+
+ @Override
+ public synchronized Counter findCounter(String group, String name) {
+ return new MRCounter(raw.findCounter(group, name));
+ }
+
+ @Override
+ public Counter findCounter(String group, int id, String name) {
+ return new MRCounter(raw.findCounter(group, name));
+ }
+
+ @Override
+ public void incrCounter(Enum<?> key, long amount) {
+ raw.findCounter(key).increment(amount);
+ }
+
+ @Override
+ public void incrCounter(String group, String counter, long amount) {
+ raw.findCounter(group, counter).increment(amount);
+ }
+
+ @Override
+ public synchronized long getCounter(Enum<?> key) {
+ return raw.findCounter(key).getValue();
+ }
+
+ @Override
+ public synchronized void incrAllCounters(
+ org.apache.hadoop.mapred.Counters other) {
+ for (Group otherGroup: other) {
+ Group group = getGroup(otherGroup.getName());
+ group.setDisplayName(otherGroup.getDisplayName());
+ for (Counter otherCounter : otherGroup) {
+ Counter counter = group.getCounterForName(otherCounter.getName());
+ counter.setDisplayName(otherCounter.getDisplayName());
+ counter.increment(otherCounter.getValue());
+ }
+ }
+ }
+
+ @Override
+ public int size() {
+ return countCounters();
+ }
+
+ @Override
+ public void log(Log log) {
+ log.info("Counters: " + size());
+ for(Group group: this) {
+ log.info(" " + group.getDisplayName());
+ for (Counter counter: group) {
+ log.info(" " + counter.getDisplayName() + "=" +
+ counter.getCounter());
+ }
+ }
+ }
+
+ @Override
+ public String makeEscapedCompactString() {
+ return toEscapedCompactString(this);
+ }
+
+ public static class MRCounterGroup extends org.apache.hadoop.mapred.Counters.Group {
+ private final org.apache.tez.common.counters.CounterGroup group;
+ public MRCounterGroup(org.apache.tez.common.counters.CounterGroup group) {
+ this.group = group;
+ }
+ @Override
+ public String getName() {
+ return group.getName();
+ }
+ @Override
+ public String getDisplayName() {
+ return group.getDisplayName();
+ }
+ @Override
+ public void setDisplayName(String displayName) {
+ group.setDisplayName(displayName);
+ }
+ @Override
+ public void addCounter(org.apache.hadoop.mapred.Counters.Counter counter) {
+ group.addCounter(convert(counter));
+ }
+ @Override
+ public org.apache.hadoop.mapred.Counters.Counter addCounter(String name,
+ String displayName, long value) {
+ return new MRCounter(group.addCounter(name, displayName, value));
+ }
+ @Override
+ public org.apache.hadoop.mapred.Counters.Counter findCounter(
+ String counterName, String displayName) {
+ return new MRCounter(group.findCounter(counterName, displayName));
+ }
+ @Override
+ public int size() {
+ return group.size();
+ }
+ @Override
+ public void incrAllCounters(
+ org.apache.hadoop.mapreduce.counters.CounterGroupBase rightGroup) {
+ new MRCounterGroup(group).incrAllCounters(rightGroup);
+ }
+ @Override
+ public org.apache.hadoop.mapreduce.counters.CounterGroupBase
+ getUnderlyingGroup() {
+ return new MRCounterGroup(group).getUnderlyingGroup();
+ }
+ @Override
+ public void readFields(DataInput arg0) throws IOException {
+ }
+ @Override
+ public void write(DataOutput arg0) throws IOException {
+ }
+ @Override
+ public Iterator iterator() {
+ // FIXME?
+ return group.iterator();
+ }
+ }
+
+ public static class MRCounter extends Counter {
+ private final org.apache.tez.common.counters.TezCounter raw;
+
+ public MRCounter(org.apache.tez.common.counters.TezCounter raw) {
+ this.raw = raw;
+ }
+
+ @Override
+ public void setDisplayName(String displayName) {
+ // TODO Auto-generated method stub
+ raw.setDisplayName(displayName);
+ }
+
+ @Override
+ public String getName() {
+ return raw.getName();
+ }
+
+ @Override
+ public String getDisplayName() {
+ return raw.getDisplayName();
+ }
+
+ @Override
+ public long getValue() {
+ return raw.getValue();
+ }
+
+ @Override
+ public void setValue(long value) {
+ raw.setValue(value);
+ }
+
+ @Override
+ public void increment(long incr) {
+ raw.increment(incr);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ raw.write(out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ raw.readFields(in);
+ }
+
+ @Override
+ public String makeEscapedCompactString() {
+ return toEscapedCompactString(new MRCounter(raw));
+ }
+
+ @Deprecated
+ public boolean contentEquals(Counter counter) {
+ MRCounter c = new MRCounter(raw);
+ return c.equals(counter.getUnderlyingCounter());
+ }
+
+
+ @Override
+ public long getCounter() {
+ return raw.getValue();
+ }
+
+ @Override
+ public org.apache.hadoop.mapreduce.Counter getUnderlyingCounter() {
+ return new MRCounter(raw).getUnderlyingCounter();
+ }
+
+ @Override
+ public synchronized boolean equals(Object genericRight) {
+ return raw.equals(genericRight);
+ }
+
+ @Override
+ public int hashCode() {
+ // TODO Auto-generated method stub
+ return raw.hashCode();
+ }
+ }
+
+ static org.apache.tez.common.counters.TezCounter convert(
+ org.apache.hadoop.mapred.Counters.Counter counter) {
+ org.apache.hadoop.mapreduce.Counter underlyingCounter =
+ counter.getUnderlyingCounter();
+ if (underlyingCounter instanceof org.apache.hadoop.mapreduce.counters.FrameworkCounterGroup.FrameworkCounter) {
+ org.apache.hadoop.mapreduce.counters.FrameworkCounterGroup.FrameworkCounter
+ real =
+ (org.apache.hadoop.mapreduce.counters.FrameworkCounterGroup.FrameworkCounter)underlyingCounter;
+ return new org.apache.tez.common.counters.FrameworkCounterGroup.FrameworkCounter(
+ real.getKey(), real.getGroupName());
+ } else if (underlyingCounter instanceof org.apache.hadoop.mapreduce.counters.FileSystemCounterGroup.FSCounter) {
+ org.apache.hadoop.mapreduce.counters.FileSystemCounterGroup.FSCounter real =
+ (org.apache.hadoop.mapreduce.counters.FileSystemCounterGroup.FSCounter)underlyingCounter;
+ return new org.apache.tez.common.counters.FileSystemCounterGroup.FSCounter(
+ real.getScheme(), convert(real.getFileSystemCounter()));
+ } else {
+ return new org.apache.tez.common.counters.GenericCounter(
+ underlyingCounter.getName(),
+ underlyingCounter.getDisplayName(),
+ underlyingCounter.getValue());
+ }
+ }
+
+ static org.apache.tez.common.counters.FileSystemCounter convert(
+ org.apache.hadoop.mapreduce.FileSystemCounter c) {
+ switch (c) {
+ case BYTES_READ:
+ return org.apache.tez.common.counters.FileSystemCounter.BYTES_READ;
+ case BYTES_WRITTEN:
+ return org.apache.tez.common.counters.FileSystemCounter.BYTES_WRITTEN;
+ case READ_OPS:
+ return org.apache.tez.common.counters.FileSystemCounter.READ_OPS;
+ case LARGE_READ_OPS:
+ return org.apache.tez.common.counters.FileSystemCounter.LARGE_READ_OPS;
+ case WRITE_OPS:
+ return org.apache.tez.common.counters.FileSystemCounter.WRITE_OPS;
+ default:
+ throw new IllegalArgumentException("Unknow FileSystemCounter: " + c);
+ }
+
+ }
+}
Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/TaskAttemptContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/TaskAttemptContextImpl.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/TaskAttemptContextImpl.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/TaskAttemptContextImpl.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,107 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop.mapred;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskID;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.tez.mapreduce.hadoop.MRTaskType;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.records.TezTaskAttemptID;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TaskAttemptContextImpl
+ extends org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+ implements TaskAttemptContext {
+ private MRTaskReporter reporter;
+
+ public TaskAttemptContextImpl(JobConf conf, TezTaskAttemptID taskid) {
+ this(conf, taskid, null);
+ }
+
+ @SuppressWarnings("deprecation")
+ public TaskAttemptContextImpl(JobConf conf, TezTaskAttemptID taskAttemptId,
+ MRTaskReporter reporter) {
+ super(conf,
+ new TaskAttemptID(
+ new TaskID(
+ taskAttemptId.getJobID().getJtIdentifier(),
+ taskAttemptId.getJobID().getId(),
+ taskAttemptId.getTaskType().equals(MRTaskType.MAP.toString()),
+ taskAttemptId.getTaskID().getId()),
+ taskAttemptId.getId()));
+ this.reporter = reporter;
+ }
+
+ /**
+ * Get the taskAttemptID.
+ *
+ * @return TaskAttemptID
+ */
+ public TaskAttemptID getTaskAttemptID() {
+ return (TaskAttemptID) super.getTaskAttemptID();
+ }
+
+ public Progressable getProgressible() {
+ return reporter;
+ }
+
+ public JobConf getJobConf() {
+ return (JobConf) getConfiguration();
+ }
+
+ @Override
+ public float getProgress() {
+ return reporter.getProgress();
+ }
+
+ @Override
+ public Counter getCounter(Enum<?> counterName) {
+ return (Counter) reporter.getCounter(counterName);
+ }
+
+ @Override
+ public Counter getCounter(String groupName, String counterName) {
+ return (Counter) reporter.getCounter(groupName, counterName);
+ }
+
+ /**
+ * Report progress.
+ */
+ @Override
+ public void progress() {
+ reporter.progress();
+ }
+
+ /**
+ * Set the current status of the task to the given string.
+ */
+ @Override
+ public void setStatus(String status) {
+ setStatusString(status);
+ reporter.setStatus(status);
+ }
+
+
+}
Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/JobContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/JobContextImpl.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/JobContextImpl.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/JobContextImpl.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,450 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configuration.IntegerRanges;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Progressable;
+import org.apache.tez.mapreduce.hadoop.IDConverter;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.records.TezJobID;
+
+/**
+ * A read-only view of the job that is provided to the tasks while they
+ * are running.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class JobContextImpl implements JobContext {
+
+ protected final org.apache.hadoop.mapred.JobConf conf;
+ private TezJobID jobId;
+ /**
+ * The UserGroupInformation object that has a reference to the current user
+ */
+ protected UserGroupInformation ugi;
+ protected final Credentials credentials;
+ private Progressable progress;
+
+ public JobContextImpl(Configuration conf, TezJobID jobId) {
+ this(conf, jobId, MRTaskReporter.NULL);
+ }
+
+ public JobContextImpl(Configuration conf, TezJobID jobId, Progressable progress) {
+ if (conf instanceof JobConf) {
+ this.conf = (JobConf)conf;
+ } else {
+ this.conf = new JobConf(conf);
+ }
+ this.jobId = jobId;
+ this.credentials = this.conf.getCredentials();
+ try {
+ this.ugi = UserGroupInformation.getCurrentUser();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ this.progress = progress;
+ }
+
+ /**
+ * Return the configuration for the job.
+ * @return the shared configuration object
+ */
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ /**
+ * Get the unique ID for the job.
+ * @return the object with the job id
+ */
+ public JobID getJobID() {
+ return IDConverter.toMRJobId(jobId);
+ }
+
+ /**
+ * Set the JobID.
+ */
+ public void setJobID(JobID jobId) {
+ this.jobId = IDConverter.fromMRJobId(jobId);
+ }
+
+ /**
+ * Get configured the number of reduce tasks for this job. Defaults to
+ * <code>1</code>.
+ * @return the number of reduce tasks for this job.
+ */
+ public int getNumReduceTasks() {
+ return conf.getNumReduceTasks();
+ }
+
+ /**
+ * Get the current working directory for the default file system.
+ *
+ * @return the directory name.
+ */
+ public Path getWorkingDirectory() throws IOException {
+ return conf.getWorkingDirectory();
+ }
+
+ /**
+ * Get the key class for the job output data.
+ * @return the key class for the job output data.
+ */
+ public Class<?> getOutputKeyClass() {
+ return conf.getOutputKeyClass();
+ }
+
+ /**
+ * Get the value class for job outputs.
+ * @return the value class for job outputs.
+ */
+ public Class<?> getOutputValueClass() {
+ return conf.getOutputValueClass();
+ }
+
+ /**
+ * Get the key class for the map output data. If it is not set, use the
+ * (final) output key class. This allows the map output key class to be
+ * different than the final output key class.
+ * @return the map output key class.
+ */
+ public Class<?> getMapOutputKeyClass() {
+ return conf.getMapOutputKeyClass();
+ }
+
+ /**
+ * Get the value class for the map output data. If it is not set, use the
+ * (final) output value class This allows the map output value class to be
+ * different than the final output value class.
+ *
+ * @return the map output value class.
+ */
+ public Class<?> getMapOutputValueClass() {
+ return conf.getMapOutputValueClass();
+ }
+
+ /**
+ * Get the user-specified job name. This is only used to identify the
+ * job to the user.
+ *
+ * @return the job's name, defaulting to "".
+ */
+ public String getJobName() {
+ return conf.getJobName();
+ }
+
+ /**
+ * Get the {@link InputFormat} class for the job.
+ *
+ * @return the {@link InputFormat} class for the job.
+ */
+ @SuppressWarnings("unchecked")
+ public Class<? extends InputFormat<?,?>> getInputFormatClass()
+ throws ClassNotFoundException {
+ return (Class<? extends InputFormat<?,?>>)
+ conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
+ }
+
+ /**
+ * Get the {@link Mapper} class for the job.
+ *
+ * @return the {@link Mapper} class for the job.
+ */
+ @SuppressWarnings("unchecked")
+ public Class<? extends Mapper<?,?,?,?>> getMapperClass()
+ throws ClassNotFoundException {
+ return (Class<? extends Mapper<?,?,?,?>>)
+ conf.getClass(MAP_CLASS_ATTR, Mapper.class);
+ }
+
+ /**
+ * Get the combiner class for the job.
+ *
+ * @return the combiner class for the job.
+ */
+ @SuppressWarnings("unchecked")
+ public Class<? extends Reducer<?,?,?,?>> getCombinerClass()
+ throws ClassNotFoundException {
+ return (Class<? extends Reducer<?,?,?,?>>)
+ conf.getClass(COMBINE_CLASS_ATTR, null);
+ }
+
+ /**
+ * Get the {@link Reducer} class for the job.
+ *
+ * @return the {@link Reducer} class for the job.
+ */
+ @SuppressWarnings("unchecked")
+ public Class<? extends Reducer<?,?,?,?>> getReducerClass()
+ throws ClassNotFoundException {
+ return (Class<? extends Reducer<?,?,?,?>>)
+ conf.getClass(REDUCE_CLASS_ATTR, Reducer.class);
+ }
+
+ /**
+ * Get the {@link OutputFormat} class for the job.
+ *
+ * @return the {@link OutputFormat} class for the job.
+ */
+ @SuppressWarnings("unchecked")
+ public Class<? extends OutputFormat<?,?>> getOutputFormatClass()
+ throws ClassNotFoundException {
+ return (Class<? extends OutputFormat<?,?>>)
+ conf.getClass(OUTPUT_FORMAT_CLASS_ATTR, TextOutputFormat.class);
+ }
+
+ /**
+ * Get the {@link Partitioner} class for the job.
+ *
+ * @return the {@link Partitioner} class for the job.
+ */
+ @SuppressWarnings("unchecked")
+ public Class<? extends Partitioner<?,?>> getPartitionerClass()
+ throws ClassNotFoundException {
+ return (Class<? extends Partitioner<?,?>>)
+ conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
+ }
+
+ /**
+ * Get the {@link RawComparator} comparator used to compare keys.
+ *
+ * @return the {@link RawComparator} comparator used to compare keys.
+ */
+ public RawComparator<?> getSortComparator() {
+ return conf.getOutputKeyComparator();
+ }
+
+ /**
+ * Get the pathname of the job's jar.
+ * @return the pathname
+ */
+ public String getJar() {
+ return conf.getJar();
+ }
+
+ /**
+ * Get the user defined {@link RawComparator} comparator for
+ * grouping keys of inputs to the reduce.
+ *
+ * @return comparator set by the user for grouping values.
+ * @see Job#setGroupingComparatorClass(Class) for details.
+ */
+ public RawComparator<?> getGroupingComparator() {
+ return conf.getOutputValueGroupingComparator();
+ }
+
+ /**
+ * Get whether job-setup and job-cleanup is needed for the job
+ *
+ * @return boolean
+ */
+ public boolean getJobSetupCleanupNeeded() {
+ return conf.getBoolean(MRJobConfig.SETUP_CLEANUP_NEEDED, true);
+ }
+
+ /**
+ * Get whether task-cleanup is needed for the job
+ *
+ * @return boolean
+ */
+ public boolean getTaskCleanupNeeded() {
+ return conf.getBoolean(MRJobConfig.TASK_CLEANUP_NEEDED, true);
+ }
+
+ /**
+ * This method checks to see if symlinks are to be create for the
+ * localized cache files in the current working directory
+ * @return true if symlinks are to be created- else return false
+ */
+ public boolean getSymlink() {
+ return DistributedCache.getSymlink(conf);
+ }
+
+ /**
+ * Get the archive entries in classpath as an array of Path
+ */
+ public Path[] getArchiveClassPaths() {
+ return DistributedCache.getArchiveClassPaths(conf);
+ }
+
+ /**
+ * Get cache archives set in the Configuration
+ * @return A URI array of the caches set in the Configuration
+ * @throws IOException
+ */
+ public URI[] getCacheArchives() throws IOException {
+ return DistributedCache.getCacheArchives(conf);
+ }
+
+ /**
+ * Get cache files set in the Configuration
+ * @return A URI array of the files set in the Configuration
+ * @throws IOException
+ */
+
+ public URI[] getCacheFiles() throws IOException {
+ return DistributedCache.getCacheFiles(conf);
+ }
+
+ /**
+ * Return the path array of the localized caches
+ * @return A path array of localized caches
+ * @throws IOException
+ */
+ public Path[] getLocalCacheArchives()
+ throws IOException {
+ return DistributedCache.getLocalCacheArchives(conf);
+ }
+
+ /**
+ * Return the path array of the localized files
+ * @return A path array of localized files
+ * @throws IOException
+ */
+ public Path[] getLocalCacheFiles()
+ throws IOException {
+ return DistributedCache.getLocalCacheFiles(conf);
+ }
+
+ /**
+ * Get the file entries in classpath as an array of Path
+ */
+ public Path[] getFileClassPaths() {
+ return DistributedCache.getFileClassPaths(conf);
+ }
+
+ /**
+ * Get the timestamps of the archives. Used by internal
+ * DistributedCache and MapReduce code.
+ * @return a string array of timestamps
+ * @throws IOException
+ */
+ public String[] getArchiveTimestamps() {
+ return DistributedCache.getArchiveTimestamps(conf);
+ }
+
+ /**
+ * Get the timestamps of the files. Used by internal
+ * DistributedCache and MapReduce code.
+ * @return a string array of timestamps
+ * @throws IOException
+ */
+ public String[] getFileTimestamps() {
+ return DistributedCache.getFileTimestamps(conf);
+ }
+
+ /**
+ * Get the configured number of maximum attempts that will be made to run a
+ * map task, as specified by the <code>mapred.map.max.attempts</code>
+ * property. If this property is not already set, the default is 4 attempts.
+ *
+ * @return the max number of attempts per map task.
+ */
+ public int getMaxMapAttempts() {
+ return conf.getMaxMapAttempts();
+ }
+
+ /**
+ * Get the configured number of maximum attempts that will be made to run a
+ * reduce task, as specified by the <code>mapred.reduce.max.attempts</code>
+ * property. If this property is not already set, the default is 4 attempts.
+ *
+ * @return the max number of attempts per reduce task.
+ */
+ public int getMaxReduceAttempts() {
+ return conf.getMaxReduceAttempts();
+ }
+
+ /**
+ * Get whether the task profiling is enabled.
+ * @return true if some tasks will be profiled
+ */
+ public boolean getProfileEnabled() {
+ return conf.getProfileEnabled();
+ }
+
+ /**
+ * Get the profiler configuration arguments.
+ *
+ * The default value for this property is
+ * "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s"
+ *
+ * @return the parameters to pass to the task child to configure profiling
+ */
+ public String getProfileParams() {
+ return conf.getProfileParams();
+ }
+
+ /**
+ * Get the range of maps or reduces to profile.
+ * @param isMap is the task a map?
+ * @return the task ranges
+ */
+ public IntegerRanges getProfileTaskRange(boolean isMap) {
+ return conf.getProfileTaskRange(isMap);
+ }
+
+ /**
+ * Get the reported username for this job.
+ *
+ * @return the username
+ */
+ public String getUser() {
+ return conf.getUser();
+ }
+
+ public Credentials getCredentials() {
+ return credentials;
+ }
+
+ @Override
+ public JobConf getJobConf() {
+ return conf;
+ }
+
+ @Override
+ public Progressable getProgressible() {
+ return progress;
+ }
+
+}
Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+
+/**
+ * The context that is given to the {@link Mapper}.
+ * @param <KEYIN> the key input type to the Mapper
+ * @param <VALUEIN> the value input type to the Mapper
+ * @param <KEYOUT> the key output type from the Mapper
+ * @param <VALUEOUT> the value output type from the Mapper
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class MapContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
+ extends TaskInputOutputContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
+ implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+ private RecordReader<KEYIN,VALUEIN> reader;
+ private InputSplit split;
+
+ public MapContextImpl(Configuration conf, TaskAttemptID taskid,
+ RecordReader<KEYIN,VALUEIN> reader,
+ RecordWriter<KEYOUT,VALUEOUT> writer,
+ OutputCommitter committer,
+ MRTaskReporter reporter,
+ InputSplit split) {
+ super(conf, taskid, writer, committer, reporter);
+ this.reader = reader;
+ this.split = split;
+ }
+
+ /**
+ * Get the input split for this map.
+ */
+ public InputSplit getInputSplit() {
+ return split;
+ }
+
+ @Override
+ public KEYIN getCurrentKey() throws IOException, InterruptedException {
+ return reader.getCurrentKey();
+ }
+
+ @Override
+ public VALUEIN getCurrentValue() throws IOException, InterruptedException {
+ return reader.getCurrentValue();
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ return reader.nextKeyValue();
+ }
+
+}
+
Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/ReduceContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/ReduceContextImpl.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/ReduceContextImpl.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/ReduceContextImpl.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop.mapreduce;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapred.BackupStore;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.ReduceContext;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.util.Progressable;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+
+/**
+ * The context passed to the {@link Reducer}.
+ * @param <KEYIN> the class of the input keys
+ * @param <VALUEIN> the class of the input values
+ * @param <KEYOUT> the class of the output keys
+ * @param <VALUEOUT> the class of the output values
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class ReduceContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
+ extends TaskInputOutputContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
+ implements ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+ private RawKeyValueIterator input;
+ private Counter inputValueCounter;
+ private Counter inputKeyCounter;
+ private RawComparator<KEYIN> comparator;
+ private KEYIN key; // current key
+ private VALUEIN value; // current value
+ private boolean firstValue = false; // first value in key
+ private boolean nextKeyIsSame = false; // more w/ this key
+ private boolean hasMore; // more in file
+ protected Progressable reporter;
+ private Deserializer<KEYIN> keyDeserializer;
+ private Deserializer<VALUEIN> valueDeserializer;
+ private DataInputBuffer buffer = new DataInputBuffer();
+ private BytesWritable currentRawKey = new BytesWritable();
+ private ValueIterable iterable = new ValueIterable();
+ private boolean isMarked = false;
+ private BackupStore<KEYIN,VALUEIN> backupStore;
+ private final SerializationFactory serializationFactory;
+ private final Class<KEYIN> keyClass;
+ private final Class<VALUEIN> valueClass;
+ private final Configuration conf;
+ private final TaskAttemptID taskid;
+ private int currentKeyLength = -1;
+ private int currentValueLength = -1;
+
+ public ReduceContextImpl(Configuration conf, TaskAttemptID taskid,
+ RawKeyValueIterator input,
+ Counter inputKeyCounter,
+ Counter inputValueCounter,
+ RecordWriter<KEYOUT,VALUEOUT> output,
+ OutputCommitter committer,
+ MRTaskReporter reporter,
+ RawComparator<KEYIN> comparator,
+ Class<KEYIN> keyClass,
+ Class<VALUEIN> valueClass
+ ) throws InterruptedException, IOException{
+ super(conf, taskid, output, committer, reporter);
+ this.input = input;
+ this.inputKeyCounter = inputKeyCounter;
+ this.inputValueCounter = inputValueCounter;
+ this.comparator = comparator;
+ this.serializationFactory = new SerializationFactory(conf);
+ this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
+ this.keyDeserializer.open(buffer);
+ this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
+ this.valueDeserializer.open(buffer);
+ hasMore = input.next();
+ this.keyClass = keyClass;
+ this.valueClass = valueClass;
+ this.conf = conf;
+ this.taskid = taskid;
+ }
+
+ /** Start processing next unique key. */
+ public boolean nextKey() throws IOException,InterruptedException {
+ while (hasMore && nextKeyIsSame) {
+ nextKeyValue();
+ }
+ if (hasMore) {
+ if (inputKeyCounter != null) {
+ inputKeyCounter.increment(1);
+ }
+ return nextKeyValue();
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Advance to the next key/value pair.
+ */
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (!hasMore) {
+ key = null;
+ value = null;
+ return false;
+ }
+ firstValue = !nextKeyIsSame;
+ DataInputBuffer nextKey = input.getKey();
+ currentRawKey.set(nextKey.getData(), nextKey.getPosition(),
+ nextKey.getLength() - nextKey.getPosition());
+ buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
+ key = keyDeserializer.deserialize(key);
+ DataInputBuffer nextVal = input.getValue();
+ buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength());
+ value = valueDeserializer.deserialize(value);
+
+ currentKeyLength = nextKey.getLength() - nextKey.getPosition();
+ currentValueLength = nextVal.getLength() - nextVal.getPosition();
+
+ if (isMarked) {
+ backupStore.write(nextKey, nextVal);
+ }
+
+ hasMore = input.next();
+ if (hasMore) {
+ nextKey = input.getKey();
+ nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0,
+ currentRawKey.getLength(),
+ nextKey.getData(),
+ nextKey.getPosition(),
+ nextKey.getLength() - nextKey.getPosition()
+ ) == 0;
+ } else {
+ nextKeyIsSame = false;
+ }
+ inputValueCounter.increment(1);
+ return true;
+ }
+
+ public KEYIN getCurrentKey() {
+ return key;
+ }
+
+ @Override
+ public VALUEIN getCurrentValue() {
+ return value;
+ }
+
+ BackupStore<KEYIN,VALUEIN> getBackupStore() {
+ return backupStore;
+ }
+
+ protected class ValueIterator implements ReduceContext.ValueIterator<VALUEIN> {
+
+ private boolean inReset = false;
+ private boolean clearMarkFlag = false;
+
+ public boolean hasNext() {
+ try {
+ if (inReset && backupStore.hasNext()) {
+ return true;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException("hasNext failed", e);
+ }
+ return firstValue || nextKeyIsSame;
+ }
+
+ public VALUEIN next() {
+ if (inReset) {
+ try {
+ if (backupStore.hasNext()) {
+ backupStore.next();
+ DataInputBuffer next = backupStore.nextValue();
+ buffer.reset(next.getData(), next.getPosition(), next.getLength());
+ value = valueDeserializer.deserialize(value);
+ return value;
+ } else {
+ inReset = false;
+ backupStore.exitResetMode();
+ if (clearMarkFlag) {
+ clearMarkFlag = false;
+ isMarked = false;
+ }
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw new RuntimeException("next value iterator failed", e);
+ }
+ }
+
+ // if this is the first record, we don't need to advance
+ if (firstValue) {
+ firstValue = false;
+ return value;
+ }
+ // if this isn't the first record and the next key is different, they
+ // can't advance it here.
+ if (!nextKeyIsSame) {
+ throw new NoSuchElementException("iterate past last value");
+ }
+ // otherwise, go to the next key/value pair
+ try {
+ nextKeyValue();
+ return value;
+ } catch (IOException ie) {
+ throw new RuntimeException("next value iterator failed", ie);
+ } catch (InterruptedException ie) {
+ // this is bad, but we can't modify the exception list of java.util
+ throw new RuntimeException("next value iterator interrupted", ie);
+ }
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException("remove not implemented");
+ }
+
+ public void mark() throws IOException {
+ if (getBackupStore() == null) {
+ backupStore = new BackupStore<KEYIN,VALUEIN>(conf, taskid);
+ }
+ isMarked = true;
+ if (!inReset) {
+ backupStore.reinitialize();
+ if (currentKeyLength == -1) {
+ // The user has not called next() for this iterator yet, so
+ // there is no current record to mark and copy to backup store.
+ return;
+ }
+ assert (currentValueLength != -1);
+ int requestedSize = currentKeyLength + currentValueLength +
+ WritableUtils.getVIntSize(currentKeyLength) +
+ WritableUtils.getVIntSize(currentValueLength);
+ DataOutputStream out = backupStore.getOutputStream(requestedSize);
+ writeFirstKeyValueBytes(out);
+ backupStore.updateCounters(requestedSize);
+ } else {
+ backupStore.mark();
+ }
+ }
+
+ public void reset() throws IOException {
+ // We reached the end of an iteration and user calls a
+ // reset, but a clearMark was called before, just throw
+ // an exception
+ if (clearMarkFlag) {
+ clearMarkFlag = false;
+ backupStore.clearMark();
+ throw new IOException("Reset called without a previous mark");
+ }
+
+ if (!isMarked) {
+ throw new IOException("Reset called without a previous mark");
+ }
+ inReset = true;
+ backupStore.reset();
+ }
+
+ public void clearMark() throws IOException {
+ if (getBackupStore() == null) {
+ return;
+ }
+ if (inReset) {
+ clearMarkFlag = true;
+ backupStore.clearMark();
+ } else {
+ inReset = isMarked = false;
+ backupStore.reinitialize();
+ }
+ }
+
+ /**
+ * This method is called when the reducer moves from one key to
+ * another.
+ * @throws IOException
+ */
+ public void resetBackupStore() throws IOException {
+ if (getBackupStore() == null) {
+ return;
+ }
+ inReset = isMarked = false;
+ backupStore.reinitialize();
+ currentKeyLength = -1;
+ }
+
+ /**
+ * This method is called to write the record that was most recently
+ * served (before a call to the mark). Since the framework reads one
+ * record in advance, to get this record, we serialize the current key
+ * and value
+ * @param out
+ * @throws IOException
+ */
+ private void writeFirstKeyValueBytes(DataOutputStream out)
+ throws IOException {
+ assert (getCurrentKey() != null && getCurrentValue() != null);
+ WritableUtils.writeVInt(out, currentKeyLength);
+ WritableUtils.writeVInt(out, currentValueLength);
+ Serializer<KEYIN> keySerializer =
+ serializationFactory.getSerializer(keyClass);
+ keySerializer.open(out);
+ keySerializer.serialize(getCurrentKey());
+
+ Serializer<VALUEIN> valueSerializer =
+ serializationFactory.getSerializer(valueClass);
+ valueSerializer.open(out);
+ valueSerializer.serialize(getCurrentValue());
+ }
+ }
+
+ protected class ValueIterable implements Iterable<VALUEIN> {
+ private ValueIterator iterator = new ValueIterator();
+ public Iterator<VALUEIN> iterator() {
+ return iterator;
+ }
+ }
+
+ /**
+ * Iterate through the values for the current key, reusing the same value
+ * object, which is stored in the context.
+ * @return the series of values associated with the current key. All of the
+ * objects returned directly and indirectly from this method are reused.
+ */
+ public
+ Iterable<VALUEIN> getValues() throws IOException, InterruptedException {
+ return iterable;
+ }
+}
Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop.mapreduce;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.tez.mapreduce.hadoop.IDConverter;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+
+/**
+ * The context for task attempts.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TaskAttemptContextImpl extends JobContextImpl
+ implements TaskAttemptContext {
+ private final TaskAttemptID taskId;
+ private String status = "";
+ private MRTaskReporter reporter;
+
+ public TaskAttemptContextImpl(Configuration conf,
+ TaskAttemptID taskId) {
+ this(conf, taskId, null);
+ }
+
+ public TaskAttemptContextImpl(Configuration conf,
+ TaskAttemptID taskId, MRTaskReporter reporter) {
+ super(conf, IDConverter.fromMRJobId(taskId.getJobID()));
+ this.taskId = taskId;
+ this.reporter = reporter;
+ }
+
+ /**
+ * Get the unique name for this task attempt.
+ */
+ public TaskAttemptID getTaskAttemptID() {
+ return taskId;
+ }
+
+ /**
+ * Get the last set status message.
+ * @return the current status message
+ */
+ public String getStatus() {
+ return status;
+ }
+
+ public Counter getCounter(Enum<?> counterName) {
+ return (Counter) reporter.getCounter(counterName);
+ }
+
+ public Counter getCounter(String groupName, String counterName) {
+ return (Counter) reporter.getCounter(groupName, counterName);
+ }
+
+ /**
+ * Report progress.
+ */
+ public void progress() {
+ reporter.progress();
+ }
+
+ protected void setStatusString(String status) {
+ this.status = status;
+ }
+
+ /**
+ * Set the current status of the task to the given string.
+ */
+ public void setStatus(String status) {
+ String normalizedStatus = Task.normalizeStatus(status, conf);
+ setStatusString(normalizedStatus);
+ reporter.setStatus(normalizedStatus);
+ }
+
+ public static class DummyReporter extends StatusReporter {
+ public void setStatus(String s) {
+ }
+ public void progress() {
+ }
+ public Counter getCounter(Enum<?> name) {
+ return new Counters().findCounter(name);
+ }
+ public Counter getCounter(String group, String name) {
+ return new Counters().findCounter(group, name);
+ }
+ public float getProgress() {
+ return 0f;
+ }
+ }
+
+ public float getProgress() {
+ return reporter.getProgress();
+ }
+}
Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+
+/**
+ * A context object that allows input and output from the task. It is only
+ * supplied to the {@link Mapper} or {@link Reducer}.
+ * @param <KEYIN> the input key type for the task
+ * @param <VALUEIN> the input value type for the task
+ * @param <KEYOUT> the output key type for the task
+ * @param <VALUEOUT> the output value type for the task
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class TaskInputOutputContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
+ extends TaskAttemptContextImpl
+ implements TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+ private RecordWriter<KEYOUT,VALUEOUT> output;
+ private OutputCommitter committer;
+
+ public TaskInputOutputContextImpl(Configuration conf, TaskAttemptID taskid,
+ RecordWriter<KEYOUT,VALUEOUT> output,
+ OutputCommitter committer,
+ MRTaskReporter reporter) {
+ super(conf, taskid, reporter);
+ this.output = output;
+ this.committer = committer;
+ }
+
+ /**
+ * Advance to the next key, value pair, returning null if at end.
+ * @return the key object that was read into, or null if no more
+ */
+ public abstract
+ boolean nextKeyValue() throws IOException, InterruptedException;
+
+ /**
+ * Get the current key.
+ * @return the current key object or null if there isn't one
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public abstract
+ KEYIN getCurrentKey() throws IOException, InterruptedException;
+
+ /**
+ * Get the current value.
+ * @return the value object that was read into
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public abstract VALUEIN getCurrentValue() throws IOException,
+ InterruptedException;
+
+ /**
+ * Generate an output key/value pair.
+ */
+ public void write(KEYOUT key, VALUEOUT value
+ ) throws IOException, InterruptedException {
+ output.write(key, value);
+ }
+
+ public OutputCommitter getOutputCommitter() {
+ return committer;
+ }
+}
Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/records/ProceedToCompletionResponse.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/records/ProceedToCompletionResponse.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/records/ProceedToCompletionResponse.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/records/ProceedToCompletionResponse.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop.records;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+
+public class ProceedToCompletionResponse implements Writable{
+
+ private boolean shouldDie;
+ private boolean readyToProceed;
+
+ public ProceedToCompletionResponse() {
+ }
+
+ public ProceedToCompletionResponse(boolean shouldDie, boolean readyToProceed) {
+ this.shouldDie = shouldDie;
+ this.readyToProceed = readyToProceed;
+ }
+
+ /**
+ * Indicates whether the task is required to proceed to completion, or should
+ * terminate.
+ *
+ * @return
+ */
+ public boolean shouldDie() {
+ return this.shouldDie;
+ }
+
+ /**
+ * Indicates whether the task is ready to proceed. Valid only if shouldDie is
+ * false.
+ *
+ * @return
+ */
+ public boolean readyToProceed() {
+ return this.readyToProceed;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeBoolean(shouldDie);
+ out.writeBoolean(readyToProceed);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ shouldDie = in.readBoolean();
+ readyToProceed = in.readBoolean();
+ }
+
+ @Override
+ public String toString() {
+ return "shouldDie: " + shouldDie + ", readyToProceed: " + readyToProceed;
+ }
+}