You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/04/19 01:54:28 UTC
svn commit: r1469642 [33/36] - in /incubator/tez/branches/TEZ-1: ./
example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/
example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/
tez-common/src/main/ tez-common/src/main/java/ t...
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+
+/**
+ * The context that is given to the {@link Mapper}.
+ * @param <KEYIN> the key input type to the Mapper
+ * @param <VALUEIN> the value input type to the Mapper
+ * @param <KEYOUT> the key output type from the Mapper
+ * @param <VALUEOUT> the value output type from the Mapper
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class MapContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
+ extends TaskInputOutputContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
+ implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+ private RecordReader<KEYIN,VALUEIN> reader;
+ private InputSplit split;
+
+ public MapContextImpl(Configuration conf, TaskAttemptID taskid,
+ RecordReader<KEYIN,VALUEIN> reader,
+ RecordWriter<KEYOUT,VALUEOUT> writer,
+ OutputCommitter committer,
+ MRTaskReporter reporter,
+ InputSplit split) {
+ super(conf, taskid, writer, committer, reporter);
+ this.reader = reader;
+ this.split = split;
+ }
+
+ /**
+ * Get the input split for this map.
+ */
+ public InputSplit getInputSplit() {
+ return split;
+ }
+
+ @Override
+ public KEYIN getCurrentKey() throws IOException, InterruptedException {
+ return reader.getCurrentKey();
+ }
+
+ @Override
+ public VALUEIN getCurrentValue() throws IOException, InterruptedException {
+ return reader.getCurrentValue();
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ return reader.nextKeyValue();
+ }
+
+}
+
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/ReduceContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/ReduceContextImpl.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/ReduceContextImpl.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/ReduceContextImpl.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop.mapreduce;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapred.BackupStore;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.ReduceContext;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.util.Progressable;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+
+/**
+ * The context passed to the {@link Reducer}.
+ * @param <KEYIN> the class of the input keys
+ * @param <VALUEIN> the class of the input values
+ * @param <KEYOUT> the class of the output keys
+ * @param <VALUEOUT> the class of the output values
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class ReduceContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
+ extends TaskInputOutputContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
+ implements ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+ private RawKeyValueIterator input;
+ private Counter inputValueCounter;
+ private Counter inputKeyCounter;
+ private RawComparator<KEYIN> comparator;
+ private KEYIN key; // current key
+ private VALUEIN value; // current value
+ private boolean firstValue = false; // first value in key
+ private boolean nextKeyIsSame = false; // more w/ this key
+ private boolean hasMore; // more in file
+ protected Progressable reporter;
+ private Deserializer<KEYIN> keyDeserializer;
+ private Deserializer<VALUEIN> valueDeserializer;
+ private DataInputBuffer buffer = new DataInputBuffer();
+ private BytesWritable currentRawKey = new BytesWritable();
+ private ValueIterable iterable = new ValueIterable();
+ private boolean isMarked = false;
+ private BackupStore<KEYIN,VALUEIN> backupStore;
+ private final SerializationFactory serializationFactory;
+ private final Class<KEYIN> keyClass;
+ private final Class<VALUEIN> valueClass;
+ private final Configuration conf;
+ private final TaskAttemptID taskid;
+ private int currentKeyLength = -1;
+ private int currentValueLength = -1;
+
+ public ReduceContextImpl(Configuration conf, TaskAttemptID taskid,
+ RawKeyValueIterator input,
+ Counter inputKeyCounter,
+ Counter inputValueCounter,
+ RecordWriter<KEYOUT,VALUEOUT> output,
+ OutputCommitter committer,
+ MRTaskReporter reporter,
+ RawComparator<KEYIN> comparator,
+ Class<KEYIN> keyClass,
+ Class<VALUEIN> valueClass
+ ) throws InterruptedException, IOException{
+ super(conf, taskid, output, committer, reporter);
+ this.input = input;
+ this.inputKeyCounter = inputKeyCounter;
+ this.inputValueCounter = inputValueCounter;
+ this.comparator = comparator;
+ this.serializationFactory = new SerializationFactory(conf);
+ this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
+ this.keyDeserializer.open(buffer);
+ this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
+ this.valueDeserializer.open(buffer);
+ hasMore = input.next();
+ this.keyClass = keyClass;
+ this.valueClass = valueClass;
+ this.conf = conf;
+ this.taskid = taskid;
+ }
+
+ /** Start processing next unique key. */
+ public boolean nextKey() throws IOException,InterruptedException {
+ while (hasMore && nextKeyIsSame) {
+ nextKeyValue();
+ }
+ if (hasMore) {
+ if (inputKeyCounter != null) {
+ inputKeyCounter.increment(1);
+ }
+ return nextKeyValue();
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Advance to the next key/value pair.
+ */
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (!hasMore) {
+ key = null;
+ value = null;
+ return false;
+ }
+ firstValue = !nextKeyIsSame;
+ DataInputBuffer nextKey = input.getKey();
+ currentRawKey.set(nextKey.getData(), nextKey.getPosition(),
+ nextKey.getLength() - nextKey.getPosition());
+ buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
+ key = keyDeserializer.deserialize(key);
+ DataInputBuffer nextVal = input.getValue();
+ buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength());
+ value = valueDeserializer.deserialize(value);
+
+ currentKeyLength = nextKey.getLength() - nextKey.getPosition();
+ currentValueLength = nextVal.getLength() - nextVal.getPosition();
+
+ if (isMarked) {
+ backupStore.write(nextKey, nextVal);
+ }
+
+ hasMore = input.next();
+ if (hasMore) {
+ nextKey = input.getKey();
+ nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0,
+ currentRawKey.getLength(),
+ nextKey.getData(),
+ nextKey.getPosition(),
+ nextKey.getLength() - nextKey.getPosition()
+ ) == 0;
+ } else {
+ nextKeyIsSame = false;
+ }
+ inputValueCounter.increment(1);
+ return true;
+ }
+
+ public KEYIN getCurrentKey() {
+ return key;
+ }
+
+ @Override
+ public VALUEIN getCurrentValue() {
+ return value;
+ }
+
+ BackupStore<KEYIN,VALUEIN> getBackupStore() {
+ return backupStore;
+ }
+
+ protected class ValueIterator implements ReduceContext.ValueIterator<VALUEIN> {
+
+ private boolean inReset = false;
+ private boolean clearMarkFlag = false;
+
+ public boolean hasNext() {
+ try {
+ if (inReset && backupStore.hasNext()) {
+ return true;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException("hasNext failed", e);
+ }
+ return firstValue || nextKeyIsSame;
+ }
+
+ public VALUEIN next() {
+ if (inReset) {
+ try {
+ if (backupStore.hasNext()) {
+ backupStore.next();
+ DataInputBuffer next = backupStore.nextValue();
+ buffer.reset(next.getData(), next.getPosition(), next.getLength());
+ value = valueDeserializer.deserialize(value);
+ return value;
+ } else {
+ inReset = false;
+ backupStore.exitResetMode();
+ if (clearMarkFlag) {
+ clearMarkFlag = false;
+ isMarked = false;
+ }
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw new RuntimeException("next value iterator failed", e);
+ }
+ }
+
+ // if this is the first record, we don't need to advance
+ if (firstValue) {
+ firstValue = false;
+ return value;
+ }
+ // if this isn't the first record and the next key is different, they
+ // can't advance it here.
+ if (!nextKeyIsSame) {
+ throw new NoSuchElementException("iterate past last value");
+ }
+ // otherwise, go to the next key/value pair
+ try {
+ nextKeyValue();
+ return value;
+ } catch (IOException ie) {
+ throw new RuntimeException("next value iterator failed", ie);
+ } catch (InterruptedException ie) {
+ // this is bad, but we can't modify the exception list of java.util
+ throw new RuntimeException("next value iterator interrupted", ie);
+ }
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException("remove not implemented");
+ }
+
+ public void mark() throws IOException {
+ if (getBackupStore() == null) {
+ backupStore = new BackupStore<KEYIN,VALUEIN>(conf, taskid);
+ }
+ isMarked = true;
+ if (!inReset) {
+ backupStore.reinitialize();
+ if (currentKeyLength == -1) {
+ // The user has not called next() for this iterator yet, so
+ // there is no current record to mark and copy to backup store.
+ return;
+ }
+ assert (currentValueLength != -1);
+ int requestedSize = currentKeyLength + currentValueLength +
+ WritableUtils.getVIntSize(currentKeyLength) +
+ WritableUtils.getVIntSize(currentValueLength);
+ DataOutputStream out = backupStore.getOutputStream(requestedSize);
+ writeFirstKeyValueBytes(out);
+ backupStore.updateCounters(requestedSize);
+ } else {
+ backupStore.mark();
+ }
+ }
+
+ public void reset() throws IOException {
+ // We reached the end of an iteration and user calls a
+ // reset, but a clearMark was called before, just throw
+ // an exception
+ if (clearMarkFlag) {
+ clearMarkFlag = false;
+ backupStore.clearMark();
+ throw new IOException("Reset called without a previous mark");
+ }
+
+ if (!isMarked) {
+ throw new IOException("Reset called without a previous mark");
+ }
+ inReset = true;
+ backupStore.reset();
+ }
+
+ public void clearMark() throws IOException {
+ if (getBackupStore() == null) {
+ return;
+ }
+ if (inReset) {
+ clearMarkFlag = true;
+ backupStore.clearMark();
+ } else {
+ inReset = isMarked = false;
+ backupStore.reinitialize();
+ }
+ }
+
+ /**
+ * This method is called when the reducer moves from one key to
+ * another.
+ * @throws IOException
+ */
+ public void resetBackupStore() throws IOException {
+ if (getBackupStore() == null) {
+ return;
+ }
+ inReset = isMarked = false;
+ backupStore.reinitialize();
+ currentKeyLength = -1;
+ }
+
+ /**
+ * This method is called to write the record that was most recently
+ * served (before a call to the mark). Since the framework reads one
+ * record in advance, to get this record, we serialize the current key
+ * and value
+ * @param out
+ * @throws IOException
+ */
+ private void writeFirstKeyValueBytes(DataOutputStream out)
+ throws IOException {
+ assert (getCurrentKey() != null && getCurrentValue() != null);
+ WritableUtils.writeVInt(out, currentKeyLength);
+ WritableUtils.writeVInt(out, currentValueLength);
+ Serializer<KEYIN> keySerializer =
+ serializationFactory.getSerializer(keyClass);
+ keySerializer.open(out);
+ keySerializer.serialize(getCurrentKey());
+
+ Serializer<VALUEIN> valueSerializer =
+ serializationFactory.getSerializer(valueClass);
+ valueSerializer.open(out);
+ valueSerializer.serialize(getCurrentValue());
+ }
+ }
+
+ protected class ValueIterable implements Iterable<VALUEIN> {
+ private ValueIterator iterator = new ValueIterator();
+ public Iterator<VALUEIN> iterator() {
+ return iterator;
+ }
+ }
+
+ /**
+ * Iterate through the values for the current key, reusing the same value
+ * object, which is stored in the context.
+ * @return the series of values associated with the current key. All of the
+ * objects returned directly and indirectly from this method are reused.
+ */
+ public
+ Iterable<VALUEIN> getValues() throws IOException, InterruptedException {
+ return iterable;
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/ReduceContextImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop.mapreduce;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.tez.mapreduce.hadoop.IDConverter;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+
+/**
+ * The context for task attempts.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TaskAttemptContextImpl extends JobContextImpl
+ implements TaskAttemptContext {
+ private final TaskAttemptID taskId;
+ private String status = "";
+ private MRTaskReporter reporter;
+
+ public TaskAttemptContextImpl(Configuration conf,
+ TaskAttemptID taskId) {
+ this(conf, taskId, null);
+ }
+
+ public TaskAttemptContextImpl(Configuration conf,
+ TaskAttemptID taskId, MRTaskReporter reporter) {
+ super(conf, IDConverter.fromMRJobId(taskId.getJobID()));
+ this.taskId = taskId;
+ this.reporter = reporter;
+ }
+
+ /**
+ * Get the unique name for this task attempt.
+ */
+ public TaskAttemptID getTaskAttemptID() {
+ return taskId;
+ }
+
+ /**
+ * Get the last set status message.
+ * @return the current status message
+ */
+ public String getStatus() {
+ return status;
+ }
+
+ public Counter getCounter(Enum<?> counterName) {
+ return (Counter) reporter.getCounter(counterName);
+ }
+
+ public Counter getCounter(String groupName, String counterName) {
+ return (Counter) reporter.getCounter(groupName, counterName);
+ }
+
+ /**
+ * Report progress.
+ */
+ public void progress() {
+ reporter.progress();
+ }
+
+ protected void setStatusString(String status) {
+ this.status = status;
+ }
+
+ /**
+ * Set the current status of the task to the given string.
+ */
+ public void setStatus(String status) {
+ String normalizedStatus = Task.normalizeStatus(status, conf);
+ setStatusString(normalizedStatus);
+ reporter.setStatus(normalizedStatus);
+ }
+
+ public static class DummyReporter extends StatusReporter {
+ public void setStatus(String s) {
+ }
+ public void progress() {
+ }
+ public Counter getCounter(Enum<?> name) {
+ return new Counters().findCounter(name);
+ }
+ public Counter getCounter(String group, String name) {
+ return new Counters().findCounter(group, name);
+ }
+ public float getProgress() {
+ return 0f;
+ }
+ }
+
+ public float getProgress() {
+ return reporter.getProgress();
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+
+/**
+ * A context object that allows input and output from the task. It is only
+ * supplied to the {@link Mapper} or {@link Reducer}.
+ * @param <KEYIN> the input key type for the task
+ * @param <VALUEIN> the input value type for the task
+ * @param <KEYOUT> the output key type for the task
+ * @param <VALUEOUT> the output value type for the task
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class TaskInputOutputContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
+ extends TaskAttemptContextImpl
+ implements TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+ private RecordWriter<KEYOUT,VALUEOUT> output;
+ private OutputCommitter committer;
+
+ public TaskInputOutputContextImpl(Configuration conf, TaskAttemptID taskid,
+ RecordWriter<KEYOUT,VALUEOUT> output,
+ OutputCommitter committer,
+ MRTaskReporter reporter) {
+ super(conf, taskid, reporter);
+ this.output = output;
+ this.committer = committer;
+ }
+
+ /**
+ * Advance to the next key, value pair, returning null if at end.
+ * @return the key object that was read into, or null if no more
+ */
+ public abstract
+ boolean nextKeyValue() throws IOException, InterruptedException;
+
+ /**
+ * Get the current key.
+ * @return the current key object or null if there isn't one
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public abstract
+ KEYIN getCurrentKey() throws IOException, InterruptedException;
+
+ /**
+ * Get the current value.
+ * @return the value object that was read into
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public abstract VALUEIN getCurrentValue() throws IOException,
+ InterruptedException;
+
+ /**
+ * Generate an output key/value pair.
+ */
+ public void write(KEYOUT key, VALUEOUT value
+ ) throws IOException, InterruptedException {
+ output.write(key, value);
+ }
+
+ public OutputCommitter getOutputCommitter() {
+ return committer;
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/records/ProceedToCompletionResponse.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/records/ProceedToCompletionResponse.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/records/ProceedToCompletionResponse.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/records/ProceedToCompletionResponse.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop.records;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+
+public class ProceedToCompletionResponse implements Writable{
+
+ private boolean shouldDie;
+ private boolean readyToProceed;
+
+ public ProceedToCompletionResponse() {
+ }
+
+ public ProceedToCompletionResponse(boolean shouldDie, boolean readyToProceed) {
+ this.shouldDie = shouldDie;
+ this.readyToProceed = readyToProceed;
+ }
+
+ /**
+ * Indicates whether the task is required to proceed to completion, or should
+ * terminate.
+ *
+ * @return
+ */
+ public boolean shouldDie() {
+ return this.shouldDie;
+ }
+
+ /**
+ * Indicates whether the task is ready to proceed. Valid only if shouldDie is
+ * false.
+ *
+ * @return
+ */
+ public boolean readyToProceed() {
+ return this.readyToProceed;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeBoolean(shouldDie);
+ out.writeBoolean(readyToProceed);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ shouldDie = in.readBoolean();
+ readyToProceed = in.readBoolean();
+ }
+
+ @Override
+ public String toString() {
+ return "shouldDie: " + shouldDie + ", readyToProceed: " + readyToProceed;
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/records/ProceedToCompletionResponse.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.input;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.mapreduce.processor.MRTask;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+/**
+ * {@link SimpleInput} is an {@link Input} which provides key/values pairs
+ * for the consumer.
+ *
+ * It is compatible with all standard Apache Hadoop MapReduce
+ * {@link InputFormat} implementations.
+ */
+@SuppressWarnings({ "unchecked", "rawtypes" })
+public class SimpleInput implements Input {
+
+ private static final Log LOG = LogFactory.getLog(SimpleInput.class);
+
+ MRTask task;
+
+ boolean useNewApi;
+
+ JobConf jobConf;
+
+ org.apache.hadoop.mapreduce.TaskAttemptContext taskAttemptContext;
+
+ org.apache.hadoop.mapreduce.InputFormat newInputFormat;
+ org.apache.hadoop.mapreduce.RecordReader newRecordReader;
+
+ org.apache.hadoop.mapred.InputFormat oldInputFormat;
+ org.apache.hadoop.mapred.RecordReader oldRecordReader;
+
+ Object key;
+ Object value;
+
+ private TezCounter inputRecordCounter;
+ private TezCounter fileInputByteCounter;
+ private List<Statistics> fsStats;
+ private MRTaskReporter reporter;
+
+ @Inject
+ public SimpleInput(
+ @Assisted TezTask task
+ )
+ {}
+
+ public void setTask(MRTask task) {
+ this.task = task;
+ }
+
+ public void initialize(Configuration conf, Master master) throws IOException,
+ InterruptedException {
+ if (task == null) {
+ return;
+ }
+
+ if (conf instanceof JobConf) {
+ jobConf = (JobConf)conf;
+ } else {
+ jobConf = new JobConf(conf);
+ }
+
+ useNewApi = jobConf.getUseNewMapper();
+ taskAttemptContext = task.getTaskAttemptContext();
+
+ inputRecordCounter = task.getInputRecordsCounter();
+ fileInputByteCounter = task.getFileInputBytesCounter();
+
+ reporter = task.getMRReporter();
+
+ if (useNewApi) {
+ try {
+ newInputFormat =
+ ReflectionUtils.newInstance(
+ taskAttemptContext.getInputFormatClass(), jobConf);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException(cnfe);
+ }
+
+ newInputSplit = getNewSplitDetails(task.getSplitIndex());
+ List<Statistics> matchedStats = null;
+ if (newInputSplit instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
+ matchedStats = MRTask.getFsStatistics(
+ ((org.apache.hadoop.mapreduce.lib.input.FileSplit)
+ newInputSplit).getPath(), jobConf);
+ }
+ fsStats = matchedStats;
+ newRecordReader =
+ newInputFormat.createRecordReader(newInputSplit, taskAttemptContext);
+ } else {
+ oldInputFormat = jobConf.getInputFormat();
+ org.apache.hadoop.mapred.InputSplit oldInputSplit =
+ getOldSplitDetails(task.getSplitIndex());
+
+ List<Statistics> matchedStats = null;
+ if (oldInputSplit instanceof FileSplit) {
+ matchedStats =
+ MRTask.getFsStatistics(
+ ((FileSplit)oldInputSplit).getPath(), jobConf);
+ }
+ fsStats = matchedStats;
+
+ long bytesInPrev = getInputBytes();
+ oldRecordReader =
+ jobConf.getInputFormat().getRecordReader(
+ oldInputSplit, jobConf, reporter);
+ long bytesInCurr = getInputBytes();
+ fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
+
+ updateJobWithSplit(jobConf, oldInputSplit);
+ }
+ }
+
+ public boolean hasNext() throws IOException, InterruptedException {
+ boolean hasNext = false;
+ long bytesInPrev = getInputBytes();
+
+ if (useNewApi) {
+ hasNext = newRecordReader.nextKeyValue();
+ } else {
+ hasNext = oldRecordReader.next(key, value);
+ }
+
+ long bytesInCurr = getInputBytes();
+ fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
+ reporter.setProgress(getProgress());
+
+ if (hasNext) {
+ inputRecordCounter.increment(1);
+ }
+
+ return hasNext;
+ }
+
+ private SimpleValueIterator vIter = new SimpleValueIterator();
+ private SimpleIterable valuesIterable =
+ new SimpleIterable(vIter);
+
+ private org.apache.hadoop.mapreduce.InputSplit newInputSplit;
+
+ public void setKey(Object key) {
+ this.key = key;
+ }
+
+ public void setValue(Object value) {
+ this.value = value;
+ }
+
+ public Object getNextKey() throws IOException, InterruptedException {
+ if (useNewApi) {
+ return newRecordReader.getCurrentKey();
+ } else {
+ return key;
+ }
+ }
+
+ public Iterable getNextValues() throws IOException,
+ InterruptedException {
+ value = newRecordReader.getCurrentValue();
+ vIter.setValue(value);
+ return valuesIterable;
+ }
+
+ public float getProgress() throws IOException, InterruptedException {
+ if (useNewApi) {
+ return newRecordReader.getProgress();
+ } else {
+ return oldRecordReader.getProgress();
+ }
+ }
+
+ public void close() throws IOException {
+ long bytesInPrev = getInputBytes();
+ if (useNewApi) {
+ newRecordReader.close();
+ } else {
+ oldRecordReader.close();
+ }
+ long bytesInCurr = getInputBytes();
+ fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
+ }
+
+ static class SimpleValueIterator implements Iterator {
+
+ private Object value;
+
+ public void setValue(Object value) {
+ this.value = value;
+ }
+
+ public boolean hasNext() {
+ return false;
+ }
+
+ public Object next() {
+ Object value = this.value;
+ this.value = null;
+ return value;
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ static class SimpleIterable implements Iterable {
+ private final Iterator iterator;
+ public SimpleIterable(Iterator iterator) {
+ this.iterator = iterator;
+ }
+
+ public Iterator iterator() {
+ return iterator;
+ }
+ }
+
+
+ public RecordReader getOldRecordReader() {
+ return oldRecordReader;
+ }
+
+ public org.apache.hadoop.mapreduce.RecordReader getNewRecordReader() {
+ return newRecordReader;
+ }
+
+ public org.apache.hadoop.mapred.InputSplit
+ getOldSplitDetails(TaskSplitIndex splitMetaInfo)
+ throws IOException {
+ Path file = new Path(splitMetaInfo.getSplitLocation());
+ FileSystem fs = FileSystem.getLocal(jobConf);
+ file = fs.makeQualified(file);
+ LOG.info("Reading input split file from : " + file);
+ long offset = splitMetaInfo.getStartOffset();
+
+ FSDataInputStream inFile = fs.open(file);
+ inFile.seek(offset);
+ String className = Text.readString(inFile);
+ Class<org.apache.hadoop.mapred.InputSplit> cls;
+ try {
+ cls =
+ (Class<org.apache.hadoop.mapred.InputSplit>)
+ jobConf.getClassByName(className);
+ } catch (ClassNotFoundException ce) {
+ IOException wrap = new IOException("Split class " + className +
+ " not found");
+ wrap.initCause(ce);
+ throw wrap;
+ }
+ SerializationFactory factory = new SerializationFactory(jobConf);
+ Deserializer<org.apache.hadoop.mapred.InputSplit> deserializer =
+ (Deserializer<org.apache.hadoop.mapred.InputSplit>)
+ factory.getDeserializer(cls);
+ deserializer.open(inFile);
+ org.apache.hadoop.mapred.InputSplit split = deserializer.deserialize(null);
+ long pos = inFile.getPos();
+ reporter.getCounter(TaskCounter.SPLIT_RAW_BYTES).increment(pos - offset);
+ inFile.close();
+ return split;
+ }
+
+ public org.apache.hadoop.mapreduce.InputSplit
+ getNewSplitDetails(TaskSplitIndex splitMetaInfo)
+ throws IOException {
+ Path file = new Path(splitMetaInfo.getSplitLocation());
+ long offset = splitMetaInfo.getStartOffset();
+
+ // Split information read from local filesystem.
+ FileSystem fs = FileSystem.getLocal(jobConf);
+ file = fs.makeQualified(file);
+ LOG.info("Reading input split file from : " + file);
+ FSDataInputStream inFile = fs.open(file);
+ inFile.seek(offset);
+ String className = Text.readString(inFile);
+ Class<org.apache.hadoop.mapreduce.InputSplit> cls;
+ try {
+ cls =
+ (Class<org.apache.hadoop.mapreduce.InputSplit>)
+ jobConf.getClassByName(className);
+ } catch (ClassNotFoundException ce) {
+ IOException wrap = new IOException("Split class " + className +
+ " not found");
+ wrap.initCause(ce);
+ throw wrap;
+ }
+ SerializationFactory factory = new SerializationFactory(jobConf);
+ Deserializer<org.apache.hadoop.mapreduce.InputSplit> deserializer =
+ (Deserializer<org.apache.hadoop.mapreduce.InputSplit>)
+ factory.getDeserializer(cls);
+ deserializer.open(inFile);
+ org.apache.hadoop.mapreduce.InputSplit split =
+ deserializer.deserialize(null);
+ long pos = inFile.getPos();
+ reporter.getCounter(TaskCounter.SPLIT_RAW_BYTES).increment(pos - offset);
+ inFile.close();
+ return split;
+ }
+
+ private void updateJobWithSplit(final JobConf job, InputSplit inputSplit) {
+ if (inputSplit instanceof FileSplit) {
+ FileSplit fileSplit = (FileSplit) inputSplit;
+ job.set(JobContext.MAP_INPUT_FILE, fileSplit.getPath().toString());
+ job.setLong(JobContext.MAP_INPUT_START, fileSplit.getStart());
+ job.setLong(JobContext.MAP_INPUT_PATH, fileSplit.getLength());
+ }
+ LOG.info("Processing split: " + inputSplit);
+ }
+
+ private long getInputBytes() {
+ if (fsStats == null) return 0;
+ long bytesRead = 0;
+ for (Statistics stat: fsStats) {
+ bytesRead = bytesRead + stat.getBytesRead();
+ }
+ return bytesRead;
+ }
+
+ public void initializeNewRecordReader(
+ org.apache.hadoop.mapreduce.InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ newRecordReader.initialize(split, context);
+ }
+
+ public org.apache.hadoop.mapreduce.InputSplit getNewInputSplit() {
+ return newInputSplit;
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.output;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.records.OutputContext;
+import org.apache.tez.mapreduce.processor.MRTask;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+/**
+ * {@link SimpleOutput} is an {@link Output} which persists key/value pairs
+ * written to it.
+ *
+ * It is compatible with all standard Apache Hadoop MapReduce
+ * {@link OutputFormat} implementations.
+ */
+@SuppressWarnings({ "unchecked", "rawtypes" })
+public class SimpleOutput implements Output {
+
+ private MRTask task;
+
+ boolean useNewApi;
+ JobConf jobConf;
+
+ org.apache.hadoop.mapreduce.TaskAttemptContext taskAttemptContext;
+ org.apache.hadoop.mapreduce.OutputFormat newOutputFormat;
+ org.apache.hadoop.mapreduce.RecordWriter newRecordWriter;
+
+ org.apache.hadoop.mapred.OutputFormat oldOutputFormat;
+ org.apache.hadoop.mapred.RecordWriter oldRecordWriter;
+
+ private TezCounter outputRecordCounter;
+ private TezCounter fileOutputByteCounter;
+ private List<Statistics> fsStats;
+ private MRTaskReporter reporter;
+
+ @Inject
+ public SimpleOutput(
+ @Assisted TezTask task
+ ) {
+ }
+
+ public void setTask(MRTask task) {
+ this.task = task;
+ }
+
+ public void initialize(Configuration conf, Master master) throws IOException,
+ InterruptedException {
+
+ if (task == null) {
+ return;
+ }
+
+ if (conf instanceof JobConf) {
+ jobConf = (JobConf)conf;
+ } else {
+ jobConf = new JobConf(conf);
+ }
+
+ useNewApi = jobConf.getUseNewMapper();
+ taskAttemptContext = task.getTaskAttemptContext();
+
+ outputRecordCounter = task.getOutputRecordsCounter();
+ fileOutputByteCounter = task.getFileOutputBytesCounter();
+
+ reporter = task.getMRReporter();
+
+ if (useNewApi) {
+ try {
+ newOutputFormat =
+ ReflectionUtils.newInstance(
+ taskAttemptContext.getOutputFormatClass(), jobConf);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException(cnfe);
+ }
+
+ List<Statistics> matchedStats = null;
+ if (newOutputFormat instanceof
+ org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) {
+ matchedStats =
+ MRTask.getFsStatistics(
+ org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+ .getOutputPath(taskAttemptContext),
+ jobConf);
+ }
+ fsStats = matchedStats;
+
+ long bytesOutPrev = getOutputBytes();
+ newRecordWriter =
+ newOutputFormat.getRecordWriter(this.taskAttemptContext);
+ long bytesOutCurr = getOutputBytes();
+ fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
+ } else {
+ oldOutputFormat = jobConf.getOutputFormat();
+
+ List<Statistics> matchedStats = null;
+ if (oldOutputFormat instanceof org.apache.hadoop.mapred.FileOutputFormat) {
+ matchedStats =
+ MRTask.getFsStatistics(
+ org.apache.hadoop.mapred.FileOutputFormat.getOutputPath(
+ jobConf),
+ jobConf);
+ }
+ fsStats = matchedStats;
+
+ FileSystem fs = FileSystem.get(jobConf);
+ String finalName = task.getOutputName();
+
+ long bytesOutPrev = getOutputBytes();
+ oldRecordWriter =
+ oldOutputFormat.getRecordWriter(
+ fs, jobConf, finalName, reporter);
+ long bytesOutCurr = getOutputBytes();
+ fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
+ }
+ }
+
+ public void write(Object key, Object value)
+ throws IOException, InterruptedException {
+
+ reporter.progress();
+ long bytesOutPrev = getOutputBytes();
+
+ if (useNewApi) {
+ newRecordWriter.write(key, value);
+ } else {
+ oldRecordWriter.write(key, value);
+ }
+
+ long bytesOutCurr = getOutputBytes();
+ fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
+ outputRecordCounter.increment(1);
+
+ }
+
+ public void close() throws IOException, InterruptedException {
+ reporter.progress();
+ long bytesOutPrev = getOutputBytes();
+ if (useNewApi) {
+ newRecordWriter.close(taskAttemptContext);
+ } else {
+ oldRecordWriter.close(null);
+ }
+ long bytesOutCurr = getOutputBytes();
+ fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
+ }
+
+ public org.apache.hadoop.mapreduce.OutputFormat getNewOutputFormat() {
+ return newOutputFormat;
+ }
+
+ public org.apache.hadoop.mapred.OutputFormat getOldOutputFormat() {
+ return oldOutputFormat;
+ }
+
+ private long getOutputBytes() {
+ if (fsStats == null) return 0;
+ long bytesWritten = 0;
+ for (Statistics stat: fsStats) {
+ bytesWritten = bytesWritten + stat.getBytesWritten();
+ }
+ return bytesWritten;
+ }
+
+ @Override
+ public OutputContext getOutputContext() {
+ return null;
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,104 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.partition;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.mapreduce.processor.MRTask;
+
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class MRPartitioner implements org.apache.tez.engine.api.Partitioner {
+
+ static final Log LOG = LogFactory.getLog(MRPartitioner.class);
+ private final MRTask task;
+
+ JobConf jobConf;
+ boolean useNewApi;
+
+ org.apache.hadoop.mapred.Partitioner oldPartitioner;
+ org.apache.hadoop.mapreduce.Partitioner newPartitioner;
+
+ public MRPartitioner(MRTask task) {
+ this.task = task;
+ }
+
+ public void initialize(Configuration conf, Master master)
+ throws IOException, InterruptedException {
+ if (conf instanceof JobConf) {
+ jobConf = (JobConf)conf;
+ } else {
+ jobConf = new JobConf(conf);
+ }
+
+ useNewApi = jobConf.getUseNewMapper();
+ final int partitions = jobConf.getNumReduceTasks();
+ if (useNewApi) {
+ if (partitions > 1) {
+ try {
+ newPartitioner = (org.apache.hadoop.mapreduce.Partitioner)
+ ReflectionUtils.newInstance(
+ task.getJobContext().getPartitionerClass(), jobConf);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException(cnfe);
+ }
+ } else {
+ newPartitioner = new org.apache.hadoop.mapreduce.Partitioner() {
+ @Override
+ public int getPartition(Object key, Object value, int numPartitions) {
+ return numPartitions - 1;
+ }
+ };
+ }
+ } else {
+ if (partitions > 1) {
+ oldPartitioner = (Partitioner)
+ ReflectionUtils.newInstance(jobConf.getPartitionerClass(), jobConf);
+ } else {
+ oldPartitioner = new Partitioner() {
+ @Override
+ public void configure(JobConf job) {}
+
+ @Override
+ public int getPartition(Object key, Object value, int numPartitions) {
+ return numPartitions - 1;
+ }
+ };
+ }
+
+ }
+
+ }
+
+ @Override
+ public int getPartition(Object key, Object value, int numPartitions) {
+ if (useNewApi) {
+ return newPartitioner.getPartition(key, value, numPartitions);
+ } else {
+ return oldPartitioner.getPartition(key, value, numPartitions);
+ }
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/FileSystemStatisticsUpdater.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/FileSystemStatisticsUpdater.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/FileSystemStatisticsUpdater.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/FileSystemStatisticsUpdater.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.mapreduce.processor;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.tez.common.counters.FileSystemCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+
+ /**
+ * An updater that tracks the last number reported for a given file
+ * system and only creates the counters when they are needed.
+ */
+ class FileSystemStatisticUpdater {
+ private List<FileSystem.Statistics> stats;
+ private TezCounter readBytesCounter, writeBytesCounter,
+ readOpsCounter, largeReadOpsCounter, writeOpsCounter;
+ private String scheme;
+ private TezCounters counters;
+
+ FileSystemStatisticUpdater(TezCounters counters, List<FileSystem.Statistics> stats, String scheme) {
+ this.stats = stats;
+ this.scheme = scheme;
+ this.counters = counters;
+ }
+
+ void updateCounters() {
+ if (readBytesCounter == null) {
+ readBytesCounter = counters.findCounter(scheme,
+ FileSystemCounter.BYTES_READ);
+ }
+ if (writeBytesCounter == null) {
+ writeBytesCounter = counters.findCounter(scheme,
+ FileSystemCounter.BYTES_WRITTEN);
+ }
+ if (readOpsCounter == null) {
+ readOpsCounter = counters.findCounter(scheme,
+ FileSystemCounter.READ_OPS);
+ }
+ if (largeReadOpsCounter == null) {
+ largeReadOpsCounter = counters.findCounter(scheme,
+ FileSystemCounter.LARGE_READ_OPS);
+ }
+ if (writeOpsCounter == null) {
+ writeOpsCounter = counters.findCounter(scheme,
+ FileSystemCounter.WRITE_OPS);
+ }
+ long readBytes = 0;
+ long writeBytes = 0;
+ long readOps = 0;
+ long largeReadOps = 0;
+ long writeOps = 0;
+ for (FileSystem.Statistics stat: stats) {
+ readBytes = readBytes + stat.getBytesRead();
+ writeBytes = writeBytes + stat.getBytesWritten();
+ readOps = readOps + stat.getReadOps();
+ largeReadOps = largeReadOps + stat.getLargeReadOps();
+ writeOps = writeOps + stat.getWriteOps();
+ }
+ readBytesCounter.setValue(readBytes);
+ writeBytesCounter.setValue(writeBytes);
+ readOpsCounter.setValue(readOps);
+ largeReadOpsCounter.setValue(largeReadOps);
+ writeOpsCounter.setValue(writeOps);
+ }
+ }
+
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/FileSystemStatisticsUpdater.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/GcTimeUpdater.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/GcTimeUpdater.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/GcTimeUpdater.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/GcTimeUpdater.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.mapreduce.processor;
+
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.util.List;
+
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.common.counters.TaskCounter;
+
+/**
+ * An updater that tracks the amount of time this task has spent in GC.
+ */
+ class GcTimeUpdater {
+ private long lastGcMillis = 0;
+ private List<GarbageCollectorMXBean> gcBeans = null;
+ TezCounters counters;
+
+ public GcTimeUpdater(TezCounters counters) {
+ this.gcBeans = ManagementFactory.getGarbageCollectorMXBeans();
+ getElapsedGc(); // Initialize 'lastGcMillis' with the current time spent.
+ this.counters = counters;
+ }
+
+ /**
+ * @return the number of milliseconds that the gc has used for CPU
+ * since the last time this method was called.
+ */
+ protected long getElapsedGc() {
+ long thisGcMillis = 0;
+ for (GarbageCollectorMXBean gcBean : gcBeans) {
+ thisGcMillis += gcBean.getCollectionTime();
+ }
+
+ long delta = thisGcMillis - lastGcMillis;
+ this.lastGcMillis = thisGcMillis;
+ return delta;
+ }
+
+ /**
+ * Increment the gc-elapsed-time counter.
+ */
+ public void incrementGcCounter() {
+ if (null == counters) {
+ return; // nothing to do.
+ }
+
+ TezCounter gcCounter =
+ counters.findCounter(TaskCounter.GC_TIME_MILLIS);
+ if (null != gcCounter) {
+ gcCounter.increment(getElapsedGc());
+ }
+ }
+ }
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/GcTimeUpdater.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,716 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.processor;
+
+import java.io.IOException;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapred.FileOutputCommitter;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
+import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
+import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
+import org.apache.tez.common.Constants;
+import org.apache.tez.common.TezEngineTask;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezTaskStatus;
+import org.apache.tez.common.TezTaskStatus.Phase;
+import org.apache.tez.common.TezTaskStatus.State;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+import org.apache.tez.engine.records.OutputContext;
+import org.apache.tez.engine.records.TezDAGID;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.mapreduce.hadoop.IDConverter;
+import org.apache.tez.mapreduce.hadoop.MRConfig;
+import org.apache.tez.mapreduce.hadoop.MRTaskStatus;
+import org.apache.tez.mapreduce.hadoop.TezTaskUmbilicalProtocol;
+import org.apache.tez.mapreduce.hadoop.TezTypeConverters;
+import org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.tez.mapreduce.hadoop.mapreduce.JobContextImpl;
+import org.apache.tez.mapreduce.hadoop.records.ProceedToCompletionResponse;
+import org.apache.tez.mapreduce.partition.MRPartitioner;
+
+public abstract class MRTask
+extends org.apache.tez.common.TezTask {
+
+ static final Log LOG = LogFactory.getLog(MRTask.class);
+
+ protected JobConf jobConf;
+ protected JobContext jobContext;
+ protected TaskAttemptContext taskAttemptContext;
+ protected OutputCommitter committer;
+
+ // Current counters
+ transient TezCounters counters = new TezCounters();
+ protected GcTimeUpdater gcUpdater;
+ private ResourceCalculatorProcessTree pTree;
+ private long initCpuCumulativeTime = 0;
+ protected TezEngineTask tezTaskContext;
+
+ /* flag to track whether task is done */
+ AtomicBoolean taskDone = new AtomicBoolean(false);
+
+ /** Construct output file names so that, when an output directory listing is
+ * sorted lexicographically, positions correspond to output partitions.*/
+ private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
+ static {
+ NUMBER_FORMAT.setMinimumIntegerDigits(5);
+ NUMBER_FORMAT.setGroupingUsed(false);
+ }
+
+ private final static int MAX_RETRIES = 10;
+
+ /** The number of milliseconds between progress reports. */
+ public static final int PROGRESS_INTERVAL = 3000;
+
+ private MRTaskReporter mrReporter;
+
+ protected TaskSplitIndex splitMetaInfo = new TaskSplitIndex();
+
+ /**
+ * A Map where Key-> URIScheme and value->FileSystemStatisticUpdater
+ */
+ private Map<String, FileSystemStatisticUpdater> statisticUpdaters =
+ new HashMap<String, FileSystemStatisticUpdater>();
+
+ public MRTask(TezTask context) {
+ super(context.getTaskAttemptId(), context.getUser(), context.getJobName(),
+ ((TezEngineTask)context).getTaskModuleClassName());
+
+ tezTaskContext = (TezEngineTask) context;
+ // TODO TEZAM4 Figure out initialization / run sequence of Input, Process,
+ // Output. Phase is MR specific.
+ status =
+ new MRTaskStatus(
+ getTaskAttemptId(),
+ counters,
+ (getTaskAttemptId().getTaskID().getVertexID().getId() == 0 ?
+ Phase.MAP : Phase.SHUFFLE)
+ );
+ gcUpdater = new GcTimeUpdater(counters);
+ }
+
+ public void initialize(Configuration conf, Master master) throws IOException,
+ InterruptedException {
+
+ if (conf instanceof JobConf) {
+ this.jobConf = (JobConf)conf;
+ } else {
+ this.jobConf = new JobConf(conf);
+ }
+ reporter =
+ new TezTaskReporterImpl(this, (TezTaskUmbilicalProtocol)master);
+ ((TezTaskReporterImpl)reporter).startCommunicationThread();
+
+ jobConf.set(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID,
+ getTaskAttemptId().toString());
+
+ initResourceCalculatorPlugin();
+
+ LOG.info("MRTask.inited: taskAttemptId = " + getTaskAttemptId().toString());
+ }
+
+ private void initResourceCalculatorPlugin() {
+ Class<? extends ResourceCalculatorProcessTree> clazz =
+ this.jobConf.getClass(MRConfig.RESOURCE_CALCULATOR_PROCESS_TREE,
+ null, ResourceCalculatorProcessTree.class);
+ pTree = ResourceCalculatorProcessTree
+ .getResourceCalculatorProcessTree(System.getenv().get("JVM_PID"), clazz, this.jobConf);
+ LOG.info(" Using ResourceCalculatorProcessTree : " + pTree);
+ if (pTree != null) {
+ pTree.updateProcessTree();
+ initCpuCumulativeTime = pTree.getCumulativeCpuTime();
+ }
+ }
+
+ public TezTaskUmbilicalProtocol getUmbilical() {
+ return ((TezTaskReporterImpl)reporter).getUmbilical();
+ }
+
+ public void initTask(JobConf job, TezDAGID dagId,
+ MRTaskReporter mrReporter,
+ boolean useNewApi) throws IOException,
+ InterruptedException {
+ this.jobConf = job;
+ this.jobContext = new JobContextImpl(job, dagId, mrReporter);
+ this.taskAttemptContext =
+ new TaskAttemptContextImpl(job, getTaskAttemptId(), mrReporter);
+ this.mrReporter = mrReporter;
+
+ if (getState() == State.UNASSIGNED) {
+ setState(State.RUNNING);
+ }
+
+ // Save the committer
+ if (useNewApi) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("using new api for output committer");
+ }
+ OutputFormat<?, ?> outputFormat = null;
+ try {
+ outputFormat =
+ ReflectionUtils.newInstance(
+ taskAttemptContext.getOutputFormatClass(), job);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException("Unknown OutputFormat", cnfe);
+ }
+ setCommitter(outputFormat.getOutputCommitter(taskAttemptContext));
+ } else {
+ setCommitter(job.getOutputCommitter());
+ }
+
+ Path outputPath = FileOutputFormat.getOutputPath(job);
+ if (outputPath != null) {
+ if ((getCommitter() instanceof FileOutputCommitter)) {
+ FileOutputFormat.setWorkOutputPath(job,
+ ((FileOutputCommitter)getCommitter()).getTaskAttemptPath(taskAttemptContext));
+ } else {
+ FileOutputFormat.setWorkOutputPath(job, outputPath);
+ }
+ }
+ getCommitter().setupTask(taskAttemptContext);
+
+ partitioner = new MRPartitioner(this);
+ ((MRPartitioner)partitioner).initialize(job, getTaskReporter());
+
+ localizeConfiguration(jobConf);
+ }
+
+ public MRTaskReporter getMRReporter() {
+ return mrReporter;
+ }
+
+ public void setState(State state) {
+ // TODO Auto-generated method stub
+
+ }
+
+ public State getState() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public OutputCommitter getCommitter() {
+ return committer;
+ }
+
+ public void setCommitter(OutputCommitter committer) {
+ this.committer = committer;
+ }
+
+ public TezCounters getCounters() { return counters; }
+
+ /**
+ * Return current phase of the task.
+ * needs to be synchronized as communication thread sends the phase every second
+ * @return the curent phase of the task
+ */
+ public synchronized TezTaskStatus.Phase getPhase(){
+ return status.getPhase();
+ }
+
+ /**
+ * Set current phase of the task.
+ * @param phase task phase
+ */
+ protected synchronized void setPhase(TezTaskStatus.Phase phase){
+ status.setPhase(phase);
+ }
+
+ public void setConf(JobConf jobConf) {
+ this.jobConf = jobConf;
+ }
+
+ public JobConf getConf() {
+ return this.jobConf;
+ }
+
+ /**
+ * Gets a handle to the Statistics instance based on the scheme associated
+ * with path.
+ *
+ * @param path the path.
+ * @param conf the configuration to extract the scheme from if not part of
+ * the path.
+ * @return a Statistics instance, or null if none is found for the scheme.
+ */
+ @Private
+ public static List<Statistics> getFsStatistics(Path path, Configuration conf) throws IOException {
+ List<Statistics> matchedStats = new ArrayList<FileSystem.Statistics>();
+ path = path.getFileSystem(conf).makeQualified(path);
+ String scheme = path.toUri().getScheme();
+ for (Statistics stats : FileSystem.getAllStatistics()) {
+ if (stats.getScheme().equals(scheme)) {
+ matchedStats.add(stats);
+ }
+ }
+ return matchedStats;
+ }
+
+ @Private
+ public synchronized String getOutputName() {
+ return "part-" + NUMBER_FORMAT.format(getTaskAttemptId().getTaskID().getId());
+ }
+
+ public void waitBeforeCompletion(MRTaskReporter reporter) throws IOException,
+ InterruptedException {
+ TezTaskUmbilicalProtocol umbilical = getUmbilical();
+ int retries = MAX_RETRIES;
+ boolean readyToProceed = false;
+ while (!readyToProceed) {
+ try {
+ ProceedToCompletionResponse response =
+ umbilical.proceedToCompletion(getTaskAttemptId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got readyToProceed: " + response);
+ }
+ if (response.shouldDie()) {
+ throw new IOException("Task was asked to die by the AM");
+ // TODO EVENTUALLY Figure out a good way for a graceful exit, instead
+ // of an exit via an Exception. This isn' necessarily an error.
+ }
+ readyToProceed = response.readyToProceed();
+ } catch (IOException ie) {
+ LOG.warn("Failure waiting for exit signal: " +
+ StringUtils.stringifyException(ie));
+ if (--retries == 0) {
+ throw ie;
+ }
+ }
+ synchronized(this) {
+ wait(1000l); // Check if ready to exit every second.
+ }
+ }
+ }
+
+ public void outputReady(MRTaskReporter reporter, OutputContext outputContext)
+ throws IOException,
+ InterruptedException {
+ LOG.info("Task: " + getTaskAttemptId() + " reporting outputReady");
+ updateCounters();
+ statusUpdate();
+
+ TezTaskUmbilicalProtocol umbilical = getUmbilical();
+ int retries = MAX_RETRIES;
+ while (true) {
+ try {
+ umbilical.outputReady(getTaskAttemptId(), outputContext);
+ LOG.info("Task '" + getTaskAttemptId() + "' reported outputReady.");
+ return;
+ } catch (IOException ie) {
+ LOG.warn("Failure signalling outputReady: " +
+ StringUtils.stringifyException(ie));
+ if (--retries == 0) {
+ throw ie;
+ }
+ }
+ }
+ }
+
+ public void done(
+ OutputContext outputContext,
+ MRTaskReporter reporter
+ ) throws IOException, InterruptedException {
+ updateCounters();
+ if (outputContext != null) {
+ LOG.info("Task: "
+ + getTaskAttemptId()
+ + " is done."
+ + " And is in the process of sending output-context with shuffle port: "
+ + outputContext.getShufflePort());
+ outputReady(reporter, outputContext);
+ waitBeforeCompletion(reporter);
+ }
+
+ LOG.info("Task:" + getTaskAttemptId() + " is done."
+ + " And is in the process of committing");
+ TezTaskUmbilicalProtocol umbilical = getUmbilical();
+ // TODO TEZ Interaciton between Commit and OutputReady. Merge ?
+ if (isCommitRequired()) {
+ int retries = MAX_RETRIES;
+ setState(TezTaskStatus.State.COMMIT_PENDING);
+ // say the task tracker that task is commit pending
+ // TODO TEZAM2 - Why is the commitRequired check missing ?
+ while (true) {
+ try {
+ umbilical.commitPending(getTaskAttemptId(), status);
+ break;
+ } catch (InterruptedException ie) {
+ // ignore
+ } catch (IOException ie) {
+ LOG.warn("Failure sending commit pending: " +
+ StringUtils.stringifyException(ie));
+ if (--retries == 0) {
+ System.exit(67);
+ }
+ }
+ }
+ //wait for commit approval and commit
+ // TODO EVENTUALLY - Commit is not required for map tasks. skip a couple of RPCs before exiting.
+ commit(umbilical, reporter, committer);
+ }
+ taskDone.set(true);
+ reporter.stopCommunicationThread();
+ // Make sure we send at least one set of counter increments. It's
+ // ok to call updateCounters() in this thread after comm thread stopped.
+ updateCounters();
+ sendLastUpdate();
+ //signal the tasktracker that we are done
+ sendDone(umbilical);
+ }
+
+
+ private boolean isCommitRequired() throws IOException {
+ return committer.needsTaskCommit(taskAttemptContext);
+ }
+
+ /**
+ * Send a status update to the task tracker
+ * @param umbilical
+ * @throws IOException
+ */
+ public void statusUpdate() throws IOException, InterruptedException {
+ int retries = MAX_RETRIES;
+ while (true) {
+ try {
+ if (!getUmbilical().statusUpdate(getTaskAttemptId(), status)) {
+ LOG.warn("Parent died. Exiting " + getTaskAttemptId());
+ System.exit(66);
+ }
+ status.clearStatus();
+ return;
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt(); // interrupt ourself
+ } catch (IOException ie) {
+ LOG.warn("Failure sending status update: " +
+ StringUtils.stringifyException(ie));
+ if (--retries == 0) {
+ throw ie;
+ }
+ }
+ }
+ }
+
+ /**
+ * Sends last status update before sending umbilical.done();
+ */
+ private void sendLastUpdate()
+ throws IOException, InterruptedException {
+ status.setOutputSize(-1l);
+ // send a final status report
+ status.statusUpdate(
+ getProgress().get(), getProgress().toString(), counters);
+ statusUpdate();
+ }
+
+ private void commit(TezTaskUmbilicalProtocol umbilical,
+ MRTaskReporter reporter,
+ org.apache.hadoop.mapreduce.OutputCommitter committer
+ ) throws IOException {
+ int retries = MAX_RETRIES;
+ while (true) {
+ try {
+ while (!umbilical.canCommit(getTaskAttemptId())) {
+ // This will loop till the AM asks for the task to be killed. As
+ // against, the AM sending a signal to the task to kill itself
+ // gracefully.
+ try {
+ Thread.sleep(1000);
+ } catch(InterruptedException ie) {
+ //ignore
+ }
+ reporter.setProgressFlag();
+ }
+ break;
+ } catch (IOException ie) {
+ LOG.warn("Failure asking whether task can commit: " +
+ StringUtils.stringifyException(ie));
+ if (--retries == 0) {
+ //if it couldn't query successfully then delete the output
+ discardOutput(taskAttemptContext);
+ System.exit(68);
+ }
+ }
+ }
+
+ // task can Commit now
+ try {
+ LOG.info("Task " + getTaskAttemptId() + " is allowed to commit now");
+ committer.commitTask(taskAttemptContext);
+ return;
+ } catch (IOException iee) {
+ LOG.warn("Failure committing: " +
+ StringUtils.stringifyException(iee));
+ //if it couldn't commit a successfully then delete the output
+ discardOutput(taskAttemptContext);
+ throw iee;
+ }
+ }
+
+ private
+ void discardOutput(TaskAttemptContext taskContext) {
+ try {
+ committer.abortTask(taskContext);
+ } catch (IOException ioe) {
+ LOG.warn("Failure cleaning up: " +
+ StringUtils.stringifyException(ioe));
+ }
+ }
+
+
+ private void sendDone(TezTaskUmbilicalProtocol umbilical) throws IOException {
+ int retries = MAX_RETRIES;
+ while (true) {
+ try {
+ umbilical.done(getTaskAttemptId());
+ LOG.info("Task '" + getTaskAttemptId() + "' done.");
+ return;
+ } catch (IOException ie) {
+ LOG.warn("Failure signalling completion: " +
+ StringUtils.stringifyException(ie));
+ if (--retries == 0) {
+ throw ie;
+ }
+ }
+ }
+ }
+
+ public void updateCounters() {
+ // TODO Auto-generated method stub
+ // TODO TEZAM Implement.
+ Map<String, List<FileSystem.Statistics>> map = new
+ HashMap<String, List<FileSystem.Statistics>>();
+ for(Statistics stat: FileSystem.getAllStatistics()) {
+ String uriScheme = stat.getScheme();
+ if (map.containsKey(uriScheme)) {
+ List<FileSystem.Statistics> list = map.get(uriScheme);
+ list.add(stat);
+ } else {
+ List<FileSystem.Statistics> list = new ArrayList<FileSystem.Statistics>();
+ list.add(stat);
+ map.put(uriScheme, list);
+ }
+ }
+ for (Map.Entry<String, List<FileSystem.Statistics>> entry: map.entrySet()) {
+ FileSystemStatisticUpdater updater = statisticUpdaters.get(entry.getKey());
+ if(updater==null) {//new FileSystem has been found in the cache
+ updater =
+ new FileSystemStatisticUpdater(counters, entry.getValue(),
+ entry.getKey());
+ statisticUpdaters.put(entry.getKey(), updater);
+ }
+ updater.updateCounters();
+ }
+
+ gcUpdater.incrementGcCounter();
+ updateResourceCounters();
+ }
+
+ /**
+ * Updates the {@link TaskCounter#COMMITTED_HEAP_BYTES} counter to reflect the
+ * current total committed heap space usage of this JVM.
+ */
+ private void updateHeapUsageCounter() {
+ long currentHeapUsage = Runtime.getRuntime().totalMemory();
+ counters.findCounter(TaskCounter.COMMITTED_HEAP_BYTES)
+ .setValue(currentHeapUsage);
+ }
+
+ /**
+ * Update resource information counters
+ */
+ void updateResourceCounters() {
+ // Update generic resource counters
+ updateHeapUsageCounter();
+
+ // Updating resources specified in ResourceCalculatorPlugin
+ if (pTree == null) {
+ return;
+ }
+ pTree.updateProcessTree();
+ long cpuTime = pTree.getCumulativeCpuTime();
+ long pMem = pTree.getCumulativeRssmem();
+ long vMem = pTree.getCumulativeVmem();
+ // Remove the CPU time consumed previously by JVM reuse
+ cpuTime -= initCpuCumulativeTime;
+ counters.findCounter(TaskCounter.CPU_MILLISECONDS).setValue(cpuTime);
+ counters.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES).setValue(pMem);
+ counters.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES).setValue(vMem);
+ }
+
+
+ public static String normalizeStatus(String status, Configuration conf) {
+ // Check to see if the status string is too long
+ // and truncate it if needed.
+ int progressStatusLength = conf.getInt(
+ MRConfig.PROGRESS_STATUS_LEN_LIMIT_KEY,
+ MRConfig.PROGRESS_STATUS_LEN_LIMIT_DEFAULT);
+ if (status.length() > progressStatusLength) {
+ LOG.warn("Task status: \"" + status + "\" truncated to max limit ("
+ + progressStatusLength + " characters)");
+ status = status.substring(0, progressStatusLength);
+ }
+ return status;
+ }
+
+ protected static <INKEY,INVALUE,OUTKEY,OUTVALUE>
+ org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
+ createReduceContext(org.apache.hadoop.mapreduce.Reducer
+ <INKEY,INVALUE,OUTKEY,OUTVALUE> reducer,
+ Configuration job,
+ TezTaskAttemptID taskId,
+ final TezRawKeyValueIterator rIter,
+ org.apache.hadoop.mapreduce.Counter inputKeyCounter,
+ org.apache.hadoop.mapreduce.Counter inputValueCounter,
+ org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output,
+ org.apache.hadoop.mapreduce.OutputCommitter committer,
+ org.apache.hadoop.mapreduce.StatusReporter reporter,
+ RawComparator<INKEY> comparator,
+ Class<INKEY> keyClass, Class<INVALUE> valueClass
+ ) throws IOException, InterruptedException {
+ RawKeyValueIterator r =
+ new RawKeyValueIterator() {
+
+ @Override
+ public boolean next() throws IOException {
+ return rIter.next();
+ }
+
+ @Override
+ public DataInputBuffer getValue() throws IOException {
+ return rIter.getValue();
+ }
+
+ @Override
+ public Progress getProgress() {
+ return rIter.getProgress();
+ }
+
+ @Override
+ public DataInputBuffer getKey() throws IOException {
+ return rIter.getKey();
+ }
+
+ @Override
+ public void close() throws IOException {
+ rIter.close();
+ }
+ };
+ org.apache.hadoop.mapreduce.ReduceContext<INKEY, INVALUE, OUTKEY, OUTVALUE>
+ reduceContext =
+ new ReduceContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(
+ job,
+ IDConverter.toMRTaskAttemptId(taskId),
+ r,
+ inputKeyCounter,
+ inputValueCounter,
+ output,
+ committer,
+ reporter,
+ comparator,
+ keyClass,
+ valueClass);
+ LOG.info("DEBUG: Using key class: " + keyClass + ", valueClass: " + valueClass);
+
+ org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
+ reducerContext =
+ new WrappedReducer<INKEY, INVALUE, OUTKEY, OUTVALUE>().getReducerContext(
+ reduceContext);
+
+ return reducerContext;
+ }
+
+ public void taskCleanup(TezTaskUmbilicalProtocol umbilical)
+ throws IOException, InterruptedException {
+ // set phase for this task
+ setPhase(TezTaskStatus.Phase.CLEANUP);
+ getProgress().setStatus("cleanup");
+ statusUpdate();
+ LOG.info("Runnning cleanup for the task");
+ // do the cleanup
+ committer.abortTask(taskAttemptContext);
+ }
+
+ public void localizeConfiguration(JobConf jobConf)
+ throws IOException, InterruptedException {
+ jobConf.set(JobContext.TASK_ID, getTaskAttemptId().getTaskID().toString());
+ jobConf.set(JobContext.TASK_ATTEMPT_ID, getTaskAttemptId().toString());
+ jobConf.setInt(JobContext.TASK_PARTITION,
+ getTaskAttemptId().getTaskID().getId());
+ jobConf.set(JobContext.ID, getTaskAttemptId().getTaskID().getVertexID().getDAGId().toString());
+ }
+
+ public abstract TezCounter getOutputRecordsCounter();
+
+ public abstract TezCounter getInputRecordsCounter();
+
+ public TezCounter getFileOutputBytesCounter() {
+ return reporter.getCounter(FileOutputFormatCounter.BYTES_WRITTEN);
+ }
+
+ public org.apache.hadoop.mapreduce.TaskAttemptContext getTaskAttemptContext() {
+ return taskAttemptContext;
+ }
+
+ public TezCounter getFileInputBytesCounter() {
+ return reporter.getCounter(FileInputFormatCounter.BYTES_READ);
+ }
+
+ public TaskSplitIndex getSplitIndex() {
+ return splitMetaInfo;
+ }
+
+ public JobContext getJobContext() {
+ return jobContext;
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
------------------------------------------------------------------------------
svn:eol-style = native