You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/04/19 01:54:28 UTC
svn commit: r1469642 [26/36] - in /incubator/tez/branches/TEZ-1: ./
example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/
example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/
tez-common/src/main/ tez-common/src/main/java/ t...
Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,1133 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.common.sort.impl.dflt;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.IntBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.util.IndexedSortable;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.common.ConfigUtils;
+import org.apache.tez.engine.common.sort.impl.ExternalSorter;
+import org.apache.tez.engine.common.sort.impl.IFile;
+import org.apache.tez.engine.common.sort.impl.TezIndexRecord;
+import org.apache.tez.engine.common.sort.impl.TezMerger;
+import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+import org.apache.tez.engine.common.sort.impl.TezSpillRecord;
+import org.apache.tez.engine.common.sort.impl.IFile.Writer;
+import org.apache.tez.engine.common.sort.impl.TezMerger.Segment;
+import org.apache.tez.engine.records.OutputContext;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class DefaultSorter extends ExternalSorter implements IndexedSortable {
+
+ private static final Log LOG = LogFactory.getLog(DefaultSorter.class);
+
+ /**
+ * The size of each record in the index file for the map-outputs.
+ */
+ public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
+
+ private final static int APPROX_HEADER_LENGTH = 150;
+
+ // k/v accounting
+ IntBuffer kvmeta; // metadata overlay on backing store
+ int kvstart; // marks origin of spill metadata
+ int kvend; // marks end of spill metadata
+ int kvindex; // marks end of fully serialized records
+
+ int equator; // marks origin of meta/serialization
+ int bufstart; // marks beginning of spill
+ int bufend; // marks beginning of collectable
+ int bufmark; // marks end of record
+ int bufindex; // marks end of collected
+ int bufvoid; // marks the point where we should stop
+ // reading at the end of the buffer
+
+ byte[] kvbuffer; // main output buffer
+ private final byte[] b0 = new byte[0];
+
+ protected static final int INDEX = 0; // index offset in acct
+ protected static final int VALSTART = 1; // val offset in acct
+ protected static final int KEYSTART = 2; // key offset in acct
+ protected static final int PARTITION = 3; // partition offset in acct
+ protected static final int NMETA = 4; // num meta ints
+ protected static final int METASIZE = NMETA * 4; // size in bytes
+
+ // spill accounting
+ int maxRec;
+ int softLimit;
+ boolean spillInProgress;
+ int bufferRemaining;
+ volatile Throwable sortSpillException = null;
+
+ int numSpills = 0;
+ int minSpillsForCombine;
+ final ReentrantLock spillLock = new ReentrantLock();
+ final Condition spillDone = spillLock.newCondition();
+ final Condition spillReady = spillLock.newCondition();
+ final BlockingBuffer bb = new BlockingBuffer();
+ volatile boolean spillThreadRunning = false;
+ final SpillThread spillThread = new SpillThread();
+
+ final ArrayList<TezSpillRecord> indexCacheList =
+ new ArrayList<TezSpillRecord>();
+ private int totalIndexCacheMemory;
+ private int indexCacheMemoryLimit;
+
+ @Inject
+ public DefaultSorter(
+ @Assisted TezTask task
+ ) throws IOException {
+ }
+
+ @Override
+ public void initialize(Configuration conf, Master master)
+ throws IOException, InterruptedException {
+ if (task == null) {
+ LOG.info("Bailing!", new IOException());
+ return;
+ }
+
+ super.initialize(conf, master);
+
+ // sanity checks
+ final float spillper = job.getFloat(
+ TezJobConfig.TEZ_ENGINE_SORT_SPILL_PERCENT,
+ TezJobConfig.DEFAULT_TEZ_ENGINE_SORT_SPILL_PERCENT);
+ final int sortmb = job.getInt(TezJobConfig.TEZ_ENGINE_IO_SORT_MB,
+ TezJobConfig.DEFAULT_TEZ_ENGINE_IO_SORT_MB);
+ if (spillper > (float) 1.0 || spillper <= (float) 0.0) {
+ throw new IOException("Invalid \""
+ + TezJobConfig.TEZ_ENGINE_SORT_SPILL_PERCENT + "\": " + spillper);
+ }
+ if ((sortmb & 0x7FF) != sortmb) {
+ throw new IOException("Invalid \"" + TezJobConfig.TEZ_ENGINE_IO_SORT_MB
+ + "\": " + sortmb);
+ }
+
+ indexCacheMemoryLimit = job.getInt(TezJobConfig.TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES,
+ TezJobConfig.DEFAULT_TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES);
+
+ // buffers and accounting
+ int maxMemUsage = sortmb << 20;
+ maxMemUsage -= maxMemUsage % METASIZE;
+ kvbuffer = new byte[maxMemUsage];
+ bufvoid = kvbuffer.length;
+ kvmeta = ByteBuffer.wrap(kvbuffer)
+ .order(ByteOrder.nativeOrder())
+ .asIntBuffer();
+ setEquator(0);
+ bufstart = bufend = bufindex = equator;
+ kvstart = kvend = kvindex;
+
+ maxRec = kvmeta.capacity() / NMETA;
+ softLimit = (int)(kvbuffer.length * spillper);
+ bufferRemaining = softLimit;
+ if (LOG.isInfoEnabled()) {
+ LOG.info(TezJobConfig.TEZ_ENGINE_IO_SORT_MB + ": " + sortmb);
+ LOG.info("soft limit at " + softLimit);
+ LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
+ LOG.info("kvstart = " + kvstart + "; length = " + maxRec);
+ }
+
+ // k/v serialization
+ valSerializer.open(bb);
+ keySerializer.open(bb);
+
+ spillInProgress = false;
+ minSpillsForCombine = job.getInt(TezJobConfig.TEZ_ENGINE_COMBINE_MIN_SPILLS, 3);
+ spillThread.setDaemon(true);
+ spillThread.setName("SpillThread");
+ spillLock.lock();
+ try {
+ spillThread.start();
+ while (!spillThreadRunning) {
+ spillDone.await();
+ }
+ } catch (InterruptedException e) {
+ throw new IOException("Spill thread failed to initialize", e);
+ } finally {
+ spillLock.unlock();
+ }
+ if (sortSpillException != null) {
+ throw new IOException("Spill thread failed to initialize",
+ sortSpillException);
+ }
+ }
+
+ @Override
+ public void write(Object key, Object value)
+ throws IOException, InterruptedException {
+ collect(
+ key, value, partitioner.getPartition(key, value, partitions));
+ }
+
+ /**
+ * Serialize the key, value to intermediate storage.
+ * When this method returns, kvindex must refer to sufficient unused
+ * storage to store one METADATA.
+ */
+ synchronized void collect(Object key, Object value, final int partition
+ ) throws IOException {
+ task.getTaskReporter().progress();
+ if (key.getClass() != keyClass) {
+ throw new IOException("Type mismatch in key from map: expected "
+ + keyClass.getName() + ", received "
+ + key.getClass().getName());
+ }
+ if (value.getClass() != valClass) {
+ throw new IOException("Type mismatch in value from map: expected "
+ + valClass.getName() + ", received "
+ + value.getClass().getName());
+ }
+ if (partition < 0 || partition >= partitions) {
+ throw new IOException("Illegal partition for " + key + " (" +
+ partition + ")");
+ }
+ checkSpillException();
+ bufferRemaining -= METASIZE;
+ if (bufferRemaining <= 0) {
+ // start spill if the thread is not running and the soft limit has been
+ // reached
+ spillLock.lock();
+ try {
+ do {
+ if (!spillInProgress) {
+ final int kvbidx = 4 * kvindex;
+ final int kvbend = 4 * kvend;
+ // serialized, unspilled bytes always lie between kvindex and
+ // bufindex, crossing the equator. Note that any void space
+ // created by a reset must be included in "used" bytes
+ final int bUsed = distanceTo(kvbidx, bufindex);
+ final boolean bufsoftlimit = bUsed >= softLimit;
+ if ((kvbend + METASIZE) % kvbuffer.length !=
+ equator - (equator % METASIZE)) {
+ // spill finished, reclaim space
+ resetSpill();
+ bufferRemaining = Math.min(
+ distanceTo(bufindex, kvbidx) - 2 * METASIZE,
+ softLimit - bUsed) - METASIZE;
+ continue;
+ } else if (bufsoftlimit && kvindex != kvend) {
+ // spill records, if any collected; check latter, as it may
+ // be possible for metadata alignment to hit spill pcnt
+ startSpill();
+ final int avgRec = (int)
+ (mapOutputByteCounter.getValue() /
+ mapOutputRecordCounter.getValue());
+ // leave at least half the split buffer for serialization data
+ // ensure that kvindex >= bufindex
+ final int distkvi = distanceTo(bufindex, kvbidx);
+ final int newPos = (bufindex +
+ Math.max(2 * METASIZE - 1,
+ Math.min(distkvi / 2,
+ distkvi / (METASIZE + avgRec) * METASIZE)))
+ % kvbuffer.length;
+ setEquator(newPos);
+ bufmark = bufindex = newPos;
+ final int serBound = 4 * kvend;
+ // bytes remaining before the lock must be held and limits
+ // checked is the minimum of three arcs: the metadata space, the
+ // serialization space, and the soft limit
+ bufferRemaining = Math.min(
+ // metadata max
+ distanceTo(bufend, newPos),
+ Math.min(
+ // serialization max
+ distanceTo(newPos, serBound),
+ // soft limit
+ softLimit)) - 2 * METASIZE;
+ }
+ }
+ } while (false);
+ } finally {
+ spillLock.unlock();
+ }
+ }
+
+ try {
+ // serialize key bytes into buffer
+ int keystart = bufindex;
+ keySerializer.serialize(key);
+ if (bufindex < keystart) {
+ // wrapped the key; must make contiguous
+ bb.shiftBufferedKey();
+ keystart = 0;
+ }
+ // serialize value bytes into buffer
+ final int valstart = bufindex;
+ valSerializer.serialize(value);
+ // It's possible for records to have zero length, i.e. the serializer
+ // will perform no writes. To ensure that the boundary conditions are
+ // checked and that the kvindex invariant is maintained, perform a
+ // zero-length write into the buffer. The logic monitoring this could be
+ // moved into collect, but this is cleaner and inexpensive. For now, it
+ // is acceptable.
+ bb.write(b0, 0, 0);
+
+ // the record must be marked after the preceding write, as the metadata
+ // for this record are not yet written
+ int valend = bb.markRecord();
+
+ mapOutputRecordCounter.increment(1);
+ mapOutputByteCounter.increment(
+ distanceTo(keystart, valend, bufvoid));
+
+ // write accounting info
+ kvmeta.put(kvindex + INDEX, kvindex);
+ kvmeta.put(kvindex + PARTITION, partition);
+ kvmeta.put(kvindex + KEYSTART, keystart);
+ kvmeta.put(kvindex + VALSTART, valstart);
+ // advance kvindex
+ kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();
+ } catch (MapBufferTooSmallException e) {
+ LOG.info("Record too large for in-memory buffer: " + e.getMessage());
+ spillSingleRecord(key, value, partition);
+ mapOutputRecordCounter.increment(1);
+ return;
+ }
+ }
+
+ /**
+ * Set the point from which meta and serialization data expand. The meta
+ * indices are aligned with the buffer, so metadata never spans the ends of
+ * the circular buffer.
+ */
+ private void setEquator(int pos) {
+ equator = pos;
+ // set index prior to first entry, aligned at meta boundary
+ final int aligned = pos - (pos % METASIZE);
+ kvindex =
+ ((aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
+ if (LOG.isInfoEnabled()) {
+ LOG.info("(EQUATOR) " + pos + " kvi " + kvindex +
+ "(" + (kvindex * 4) + ")");
+ }
+ }
+
+ /**
+ * The spill is complete, so set the buffer and meta indices to be equal to
+ * the new equator to free space for continuing collection. Note that when
+ * kvindex == kvend == kvstart, the buffer is empty.
+ */
+ private void resetSpill() {
+ final int e = equator;
+ bufstart = bufend = e;
+ final int aligned = e - (e % METASIZE);
+ // set start/end to point to first meta record
+ kvstart = kvend =
+ ((aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
+ if (LOG.isInfoEnabled()) {
+ LOG.info("(RESET) equator " + e + " kv " + kvstart + "(" +
+ (kvstart * 4) + ")" + " kvi " + kvindex + "(" + (kvindex * 4) + ")");
+ }
+ }
+
+ /**
+ * Compute the distance in bytes between two indices in the serialization
+ * buffer.
+ * @see #distanceTo(int,int,int)
+ */
+ final int distanceTo(final int i, final int j) {
+ return distanceTo(i, j, kvbuffer.length);
+ }
+
+ /**
+ * Compute the distance between two indices in the circular buffer given the
+ * max distance.
+ */
+ int distanceTo(final int i, final int j, final int mod) {
+ return i <= j
+ ? j - i
+ : mod - i + j;
+ }
+
+ /**
+ * For the given meta position, return the dereferenced position in the
+ * integer array. Each meta block contains several integers describing
+ * record data in its serialized form, but the INDEX is not necessarily
+ * related to the proximate metadata. The index value at the referenced int
+ * position is the start offset of the associated metadata block. So the
+ * metadata INDEX at metapos may point to the metadata described by the
+ * metadata block at metapos + k, which contains information about that
+ * serialized record.
+ */
+ int offsetFor(int metapos) {
+ return kvmeta.get((metapos % maxRec) * NMETA + INDEX);
+ }
+
+ /**
+ * Compare logical range, st i, j MOD offset capacity.
+ * Compare by partition, then by key.
+ * @see IndexedSortable#compare
+ */
+ public int compare(final int mi, final int mj) {
+ final int kvi = offsetFor(mi);
+ final int kvj = offsetFor(mj);
+ final int kvip = kvmeta.get(kvi + PARTITION);
+ final int kvjp = kvmeta.get(kvj + PARTITION);
+ // sort by partition
+ if (kvip != kvjp) {
+ return kvip - kvjp;
+ }
+ // sort by key
+ return comparator.compare(kvbuffer,
+ kvmeta.get(kvi + KEYSTART),
+ kvmeta.get(kvi + VALSTART) - kvmeta.get(kvi + KEYSTART),
+ kvbuffer,
+ kvmeta.get(kvj + KEYSTART),
+ kvmeta.get(kvj + VALSTART) - kvmeta.get(kvj + KEYSTART));
+ }
+
+ /**
+ * Swap logical indices st i, j MOD offset capacity.
+ * @see IndexedSortable#swap
+ */
+ public void swap(final int mi, final int mj) {
+ final int kvi = (mi % maxRec) * NMETA + INDEX;
+ final int kvj = (mj % maxRec) * NMETA + INDEX;
+ int tmp = kvmeta.get(kvi);
+ kvmeta.put(kvi, kvmeta.get(kvj));
+ kvmeta.put(kvj, tmp);
+ }
+
+ /**
+ * Inner class managing the spill of serialized records to disk.
+ */
+ protected class BlockingBuffer extends DataOutputStream {
+
+ public BlockingBuffer() {
+ super(new Buffer());
+ }
+
+ /**
+ * Mark end of record. Note that this is required if the buffer is to
+ * cut the spill in the proper place.
+ */
+ public int markRecord() {
+ bufmark = bufindex;
+ return bufindex;
+ }
+
+ /**
+ * Set position from last mark to end of writable buffer, then rewrite
+ * the data between last mark and kvindex.
+ * This handles a special case where the key wraps around the buffer.
+ * If the key is to be passed to a RawComparator, then it must be
+ * contiguous in the buffer. This recopies the data in the buffer back
+ * into itself, but starting at the beginning of the buffer. Note that
+ * this method should <b>only</b> be called immediately after detecting
+ * this condition. To call it at any other time is undefined and would
+ * likely result in data loss or corruption.
+ * @see #markRecord()
+ */
+ protected void shiftBufferedKey() throws IOException {
+ // spillLock unnecessary; both kvend and kvindex are current
+ int headbytelen = bufvoid - bufmark;
+ bufvoid = bufmark;
+ final int kvbidx = 4 * kvindex;
+ final int kvbend = 4 * kvend;
+ final int avail =
+ Math.min(distanceTo(0, kvbidx), distanceTo(0, kvbend));
+ if (bufindex + headbytelen < avail) {
+ System.arraycopy(kvbuffer, 0, kvbuffer, headbytelen, bufindex);
+ System.arraycopy(kvbuffer, bufvoid, kvbuffer, 0, headbytelen);
+ bufindex += headbytelen;
+ bufferRemaining -= kvbuffer.length - bufvoid;
+ } else {
+ byte[] keytmp = new byte[bufindex];
+ System.arraycopy(kvbuffer, 0, keytmp, 0, bufindex);
+ bufindex = 0;
+ out.write(kvbuffer, bufmark, headbytelen);
+ out.write(keytmp);
+ }
+ }
+ }
+
+ public class Buffer extends OutputStream {
+ private final byte[] scratch = new byte[1];
+
+ @Override
+ public void write(int v)
+ throws IOException {
+ scratch[0] = (byte)v;
+ write(scratch, 0, 1);
+ }
+
+ /**
+ * Attempt to write a sequence of bytes to the collection buffer.
+ * This method will block if the spill thread is running and it
+ * cannot write.
+ * @throws MapBufferTooSmallException if record is too large to
+ * deserialize into the collection buffer.
+ */
+ @Override
+ public void write(byte b[], int off, int len)
+ throws IOException {
+ // must always verify the invariant that at least METASIZE bytes are
+ // available beyond kvindex, even when len == 0
+ bufferRemaining -= len;
+ if (bufferRemaining <= 0) {
+ // writing these bytes could exhaust available buffer space or fill
+ // the buffer to soft limit. check if spill or blocking are necessary
+ boolean blockwrite = false;
+ spillLock.lock();
+ try {
+ do {
+ checkSpillException();
+
+ final int kvbidx = 4 * kvindex;
+ final int kvbend = 4 * kvend;
+ // ser distance to key index
+ final int distkvi = distanceTo(bufindex, kvbidx);
+ // ser distance to spill end index
+ final int distkve = distanceTo(bufindex, kvbend);
+
+ // if kvindex is closer than kvend, then a spill is neither in
+ // progress nor complete and reset since the lock was held. The
+ // write should block only if there is insufficient space to
+ // complete the current write, write the metadata for this record,
+ // and write the metadata for the next record. If kvend is closer,
+ // then the write should block if there is too little space for
+ // either the metadata or the current write. Note that collect
+ // ensures its metadata requirement with a zero-length write
+ blockwrite = distkvi <= distkve
+ ? distkvi <= len + 2 * METASIZE
+ : distkve <= len || distanceTo(bufend, kvbidx) < 2 * METASIZE;
+
+ if (!spillInProgress) {
+ if (blockwrite) {
+ if ((kvbend + METASIZE) % kvbuffer.length !=
+ equator - (equator % METASIZE)) {
+ // spill finished, reclaim space
+ // need to use meta exclusively; zero-len rec & 100% spill
+ // pcnt would fail
+ resetSpill(); // resetSpill doesn't move bufindex, kvindex
+ bufferRemaining = Math.min(
+ distkvi - 2 * METASIZE,
+ softLimit - distanceTo(kvbidx, bufindex)) - len;
+ continue;
+ }
+ // we have records we can spill; only spill if blocked
+ if (kvindex != kvend) {
+ startSpill();
+ // Blocked on this write, waiting for the spill just
+ // initiated to finish. Instead of repositioning the marker
+ // and copying the partial record, we set the record start
+ // to be the new equator
+ setEquator(bufmark);
+ } else {
+ // We have no buffered records, and this record is too large
+ // to write into kvbuffer. We must spill it directly from
+ // collect
+ final int size = distanceTo(bufstart, bufindex) + len;
+ setEquator(0);
+ bufstart = bufend = bufindex = equator;
+ kvstart = kvend = kvindex;
+ bufvoid = kvbuffer.length;
+ throw new MapBufferTooSmallException(size + " bytes");
+ }
+ }
+ }
+
+ if (blockwrite) {
+ // wait for spill
+ try {
+ while (spillInProgress) {
+ task.getTaskReporter().progress();
+ spillDone.await();
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(
+ "Buffer interrupted while waiting for the writer", e);
+ }
+ }
+ } while (blockwrite);
+ } finally {
+ spillLock.unlock();
+ }
+ }
+ // here, we know that we have sufficient space to write
+ if (bufindex + len > bufvoid) {
+ final int gaplen = bufvoid - bufindex;
+ System.arraycopy(b, off, kvbuffer, bufindex, gaplen);
+ len -= gaplen;
+ off += gaplen;
+ bufindex = 0;
+ }
+ System.arraycopy(b, off, kvbuffer, bufindex, len);
+ bufindex += len;
+ }
+ }
+
+ @Override
+ public void flush() throws IOException, InterruptedException {
+ LOG.info("Starting flush of map output");
+ spillLock.lock();
+ try {
+ while (spillInProgress) {
+ task.getTaskReporter().progress();
+ spillDone.await();
+ }
+ checkSpillException();
+
+ final int kvbend = 4 * kvend;
+ if ((kvbend + METASIZE) % kvbuffer.length !=
+ equator - (equator % METASIZE)) {
+ // spill finished
+ resetSpill();
+ }
+ if (kvindex != kvend) {
+ kvend = (kvindex + NMETA) % kvmeta.capacity();
+ bufend = bufmark;
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Sorting & Spilling map output");
+ LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
+ "; bufvoid = " + bufvoid);
+ LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) +
+ "); kvend = " + kvend + "(" + (kvend * 4) +
+ "); length = " + (distanceTo(kvend, kvstart,
+ kvmeta.capacity()) + 1) + "/" + maxRec);
+ }
+ sortAndSpill();
+ }
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted while waiting for the writer", e);
+ } finally {
+ spillLock.unlock();
+ }
+ assert !spillLock.isHeldByCurrentThread();
+ // shut down spill thread and wait for it to exit. Since the preceding
+ // ensures that it is finished with its work (and sortAndSpill did not
+ // throw), we elect to use an interrupt instead of setting a flag.
+ // Spilling simultaneously from this thread while the spill thread
+ // finishes its work might be both a useful way to extend this and also
+ // sufficient motivation for the latter approach.
+ try {
+ spillThread.interrupt();
+ spillThread.join();
+ } catch (InterruptedException e) {
+ throw new IOException("Spill failed", e);
+ }
+ // release sort buffer before the merge
+ //FIXME
+ //kvbuffer = null;
+ mergeParts();
+ Path outputPath = mapOutputFile.getOutputFile();
+ fileOutputByteCounter.increment(rfs.getFileStatus(outputPath).getLen());
+ }
+
+ @Override
+ public void close() throws IOException, InterruptedException { }
+
+ protected class SpillThread extends Thread {
+
+ @Override
+ public void run() {
+ spillLock.lock();
+ spillThreadRunning = true;
+ try {
+ while (true) {
+ spillDone.signal();
+ while (!spillInProgress) {
+ spillReady.await();
+ }
+ try {
+ spillLock.unlock();
+ sortAndSpill();
+ } catch (Throwable t) {
+ sortSpillException = t;
+ } finally {
+ spillLock.lock();
+ if (bufend < bufstart) {
+ bufvoid = kvbuffer.length;
+ }
+ kvstart = kvend;
+ bufstart = bufend;
+ spillInProgress = false;
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ spillLock.unlock();
+ spillThreadRunning = false;
+ }
+ }
+ }
+
+ private void checkSpillException() throws IOException {
+ final Throwable lspillException = sortSpillException;
+ if (lspillException != null) {
+ if (lspillException instanceof Error) {
+ final String logMsg = "Task " + task.getTaskAttemptId() + " failed : " +
+ StringUtils.stringifyException(lspillException);
+ task.getTaskReporter().reportFatalError(
+ task.getTaskAttemptId(), lspillException, logMsg);
+ }
+ throw new IOException("Spill failed", lspillException);
+ }
+ }
+
+ private void startSpill() {
+ assert !spillInProgress;
+ kvend = (kvindex + NMETA) % kvmeta.capacity();
+ bufend = bufmark;
+ spillInProgress = true;
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Spilling map output");
+ LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
+ "; bufvoid = " + bufvoid);
+ LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) +
+ "); kvend = " + kvend + "(" + (kvend * 4) +
+ "); length = " + (distanceTo(kvend, kvstart,
+ kvmeta.capacity()) + 1) + "/" + maxRec);
+ }
+ spillReady.signal();
+ }
+
+ int getMetaStart() {
+ return kvend / NMETA;
+ }
+
+ int getMetaEnd() {
+ return 1 + // kvend is a valid record
+ (kvstart >= kvend
+ ? kvstart
+ : kvmeta.capacity() + kvstart) / NMETA;
+ }
+
+ protected void sortAndSpill()
+ throws IOException, InterruptedException {
+ final int mstart = getMetaStart();
+ final int mend = getMetaEnd();
+ sorter.sort(this, mstart, mend, task.getTaskReporter());
+ spill(mstart, mend);
+ }
+
+ protected void spill(int mstart, int mend)
+ throws IOException, InterruptedException {
+
+ //approximate the length of the output file to be the length of the
+ //buffer + header lengths for the partitions
+ final long size = (bufend >= bufstart
+ ? bufend - bufstart
+ : (bufvoid - bufend) + bufstart) +
+ partitions * APPROX_HEADER_LENGTH;
+ FSDataOutputStream out = null;
+ try {
+ // create spill file
+ final TezSpillRecord spillRec = new TezSpillRecord(partitions);
+ final Path filename =
+ mapOutputFile.getSpillFileForWrite(numSpills, size);
+ out = rfs.create(filename);
+
+ int spindex = mstart;
+ final InMemValBytes value = createInMemValBytes();
+ for (int i = 0; i < partitions; ++i) {
+ IFile.Writer writer = null;
+ try {
+ long segmentStart = out.getPos();
+ writer = new Writer(job, out, keyClass, valClass, codec,
+ spilledRecordsCounter);
+ if (combineProcessor == null) {
+ // spill directly
+ DataInputBuffer key = new DataInputBuffer();
+ while (spindex < mend &&
+ kvmeta.get(offsetFor(spindex) + PARTITION) == i) {
+ final int kvoff = offsetFor(spindex);
+ key.reset(
+ kvbuffer,
+ kvmeta.get(kvoff + KEYSTART),
+ (kvmeta.get(kvoff + VALSTART) - kvmeta.get(kvoff + KEYSTART))
+ );
+ getVBytesForOffset(kvoff, value);
+ writer.append(key, value);
+ ++spindex;
+ }
+ } else {
+ int spstart = spindex;
+ while (spindex < mend &&
+ kvmeta.get(offsetFor(spindex)
+ + PARTITION) == i) {
+ ++spindex;
+ }
+ // Note: we would like to avoid the combiner if we've fewer
+ // than some threshold of records for a partition
+ if (spstart != spindex) {
+ TezRawKeyValueIterator kvIter =
+ new MRResultIterator(spstart, spindex);
+ runCombineProcessor(kvIter, writer);
+ }
+ }
+
+ // close the writer
+ writer.close();
+
+ // record offsets
+ final TezIndexRecord rec =
+ new TezIndexRecord(
+ segmentStart,
+ writer.getRawLength(),
+ writer.getCompressedLength());
+ spillRec.putIndex(rec, i);
+
+ writer = null;
+ } finally {
+ if (null != writer) writer.close();
+ }
+ }
+
+ if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
+ // create spill index file
+ Path indexFilename =
+ mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
+ * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+ spillRec.writeToFile(indexFilename, job);
+ } else {
+ indexCacheList.add(spillRec);
+ totalIndexCacheMemory +=
+ spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
+ }
+ LOG.info("Finished spill " + numSpills);
+ ++numSpills;
+ } finally {
+ if (out != null) out.close();
+ }
+ }
+
+ /**
+ * Handles the degenerate case where serialization fails to fit in
+ * the in-memory buffer, so we must spill the record from collect
+ * directly to a spill file. Consider this "losing".
+ */
+ private void spillSingleRecord(final Object key, final Object value,
+ int partition) throws IOException {
+ long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
+ FSDataOutputStream out = null;
+ try {
+ // create spill file
+ final TezSpillRecord spillRec = new TezSpillRecord(partitions);
+ final Path filename =
+ mapOutputFile.getSpillFileForWrite(numSpills, size);
+ out = rfs.create(filename);
+
+ // we don't run the combiner for a single record
+ for (int i = 0; i < partitions; ++i) {
+ IFile.Writer writer = null;
+ try {
+ long segmentStart = out.getPos();
+ // Create a new codec, don't care!
+ writer = new IFile.Writer(job, out, keyClass, valClass, codec,
+ spilledRecordsCounter);
+
+ if (i == partition) {
+ final long recordStart = out.getPos();
+ writer.append(key, value);
+ // Note that our map byte count will not be accurate with
+ // compression
+ mapOutputByteCounter.increment(out.getPos() - recordStart);
+ }
+ writer.close();
+
+ // record offsets
+ TezIndexRecord rec =
+ new TezIndexRecord(
+ segmentStart,
+ writer.getRawLength(),
+ writer.getCompressedLength());
+ spillRec.putIndex(rec, i);
+
+ writer = null;
+ } catch (IOException e) {
+ if (null != writer) writer.close();
+ throw e;
+ }
+ }
+ if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
+ // create spill index file
+ Path indexFilename =
+ mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
+ * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+ spillRec.writeToFile(indexFilename, job);
+ } else {
+ indexCacheList.add(spillRec);
+ totalIndexCacheMemory +=
+ spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
+ }
+ ++numSpills;
+ } finally {
+ if (out != null) out.close();
+ }
+ }
+
+ protected int getInMemVBytesLength(int kvoff) {
+ // get the keystart for the next serialized value to be the end
+ // of this value. If this is the last value in the buffer, use bufend
+ final int nextindex = kvoff == kvend
+ ? bufend
+ : kvmeta.get(
+ (kvoff - NMETA + kvmeta.capacity() + KEYSTART) % kvmeta.capacity());
+ // calculate the length of the value
+ int vallen = (nextindex >= kvmeta.get(kvoff + VALSTART))
+ ? nextindex - kvmeta.get(kvoff + VALSTART)
+ : (bufvoid - kvmeta.get(kvoff + VALSTART)) + nextindex;
+ return vallen;
+ }
+
+ /**
+ * Given an offset, populate vbytes with the associated set of
+ * deserialized value bytes. Should only be called during a spill.
+ */
+ int getVBytesForOffset(int kvoff, InMemValBytes vbytes) {
+ int vallen = getInMemVBytesLength(kvoff);
+ vbytes.reset(kvbuffer, kvmeta.get(kvoff + VALSTART), vallen);
+ return vallen;
+ }
+
+ /**
+ * Inner class wrapping valuebytes, used for appendRaw.
+ */
+ static class InMemValBytes extends DataInputBuffer {
+ private byte[] buffer;
+ private int start;
+ private int length;
+ private final int bufvoid;
+
+ public InMemValBytes(int bufvoid) {
+ this.bufvoid = bufvoid;
+ }
+
+ public void reset(byte[] buffer, int start, int length) {
+ this.buffer = buffer;
+ this.start = start;
+ this.length = length;
+
+ if (start + length > bufvoid) {
+ this.buffer = new byte[this.length];
+ final int taillen = bufvoid - start;
+ System.arraycopy(buffer, start, this.buffer, 0, taillen);
+ System.arraycopy(buffer, 0, this.buffer, taillen, length-taillen);
+ this.start = 0;
+ }
+
+ super.reset(this.buffer, this.start, this.length);
+ }
+ }
+
+ InMemValBytes createInMemValBytes() {
+ return new InMemValBytes(bufvoid);
+ }
+
+ protected class MRResultIterator implements TezRawKeyValueIterator {
+ private final DataInputBuffer keybuf = new DataInputBuffer();
+ private final InMemValBytes vbytes = createInMemValBytes();
+ private final int end;
+ private int current;
+ public MRResultIterator(int start, int end) {
+ this.end = end;
+ current = start - 1;
+ }
+ public boolean next() throws IOException {
+ return ++current < end;
+ }
+ public DataInputBuffer getKey() throws IOException {
+ final int kvoff = offsetFor(current);
+ keybuf.reset(kvbuffer, kvmeta.get(kvoff + KEYSTART),
+ kvmeta.get(kvoff + VALSTART) - kvmeta.get(kvoff + KEYSTART));
+ return keybuf;
+ }
+ public DataInputBuffer getValue() throws IOException {
+ getVBytesForOffset(offsetFor(current), vbytes);
+ return vbytes;
+ }
+ public Progress getProgress() {
+ return null;
+ }
+ public void close() { }
+ }
+
+ private void mergeParts() throws IOException, InterruptedException {
+ // get the approximate size of the final output/index files
+ long finalOutFileSize = 0;
+ long finalIndexFileSize = 0;
+ final Path[] filename = new Path[numSpills];
+ final TezTaskAttemptID mapId = task.getTaskAttemptId();
+
+ for(int i = 0; i < numSpills; i++) {
+ filename[i] = mapOutputFile.getSpillFile(i);
+ finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
+ }
+ if (numSpills == 1) { //the spill is the final output
+ sameVolRename(filename[0],
+ mapOutputFile.getOutputFileForWriteInVolume(filename[0]));
+ if (indexCacheList.size() == 0) {
+ sameVolRename(mapOutputFile.getSpillIndexFile(0),
+ mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]));
+ } else {
+ indexCacheList.get(0).writeToFile(
+ mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), job);
+ }
+ sortPhase.complete();
+ return;
+ }
+
+ // read in paged indices
+ for (int i = indexCacheList.size(); i < numSpills; ++i) {
+ Path indexFileName = mapOutputFile.getSpillIndexFile(i);
+ indexCacheList.add(new TezSpillRecord(indexFileName, job));
+ }
+
+ //make correction in the length to include the sequence file header
+ //lengths for each partition
+ finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
+ finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
+ Path finalOutputFile =
+ mapOutputFile.getOutputFileForWrite(finalOutFileSize);
+ Path finalIndexFile =
+ mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);
+
+ //The output stream for the final single output file
+ FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
+
+ if (numSpills == 0) {
+ //create dummy files
+
+ TezSpillRecord sr = new TezSpillRecord(partitions);
+ try {
+ for (int i = 0; i < partitions; i++) {
+ long segmentStart = finalOut.getPos();
+ Writer writer =
+ new Writer(job, finalOut, keyClass, valClass, codec, null);
+ writer.close();
+
+ TezIndexRecord rec =
+ new TezIndexRecord(
+ segmentStart,
+ writer.getRawLength(),
+ writer.getCompressedLength());
+ sr.putIndex(rec, i);
+ }
+ sr.writeToFile(finalIndexFile, job);
+ } finally {
+ finalOut.close();
+ }
+ sortPhase.complete();
+ return;
+ }
+ {
+ sortPhase.addPhases(partitions); // Divide sort phase into sub-phases
+ TezMerger.considerFinalMergeForProgress();
+
+ final TezSpillRecord spillRec = new TezSpillRecord(partitions);
+ for (int parts = 0; parts < partitions; parts++) {
+ //create the segments to be merged
+ List<Segment> segmentList =
+ new ArrayList<Segment>(numSpills);
+ for(int i = 0; i < numSpills; i++) {
+ TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
+
+ Segment s =
+ new Segment(job, rfs, filename[i], indexRecord.getStartOffset(),
+ indexRecord.getPartLength(), codec, true);
+ segmentList.add(i, s);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("MapId=" + mapId + " Reducer=" + parts +
+ "Spill =" + i + "(" + indexRecord.getStartOffset() + "," +
+ indexRecord.getRawLength() + ", " +
+ indexRecord.getPartLength() + ")");
+ }
+ }
+
+ int mergeFactor =
+ job.getInt(TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR,
+ TezJobConfig.DEFAULT_TEZ_ENGINE_IO_SORT_FACTOR);
+ // sort the segments only if there are intermediate merges
+ boolean sortSegments = segmentList.size() > mergeFactor;
+ //merge
+ TezRawKeyValueIterator kvIter = TezMerger.merge(job, rfs,
+ keyClass, valClass, codec,
+ segmentList, mergeFactor,
+ new Path(mapId.toString()),
+ (RawComparator)ConfigUtils.getOutputKeyComparator(job),
+ task.getTaskReporter(), sortSegments,
+ null, spilledRecordsCounter,
+ sortPhase.phase());
+
+ //write merged output to disk
+ long segmentStart = finalOut.getPos();
+ Writer writer =
+ new Writer(job, finalOut, keyClass, valClass, codec,
+ spilledRecordsCounter);
+ if (combineProcessor == null || numSpills < minSpillsForCombine) {
+ TezMerger.writeFile(kvIter, writer, task.getTaskReporter(), job);
+ writer.close();
+ } else {
+ runCombineProcessor(kvIter, writer);
+ }
+
+ sortPhase.startNextPhase();
+
+ // record offsets
+ final TezIndexRecord rec =
+ new TezIndexRecord(
+ segmentStart,
+ writer.getRawLength(),
+ writer.getCompressedLength());
+ spillRec.putIndex(rec, parts);
+ }
+ spillRec.writeToFile(finalIndexFile, job);
+ finalOut.close();
+ for(int i = 0; i < numSpills; i++) {
+ rfs.delete(filename[i],true);
+ }
+ }
+ }
+
+ @Override
+ public OutputContext getOutputContext() {
+ return null;
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,144 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.common.sort.impl.dflt;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.IntBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.common.shuffle.impl.ShuffleHeader;
+import org.apache.tez.engine.common.shuffle.server.ShuffleHandler;
+import org.apache.tez.engine.common.sort.impl.IFile;
+import org.apache.tez.engine.records.OutputContext;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+public class InMemoryShuffleSorter extends DefaultSorter {
+
+ private static final Log LOG = LogFactory.getLog(InMemoryShuffleSorter.class);
+
+ static final int IFILE_EOF_LENGTH =
+ 2 * WritableUtils.getVIntSize(IFile.EOF_MARKER);
+ static final int IFILE_CHECKSUM_LENGTH = DataChecksum.Type.CRC32.size;
+
+ private List<Integer> spillIndices = new ArrayList<Integer>();
+ private List<ShuffleHeader> shuffleHeaders = new ArrayList<ShuffleHeader>();
+
+ ShuffleHandler shuffleHandler = new ShuffleHandler(this);
+
+ byte[] kvbuffer;
+ IntBuffer kvmeta;
+
+ @Inject
+ public InMemoryShuffleSorter(
+ @Assisted TezTask task
+ ) throws IOException {
+ super(task);
+ }
+
+ @Override
+ public void initialize(Configuration conf, Master master) throws IOException,
+ InterruptedException {
+ super.initialize(conf, master);
+ shuffleHandler.init(conf, task);
+ }
+
+ @Override
+ protected void spill(int mstart, int mend)
+ throws IOException, InterruptedException {
+ // Start the shuffleHandler
+ shuffleHandler.start();
+
+ // Don't spill!
+
+ // Make a copy
+ this.kvbuffer = super.kvbuffer;
+ this.kvmeta = super.kvmeta;
+
+ // Just save spill-indices for serving later
+ int spindex = mstart;
+ for (int i = 0; i < partitions; ++i) {
+ spillIndices.add(spindex);
+
+ int length = 0;
+ while (spindex < mend &&
+ kvmeta.get(offsetFor(spindex) + PARTITION) == i) {
+
+ final int kvoff = offsetFor(spindex);
+ int keyLen =
+ kvmeta.get(kvoff + VALSTART) - kvmeta.get(kvoff + KEYSTART);
+ int valLen = getInMemVBytesLength(kvoff);
+ length +=
+ (keyLen + WritableUtils.getVIntSize(keyLen)) +
+ (valLen + WritableUtils.getVIntSize(valLen));
+
+ ++spindex;
+ }
+ length += IFILE_EOF_LENGTH;
+
+ shuffleHeaders.add(
+ new ShuffleHeader(
+ task.getTaskAttemptId().toString(),
+ length + IFILE_CHECKSUM_LENGTH, length, i)
+ );
+ LOG.info("shuffleHeader[" + i + "]:" +
+ " rawLen=" + length + " partLen=" + (length + IFILE_CHECKSUM_LENGTH) +
+ " spillIndex=" + spillIndices.get(i));
+ }
+
+ LOG.info("Saved " + spillIndices.size() + " spill-indices and " +
+ shuffleHeaders.size() + " shuffle headers");
+ }
+
+ @Override
+ public InputStream getSortedStream(int partition) {
+ return new SortBufferInputStream(this, partition);
+ }
+
+ @Override
+ public void close() throws IOException, InterruptedException{
+ // FIXME
+ //shuffleHandler.stop();
+ }
+
+ @Override
+ public ShuffleHeader getShuffleHeader(int reduce) {
+ return shuffleHeaders.get(reduce);
+ }
+
+ public int getSpillIndex(int partition) {
+ return spillIndices.get(partition);
+ }
+
+ @Override
+ public OutputContext getOutputContext() {
+ return new OutputContext(shuffleHandler.getPort());
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/SortBufferInputStream.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/SortBufferInputStream.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/SortBufferInputStream.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/SortBufferInputStream.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,271 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.common.sort.impl.dflt;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.IntBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.BoundedByteArrayOutputStream;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.tez.engine.common.shuffle.impl.InMemoryWriter;
+import org.apache.tez.engine.common.sort.impl.dflt.DefaultSorter.InMemValBytes;
+
+class SortBufferInputStream extends InputStream {
+
+ private static final Log LOG = LogFactory.getLog(SortBufferInputStream.class);
+
+ private final InMemoryShuffleSorter sorter;
+ private InMemoryWriter sortOutput;
+
+ private int mend;
+ private int recIndex;
+ private final byte[] kvbuffer;
+ private final IntBuffer kvmeta;
+ private final int partitionBytes;
+ private final int partition;
+
+ byte[] dualBuf = new byte[8192];
+ DualBufferOutputStream out;
+ private int readBytes = 0;
+
+ public SortBufferInputStream(
+ InMemoryShuffleSorter sorter, int partition) {
+ this.sorter = sorter;
+ this.partitionBytes =
+ (int)sorter.getShuffleHeader(partition).getCompressedLength();
+ this.partition = partition;
+ this.mend = sorter.getMetaEnd();
+ this.recIndex = sorter.getSpillIndex(partition);
+ this.kvbuffer = sorter.kvbuffer;
+ this.kvmeta = sorter.kvmeta;
+ out = new DualBufferOutputStream(null, 0, 0, dualBuf);
+ sortOutput = new InMemoryWriter(out);
+ }
+
+ byte[] one = new byte[1];
+
+ @Override
+ public int read() throws IOException {
+ int b = read(one, 0, 1);
+ return (b == -1) ? b : one[0];
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (available() == 0) {
+ return -1;
+ }
+
+ int currentOffset = off;
+ int currentLength = len;
+ int currentReadBytes = 0;
+
+ // Check if there is residual data in the dualBuf
+ int residualLen = out.getCurrent();
+ if (residualLen > 0) {
+ int readable = Math.min(currentLength, residualLen);
+ System.arraycopy(dualBuf, 0, b, currentOffset, readable);
+ currentOffset += readable;
+ currentReadBytes += readable;
+ out.setCurrentPointer(-readable);
+
+ // buffer has less capacity
+ currentLength -= readable;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("XXX read_residual:" +
+ " readable=" + readable +
+ " readBytes=" + readBytes);
+ }
+ }
+
+ // Now, use the provided buffer
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("XXX read: out.reset" +
+ " b=" + b +
+ " currentOffset=" + currentOffset +
+ " currentLength=" + currentLength +
+ " recIndex=" + recIndex);
+ }
+ out.reset(b, currentOffset, currentLength);
+
+ // Read from sort-buffer into the provided buffer, space permitting
+ DataInputBuffer key = new DataInputBuffer();
+ final InMemValBytes value = sorter.createInMemValBytes();
+
+ int kvPartition = 0;
+ int numRec = 0;
+ for (;
+ currentLength > 0 && recIndex < mend &&
+ (kvPartition = getKVPartition(recIndex)) == partition;
+ ++recIndex) {
+
+ final int kvoff = sorter.offsetFor(recIndex);
+
+ int keyLen =
+ (kvmeta.get(kvoff + InMemoryShuffleSorter.VALSTART) -
+ kvmeta.get(kvoff + InMemoryShuffleSorter.KEYSTART));
+ key.reset(
+ kvbuffer,
+ kvmeta.get(kvoff + InMemoryShuffleSorter.KEYSTART),
+ keyLen
+ );
+
+ int valLen = sorter.getVBytesForOffset(kvoff, value);
+
+ int recLen =
+ (keyLen + WritableUtils.getVIntSize(keyLen)) +
+ (valLen + WritableUtils.getVIntSize(valLen));
+
+ currentReadBytes += recLen;
+ currentOffset += recLen;
+ currentLength -= recLen;
+
+ // Write out key/value into the in-mem ifile
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("XXX read: sortOutput.append" +
+ " #rec=" + ++numRec +
+ " recIndex=" + recIndex + " kvoff=" + kvoff +
+ " keyLen=" + keyLen + " valLen=" + valLen + " recLen=" + recLen +
+ " readBytes=" + readBytes +
+ " currentReadBytes=" + currentReadBytes +
+ " currentLength=" + currentLength);
+ }
+ sortOutput.append(key, value);
+ }
+
+ // If we are at the end of the segment, close the ifile
+ if (currentLength > 0 &&
+ (recIndex == mend || kvPartition != partition)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("XXX About to call close:" +
+ " currentLength=" + currentLength +
+ " recIndex=" + recIndex + " mend=" + mend +
+ " kvPartition=" + kvPartition + " partitino=" + partition);
+ }
+ sortOutput.close();
+ currentReadBytes +=
+ (InMemoryShuffleSorter.IFILE_EOF_LENGTH +
+ InMemoryShuffleSorter.IFILE_CHECKSUM_LENGTH);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("XXX Hmm..." +
+ " currentLength=" + currentLength +
+ " recIndex=" + recIndex + " mend=" + mend +
+ " kvPartition=" + kvPartition + " partitino=" + partition);
+ }
+ }
+
+ int retVal = Math.min(currentReadBytes, len);
+ readBytes += retVal;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("XXX read: done" +
+ " retVal=" + retVal +
+ " currentReadBytes=" + currentReadBytes +
+ " len=" + len +
+ " readBytes=" + readBytes +
+ " partitionBytes=" + partitionBytes +
+ " residualBytes=" + out.getCurrent());
+ }
+ return retVal;
+ }
+
+ private int getKVPartition(int recIndex) {
+ return kvmeta.get(
+ sorter.offsetFor(recIndex) + InMemoryShuffleSorter.PARTITION);
+ }
+
+ @Override
+ public int available() throws IOException {
+ return (partitionBytes - readBytes);
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ }
+
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+
+ static class DualBufferOutputStream extends BoundedByteArrayOutputStream {
+
+ byte[] dualBuf;
+ int currentPointer = 0;
+ byte[] one = new byte[1];
+
+ public DualBufferOutputStream(
+ byte[] buf, int offset, int length,
+ byte[] altBuf) {
+ super(buf, offset, length);
+ this.dualBuf = altBuf;
+ }
+
+ public void reset(byte[] b, int off, int len) {
+ super.resetBuffer(b, off, len);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ one[0] = (byte)b;
+ write(one, 0, 1);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ write(b, 0, b.length);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ int available = super.available();
+ if (available >= len) {
+ super.write(b, off, len);
+ } else {
+ super.write(b, off, available);
+ System.arraycopy(b, off+available, dualBuf, currentPointer, len-available);
+ currentPointer += (len - available);
+ }
+ }
+
+ int getCurrent() {
+ return currentPointer;
+ }
+
+ void setCurrentPointer(int delta) {
+ if ((currentPointer + delta) > dualBuf.length) {
+ throw new IndexOutOfBoundsException("Trying to set dualBuf 'current'" +
+ " marker to " + (currentPointer+delta) + " when " +
+ " dualBuf.length is " + dualBuf.length);
+ }
+ currentPointer = (currentPointer + delta) % dualBuf.length;
+ }
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/SortBufferInputStream.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/CombineValuesIterator.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/CombineValuesIterator.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/CombineValuesIterator.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/CombineValuesIterator.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,51 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.common.task.impl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.tez.common.TezTaskReporter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+
+/** Iterator to return Combined values */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class CombineValuesIterator<KEY,VALUE>
+extends ValuesIterator<KEY,VALUE> {
+
+ private final TezCounter combineInputCounter;
+
+ public CombineValuesIterator(TezRawKeyValueIterator in,
+ RawComparator<KEY> comparator, Class<KEY> keyClass,
+ Class<VALUE> valClass, Configuration conf, TezTaskReporter reporter,
+ TezCounter combineInputCounter) throws IOException {
+ super(in, comparator, keyClass, valClass, conf, reporter);
+ this.combineInputCounter = combineInputCounter;
+ }
+
+ public VALUE next() {
+ combineInputCounter.increment(1);
+ return super.next();
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/CombineValuesIterator.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/ValuesIterator.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/ValuesIterator.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/ValuesIterator.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/ValuesIterator.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,142 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.common.task.impl;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.util.Progressable;
+import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+
+/** Iterates values while keys match in sorted input. */
+public class ValuesIterator<KEY,VALUE> implements Iterator<VALUE> {
+ protected TezRawKeyValueIterator in; //input iterator
+ private KEY key; // current key
+ private KEY nextKey;
+ private VALUE value; // current value
+ private boolean hasNext; // more w/ this key
+ private boolean more; // more in file
+ private RawComparator<KEY> comparator;
+ protected Progressable reporter;
+ private Deserializer<KEY> keyDeserializer;
+ private Deserializer<VALUE> valDeserializer;
+ private DataInputBuffer keyIn = new DataInputBuffer();
+ private DataInputBuffer valueIn = new DataInputBuffer();
+
+ public ValuesIterator (TezRawKeyValueIterator in,
+ RawComparator<KEY> comparator,
+ Class<KEY> keyClass,
+ Class<VALUE> valClass, Configuration conf,
+ Progressable reporter)
+ throws IOException {
+ this.in = in;
+ this.comparator = comparator;
+ this.reporter = reporter;
+ SerializationFactory serializationFactory = new SerializationFactory(conf);
+ this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
+ this.keyDeserializer.open(keyIn);
+ this.valDeserializer = serializationFactory.getDeserializer(valClass);
+ this.valDeserializer.open(this.valueIn);
+ readNextKey();
+ key = nextKey;
+ nextKey = null; // force new instance creation
+ hasNext = more;
+ }
+
+ TezRawKeyValueIterator getRawIterator() { return in; }
+
+ /// Iterator methods
+
+ public boolean hasNext() { return hasNext; }
+
+ private int ctr = 0;
+ public VALUE next() {
+ if (!hasNext) {
+ throw new NoSuchElementException("iterate past last value");
+ }
+ try {
+ readNextValue();
+ readNextKey();
+ } catch (IOException ie) {
+ throw new RuntimeException("problem advancing post rec#"+ctr, ie);
+ }
+ reporter.progress();
+ return value;
+ }
+
+ public void remove() { throw new RuntimeException("not implemented"); }
+
+ /// Auxiliary methods
+
+ /** Start processing next unique key. */
+ public void nextKey() throws IOException {
+ // read until we find a new key
+ while (hasNext) {
+ readNextKey();
+ }
+ ++ctr;
+
+ // move the next key to the current one
+ KEY tmpKey = key;
+ key = nextKey;
+ nextKey = tmpKey;
+ hasNext = more;
+ }
+
+ /** True iff more keys remain. */
+ public boolean more() {
+ return more;
+ }
+
+ /** The current key. */
+ public KEY getKey() {
+ return key;
+ }
+
+ /**
+ * read the next key
+ */
+ private void readNextKey() throws IOException {
+ more = in.next();
+ if (more) {
+ DataInputBuffer nextKeyBytes = in.getKey();
+ keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(), nextKeyBytes.getLength());
+ nextKey = keyDeserializer.deserialize(nextKey);
+ hasNext = key != null && (comparator.compare(key, nextKey) == 0);
+ } else {
+ hasNext = false;
+ }
+ }
+
+ /**
+ * Read the next value
+ * @throws IOException
+ */
+ private void readNextValue() throws IOException {
+ DataInputBuffer nextValueBytes = in.getValue();
+ valueIn.reset(nextValueBytes.getData(), nextValueBytes.getPosition(), nextValueBytes.getLength());
+ value = valDeserializer.deserialize(value);
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/ValuesIterator.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.task.local.output;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.common.Constants;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.engine.records.TezTaskID;
+
+/**
+ * Manipulate the working area for the transient store for maps and reduces.
+ *
+ * This class is used by map and reduce tasks to identify the directories that
+ * they need to write to/read from for intermediate files. The callers of
+ * these methods are from the Child running the Task.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TezLocalTaskOutputFiles extends TezTaskOutput {
+
+ private LocalDirAllocator lDirAlloc =
+ new LocalDirAllocator(TezJobConfig.LOCAL_DIR);
+
+ public TezLocalTaskOutputFiles() {
+ }
+
+ /**
+ * Return the path to local map output file created earlier
+ *
+ * @return path
+ * @throws IOException
+ */
+ @Override
+ public Path getOutputFile()
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
+ + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING, getConf());
+ }
+
+ /**
+ * Create a local map output file name.
+ *
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ @Override
+ public Path getOutputFileForWrite(long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
+ + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING, size, getConf());
+ }
+
+ /**
+ * Create a local map output file name on the same volume.
+ */
+ @Override
+ public Path getOutputFileForWriteInVolume(Path existing) {
+ return new Path(existing.getParent(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
+ }
+
+ /**
+ * Return the path to a local map output index file created earlier
+ *
+ * @return path
+ * @throws IOException
+ */
+ @Override
+ public Path getOutputIndexFile()
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
+ + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING,
+ getConf());
+ }
+
+ /**
+ * Create a local map output index file name.
+ *
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ @Override
+ public Path getOutputIndexFileForWrite(long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
+ + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING,
+ size, getConf());
+ }
+
+ /**
+ * Create a local map output index file name on the same volume.
+ */
+ @Override
+ public Path getOutputIndexFileForWriteInVolume(Path existing) {
+ return new Path(existing.getParent(),
+ Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING);
+ }
+
+ /**
+ * Return a local map spill file created earlier.
+ *
+ * @param spillNumber the number
+ * @return path
+ * @throws IOException
+ */
+ @Override
+ public Path getSpillFile(int spillNumber)
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + "/spill"
+ + spillNumber + ".out", getConf());
+ }
+
+ /**
+ * Create a local map spill file name.
+ *
+ * @param spillNumber the number
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ @Override
+ public Path getSpillFileForWrite(int spillNumber, long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + "/spill"
+ + spillNumber + ".out", size, getConf());
+ }
+
+ /**
+ * Return a local map spill index file created earlier
+ *
+ * @param spillNumber the number
+ * @return path
+ * @throws IOException
+ */
+ @Override
+ public Path getSpillIndexFile(int spillNumber)
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + "/spill"
+ + spillNumber + ".out.index", getConf());
+ }
+
+ /**
+ * Create a local map spill index file name.
+ *
+ * @param spillNumber the number
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ @Override
+ public Path getSpillIndexFileForWrite(int spillNumber, long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + "/spill"
+ + spillNumber + ".out.index", size, getConf());
+ }
+
+ /**
+ * Return a local reduce input file created earlier
+ *
+ * @param mapId a map task id
+ * @return path
+ * @throws IOException
+ */
+ @Override
+ public Path getInputFile(int mapId)
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(String.format(
+ Constants.TEZ_ENGINE_TASK_INPUT_FILE_FORMAT_STRING,
+ Constants.TASK_OUTPUT_DIR, Integer.valueOf(mapId)), getConf());
+ }
+
+ /**
+ * Create a local reduce input file name.
+ *
+ * @param mapId a map task id
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ @Override
+ public Path getInputFileForWrite(TezTaskID mapId,
+ long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(String.format(
+ Constants.TEZ_ENGINE_TASK_INPUT_FILE_FORMAT_STRING, Constants.TASK_OUTPUT_DIR, mapId.getId()),
+ size, getConf());
+ }
+
+ /** Removes all of the files related to a task. */
+ @Override
+ public void removeAll()
+ throws IOException {
+ deleteLocalFiles(Constants.TASK_OUTPUT_DIR);
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+ }
+
+ private String[] getLocalDirs() throws IOException {
+ return getConf().getStrings(TezJobConfig.LOCAL_DIR);
+ }
+
+ @SuppressWarnings("deprecation")
+ private void deleteLocalFiles(String subdir) throws IOException {
+ String[] localDirs = getLocalDirs();
+ for (int i = 0; i < localDirs.length; i++) {
+ FileSystem.getLocal(getConf()).delete(new Path(localDirs[i], subdir));
+ }
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.task.local.output;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.engine.records.TezTaskID;
+
+/**
+ * Manipulate the working area for the transient store for maps and reduces.
+ *
+ * This class is used by map and reduce tasks to identify the directories that
+ * they need to write to/read from for intermediate files. The callers of
+ * these methods are from child space and see mapreduce.cluster.local.dir as
+ * taskTracker/jobCache/jobId/attemptId
+ * This class should not be used from TaskTracker space.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class TezTaskOutput implements Configurable {
+
+ private Configuration conf;
+
+ public TezTaskOutput() {
+ }
+
+ /**
+ * Return the path to local map output file created earlier
+ *
+ * @return path
+ * @throws IOException
+ */
+ public abstract Path getOutputFile() throws IOException;
+
+ /**
+ * Create a local map output file name.
+ *
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public abstract Path getOutputFileForWrite(long size) throws IOException;
+
+ /**
+ * Create a local map output file name on the same volume.
+ */
+ public abstract Path getOutputFileForWriteInVolume(Path existing);
+
+ /**
+ * Return the path to a local map output index file created earlier
+ *
+ * @return path
+ * @throws IOException
+ */
+ public abstract Path getOutputIndexFile() throws IOException;
+
+ /**
+ * Create a local map output index file name.
+ *
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public abstract Path getOutputIndexFileForWrite(long size) throws IOException;
+
+ /**
+ * Create a local map output index file name on the same volume.
+ */
+ public abstract Path getOutputIndexFileForWriteInVolume(Path existing);
+
+ /**
+ * Return a local map spill file created earlier.
+ *
+ * @param spillNumber the number
+ * @return path
+ * @throws IOException
+ */
+ public abstract Path getSpillFile(int spillNumber) throws IOException;
+
+ /**
+ * Create a local map spill file name.
+ *
+ * @param spillNumber the number
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public abstract Path getSpillFileForWrite(int spillNumber, long size)
+ throws IOException;
+
+ /**
+ * Return a local map spill index file created earlier
+ *
+ * @param spillNumber the number
+ * @return path
+ * @throws IOException
+ */
+ public abstract Path getSpillIndexFile(int spillNumber) throws IOException;
+
+ /**
+ * Create a local map spill index file name.
+ *
+ * @param spillNumber the number
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public abstract Path getSpillIndexFileForWrite(int spillNumber, long size)
+ throws IOException;
+
+ /**
+ * Return a local reduce input file created earlier
+ *
+ * @param mapId a map task id
+ * @return path
+ * @throws IOException
+ */
+ public abstract Path getInputFile(int mapId) throws IOException;
+
+ /**
+ * Create a local reduce input file name.
+ *
+ * @param mapId a map task id
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public abstract Path getInputFileForWrite(
+ TezTaskID mapId, long size) throws IOException;
+
+ /** Removes all of the files related to a task. */
+ public abstract void removeAll() throws IOException;
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java
------------------------------------------------------------------------------
svn:eol-style = native