You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/09/25 00:44:11 UTC
[02/20] Rename tez-engine-api to tez-runtime-api and tez-engine is
split into 2: - tez-engine-library for user-visible Input/Output/Processor
implementations - tez-engine-internals for framework internals
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/SortBufferInputStream.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/SortBufferInputStream.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/SortBufferInputStream.java
new file mode 100644
index 0000000..b7b1e82
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/SortBufferInputStream.java
@@ -0,0 +1,271 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.runtime.library.common.sort.impl.dflt;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.IntBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.BoundedByteArrayOutputStream;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.tez.runtime.library.common.shuffle.impl.InMemoryWriter;
+import org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter.InMemValBytes;
+
+ public class SortBufferInputStream extends InputStream {
+
+ private static final Log LOG = LogFactory.getLog(SortBufferInputStream.class);
+
+ private final InMemoryShuffleSorter sorter;
+ private InMemoryWriter sortOutput;
+
+ private int mend;
+ private int recIndex;
+ private final byte[] kvbuffer;
+ private final IntBuffer kvmeta;
+ private final int partitionBytes;
+ private final int partition;
+
+ byte[] dualBuf = new byte[8192];
+ DualBufferOutputStream out;
+ private int readBytes = 0;
+
+ public SortBufferInputStream(
+ InMemoryShuffleSorter sorter, int partition) {
+ this.sorter = sorter;
+ this.partitionBytes =
+ (int)sorter.getShuffleHeader(partition).getCompressedLength();
+ this.partition = partition;
+ this.mend = sorter.getMetaEnd();
+ this.recIndex = sorter.getSpillIndex(partition);
+ this.kvbuffer = sorter.kvbuffer;
+ this.kvmeta = sorter.kvmeta;
+ out = new DualBufferOutputStream(null, 0, 0, dualBuf);
+ sortOutput = new InMemoryWriter(out);
+ }
+
+ byte[] one = new byte[1];
+
+ @Override
+ public int read() throws IOException {
+ int b = read(one, 0, 1);
+ return (b == -1) ? b : one[0];
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (available() == 0) {
+ return -1;
+ }
+
+ int currentOffset = off;
+ int currentLength = len;
+ int currentReadBytes = 0;
+
+ // Check if there is residual data in the dualBuf
+ int residualLen = out.getCurrent();
+ if (residualLen > 0) {
+ int readable = Math.min(currentLength, residualLen);
+ System.arraycopy(dualBuf, 0, b, currentOffset, readable);
+ currentOffset += readable;
+ currentReadBytes += readable;
+ out.setCurrentPointer(-readable);
+
+ // buffer has less capacity
+ currentLength -= readable;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("XXX read_residual:" +
+ " readable=" + readable +
+ " readBytes=" + readBytes);
+ }
+ }
+
+ // Now, use the provided buffer
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("XXX read: out.reset" +
+ " b=" + b +
+ " currentOffset=" + currentOffset +
+ " currentLength=" + currentLength +
+ " recIndex=" + recIndex);
+ }
+ out.reset(b, currentOffset, currentLength);
+
+ // Read from sort-buffer into the provided buffer, space permitting
+ DataInputBuffer key = new DataInputBuffer();
+ final InMemValBytes value = sorter.createInMemValBytes();
+
+ int kvPartition = 0;
+ int numRec = 0;
+ for (;
+ currentLength > 0 && recIndex < mend &&
+ (kvPartition = getKVPartition(recIndex)) == partition;
+ ++recIndex) {
+
+ final int kvoff = sorter.offsetFor(recIndex);
+
+ int keyLen =
+ (kvmeta.get(kvoff + InMemoryShuffleSorter.VALSTART) -
+ kvmeta.get(kvoff + InMemoryShuffleSorter.KEYSTART));
+ key.reset(
+ kvbuffer,
+ kvmeta.get(kvoff + InMemoryShuffleSorter.KEYSTART),
+ keyLen
+ );
+
+ int valLen = sorter.getVBytesForOffset(kvoff, value);
+
+ int recLen =
+ (keyLen + WritableUtils.getVIntSize(keyLen)) +
+ (valLen + WritableUtils.getVIntSize(valLen));
+
+ currentReadBytes += recLen;
+ currentOffset += recLen;
+ currentLength -= recLen;
+
+ // Write out key/value into the in-mem ifile
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("XXX read: sortOutput.append" +
+ " #rec=" + ++numRec +
+ " recIndex=" + recIndex + " kvoff=" + kvoff +
+ " keyLen=" + keyLen + " valLen=" + valLen + " recLen=" + recLen +
+ " readBytes=" + readBytes +
+ " currentReadBytes=" + currentReadBytes +
+ " currentLength=" + currentLength);
+ }
+ sortOutput.append(key, value);
+ }
+
+ // If we are at the end of the segment, close the ifile
+ if (currentLength > 0 &&
+ (recIndex == mend || kvPartition != partition)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("XXX About to call close:" +
+ " currentLength=" + currentLength +
+ " recIndex=" + recIndex + " mend=" + mend +
+ " kvPartition=" + kvPartition + " partitino=" + partition);
+ }
+ sortOutput.close();
+ currentReadBytes +=
+ (InMemoryShuffleSorter.IFILE_EOF_LENGTH +
+ InMemoryShuffleSorter.IFILE_CHECKSUM_LENGTH);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("XXX Hmm..." +
+ " currentLength=" + currentLength +
+ " recIndex=" + recIndex + " mend=" + mend +
+ " kvPartition=" + kvPartition + " partitino=" + partition);
+ }
+ }
+
+ int retVal = Math.min(currentReadBytes, len);
+ readBytes += retVal;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("XXX read: done" +
+ " retVal=" + retVal +
+ " currentReadBytes=" + currentReadBytes +
+ " len=" + len +
+ " readBytes=" + readBytes +
+ " partitionBytes=" + partitionBytes +
+ " residualBytes=" + out.getCurrent());
+ }
+ return retVal;
+ }
+
+ private int getKVPartition(int recIndex) {
+ return kvmeta.get(
+ sorter.offsetFor(recIndex) + InMemoryShuffleSorter.PARTITION);
+ }
+
+ @Override
+ public int available() throws IOException {
+ return (partitionBytes - readBytes);
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ }
+
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+
+ static class DualBufferOutputStream extends BoundedByteArrayOutputStream {
+
+ byte[] dualBuf;
+ int currentPointer = 0;
+ byte[] one = new byte[1];
+
+ public DualBufferOutputStream(
+ byte[] buf, int offset, int length,
+ byte[] altBuf) {
+ super(buf, offset, length);
+ this.dualBuf = altBuf;
+ }
+
+ public void reset(byte[] b, int off, int len) {
+ super.resetBuffer(b, off, len);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ one[0] = (byte)b;
+ write(one, 0, 1);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ write(b, 0, b.length);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ int available = super.available();
+ if (available >= len) {
+ super.write(b, off, len);
+ } else {
+ super.write(b, off, available);
+ System.arraycopy(b, off+available, dualBuf, currentPointer, len-available);
+ currentPointer += (len - available);
+ }
+ }
+
+ int getCurrent() {
+ return currentPointer;
+ }
+
+ void setCurrentPointer(int delta) {
+ if ((currentPointer + delta) > dualBuf.length) {
+ throw new IndexOutOfBoundsException("Trying to set dualBuf 'current'" +
+ " marker to " + (currentPointer+delta) + " when " +
+ " dualBuf.length is " + dualBuf.length);
+ }
+ currentPointer = (currentPointer + delta) % dualBuf.length;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/impl/ValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/impl/ValuesIterator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/impl/ValuesIterator.java
new file mode 100644
index 0000000..88cb750
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/impl/ValuesIterator.java
@@ -0,0 +1,149 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.runtime.library.common.task.impl;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.util.Progressable;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+
+
+/**
+ * Iterates values while keys match in sorted input.
+ *
+ * Usage: Call moveToNext to move to the next k, v pair. This returns true if another exists,
+ * followed by getKey() and getValues() to get the current key and list of values.
+ *
+ */
+public class ValuesIterator<KEY,VALUE> implements Iterator<VALUE> {
+ protected TezRawKeyValueIterator in; //input iterator
+ private KEY key; // current key
+ private KEY nextKey;
+ private VALUE value; // current value
+ private boolean hasNext; // more w/ this key
+ private boolean more; // more in file
+ private RawComparator<KEY> comparator;
+ protected Progressable reporter;
+ private Deserializer<KEY> keyDeserializer;
+ private Deserializer<VALUE> valDeserializer;
+ private DataInputBuffer keyIn = new DataInputBuffer();
+ private DataInputBuffer valueIn = new DataInputBuffer();
+
+ public ValuesIterator (TezRawKeyValueIterator in,
+ RawComparator<KEY> comparator,
+ Class<KEY> keyClass,
+ Class<VALUE> valClass, Configuration conf,
+ Progressable reporter)
+ throws IOException {
+ this.in = in;
+ this.comparator = comparator;
+ this.reporter = reporter;
+ SerializationFactory serializationFactory = new SerializationFactory(conf);
+ this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
+ this.keyDeserializer.open(keyIn);
+ this.valDeserializer = serializationFactory.getDeserializer(valClass);
+ this.valDeserializer.open(this.valueIn);
+ readNextKey();
+ key = nextKey;
+ nextKey = null; // force new instance creation
+ hasNext = more;
+ }
+
+ TezRawKeyValueIterator getRawIterator() { return in; }
+
+ /// Iterator methods
+
+ public boolean hasNext() { return hasNext; }
+
+ private int ctr = 0;
+ public VALUE next() {
+ if (!hasNext) {
+ throw new NoSuchElementException("iterate past last value");
+ }
+ try {
+ readNextValue();
+ readNextKey();
+ } catch (IOException ie) {
+ throw new RuntimeException("problem advancing post rec#"+ctr, ie);
+ }
+ reporter.progress();
+ return value;
+ }
+
+ public void remove() { throw new RuntimeException("not implemented"); }
+
+ /// Auxiliary methods
+
+ /** Start processing next unique key. */
+ public void nextKey() throws IOException {
+ // read until we find a new key
+ while (hasNext) {
+ readNextKey();
+ }
+ ++ctr;
+
+ // move the next key to the current one
+ KEY tmpKey = key;
+ key = nextKey;
+ nextKey = tmpKey;
+ hasNext = more;
+ }
+
+ /** True iff more keys remain. */
+ public boolean more() {
+ return more;
+ }
+
+ /** The current key. */
+ public KEY getKey() {
+ return key;
+ }
+
+ /**
+ * read the next key
+ */
+ private void readNextKey() throws IOException {
+ more = in.next();
+ if (more) {
+ DataInputBuffer nextKeyBytes = in.getKey();
+ keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(), nextKeyBytes.getLength());
+ nextKey = keyDeserializer.deserialize(nextKey);
+ hasNext = key != null && (comparator.compare(key, nextKey) == 0);
+ } else {
+ hasNext = false;
+ }
+ }
+
+ /**
+ * Read the next value
+ * @throws IOException
+ */
+ private void readNextValue() throws IOException {
+ DataInputBuffer nextValueBytes = in.getValue();
+ valueIn.reset(nextValueBytes.getData(), nextValueBytes.getPosition(), nextValueBytes.getLength());
+ value = valDeserializer.deserialize(value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezLocalTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezLocalTaskOutputFiles.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezLocalTaskOutputFiles.java
new file mode 100644
index 0000000..30d28f0
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezLocalTaskOutputFiles.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.task.local.output;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+
+/**
+ * Manipulate the working area for the transient store for maps and reduces.
+ *
+ * This class is used by map and reduce tasks to identify the directories that
+ * they need to write to/read from for intermediate files. The callers of
+ * these methods are from the Child running the Task.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TezLocalTaskOutputFiles extends TezTaskOutput {
+
+ public TezLocalTaskOutputFiles(Configuration conf, String uniqueId) {
+ super(conf, uniqueId);
+ }
+
+ private LocalDirAllocator lDirAlloc =
+ new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
+
+
+ /**
+ * Return the path to local map output file created earlier
+ *
+ * @return path
+ * @throws IOException
+ */
+ @Override
+ public Path getOutputFile()
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + Path.SEPARATOR
+ + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING, conf);
+ }
+
+ /**
+ * Create a local map output file name.
+ *
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ @Override
+ public Path getOutputFileForWrite(long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + Path.SEPARATOR
+ + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING, size, conf);
+ }
+
+ /**
+ * Create a local map output file name. This should *only* be used if the size
+ * of the file is not known. Otherwise use the equivalent which accepts a size
+ * parameter.
+ *
+ * @return path
+ * @throws IOException
+ */
+ @Override
+ public Path getOutputFileForWrite() throws IOException {
+ return lDirAlloc.getLocalPathForWrite(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR
+ + Path.SEPARATOR + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING,
+ conf);
+ }
+
+ /**
+ * Create a local map output file name on the same volume.
+ */
+ @Override
+ public Path getOutputFileForWriteInVolume(Path existing) {
+ return new Path(existing.getParent(), Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING);
+ }
+
+ /**
+ * Return the path to a local map output index file created earlier
+ *
+ * @return path
+ * @throws IOException
+ */
+ @Override
+ public Path getOutputIndexFile()
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + Path.SEPARATOR
+ + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING,
+ conf);
+ }
+
+ /**
+ * Create a local map output index file name.
+ *
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ @Override
+ public Path getOutputIndexFileForWrite(long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + Path.SEPARATOR
+ + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING,
+ size, conf);
+ }
+
+ /**
+ * Create a local map output index file name on the same volume.
+ */
+ @Override
+ public Path getOutputIndexFileForWriteInVolume(Path existing) {
+ return new Path(existing.getParent(),
+ Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING);
+ }
+
+ /**
+ * Return a local map spill file created earlier.
+ *
+ * @param spillNumber the number
+ * @return path
+ * @throws IOException
+ */
+ @Override
+ public Path getSpillFile(int spillNumber)
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + "/spill"
+ + spillNumber + ".out", conf);
+ }
+
+ /**
+ * Create a local map spill file name.
+ *
+ * @param spillNumber the number
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ @Override
+ public Path getSpillFileForWrite(int spillNumber, long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + "/spill"
+ + spillNumber + ".out", size, conf);
+ }
+
+ /**
+ * Return a local map spill index file created earlier
+ *
+ * @param spillNumber the number
+ * @return path
+ * @throws IOException
+ */
+ @Override
+ public Path getSpillIndexFile(int spillNumber)
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + "/spill"
+ + spillNumber + ".out.index", conf);
+ }
+
+ /**
+ * Create a local map spill index file name.
+ *
+ * @param spillNumber the number
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ @Override
+ public Path getSpillIndexFileForWrite(int spillNumber, long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + "/spill"
+ + spillNumber + ".out.index", size, conf);
+ }
+
+ /**
+ * Return a local reduce input file created earlier
+ *
+ * @param mapId a map task id
+ * @return path
+ * @throws IOException
+ */
+ @Override
+ public Path getInputFile(InputAttemptIdentifier mapId)
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(String.format(
+ Constants.TEZ_RUNTIME_TASK_INPUT_FILE_FORMAT_STRING,
+ Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR, Integer.valueOf(mapId.getInputIdentifier().getSrcTaskIndex())), conf);
+ }
+
+ /**
+ * Create a local reduce input file name.
+ *
+ * @param mapId a map task id
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ @Override
+ public Path getInputFileForWrite(int taskId,
+ long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(String.format(
+ Constants.TEZ_RUNTIME_TASK_INPUT_FILE_FORMAT_STRING, Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR, taskId),
+ size, conf);
+ }
+
+ /** Removes all of the files related to a task. */
+ @Override
+ public void removeAll()
+ throws IOException {
+ deleteLocalFiles(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR);
+ }
+
+ private String[] getLocalDirs() throws IOException {
+ return conf.getStrings(TezJobConfig.LOCAL_DIRS);
+ }
+
+ @SuppressWarnings("deprecation")
+ private void deleteLocalFiles(String subdir) throws IOException {
+ String[] localDirs = getLocalDirs();
+ for (int i = 0; i < localDirs.length; i++) {
+ FileSystem.getLocal(conf).delete(new Path(localDirs[i], subdir));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
new file mode 100644
index 0000000..d3e7d27
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.task.local.output;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+
+/**
+ * Manipulate the working area for the transient store for maps and reduces.
+ *
+ * This class is used by map and reduce tasks to identify the directories that
+ * they need to write to/read from for intermediate files. The callers of
+ * these methods are from child space and see mapreduce.cluster.local.dir as
+ * taskTracker/jobCache/jobId/attemptId
+ * This class should not be used from TaskTracker space.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class TezTaskOutput {
+
+ protected Configuration conf;
+ protected String uniqueId;
+
+ public TezTaskOutput(Configuration conf, String uniqueId) {
+ this.conf = conf;
+ this.uniqueId = uniqueId;
+ }
+
+ /**
+ * Return the path to local map output file created earlier
+ *
+ * @return path
+ * @throws IOException
+ */
+ public abstract Path getOutputFile() throws IOException;
+
+ /**
+ * Create a local map output file name.
+ *
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public abstract Path getOutputFileForWrite(long size) throws IOException;
+
+ /**
+ * Create a local output file name. This method is meant to be used *only* if
+ * the size of the file is not know up front.
+ *
+ * @return path
+ * @throws IOException
+ */
+ public abstract Path getOutputFileForWrite() throws IOException;
+
+ /**
+ * Create a local map output file name on the same volume.
+ */
+ public abstract Path getOutputFileForWriteInVolume(Path existing);
+
+ /**
+ * Return the path to a local map output index file created earlier
+ *
+ * @return path
+ * @throws IOException
+ */
+ public abstract Path getOutputIndexFile() throws IOException;
+
+ /**
+ * Create a local map output index file name.
+ *
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public abstract Path getOutputIndexFileForWrite(long size) throws IOException;
+
+ /**
+ * Create a local map output index file name on the same volume.
+ */
+ public abstract Path getOutputIndexFileForWriteInVolume(Path existing);
+
+ /**
+ * Return a local map spill file created earlier.
+ *
+ * @param spillNumber the number
+ * @return path
+ * @throws IOException
+ */
+ public abstract Path getSpillFile(int spillNumber) throws IOException;
+
+ /**
+ * Create a local map spill file name.
+ *
+ * @param spillNumber the number
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public abstract Path getSpillFileForWrite(int spillNumber, long size)
+ throws IOException;
+
+ /**
+ * Return a local map spill index file created earlier
+ *
+ * @param spillNumber the number
+ * @return path
+ * @throws IOException
+ */
+ public abstract Path getSpillIndexFile(int spillNumber) throws IOException;
+
+ /**
+ * Create a local map spill index file name.
+ *
+ * @param spillNumber the number
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public abstract Path getSpillIndexFileForWrite(int spillNumber, long size)
+ throws IOException;
+
+ /**
+ * Return a local reduce input file created earlier
+ *
+ * @param attemptIdentifier The identifier for the source task
+ * @return path
+ * @throws IOException
+ */
+ public abstract Path getInputFile(InputAttemptIdentifier attemptIdentifier) throws IOException;
+
+ /**
+ * Create a local reduce input file name.
+ *
+ * @param taskIdentifier The identifier for the source task
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public abstract Path getInputFileForWrite(
+ int taskIdentifier, long size) throws IOException;
+
+ /** Removes all of the files related to a task. */
+ public abstract void removeAll() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
new file mode 100644
index 0000000..2c18b4e
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.task.local.output;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+
+/**
+ * Manipulate the working area for the transient store for maps and reduces.
+ *
+ * This class is used by map and reduce tasks to identify the directories that
+ * they need to write to/read from for intermediate files. The callers of
+ * these methods are from child space and see mapreduce.cluster.local.dir as
+ * taskTracker/jobCache/jobId/attemptId
+ * This class should not be used from TaskTracker space.
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TezTaskOutputFiles extends TezTaskOutput {
+
+ public TezTaskOutputFiles(Configuration conf, String uniqueId) {
+ super(conf, uniqueId);
+ }
+
+ private static final Log LOG = LogFactory.getLog(TezTaskOutputFiles.class);
+
+ private static final String SPILL_FILE_PATTERN = "%s_spill_%d.out";
+ private static final String SPILL_INDEX_FILE_PATTERN = SPILL_FILE_PATTERN
+ + ".index";
+
+
+
+ // assume configured to $localdir/usercache/$user/appcache/$appId
+ private LocalDirAllocator lDirAlloc =
+ new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
+
+
+ private Path getAttemptOutputDir() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getAttemptOutputDir: "
+ + Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + "/"
+ + uniqueId);
+ }
+ return new Path(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR, uniqueId);
+ }
+
+ /**
+ * Return the path to local map output file created earlier
+ *
+ * @return path
+ * @throws IOException
+ */
+ public Path getOutputFile() throws IOException {
+ Path attemptOutput =
+ new Path(getAttemptOutputDir(), Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING);
+ return lDirAlloc.getLocalPathToRead(attemptOutput.toString(), conf);
+ }
+
+ /**
+ * Create a local map output file name.
+ *
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public Path getOutputFileForWrite(long size) throws IOException {
+ Path attemptOutput =
+ new Path(getAttemptOutputDir(), Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING);
+ return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), size, conf);
+ }
+
+ /**
+ * Create a local map output file name. This should *only* be used if the size
+ * of the file is not known. Otherwise use the equivalent which accepts a size
+ * parameter.
+ *
+ * @return path
+ * @throws IOException
+ */
+ public Path getOutputFileForWrite() throws IOException {
+ Path attemptOutput =
+ new Path(getAttemptOutputDir(), Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING);
+ return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), conf);
+ }
+
+ /**
+ * Create a local map output file name on the same volume.
+ */
+ public Path getOutputFileForWriteInVolume(Path existing) {
+ Path outputDir = new Path(existing.getParent(), Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR);
+ Path attemptOutputDir = new Path(outputDir, uniqueId);
+ return new Path(attemptOutputDir, Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING);
+ }
+
+ /**
+ * Return the path to a local map output index file created earlier
+ *
+ * @return path
+ * @throws IOException
+ */
+ public Path getOutputIndexFile() throws IOException {
+ Path attemptIndexOutput =
+ new Path(getAttemptOutputDir(), Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING +
+ Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING);
+ return lDirAlloc.getLocalPathToRead(attemptIndexOutput.toString(), conf);
+ }
+
+ /**
+ * Create a local map output index file name.
+ *
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public Path getOutputIndexFileForWrite(long size) throws IOException {
+ Path attemptIndexOutput =
+ new Path(getAttemptOutputDir(), Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING +
+ Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING);
+ return lDirAlloc.getLocalPathForWrite(attemptIndexOutput.toString(),
+ size, conf);
+ }
+
+ /**
+ * Create a local map output index file name on the same volume.
+ */
+ public Path getOutputIndexFileForWriteInVolume(Path existing) {
+ Path outputDir = new Path(existing.getParent(), Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR);
+ Path attemptOutputDir = new Path(outputDir, uniqueId);
+ return new Path(attemptOutputDir, Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING +
+ Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING);
+ }
+
+ /**
+ * Return a local map spill file created earlier.
+ *
+ * @param spillNumber the number
+ * @return path
+ * @throws IOException
+ */
+ public Path getSpillFile(int spillNumber) throws IOException {
+ return lDirAlloc.getLocalPathToRead(
+ String.format(SPILL_FILE_PATTERN,
+ uniqueId, spillNumber), conf);
+ }
+
+ /**
+ * Create a local map spill file name.
+ *
+ * @param spillNumber the number
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public Path getSpillFileForWrite(int spillNumber, long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(
+ String.format(String.format(SPILL_FILE_PATTERN,
+ uniqueId, spillNumber)), size, conf);
+ }
+
+ /**
+ * Return a local map spill index file created earlier
+ *
+ * @param spillNumber the number
+ * @return path
+ * @throws IOException
+ */
+ public Path getSpillIndexFile(int spillNumber) throws IOException {
+ return lDirAlloc.getLocalPathToRead(
+ String.format(SPILL_INDEX_FILE_PATTERN,
+ uniqueId, spillNumber), conf);
+ }
+
+ /**
+ * Create a local map spill index file name.
+ *
+ * @param spillNumber the number
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public Path getSpillIndexFileForWrite(int spillNumber, long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(
+ String.format(SPILL_INDEX_FILE_PATTERN,
+ uniqueId, spillNumber), size, conf);
+ }
+
+ /**
+ * Return a local reduce input file created earlier
+ *
+ * @param attemptIdentifier an identifier for a task. The attempt information is ignored.
+ * @return path
+ * @throws IOException
+ */
+ public Path getInputFile(InputAttemptIdentifier attemptIdentifier) throws IOException {
+ throw new UnsupportedOperationException("Incompatible with LocalRunner");
+ }
+
+ /**
+ * Create a local reduce input file name.
+ *
+ * @param attemptIdentifier an identifier for a task. The attempt information is ignored.
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public Path getInputFileForWrite(int srcTaskId,
+ long size) throws IOException {
+ return lDirAlloc.getLocalPathForWrite(String.format(
+ uniqueId, getAttemptOutputDir().toString(), srcTaskId),
+ size, conf);
+ }
+
+ /** Removes all of the files related to a task. */
+ public void removeAll() throws IOException {
+ throw new UnsupportedOperationException("Incompatible with LocalRunner");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/hadoop/compat/NullProgressable.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/hadoop/compat/NullProgressable.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/hadoop/compat/NullProgressable.java
new file mode 100644
index 0000000..eb8d176
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/hadoop/compat/NullProgressable.java
@@ -0,0 +1,33 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.runtime.library.hadoop.compat;
+
+import org.apache.hadoop.util.Progressable;
+
+public class NullProgressable implements Progressable {
+
+ public NullProgressable() {
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ public void progress() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
new file mode 100644
index 0000000..3aec247
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.input;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.library.common.localshuffle.LocalShuffle;
+
+/**
+ * <code>LocalMergedInput</code> in an {@link LogicalInput} which shuffles intermediate
+ * sorted data, merges them and provides key/<values> to the consumer.
+ */
+public class LocalMergedInput extends ShuffledMergedInputLegacy {
+
+ @Override
+ public List<Event> initialize(TezInputContext inputContext) throws IOException {
+ this.inputContext = inputContext;
+ this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
+
+ LocalShuffle localShuffle = new LocalShuffle(inputContext, conf, numInputs);
+ rawIter = localShuffle.run();
+ createValuesIterator();
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<Event> close() throws IOException {
+ rawIter.close();
+ return Collections.emptyList();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
new file mode 100644
index 0000000..771ac1b
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.input;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.library.api.KVReader;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.ValuesIterator;
+import org.apache.tez.runtime.library.common.shuffle.impl.Shuffle;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+
+/**
+ * <code>ShuffleMergedInput</code> in a {@link LogicalInput} which shuffles
+ * intermediate sorted data, merges them and provides key/<values> to the
+ * consumer.
+ *
+ * The Copy and Merge will be triggered by the initialization - which is handled
+ * by the Tez framework. Input is not consumable until the Copy and Merge are
+ * complete. Methods are provided to check for this, as well as to wait for
+ * completion. Attempting to get a reader on a non-complete input will block.
+ *
+ */
+public class ShuffledMergedInput implements LogicalInput {
+
+ static final Log LOG = LogFactory.getLog(ShuffledMergedInput.class);
+
+ protected TezInputContext inputContext;
+ protected TezRawKeyValueIterator rawIter = null;
+ protected Configuration conf;
+ protected int numInputs = 0;
+ protected Shuffle shuffle;
+ @SuppressWarnings("rawtypes")
+ protected ValuesIterator vIter;
+
+ private TezCounter inputKeyCounter;
+ private TezCounter inputValueCounter;
+
+ @Override
+ public List<Event> initialize(TezInputContext inputContext) throws IOException {
+ this.inputContext = inputContext;
+ this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
+
+ this.inputKeyCounter = inputContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
+ this.inputValueCounter = inputContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_RECORDS);
+ this.conf.setStrings(TezJobConfig.LOCAL_DIRS,
+ inputContext.getWorkDirs());
+
+ // Start the shuffle - copy and merge.
+ shuffle = new Shuffle(inputContext, this.conf, numInputs);
+ shuffle.run();
+
+ return Collections.emptyList();
+ }
+
+ /**
+ * Check if the input is ready for consumption
+ *
+ * @return true if the input is ready for consumption, or if an error occurred
+ * processing fetching the input. false if the shuffle and merge are
+ * still in progress
+ */
+ public boolean isInputReady() {
+ return shuffle.isInputReady();
+ }
+
+ /**
+ * Waits for the input to become ready for consumption
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public void waitForInputReady() throws IOException, InterruptedException {
+ rawIter = shuffle.waitForInput();
+ createValuesIterator();
+ }
+
+ @Override
+ public List<Event> close() throws IOException {
+ rawIter.close();
+ return Collections.emptyList();
+ }
+
+ /**
+ * Get a KVReader for the Input.</p> This method will block until the input is
+ * ready - i.e. the copy and merge stages are complete. Users can use the
+ * isInputReady method to check if the input is ready, which gives an
+ * indication of whether this method will block or not.
+ *
+ * NOTE: All values for the current K-V pair must be read prior to invoking
+ * moveToNext. Once moveToNext() is called, the valueIterator from the
+ * previous K-V pair will throw an Exception
+ *
+ * @return a KVReader over the sorted input.
+ */
+ @Override
+ public KVReader getReader() throws IOException {
+ if (rawIter == null) {
+ try {
+ waitForInputReady();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while waiting for input ready", e);
+ }
+ }
+ return new KVReader() {
+
+ @Override
+ public boolean next() throws IOException {
+ return vIter.moveToNext();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public KVRecord getCurrentKV() {
+ return new KVRecord(vIter.getKey(), vIter.getValues());
+ }
+ };
+ }
+
+ @Override
+ public void handleEvents(List<Event> inputEvents) {
+ shuffle.handleEvents(inputEvents);
+ }
+
+ @Override
+ public void setNumPhysicalInputs(int numInputs) {
+ this.numInputs = numInputs;
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ protected void createValuesIterator()
+ throws IOException {
+ vIter = new ValuesIterator(rawIter,
+ (RawComparator) ConfigUtils.getIntermediateInputKeyComparator(conf),
+ ConfigUtils.getIntermediateInputKeyClass(conf),
+ ConfigUtils.getIntermediateInputValueClass(conf), conf, inputKeyCounter, inputValueCounter);
+
+ }
+
+ // This functionality is currently broken. If there's inputs which need to be
+ // written to disk, there's a possibility that inputs from the different
+ // sources could clobber each others' output. Also the current structures do
+ // not have adequate information to de-dupe these (vertex name)
+// public void mergeWith(ShuffledMergedInput other) {
+// this.numInputs += other.getNumPhysicalInputs();
+// }
+//
+// public int getNumPhysicalInputs() {
+// return this.numInputs;
+// }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java
new file mode 100644
index 0000000..97e19d8
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java
@@ -0,0 +1,30 @@
+/**
+ * <code>ShuffleMergedInput</code> in a {@link LogicalInput} which shuffles
+ * intermediate sorted data, merges them and provides key/<values> to the
+ * consumer.
+ *
+ * The Copy and Merge will be triggered by the initialization - which is handled
+ * by the Tez framework. Input is not consumable until the Copy and Merge are
+ * complete. Methods are provided to check for this, as well as to wait for
+ * completion. Attempting to get a reader on a non-complete input will block.
+ *
+ */
+
+package org.apache.tez.runtime.library.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+
+@LimitedPrivate("mapreduce")
+public class ShuffledMergedInputLegacy extends ShuffledMergedInput {
+
+ @Private
+ public TezRawKeyValueIterator getIterator() throws IOException, InterruptedException {
+ // wait for input so that iterator is available
+ waitForInputReady();
+ return rawIter;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
new file mode 100644
index 0000000..42b2e00
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
@@ -0,0 +1,76 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.runtime.library.input;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.library.broadcast.input.BroadcastShuffleManager;
+
+import com.google.common.base.Preconditions;
+
+public class ShuffledUnorderedKVInput implements LogicalInput {
+
+ private Configuration conf;
+ private int numInputs = -1;
+ private BroadcastShuffleManager shuffleManager;
+
+
+
+ public ShuffledUnorderedKVInput() {
+ }
+
+ @Override
+ public List<Event> initialize(TezInputContext inputContext) throws Exception {
+ Preconditions.checkArgument(numInputs != -1, "Number of Inputs has not been set");
+ this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
+ this.conf.setStrings(TezJobConfig.LOCAL_DIRS, inputContext.getWorkDirs());
+
+ this.shuffleManager = new BroadcastShuffleManager(inputContext, conf, numInputs);
+ return null;
+ }
+
+ @Override
+ public Reader getReader() throws Exception {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void handleEvents(List<Event> inputEvents) {
+ shuffleManager.handleEvents(inputEvents);
+ }
+
+ @Override
+ public List<Event> close() throws Exception {
+ this.shuffleManager.shutdown();
+ return null;
+ }
+
+ @Override
+ public void setNumPhysicalInputs(int numInputs) {
+ this.numInputs = numInputs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/InMemorySortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/InMemorySortedOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/InMemorySortedOutput.java
new file mode 100644
index 0000000..2ec6b2a
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/InMemorySortedOutput.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.output;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.Output;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.api.Writer;
+import org.apache.tez.runtime.library.api.KVWriter;
+import org.apache.tez.runtime.library.common.sort.impl.dflt.InMemoryShuffleSorter;
+
+/**
+ * {@link InMemorySortedOutput} is an {@link Output} which sorts key/value pairs
+ * written to it and persists it to a file.
+ */
+public class InMemorySortedOutput implements LogicalOutput {
+
+ protected InMemoryShuffleSorter sorter;
+ protected int numTasks;
+ protected TezOutputContext outputContext;
+
+
+ @Override
+ public List<Event> initialize(TezOutputContext outputContext)
+ throws IOException {
+ this.outputContext = outputContext;
+ this.sorter = new InMemoryShuffleSorter();
+ sorter.initialize(outputContext, TezUtils.createConfFromUserPayload(outputContext.getUserPayload()), numTasks);
+ return Collections.emptyList();
+ }
+
+ @Override
+ public Writer getWriter() throws IOException {
+ return new KVWriter() {
+
+ @Override
+ public void write(Object key, Object value) throws IOException {
+ sorter.write(key, value);
+ }
+ };
+ }
+
+ @Override
+ public void handleEvents(List<Event> outputEvents) {
+ // No events expected.
+ }
+
+ @Override
+ public void setNumPhysicalOutputs(int numOutputs) {
+ this.numTasks = numOutputs;
+ }
+
+ @Override
+ public List<Event> close() throws IOException {
+ sorter.flush();
+ sorter.close();
+ // TODO NEWTEZ Event generation
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java
new file mode 100644
index 0000000..a19d5e1
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java
@@ -0,0 +1,63 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.runtime.library.output;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
+
+public class LocalOnFileSorterOutput extends OnFileSortedOutput {
+
+ private static final Log LOG = LogFactory.getLog(LocalOnFileSorterOutput.class);
+
+
+
+ @Override
+ public List<Event> close() throws IOException {
+ LOG.debug("Closing LocalOnFileSorterOutput");
+ super.close();
+
+ TezTaskOutput mapOutputFile = sorter.getMapOutput();
+ FileSystem localFs = FileSystem.getLocal(conf);
+
+ Path src = mapOutputFile.getOutputFile();
+ Path dst =
+ mapOutputFile.getInputFileForWrite(
+ outputContext.getTaskIndex(),
+ localFs.getFileStatus(src).getLen());
+
+ LOG.info("Renaming src = " + src + ", dst = " + dst);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Renaming src = " + src + ", dst = " + dst);
+ }
+ localFs.rename(src, dst);
+ return null;
+ }
+
+ @Override
+ protected List<Event> generateDataMovementEventsOnClose() throws IOException {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
new file mode 100644
index 0000000..42e1eeb
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.output;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.library.api.KVWriter;
+import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter;
+import org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter;
+import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
+
+import com.google.common.collect.Lists;
+
+/**
+ * <code>OnFileSortedOutput</code> is an {@link LogicalOutput} which sorts key/value pairs
+ * written to it and persists it to a file.
+ */
+public class OnFileSortedOutput implements LogicalOutput {
+
+ protected ExternalSorter sorter;
+ protected Configuration conf;
+ protected int numOutputs;
+ protected TezOutputContext outputContext;
+ private long startTime;
+ private long endTime;
+
+
+ @Override
+ public List<Event> initialize(TezOutputContext outputContext)
+ throws IOException {
+ this.startTime = System.nanoTime();
+ this.outputContext = outputContext;
+ sorter = new DefaultSorter();
+ this.conf = TezUtils.createConfFromUserPayload(outputContext.getUserPayload());
+ // Initializing this parametr in this conf since it is used in multiple
+ // places (wherever LocalDirAllocator is used) - TezTaskOutputFiles,
+ // TezMerger, etc.
+ this.conf.setStrings(TezJobConfig.LOCAL_DIRS, outputContext.getWorkDirs());
+ sorter.initialize(outputContext, conf, numOutputs);
+ return Collections.emptyList();
+ }
+
+ @Override
+ public KVWriter getWriter() throws IOException {
+ return new KVWriter() {
+ @Override
+ public void write(Object key, Object value) throws IOException {
+ sorter.write(key, value);
+ }
+ };
+ }
+
+ @Override
+ public void handleEvents(List<Event> outputEvents) {
+ // Not expecting any events.
+ }
+
+ @Override
+ public void setNumPhysicalOutputs(int numOutputs) {
+ this.numOutputs = numOutputs;
+ }
+
+ @Override
+ public List<Event> close() throws IOException {
+ sorter.flush();
+ sorter.close();
+ this.endTime = System.nanoTime();
+
+ return generateDataMovementEventsOnClose();
+ }
+
+ protected List<Event> generateDataMovementEventsOnClose() throws IOException {
+ String host = System.getenv(ApplicationConstants.Environment.NM_HOST
+ .toString());
+ ByteBuffer shuffleMetadata = outputContext
+ .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+ int shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata);
+
+ DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
+ .newBuilder();
+ payloadBuilder.setHost(host);
+ payloadBuilder.setPort(shufflePort);
+ payloadBuilder.setPathComponent(outputContext.getUniqueIdentifier());
+ payloadBuilder.setRunDuration((int) ((endTime - startTime) / 1000));
+ DataMovementEventPayloadProto payloadProto = payloadBuilder.build();
+ byte[] payloadBytes = payloadProto.toByteArray();
+
+ List<Event> events = Lists.newArrayListWithCapacity(numOutputs);
+
+ for (int i = 0; i < numOutputs; i++) {
+ DataMovementEvent event = new DataMovementEvent(i, payloadBytes);
+ events.add(event);
+ }
+ return events;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
new file mode 100644
index 0000000..dd18149
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
@@ -0,0 +1,98 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.runtime.library.output;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.library.api.KVWriter;
+import org.apache.tez.runtime.library.broadcast.output.FileBasedKVWriter;
+import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class OnFileUnorderedKVOutput implements LogicalOutput {
+
+ private TezOutputContext outputContext;
+ private FileBasedKVWriter kvWriter;
+
+ public OnFileUnorderedKVOutput() {
+ }
+
+ @Override
+ public List<Event> initialize(TezOutputContext outputContext)
+ throws Exception {
+ this.outputContext = outputContext;
+ this.kvWriter = new FileBasedKVWriter(outputContext);
+ return Collections.emptyList();
+ }
+
+ @Override
+ public KVWriter getWriter() throws Exception {
+ return kvWriter;
+ }
+
+ @Override
+ public void handleEvents(List<Event> outputEvents) {
+ throw new TezUncheckedException("Not expecting any events");
+ }
+
+ @Override
+ public List<Event> close() throws Exception {
+ boolean outputGenerated = this.kvWriter.close();
+ DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
+ .newBuilder();
+
+ String host = System.getenv(ApplicationConstants.Environment.NM_HOST
+ .toString());
+ ByteBuffer shuffleMetadata = outputContext
+ .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+ int shufflePort = ShuffleUtils
+ .deserializeShuffleProviderMetaData(shuffleMetadata);
+ payloadBuilder.setOutputGenerated(outputGenerated);
+ if (outputGenerated) {
+ payloadBuilder.setHost(host);
+ payloadBuilder.setPort(shufflePort);
+ payloadBuilder.setPathComponent(outputContext.getUniqueIdentifier());
+ }
+ DataMovementEventPayloadProto payloadProto = payloadBuilder.build();
+
+ DataMovementEvent dmEvent = new DataMovementEvent(0,
+ payloadProto.toByteArray());
+ List<Event> events = Lists.newArrayListWithCapacity(1);
+ events.add(dmEvent);
+ return events;
+ }
+
+ @Override
+ public void setNumPhysicalOutputs(int numOutputs) {
+ Preconditions.checkArgument(numOutputs == 1,
+ "Number of outputs can only be 1 for " + this.getClass().getName());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
new file mode 100644
index 0000000..a98ce63
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.shuffle.common;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
+
+import com.google.common.base.Preconditions;
+
+public class DiskFetchedInput extends FetchedInput {
+
+ private static final Log LOG = LogFactory.getLog(DiskFetchedInput.class);
+
+ private final FileSystem localFS;
+ private final Path tmpOutputPath;
+ private final Path outputPath;
+
+ public DiskFetchedInput(long size,
+ InputAttemptIdentifier inputAttemptIdentifier,
+ FetchedInputCallback callbackHandler, Configuration conf,
+ LocalDirAllocator localDirAllocator, TezTaskOutputFiles filenameAllocator)
+ throws IOException {
+ super(Type.DISK, size, inputAttemptIdentifier, callbackHandler);
+
+ this.localFS = FileSystem.getLocal(conf);
+ this.outputPath = filenameAllocator.getInputFileForWrite(
+ this.inputAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(), size);
+ this.tmpOutputPath = outputPath.suffix(String.valueOf(id));
+ }
+
+ @Override
+ public OutputStream getOutputStream() throws IOException {
+ return localFS.create(tmpOutputPath);
+ }
+
+ @Override
+ public InputStream getInputStream() throws IOException {
+ return localFS.open(outputPath);
+ }
+
+ @Override
+ public void commit() throws IOException {
+ if (state == State.PENDING) {
+ state = State.COMMITTED;
+ localFS.rename(tmpOutputPath, outputPath);
+ notifyFetchComplete();
+ }
+ }
+
+ @Override
+ public void abort() throws IOException {
+ if (state == State.PENDING) {
+ state = State.ABORTED;
+ // TODO NEWTEZ Maybe defer this to container cleanup
+ localFS.delete(tmpOutputPath, false);
+ notifyFetchFailure();
+ }
+ }
+
+ @Override
+ public void free() {
+ Preconditions.checkState(
+ state == State.COMMITTED || state == State.ABORTED,
+ "FetchedInput can only be freed after it is committed or aborted");
+ if (state == State.COMMITTED) {
+ state = State.FREED;
+ try {
+ // TODO NEWTEZ Maybe defer this to container cleanup
+ localFS.delete(outputPath, false);
+ } catch (IOException e) {
+ // Ignoring the exception, will eventually be cleaned by container
+ // cleanup.
+ LOG.warn("Failed to remvoe file : " + outputPath.toString());
+ }
+ notifyFreedResource();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "DiskFetchedInput [outputPath=" + outputPath
+ + ", inputAttemptIdentifier=" + inputAttemptIdentifier + ", size="
+ + size + ", type=" + type + ", id=" + id + ", state=" + state + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchResult.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchResult.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchResult.java
new file mode 100644
index 0000000..df38b07
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchResult.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.shuffle.common;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+
+public class FetchResult {
+
+ private final String host;
+ private final int port;
+ private final int partition;
+ private final Iterable<InputAttemptIdentifier> pendingInputs;
+
+ public FetchResult(String host, int port, int partition,
+ Iterable<InputAttemptIdentifier> pendingInputs) {
+ this.host = host;
+ this.port = port;
+ this.partition = partition;
+ this.pendingInputs = pendingInputs;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
+ public Iterable<InputAttemptIdentifier> getPendingInputs() {
+ return pendingInputs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInput.java
new file mode 100644
index 0000000..8f3c407
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInput.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.shuffle.common;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+
+@Private
+public abstract class FetchedInput {
+
+ public static enum Type {
+ WAIT, // TODO NEWTEZ Implement this, only if required.
+ MEMORY,
+ DISK,
+ }
+
+ protected static enum State {
+ PENDING, COMMITTED, ABORTED, FREED
+ }
+
+ private static AtomicInteger ID_GEN = new AtomicInteger(0);
+
+ protected InputAttemptIdentifier inputAttemptIdentifier;
+ protected final long size;
+ protected final Type type;
+ protected final FetchedInputCallback callback;
+ protected final int id;
+ protected State state;
+
+ public FetchedInput(Type type, long size,
+ InputAttemptIdentifier inputAttemptIdentifier,
+ FetchedInputCallback callbackHandler) {
+ this.type = type;
+ this.size = size;
+ this.inputAttemptIdentifier = inputAttemptIdentifier;
+ this.callback = callbackHandler;
+ this.id = ID_GEN.getAndIncrement();
+ this.state = State.PENDING;
+ }
+
+ public Type getType() {
+ return this.type;
+ }
+
+ public long getSize() {
+ return this.size;
+ }
+
+ public InputAttemptIdentifier getInputAttemptIdentifier() {
+ return this.inputAttemptIdentifier;
+ }
+
+ /**
+ * Inform the Allocator about a committed resource.
+ * This should be called by commit
+ */
+ public void notifyFetchComplete() {
+ this.callback.fetchComplete(this);
+ }
+
+ /**
+ * Inform the Allocator about a failed resource.
+ * This should be called by abort
+ */
+ public void notifyFetchFailure() {
+ this.callback.fetchFailed(this);
+ }
+
+ /**
+ * Inform the Allocator about a completed resource being released.
+ * This should be called by free
+ */
+ public void notifyFreedResource() {
+ this.callback.freeResources(this);
+ }
+
+ /**
+ * Returns the output stream to be used to write fetched data. Users are
+ * expected to close the OutputStream when they're done
+ */
+ public abstract OutputStream getOutputStream() throws IOException;
+
+ /**
+ * Return an input stream to be used to read the previously fetched data.
+ * Users are expected to close the InputStream when they're done
+ */
+ public abstract InputStream getInputStream() throws IOException;
+
+ /**
+ * Commit the output. Should be idempotent
+ */
+ public abstract void commit() throws IOException;
+
+ /**
+ * Abort the output. Should be idempotent
+ */
+ public abstract void abort() throws IOException;
+
+ /**
+ * Called when this input has been consumed, so that resources can be
+ * reclaimed.
+ */
+ public abstract void free();
+
+ @Override
+ public int hashCode() {
+ return id;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ FetchedInput other = (FetchedInput) obj;
+ if (id != other.id)
+ return false;
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInputAllocator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInputAllocator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInputAllocator.java
new file mode 100644
index 0000000..1d60b68
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInputAllocator.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.shuffle.common;
+
+import java.io.IOException;
+
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+
+public interface FetchedInputAllocator {
+
+ public FetchedInput allocate(long size,
+ InputAttemptIdentifier inputAttemptIdentifier) throws IOException;
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInputCallback.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInputCallback.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInputCallback.java
new file mode 100644
index 0000000..a22ce45
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInputCallback.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.shuffle.common;
+
+public interface FetchedInputCallback {
+
+ public void fetchComplete(FetchedInput fetchedInput);
+
+ public void fetchFailed(FetchedInput fetchedInput);
+
+ public void freeResources(FetchedInput fetchedInput);
+
+}