You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/09/25 00:44:21 UTC
[12/20] Rename tez-engine-api to tez-runtime-api and tez-engine is
split into 2: - tez-engine-library for user-visible Input/Output/Processor
implementations - tez-engine-internals for framework internals
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
deleted file mode 100644
index 6b48270..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
+++ /dev/null
@@ -1,1108 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.common.sort.impl.dflt;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.IntBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.util.IndexedSortable;
-import org.apache.hadoop.util.Progress;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.engine.api.TezOutputContext;
-import org.apache.tez.engine.common.ConfigUtils;
-import org.apache.tez.engine.common.sort.impl.ExternalSorter;
-import org.apache.tez.engine.common.sort.impl.IFile;
-import org.apache.tez.engine.common.sort.impl.TezIndexRecord;
-import org.apache.tez.engine.common.sort.impl.TezMerger;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-import org.apache.tez.engine.common.sort.impl.TezSpillRecord;
-import org.apache.tez.engine.common.sort.impl.IFile.Writer;
-import org.apache.tez.engine.common.sort.impl.TezMerger.Segment;
-
-@SuppressWarnings({"unchecked", "rawtypes"})
-public class DefaultSorter extends ExternalSorter implements IndexedSortable {
-
- private static final Log LOG = LogFactory.getLog(DefaultSorter.class);
-
- // TODO NEWTEZ Progress reporting to Tez framework. (making progress vs %complete)
-
- /**
- * The size of each record in the index file for the map-outputs.
- */
- public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
-
- private final static int APPROX_HEADER_LENGTH = 150;
-
- // k/v accounting
- IntBuffer kvmeta; // metadata overlay on backing store
- int kvstart; // marks origin of spill metadata
- int kvend; // marks end of spill metadata
- int kvindex; // marks end of fully serialized records
-
- int equator; // marks origin of meta/serialization
- int bufstart; // marks beginning of spill
- int bufend; // marks beginning of collectable
- int bufmark; // marks end of record
- int bufindex; // marks end of collected
- int bufvoid; // marks the point where we should stop
- // reading at the end of the buffer
-
- byte[] kvbuffer; // main output buffer
- private final byte[] b0 = new byte[0];
-
- protected static final int INDEX = 0; // index offset in acct
- protected static final int VALSTART = 1; // val offset in acct
- protected static final int KEYSTART = 2; // key offset in acct
- protected static final int PARTITION = 3; // partition offset in acct
- protected static final int NMETA = 4; // num meta ints
- protected static final int METASIZE = NMETA * 4; // size in bytes
-
- // spill accounting
- int maxRec;
- int softLimit;
- boolean spillInProgress;
- int bufferRemaining;
- volatile Throwable sortSpillException = null;
-
- int numSpills = 0;
- int minSpillsForCombine;
- final ReentrantLock spillLock = new ReentrantLock();
- final Condition spillDone = spillLock.newCondition();
- final Condition spillReady = spillLock.newCondition();
- final BlockingBuffer bb = new BlockingBuffer();
- volatile boolean spillThreadRunning = false;
- final SpillThread spillThread = new SpillThread();
-
- final ArrayList<TezSpillRecord> indexCacheList =
- new ArrayList<TezSpillRecord>();
- private int totalIndexCacheMemory;
- private int indexCacheMemoryLimit;
-
- @Override
- public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException {
- super.initialize(outputContext, conf, numOutputs);
-
- // sanity checks
- final float spillper = this.conf.getFloat(
- TezJobConfig.TEZ_ENGINE_SORT_SPILL_PERCENT,
- TezJobConfig.DEFAULT_TEZ_ENGINE_SORT_SPILL_PERCENT);
- final int sortmb = this.conf.getInt(TezJobConfig.TEZ_ENGINE_IO_SORT_MB,
- TezJobConfig.DEFAULT_TEZ_ENGINE_IO_SORT_MB);
- if (spillper > (float) 1.0 || spillper <= (float) 0.0) {
- throw new IOException("Invalid \""
- + TezJobConfig.TEZ_ENGINE_SORT_SPILL_PERCENT + "\": " + spillper);
- }
- if ((sortmb & 0x7FF) != sortmb) {
- throw new IOException("Invalid \"" + TezJobConfig.TEZ_ENGINE_IO_SORT_MB
- + "\": " + sortmb);
- }
-
- indexCacheMemoryLimit = this.conf.getInt(TezJobConfig.TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES,
- TezJobConfig.DEFAULT_TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES);
-
- // buffers and accounting
- int maxMemUsage = sortmb << 20;
- maxMemUsage -= maxMemUsage % METASIZE;
- kvbuffer = new byte[maxMemUsage];
- bufvoid = kvbuffer.length;
- kvmeta = ByteBuffer.wrap(kvbuffer)
- .order(ByteOrder.nativeOrder())
- .asIntBuffer();
- setEquator(0);
- bufstart = bufend = bufindex = equator;
- kvstart = kvend = kvindex;
-
- maxRec = kvmeta.capacity() / NMETA;
- softLimit = (int)(kvbuffer.length * spillper);
- bufferRemaining = softLimit;
- if (LOG.isInfoEnabled()) {
- LOG.info(TezJobConfig.TEZ_ENGINE_IO_SORT_MB + ": " + sortmb);
- LOG.info("soft limit at " + softLimit);
- LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
- LOG.info("kvstart = " + kvstart + "; length = " + maxRec);
- }
-
- // k/v serialization
- valSerializer.open(bb);
- keySerializer.open(bb);
-
- spillInProgress = false;
- minSpillsForCombine = this.conf.getInt(TezJobConfig.TEZ_ENGINE_COMBINE_MIN_SPILLS, 3);
- spillThread.setDaemon(true);
- spillThread.setName("SpillThread");
- spillLock.lock();
- try {
- spillThread.start();
- while (!spillThreadRunning) {
- spillDone.await();
- }
- } catch (InterruptedException e) {
- throw new IOException("Spill thread failed to initialize", e);
- } finally {
- spillLock.unlock();
- }
- if (sortSpillException != null) {
- throw new IOException("Spill thread failed to initialize",
- sortSpillException);
- }
- }
-
- @Override
- public void write(Object key, Object value)
- throws IOException {
- collect(
- key, value, partitioner.getPartition(key, value, partitions));
- }
-
- /**
- * Serialize the key, value to intermediate storage.
- * When this method returns, kvindex must refer to sufficient unused
- * storage to store one METADATA.
- */
- synchronized void collect(Object key, Object value, final int partition
- ) throws IOException {
-
- if (key.getClass() != keyClass) {
- throw new IOException("Type mismatch in key from map: expected "
- + keyClass.getName() + ", received "
- + key.getClass().getName());
- }
- if (value.getClass() != valClass) {
- throw new IOException("Type mismatch in value from map: expected "
- + valClass.getName() + ", received "
- + value.getClass().getName());
- }
- if (partition < 0 || partition >= partitions) {
- throw new IOException("Illegal partition for " + key + " (" +
- partition + ")" + ", TotalPartitions: " + partitions);
- }
- checkSpillException();
- bufferRemaining -= METASIZE;
- if (bufferRemaining <= 0) {
- // start spill if the thread is not running and the soft limit has been
- // reached
- spillLock.lock();
- try {
- do {
- if (!spillInProgress) {
- final int kvbidx = 4 * kvindex;
- final int kvbend = 4 * kvend;
- // serialized, unspilled bytes always lie between kvindex and
- // bufindex, crossing the equator. Note that any void space
- // created by a reset must be included in "used" bytes
- final int bUsed = distanceTo(kvbidx, bufindex);
- final boolean bufsoftlimit = bUsed >= softLimit;
- if ((kvbend + METASIZE) % kvbuffer.length !=
- equator - (equator % METASIZE)) {
- // spill finished, reclaim space
- resetSpill();
- bufferRemaining = Math.min(
- distanceTo(bufindex, kvbidx) - 2 * METASIZE,
- softLimit - bUsed) - METASIZE;
- continue;
- } else if (bufsoftlimit && kvindex != kvend) {
- // spill records, if any collected; check latter, as it may
- // be possible for metadata alignment to hit spill pcnt
- startSpill();
- final int avgRec = (int)
- (mapOutputByteCounter.getValue() /
- mapOutputRecordCounter.getValue());
- // leave at least half the split buffer for serialization data
- // ensure that kvindex >= bufindex
- final int distkvi = distanceTo(bufindex, kvbidx);
- final int newPos = (bufindex +
- Math.max(2 * METASIZE - 1,
- Math.min(distkvi / 2,
- distkvi / (METASIZE + avgRec) * METASIZE)))
- % kvbuffer.length;
- setEquator(newPos);
- bufmark = bufindex = newPos;
- final int serBound = 4 * kvend;
- // bytes remaining before the lock must be held and limits
- // checked is the minimum of three arcs: the metadata space, the
- // serialization space, and the soft limit
- bufferRemaining = Math.min(
- // metadata max
- distanceTo(bufend, newPos),
- Math.min(
- // serialization max
- distanceTo(newPos, serBound),
- // soft limit
- softLimit)) - 2 * METASIZE;
- }
- }
- } while (false);
- } finally {
- spillLock.unlock();
- }
- }
-
- try {
- // serialize key bytes into buffer
- int keystart = bufindex;
- keySerializer.serialize(key);
- if (bufindex < keystart) {
- // wrapped the key; must make contiguous
- bb.shiftBufferedKey();
- keystart = 0;
- }
- // serialize value bytes into buffer
- final int valstart = bufindex;
- valSerializer.serialize(value);
- // It's possible for records to have zero length, i.e. the serializer
- // will perform no writes. To ensure that the boundary conditions are
- // checked and that the kvindex invariant is maintained, perform a
- // zero-length write into the buffer. The logic monitoring this could be
- // moved into collect, but this is cleaner and inexpensive. For now, it
- // is acceptable.
- bb.write(b0, 0, 0);
-
- // the record must be marked after the preceding write, as the metadata
- // for this record are not yet written
- int valend = bb.markRecord();
-
- mapOutputRecordCounter.increment(1);
- mapOutputByteCounter.increment(
- distanceTo(keystart, valend, bufvoid));
-
- // write accounting info
- kvmeta.put(kvindex + INDEX, kvindex);
- kvmeta.put(kvindex + PARTITION, partition);
- kvmeta.put(kvindex + KEYSTART, keystart);
- kvmeta.put(kvindex + VALSTART, valstart);
- // advance kvindex
- kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();
- } catch (MapBufferTooSmallException e) {
- LOG.info("Record too large for in-memory buffer: " + e.getMessage());
- spillSingleRecord(key, value, partition);
- mapOutputRecordCounter.increment(1);
- return;
- }
- }
-
- /**
- * Set the point from which meta and serialization data expand. The meta
- * indices are aligned with the buffer, so metadata never spans the ends of
- * the circular buffer.
- */
- private void setEquator(int pos) {
- equator = pos;
- // set index prior to first entry, aligned at meta boundary
- final int aligned = pos - (pos % METASIZE);
- kvindex =
- ((aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
- if (LOG.isInfoEnabled()) {
- LOG.info("(EQUATOR) " + pos + " kvi " + kvindex +
- "(" + (kvindex * 4) + ")");
- }
- }
-
- /**
- * The spill is complete, so set the buffer and meta indices to be equal to
- * the new equator to free space for continuing collection. Note that when
- * kvindex == kvend == kvstart, the buffer is empty.
- */
- private void resetSpill() {
- final int e = equator;
- bufstart = bufend = e;
- final int aligned = e - (e % METASIZE);
- // set start/end to point to first meta record
- kvstart = kvend =
- ((aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
- if (LOG.isInfoEnabled()) {
- LOG.info("(RESET) equator " + e + " kv " + kvstart + "(" +
- (kvstart * 4) + ")" + " kvi " + kvindex + "(" + (kvindex * 4) + ")");
- }
- }
-
- /**
- * Compute the distance in bytes between two indices in the serialization
- * buffer.
- * @see #distanceTo(int,int,int)
- */
- final int distanceTo(final int i, final int j) {
- return distanceTo(i, j, kvbuffer.length);
- }
-
- /**
- * Compute the distance between two indices in the circular buffer given the
- * max distance.
- */
- int distanceTo(final int i, final int j, final int mod) {
- return i <= j
- ? j - i
- : mod - i + j;
- }
-
- /**
- * For the given meta position, return the dereferenced position in the
- * integer array. Each meta block contains several integers describing
- * record data in its serialized form, but the INDEX is not necessarily
- * related to the proximate metadata. The index value at the referenced int
- * position is the start offset of the associated metadata block. So the
- * metadata INDEX at metapos may point to the metadata described by the
- * metadata block at metapos + k, which contains information about that
- * serialized record.
- */
- int offsetFor(int metapos) {
- return kvmeta.get((metapos % maxRec) * NMETA + INDEX);
- }
-
- /**
- * Compare logical range, st i, j MOD offset capacity.
- * Compare by partition, then by key.
- * @see IndexedSortable#compare
- */
- public int compare(final int mi, final int mj) {
- final int kvi = offsetFor(mi);
- final int kvj = offsetFor(mj);
- final int kvip = kvmeta.get(kvi + PARTITION);
- final int kvjp = kvmeta.get(kvj + PARTITION);
- // sort by partition
- if (kvip != kvjp) {
- return kvip - kvjp;
- }
- // sort by key
- return comparator.compare(kvbuffer,
- kvmeta.get(kvi + KEYSTART),
- kvmeta.get(kvi + VALSTART) - kvmeta.get(kvi + KEYSTART),
- kvbuffer,
- kvmeta.get(kvj + KEYSTART),
- kvmeta.get(kvj + VALSTART) - kvmeta.get(kvj + KEYSTART));
- }
-
- /**
- * Swap logical indices st i, j MOD offset capacity.
- * @see IndexedSortable#swap
- */
- public void swap(final int mi, final int mj) {
- final int kvi = (mi % maxRec) * NMETA + INDEX;
- final int kvj = (mj % maxRec) * NMETA + INDEX;
- int tmp = kvmeta.get(kvi);
- kvmeta.put(kvi, kvmeta.get(kvj));
- kvmeta.put(kvj, tmp);
- }
-
- /**
- * Inner class managing the spill of serialized records to disk.
- */
- protected class BlockingBuffer extends DataOutputStream {
-
- public BlockingBuffer() {
- super(new Buffer());
- }
-
- /**
- * Mark end of record. Note that this is required if the buffer is to
- * cut the spill in the proper place.
- */
- public int markRecord() {
- bufmark = bufindex;
- return bufindex;
- }
-
- /**
- * Set position from last mark to end of writable buffer, then rewrite
- * the data between last mark and kvindex.
- * This handles a special case where the key wraps around the buffer.
- * If the key is to be passed to a RawComparator, then it must be
- * contiguous in the buffer. This recopies the data in the buffer back
- * into itself, but starting at the beginning of the buffer. Note that
- * this method should <b>only</b> be called immediately after detecting
- * this condition. To call it at any other time is undefined and would
- * likely result in data loss or corruption.
- * @see #markRecord()
- */
- protected void shiftBufferedKey() throws IOException {
- // spillLock unnecessary; both kvend and kvindex are current
- int headbytelen = bufvoid - bufmark;
- bufvoid = bufmark;
- final int kvbidx = 4 * kvindex;
- final int kvbend = 4 * kvend;
- final int avail =
- Math.min(distanceTo(0, kvbidx), distanceTo(0, kvbend));
- if (bufindex + headbytelen < avail) {
- System.arraycopy(kvbuffer, 0, kvbuffer, headbytelen, bufindex);
- System.arraycopy(kvbuffer, bufvoid, kvbuffer, 0, headbytelen);
- bufindex += headbytelen;
- bufferRemaining -= kvbuffer.length - bufvoid;
- } else {
- byte[] keytmp = new byte[bufindex];
- System.arraycopy(kvbuffer, 0, keytmp, 0, bufindex);
- bufindex = 0;
- out.write(kvbuffer, bufmark, headbytelen);
- out.write(keytmp);
- }
- }
- }
-
- public class Buffer extends OutputStream {
- private final byte[] scratch = new byte[1];
-
- @Override
- public void write(int v)
- throws IOException {
- scratch[0] = (byte)v;
- write(scratch, 0, 1);
- }
-
- /**
- * Attempt to write a sequence of bytes to the collection buffer.
- * This method will block if the spill thread is running and it
- * cannot write.
- * @throws MapBufferTooSmallException if record is too large to
- * deserialize into the collection buffer.
- */
- @Override
- public void write(byte b[], int off, int len)
- throws IOException {
- // must always verify the invariant that at least METASIZE bytes are
- // available beyond kvindex, even when len == 0
- bufferRemaining -= len;
- if (bufferRemaining <= 0) {
- // writing these bytes could exhaust available buffer space or fill
- // the buffer to soft limit. check if spill or blocking are necessary
- boolean blockwrite = false;
- spillLock.lock();
- try {
- do {
- checkSpillException();
-
- final int kvbidx = 4 * kvindex;
- final int kvbend = 4 * kvend;
- // ser distance to key index
- final int distkvi = distanceTo(bufindex, kvbidx);
- // ser distance to spill end index
- final int distkve = distanceTo(bufindex, kvbend);
-
- // if kvindex is closer than kvend, then a spill is neither in
- // progress nor complete and reset since the lock was held. The
- // write should block only if there is insufficient space to
- // complete the current write, write the metadata for this record,
- // and write the metadata for the next record. If kvend is closer,
- // then the write should block if there is too little space for
- // either the metadata or the current write. Note that collect
- // ensures its metadata requirement with a zero-length write
- blockwrite = distkvi <= distkve
- ? distkvi <= len + 2 * METASIZE
- : distkve <= len || distanceTo(bufend, kvbidx) < 2 * METASIZE;
-
- if (!spillInProgress) {
- if (blockwrite) {
- if ((kvbend + METASIZE) % kvbuffer.length !=
- equator - (equator % METASIZE)) {
- // spill finished, reclaim space
- // need to use meta exclusively; zero-len rec & 100% spill
- // pcnt would fail
- resetSpill(); // resetSpill doesn't move bufindex, kvindex
- bufferRemaining = Math.min(
- distkvi - 2 * METASIZE,
- softLimit - distanceTo(kvbidx, bufindex)) - len;
- continue;
- }
- // we have records we can spill; only spill if blocked
- if (kvindex != kvend) {
- startSpill();
- // Blocked on this write, waiting for the spill just
- // initiated to finish. Instead of repositioning the marker
- // and copying the partial record, we set the record start
- // to be the new equator
- setEquator(bufmark);
- } else {
- // We have no buffered records, and this record is too large
- // to write into kvbuffer. We must spill it directly from
- // collect
- final int size = distanceTo(bufstart, bufindex) + len;
- setEquator(0);
- bufstart = bufend = bufindex = equator;
- kvstart = kvend = kvindex;
- bufvoid = kvbuffer.length;
- throw new MapBufferTooSmallException(size + " bytes");
- }
- }
- }
-
- if (blockwrite) {
- // wait for spill
- try {
- while (spillInProgress) {
- spillDone.await();
- }
- } catch (InterruptedException e) {
- throw new IOException(
- "Buffer interrupted while waiting for the writer", e);
- }
- }
- } while (blockwrite);
- } finally {
- spillLock.unlock();
- }
- }
- // here, we know that we have sufficient space to write
- if (bufindex + len > bufvoid) {
- final int gaplen = bufvoid - bufindex;
- System.arraycopy(b, off, kvbuffer, bufindex, gaplen);
- len -= gaplen;
- off += gaplen;
- bufindex = 0;
- }
- System.arraycopy(b, off, kvbuffer, bufindex, len);
- bufindex += len;
- }
- }
-
- @Override
- public void flush() throws IOException {
- LOG.info("Starting flush of map output");
- spillLock.lock();
- try {
- while (spillInProgress) {
- spillDone.await();
- }
- checkSpillException();
-
- final int kvbend = 4 * kvend;
- if ((kvbend + METASIZE) % kvbuffer.length !=
- equator - (equator % METASIZE)) {
- // spill finished
- resetSpill();
- }
- if (kvindex != kvend) {
- kvend = (kvindex + NMETA) % kvmeta.capacity();
- bufend = bufmark;
- if (LOG.isInfoEnabled()) {
- LOG.info("Sorting & Spilling map output");
- LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
- "; bufvoid = " + bufvoid);
- LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) +
- "); kvend = " + kvend + "(" + (kvend * 4) +
- "); length = " + (distanceTo(kvend, kvstart,
- kvmeta.capacity()) + 1) + "/" + maxRec);
- }
- sortAndSpill();
- }
- } catch (InterruptedException e) {
- throw new IOException("Interrupted while waiting for the writer", e);
- } finally {
- spillLock.unlock();
- }
- assert !spillLock.isHeldByCurrentThread();
- // shut down spill thread and wait for it to exit. Since the preceding
- // ensures that it is finished with its work (and sortAndSpill did not
- // throw), we elect to use an interrupt instead of setting a flag.
- // Spilling simultaneously from this thread while the spill thread
- // finishes its work might be both a useful way to extend this and also
- // sufficient motivation for the latter approach.
- try {
- spillThread.interrupt();
- spillThread.join();
- } catch (InterruptedException e) {
- throw new IOException("Spill failed", e);
- }
- // release sort buffer before the merge
- //FIXME
- //kvbuffer = null;
- mergeParts();
- Path outputPath = mapOutputFile.getOutputFile();
- fileOutputByteCounter.increment(rfs.getFileStatus(outputPath).getLen());
- }
-
- @Override
- public void close() throws IOException { }
-
- protected class SpillThread extends Thread {
-
- @Override
- public void run() {
- spillLock.lock();
- spillThreadRunning = true;
- try {
- while (true) {
- spillDone.signal();
- while (!spillInProgress) {
- spillReady.await();
- }
- try {
- spillLock.unlock();
- sortAndSpill();
- } catch (Throwable t) {
- LOG.warn("Got an exception in sortAndSpill", t);
- sortSpillException = t;
- } finally {
- spillLock.lock();
- if (bufend < bufstart) {
- bufvoid = kvbuffer.length;
- }
- kvstart = kvend;
- bufstart = bufend;
- spillInProgress = false;
- }
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } finally {
- spillLock.unlock();
- spillThreadRunning = false;
- }
- }
- }
-
- private void checkSpillException() throws IOException {
- final Throwable lspillException = sortSpillException;
- if (lspillException != null) {
- if (lspillException instanceof Error) {
- final String logMsg = "Task " + outputContext.getUniqueIdentifier()
- + " failed : " + StringUtils.stringifyException(lspillException);
- outputContext.fatalError(lspillException, logMsg);
- }
- throw new IOException("Spill failed", lspillException);
- }
- }
-
- private void startSpill() {
- assert !spillInProgress;
- kvend = (kvindex + NMETA) % kvmeta.capacity();
- bufend = bufmark;
- spillInProgress = true;
- if (LOG.isInfoEnabled()) {
- LOG.info("Spilling map output");
- LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
- "; bufvoid = " + bufvoid);
- LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) +
- "); kvend = " + kvend + "(" + (kvend * 4) +
- "); length = " + (distanceTo(kvend, kvstart,
- kvmeta.capacity()) + 1) + "/" + maxRec);
- }
- spillReady.signal();
- }
-
- int getMetaStart() {
- return kvend / NMETA;
- }
-
- int getMetaEnd() {
- return 1 + // kvend is a valid record
- (kvstart >= kvend
- ? kvstart
- : kvmeta.capacity() + kvstart) / NMETA;
- }
-
- protected void sortAndSpill()
- throws IOException, InterruptedException {
- final int mstart = getMetaStart();
- final int mend = getMetaEnd();
- sorter.sort(this, mstart, mend, nullProgressable);
- spill(mstart, mend);
- }
-
- protected void spill(int mstart, int mend)
- throws IOException, InterruptedException {
-
- //approximate the length of the output file to be the length of the
- //buffer + header lengths for the partitions
- final long size = (bufend >= bufstart
- ? bufend - bufstart
- : (bufvoid - bufend) + bufstart) +
- partitions * APPROX_HEADER_LENGTH;
- FSDataOutputStream out = null;
- try {
- // create spill file
- final TezSpillRecord spillRec = new TezSpillRecord(partitions);
- final Path filename =
- mapOutputFile.getSpillFileForWrite(numSpills, size);
- out = rfs.create(filename);
-
- int spindex = mstart;
- final InMemValBytes value = createInMemValBytes();
- for (int i = 0; i < partitions; ++i) {
- IFile.Writer writer = null;
- try {
- long segmentStart = out.getPos();
- writer = new Writer(conf, out, keyClass, valClass, codec,
- spilledRecordsCounter);
- if (combiner == null) {
- // spill directly
- DataInputBuffer key = new DataInputBuffer();
- while (spindex < mend &&
- kvmeta.get(offsetFor(spindex) + PARTITION) == i) {
- final int kvoff = offsetFor(spindex);
- key.reset(
- kvbuffer,
- kvmeta.get(kvoff + KEYSTART),
- (kvmeta.get(kvoff + VALSTART) - kvmeta.get(kvoff + KEYSTART))
- );
- getVBytesForOffset(kvoff, value);
- writer.append(key, value);
- ++spindex;
- }
- } else {
- int spstart = spindex;
- while (spindex < mend &&
- kvmeta.get(offsetFor(spindex)
- + PARTITION) == i) {
- ++spindex;
- }
- // Note: we would like to avoid the combiner if we've fewer
- // than some threshold of records for a partition
- if (spstart != spindex) {
- TezRawKeyValueIterator kvIter =
- new MRResultIterator(spstart, spindex);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Running combine processor");
- }
- runCombineProcessor(kvIter, writer);
- }
- }
-
- // close the writer
- writer.close();
-
- // record offsets
- final TezIndexRecord rec =
- new TezIndexRecord(
- segmentStart,
- writer.getRawLength(),
- writer.getCompressedLength());
- spillRec.putIndex(rec, i);
-
- writer = null;
- } finally {
- if (null != writer) writer.close();
- }
- }
-
- if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
- // create spill index file
- Path indexFilename =
- mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
- * MAP_OUTPUT_INDEX_RECORD_LENGTH);
- spillRec.writeToFile(indexFilename, conf);
- } else {
- indexCacheList.add(spillRec);
- totalIndexCacheMemory +=
- spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
- }
- LOG.info("Finished spill " + numSpills);
- ++numSpills;
- } finally {
- if (out != null) out.close();
- }
- }
-
- /**
- * Handles the degenerate case where serialization fails to fit in
- * the in-memory buffer, so we must spill the record from collect
- * directly to a spill file. Consider this "losing".
- */
- private void spillSingleRecord(final Object key, final Object value,
- int partition) throws IOException {
- long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
- FSDataOutputStream out = null;
- try {
- // create spill file
- final TezSpillRecord spillRec = new TezSpillRecord(partitions);
- final Path filename =
- mapOutputFile.getSpillFileForWrite(numSpills, size);
- out = rfs.create(filename);
-
- // we don't run the combiner for a single record
- for (int i = 0; i < partitions; ++i) {
- IFile.Writer writer = null;
- try {
- long segmentStart = out.getPos();
- // Create a new codec, don't care!
- writer = new IFile.Writer(conf, out, keyClass, valClass, codec,
- spilledRecordsCounter);
-
- if (i == partition) {
- final long recordStart = out.getPos();
- writer.append(key, value);
- // Note that our map byte count will not be accurate with
- // compression
- mapOutputByteCounter.increment(out.getPos() - recordStart);
- }
- writer.close();
-
- // record offsets
- TezIndexRecord rec =
- new TezIndexRecord(
- segmentStart,
- writer.getRawLength(),
- writer.getCompressedLength());
- spillRec.putIndex(rec, i);
-
- writer = null;
- } catch (IOException e) {
- if (null != writer) writer.close();
- throw e;
- }
- }
- if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
- // create spill index file
- Path indexFilename =
- mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
- * MAP_OUTPUT_INDEX_RECORD_LENGTH);
- spillRec.writeToFile(indexFilename, conf);
- } else {
- indexCacheList.add(spillRec);
- totalIndexCacheMemory +=
- spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
- }
- ++numSpills;
- } finally {
- if (out != null) out.close();
- }
- }
-
- protected int getInMemVBytesLength(int kvoff) {
- // get the keystart for the next serialized value to be the end
- // of this value. If this is the last value in the buffer, use bufend
- final int nextindex = kvoff == kvend
- ? bufend
- : kvmeta.get(
- (kvoff - NMETA + kvmeta.capacity() + KEYSTART) % kvmeta.capacity());
- // calculate the length of the value
- int vallen = (nextindex >= kvmeta.get(kvoff + VALSTART))
- ? nextindex - kvmeta.get(kvoff + VALSTART)
- : (bufvoid - kvmeta.get(kvoff + VALSTART)) + nextindex;
- return vallen;
- }
-
- /**
- * Given an offset, populate vbytes with the associated set of
- * deserialized value bytes. Should only be called during a spill.
- */
- int getVBytesForOffset(int kvoff, InMemValBytes vbytes) {
- int vallen = getInMemVBytesLength(kvoff);
- vbytes.reset(kvbuffer, kvmeta.get(kvoff + VALSTART), vallen);
- return vallen;
- }
-
- /**
- * Inner class wrapping valuebytes, used for appendRaw.
- */
- static class InMemValBytes extends DataInputBuffer {
- private byte[] buffer;
- private int start;
- private int length;
- private final int bufvoid;
-
- public InMemValBytes(int bufvoid) {
- this.bufvoid = bufvoid;
- }
-
- public void reset(byte[] buffer, int start, int length) {
- this.buffer = buffer;
- this.start = start;
- this.length = length;
-
- if (start + length > bufvoid) {
- this.buffer = new byte[this.length];
- final int taillen = bufvoid - start;
- System.arraycopy(buffer, start, this.buffer, 0, taillen);
- System.arraycopy(buffer, 0, this.buffer, taillen, length-taillen);
- this.start = 0;
- }
-
- super.reset(this.buffer, this.start, this.length);
- }
- }
-
- InMemValBytes createInMemValBytes() {
- return new InMemValBytes(bufvoid);
- }
-
- protected class MRResultIterator implements TezRawKeyValueIterator {
- private final DataInputBuffer keybuf = new DataInputBuffer();
- private final InMemValBytes vbytes = createInMemValBytes();
- private final int end;
- private int current;
- public MRResultIterator(int start, int end) {
- this.end = end;
- current = start - 1;
- }
- public boolean next() throws IOException {
- return ++current < end;
- }
- public DataInputBuffer getKey() throws IOException {
- final int kvoff = offsetFor(current);
- keybuf.reset(kvbuffer, kvmeta.get(kvoff + KEYSTART),
- kvmeta.get(kvoff + VALSTART) - kvmeta.get(kvoff + KEYSTART));
- return keybuf;
- }
- public DataInputBuffer getValue() throws IOException {
- getVBytesForOffset(offsetFor(current), vbytes);
- return vbytes;
- }
- public Progress getProgress() {
- return null;
- }
- public void close() { }
- }
-
- private void mergeParts() throws IOException {
- // get the approximate size of the final output/index files
- long finalOutFileSize = 0;
- long finalIndexFileSize = 0;
- final Path[] filename = new Path[numSpills];
- final String taskIdentifier = outputContext.getUniqueIdentifier();
-
- for(int i = 0; i < numSpills; i++) {
- filename[i] = mapOutputFile.getSpillFile(i);
- finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
- }
- if (numSpills == 1) { //the spill is the final output
- sameVolRename(filename[0],
- mapOutputFile.getOutputFileForWriteInVolume(filename[0]));
- if (indexCacheList.size() == 0) {
- sameVolRename(mapOutputFile.getSpillIndexFile(0),
- mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]));
- } else {
- indexCacheList.get(0).writeToFile(
- mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), conf);
- }
- return;
- }
-
- // read in paged indices
- for (int i = indexCacheList.size(); i < numSpills; ++i) {
- Path indexFileName = mapOutputFile.getSpillIndexFile(i);
- indexCacheList.add(new TezSpillRecord(indexFileName, conf));
- }
-
- //make correction in the length to include the sequence file header
- //lengths for each partition
- finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
- finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
- Path finalOutputFile =
- mapOutputFile.getOutputFileForWrite(finalOutFileSize);
- Path finalIndexFile =
- mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);
-
- //The output stream for the final single output file
- FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
-
- if (numSpills == 0) {
- //create dummy files
-
- TezSpillRecord sr = new TezSpillRecord(partitions);
- try {
- for (int i = 0; i < partitions; i++) {
- long segmentStart = finalOut.getPos();
- Writer writer =
- new Writer(conf, finalOut, keyClass, valClass, codec, null);
- writer.close();
-
- TezIndexRecord rec =
- new TezIndexRecord(
- segmentStart,
- writer.getRawLength(),
- writer.getCompressedLength());
- sr.putIndex(rec, i);
- }
- sr.writeToFile(finalIndexFile, conf);
- } finally {
- finalOut.close();
- }
- return;
- }
- else {
- TezMerger.considerFinalMergeForProgress();
-
- final TezSpillRecord spillRec = new TezSpillRecord(partitions);
- for (int parts = 0; parts < partitions; parts++) {
- //create the segments to be merged
- List<Segment> segmentList =
- new ArrayList<Segment>(numSpills);
- for(int i = 0; i < numSpills; i++) {
- TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
-
- Segment s =
- new Segment(conf, rfs, filename[i], indexRecord.getStartOffset(),
- indexRecord.getPartLength(), codec, true);
- segmentList.add(i, s);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("TaskIdentifier=" + taskIdentifier + " Partition=" + parts +
- "Spill =" + i + "(" + indexRecord.getStartOffset() + "," +
- indexRecord.getRawLength() + ", " +
- indexRecord.getPartLength() + ")");
- }
- }
-
- int mergeFactor =
- this.conf.getInt(TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR,
- TezJobConfig.DEFAULT_TEZ_ENGINE_IO_SORT_FACTOR);
- // sort the segments only if there are intermediate merges
- boolean sortSegments = segmentList.size() > mergeFactor;
- //merge
- TezRawKeyValueIterator kvIter = TezMerger.merge(conf, rfs,
- keyClass, valClass, codec,
- segmentList, mergeFactor,
- new Path(taskIdentifier),
- (RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(conf),
- nullProgressable, sortSegments,
- null, spilledRecordsCounter,
- null); // Not using any Progress in TezMerger. Should just work.
-
- //write merged output to disk
- long segmentStart = finalOut.getPos();
- Writer writer =
- new Writer(conf, finalOut, keyClass, valClass, codec,
- spilledRecordsCounter);
- if (combiner == null || numSpills < minSpillsForCombine) {
- TezMerger.writeFile(kvIter, writer,
- nullProgressable, conf);
- } else {
- runCombineProcessor(kvIter, writer);
- }
- writer.close();
-
- // record offsets
- final TezIndexRecord rec =
- new TezIndexRecord(
- segmentStart,
- writer.getRawLength(),
- writer.getCompressedLength());
- spillRec.putIndex(rec, parts);
- }
- spillRec.writeToFile(finalIndexFile, conf);
- finalOut.close();
- for(int i = 0; i < numSpills; i++) {
- rfs.delete(filename[i],true);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java
deleted file mode 100644
index e2b3315..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.common.sort.impl.dflt;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.IntBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.util.DataChecksum;
-import org.apache.tez.engine.api.TezOutputContext;
-import org.apache.tez.engine.common.shuffle.impl.ShuffleHeader;
-import org.apache.tez.engine.common.shuffle.server.ShuffleHandler;
-import org.apache.tez.engine.common.sort.impl.IFile;
-
-public class InMemoryShuffleSorter extends DefaultSorter {
-
- private static final Log LOG = LogFactory.getLog(InMemoryShuffleSorter.class);
-
- static final int IFILE_EOF_LENGTH =
- 2 * WritableUtils.getVIntSize(IFile.EOF_MARKER);
- static final int IFILE_CHECKSUM_LENGTH = DataChecksum.Type.CRC32.size;
-
- private List<Integer> spillIndices = new ArrayList<Integer>();
- private List<ShuffleHeader> shuffleHeaders = new ArrayList<ShuffleHeader>();
-
- ShuffleHandler shuffleHandler = new ShuffleHandler(this);
-
- byte[] kvbuffer;
- IntBuffer kvmeta;
-
- @Override
- public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException {
- super.initialize(outputContext, conf, numOutputs);
- shuffleHandler.initialize(outputContext, conf);
- }
-
- @Override
- protected void spill(int mstart, int mend)
- throws IOException, InterruptedException {
- // Start the shuffleHandler
- shuffleHandler.start();
-
- // Don't spill!
-
- // Make a copy
- this.kvbuffer = super.kvbuffer;
- this.kvmeta = super.kvmeta;
-
- // Just save spill-indices for serving later
- int spindex = mstart;
- for (int i = 0; i < partitions; ++i) {
- spillIndices.add(spindex);
-
- int length = 0;
- while (spindex < mend &&
- kvmeta.get(offsetFor(spindex) + PARTITION) == i) {
-
- final int kvoff = offsetFor(spindex);
- int keyLen =
- kvmeta.get(kvoff + VALSTART) - kvmeta.get(kvoff + KEYSTART);
- int valLen = getInMemVBytesLength(kvoff);
- length +=
- (keyLen + WritableUtils.getVIntSize(keyLen)) +
- (valLen + WritableUtils.getVIntSize(valLen));
-
- ++spindex;
- }
- length += IFILE_EOF_LENGTH;
-
- shuffleHeaders.add(
- new ShuffleHeader(
- outputContext.getUniqueIdentifier(), // TODO Verify that this is correct.
- length + IFILE_CHECKSUM_LENGTH, length, i)
- );
- LOG.info("shuffleHeader[" + i + "]:" +
- " rawLen=" + length + " partLen=" + (length + IFILE_CHECKSUM_LENGTH) +
- " spillIndex=" + spillIndices.get(i));
- }
-
- LOG.info("Saved " + spillIndices.size() + " spill-indices and " +
- shuffleHeaders.size() + " shuffle headers");
- }
-
- @Override
- public InputStream getSortedStream(int partition) {
- return new SortBufferInputStream(this, partition);
- }
-
- @Override
- public void close() throws IOException {
- // FIXME
- //shuffleHandler.stop();
- }
-
- @Override
- public ShuffleHeader getShuffleHeader(int reduce) {
- return shuffleHeaders.get(reduce);
- }
-
- public int getSpillIndex(int partition) {
- return spillIndices.get(partition);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/SortBufferInputStream.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/SortBufferInputStream.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/SortBufferInputStream.java
deleted file mode 100644
index d74e159..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/SortBufferInputStream.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.common.sort.impl.dflt;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.IntBuffer;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.BoundedByteArrayOutputStream;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.tez.engine.common.shuffle.impl.InMemoryWriter;
-import org.apache.tez.engine.common.sort.impl.dflt.DefaultSorter.InMemValBytes;
-
- public class SortBufferInputStream extends InputStream {
-
- private static final Log LOG = LogFactory.getLog(SortBufferInputStream.class);
-
- private final InMemoryShuffleSorter sorter;
- private InMemoryWriter sortOutput;
-
- private int mend;
- private int recIndex;
- private final byte[] kvbuffer;
- private final IntBuffer kvmeta;
- private final int partitionBytes;
- private final int partition;
-
- byte[] dualBuf = new byte[8192];
- DualBufferOutputStream out;
- private int readBytes = 0;
-
- public SortBufferInputStream(
- InMemoryShuffleSorter sorter, int partition) {
- this.sorter = sorter;
- this.partitionBytes =
- (int)sorter.getShuffleHeader(partition).getCompressedLength();
- this.partition = partition;
- this.mend = sorter.getMetaEnd();
- this.recIndex = sorter.getSpillIndex(partition);
- this.kvbuffer = sorter.kvbuffer;
- this.kvmeta = sorter.kvmeta;
- out = new DualBufferOutputStream(null, 0, 0, dualBuf);
- sortOutput = new InMemoryWriter(out);
- }
-
- byte[] one = new byte[1];
-
- @Override
- public int read() throws IOException {
- int b = read(one, 0, 1);
- return (b == -1) ? b : one[0];
- }
-
- @Override
- public int read(byte[] b) throws IOException {
- return read(b, 0, b.length);
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- if (available() == 0) {
- return -1;
- }
-
- int currentOffset = off;
- int currentLength = len;
- int currentReadBytes = 0;
-
- // Check if there is residual data in the dualBuf
- int residualLen = out.getCurrent();
- if (residualLen > 0) {
- int readable = Math.min(currentLength, residualLen);
- System.arraycopy(dualBuf, 0, b, currentOffset, readable);
- currentOffset += readable;
- currentReadBytes += readable;
- out.setCurrentPointer(-readable);
-
- // buffer has less capacity
- currentLength -= readable;
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("XXX read_residual:" +
- " readable=" + readable +
- " readBytes=" + readBytes);
- }
- }
-
- // Now, use the provided buffer
- if (LOG.isDebugEnabled()) {
- LOG.debug("XXX read: out.reset" +
- " b=" + b +
- " currentOffset=" + currentOffset +
- " currentLength=" + currentLength +
- " recIndex=" + recIndex);
- }
- out.reset(b, currentOffset, currentLength);
-
- // Read from sort-buffer into the provided buffer, space permitting
- DataInputBuffer key = new DataInputBuffer();
- final InMemValBytes value = sorter.createInMemValBytes();
-
- int kvPartition = 0;
- int numRec = 0;
- for (;
- currentLength > 0 && recIndex < mend &&
- (kvPartition = getKVPartition(recIndex)) == partition;
- ++recIndex) {
-
- final int kvoff = sorter.offsetFor(recIndex);
-
- int keyLen =
- (kvmeta.get(kvoff + InMemoryShuffleSorter.VALSTART) -
- kvmeta.get(kvoff + InMemoryShuffleSorter.KEYSTART));
- key.reset(
- kvbuffer,
- kvmeta.get(kvoff + InMemoryShuffleSorter.KEYSTART),
- keyLen
- );
-
- int valLen = sorter.getVBytesForOffset(kvoff, value);
-
- int recLen =
- (keyLen + WritableUtils.getVIntSize(keyLen)) +
- (valLen + WritableUtils.getVIntSize(valLen));
-
- currentReadBytes += recLen;
- currentOffset += recLen;
- currentLength -= recLen;
-
- // Write out key/value into the in-mem ifile
- if (LOG.isDebugEnabled()) {
- LOG.debug("XXX read: sortOutput.append" +
- " #rec=" + ++numRec +
- " recIndex=" + recIndex + " kvoff=" + kvoff +
- " keyLen=" + keyLen + " valLen=" + valLen + " recLen=" + recLen +
- " readBytes=" + readBytes +
- " currentReadBytes=" + currentReadBytes +
- " currentLength=" + currentLength);
- }
- sortOutput.append(key, value);
- }
-
- // If we are at the end of the segment, close the ifile
- if (currentLength > 0 &&
- (recIndex == mend || kvPartition != partition)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("XXX About to call close:" +
- " currentLength=" + currentLength +
- " recIndex=" + recIndex + " mend=" + mend +
- " kvPartition=" + kvPartition + " partitino=" + partition);
- }
- sortOutput.close();
- currentReadBytes +=
- (InMemoryShuffleSorter.IFILE_EOF_LENGTH +
- InMemoryShuffleSorter.IFILE_CHECKSUM_LENGTH);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("XXX Hmm..." +
- " currentLength=" + currentLength +
- " recIndex=" + recIndex + " mend=" + mend +
- " kvPartition=" + kvPartition + " partitino=" + partition);
- }
- }
-
- int retVal = Math.min(currentReadBytes, len);
- readBytes += retVal;
- if (LOG.isDebugEnabled()) {
- LOG.debug("XXX read: done" +
- " retVal=" + retVal +
- " currentReadBytes=" + currentReadBytes +
- " len=" + len +
- " readBytes=" + readBytes +
- " partitionBytes=" + partitionBytes +
- " residualBytes=" + out.getCurrent());
- }
- return retVal;
- }
-
- private int getKVPartition(int recIndex) {
- return kvmeta.get(
- sorter.offsetFor(recIndex) + InMemoryShuffleSorter.PARTITION);
- }
-
- @Override
- public int available() throws IOException {
- return (partitionBytes - readBytes);
- }
-
- @Override
- public void close() throws IOException {
- super.close();
- }
-
- @Override
- public boolean markSupported() {
- return false;
- }
-
- static class DualBufferOutputStream extends BoundedByteArrayOutputStream {
-
- byte[] dualBuf;
- int currentPointer = 0;
- byte[] one = new byte[1];
-
- public DualBufferOutputStream(
- byte[] buf, int offset, int length,
- byte[] altBuf) {
- super(buf, offset, length);
- this.dualBuf = altBuf;
- }
-
- public void reset(byte[] b, int off, int len) {
- super.resetBuffer(b, off, len);
- }
-
- @Override
- public void write(int b) throws IOException {
- one[0] = (byte)b;
- write(one, 0, 1);
- }
-
- @Override
- public void write(byte[] b) throws IOException {
- write(b, 0, b.length);
- }
-
- @Override
- public void write(byte[] b, int off, int len) throws IOException {
- int available = super.available();
- if (available >= len) {
- super.write(b, off, len);
- } else {
- super.write(b, off, available);
- System.arraycopy(b, off+available, dualBuf, currentPointer, len-available);
- currentPointer += (len - available);
- }
- }
-
- int getCurrent() {
- return currentPointer;
- }
-
- void setCurrentPointer(int delta) {
- if ((currentPointer + delta) > dualBuf.length) {
- throw new IndexOutOfBoundsException("Trying to set dualBuf 'current'" +
- " marker to " + (currentPointer+delta) + " when " +
- " dualBuf.length is " + dualBuf.length);
- }
- currentPointer = (currentPointer + delta) % dualBuf.length;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/ValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/ValuesIterator.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/ValuesIterator.java
deleted file mode 100644
index 841e54d..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/ValuesIterator.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.common.task.impl;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.util.Progressable;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-
-
-/**
- * Iterates values while keys match in sorted input.
- *
- * Usage: Call moveToNext to move to the next k, v pair. This returns true if another exists,
- * followed by getKey() and getValues() to get the current key and list of values.
- *
- */
-public class ValuesIterator<KEY,VALUE> implements Iterator<VALUE> {
- protected TezRawKeyValueIterator in; //input iterator
- private KEY key; // current key
- private KEY nextKey;
- private VALUE value; // current value
- private boolean hasNext; // more w/ this key
- private boolean more; // more in file
- private RawComparator<KEY> comparator;
- protected Progressable reporter;
- private Deserializer<KEY> keyDeserializer;
- private Deserializer<VALUE> valDeserializer;
- private DataInputBuffer keyIn = new DataInputBuffer();
- private DataInputBuffer valueIn = new DataInputBuffer();
-
- public ValuesIterator (TezRawKeyValueIterator in,
- RawComparator<KEY> comparator,
- Class<KEY> keyClass,
- Class<VALUE> valClass, Configuration conf,
- Progressable reporter)
- throws IOException {
- this.in = in;
- this.comparator = comparator;
- this.reporter = reporter;
- SerializationFactory serializationFactory = new SerializationFactory(conf);
- this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
- this.keyDeserializer.open(keyIn);
- this.valDeserializer = serializationFactory.getDeserializer(valClass);
- this.valDeserializer.open(this.valueIn);
- readNextKey();
- key = nextKey;
- nextKey = null; // force new instance creation
- hasNext = more;
- }
-
- TezRawKeyValueIterator getRawIterator() { return in; }
-
- /// Iterator methods
-
- public boolean hasNext() { return hasNext; }
-
- private int ctr = 0;
- public VALUE next() {
- if (!hasNext) {
- throw new NoSuchElementException("iterate past last value");
- }
- try {
- readNextValue();
- readNextKey();
- } catch (IOException ie) {
- throw new RuntimeException("problem advancing post rec#"+ctr, ie);
- }
- reporter.progress();
- return value;
- }
-
- public void remove() { throw new RuntimeException("not implemented"); }
-
- /// Auxiliary methods
-
- /** Start processing next unique key. */
- public void nextKey() throws IOException {
- // read until we find a new key
- while (hasNext) {
- readNextKey();
- }
- ++ctr;
-
- // move the next key to the current one
- KEY tmpKey = key;
- key = nextKey;
- nextKey = tmpKey;
- hasNext = more;
- }
-
- /** True iff more keys remain. */
- public boolean more() {
- return more;
- }
-
- /** The current key. */
- public KEY getKey() {
- return key;
- }
-
- /**
- * read the next key
- */
- private void readNextKey() throws IOException {
- more = in.next();
- if (more) {
- DataInputBuffer nextKeyBytes = in.getKey();
- keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(), nextKeyBytes.getLength());
- nextKey = keyDeserializer.deserialize(nextKey);
- hasNext = key != null && (comparator.compare(key, nextKey) == 0);
- } else {
- hasNext = false;
- }
- }
-
- /**
- * Read the next value
- * @throws IOException
- */
- private void readNextValue() throws IOException {
- DataInputBuffer nextValueBytes = in.getValue();
- valueIn.reset(nextValueBytes.getData(), nextValueBytes.getPosition(), nextValueBytes.getLength());
- value = valDeserializer.deserialize(value);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java
deleted file mode 100644
index 40e6b1a..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.task.local.output;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
-import org.apache.tez.common.Constants;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-
-/**
- * Manipulate the working area for the transient store for maps and reduces.
- *
- * This class is used by map and reduce tasks to identify the directories that
- * they need to write to/read from for intermediate files. The callers of
- * these methods are from the Child running the Task.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class TezLocalTaskOutputFiles extends TezTaskOutput {
-
- public TezLocalTaskOutputFiles(Configuration conf, String uniqueId) {
- super(conf, uniqueId);
- }
-
- private LocalDirAllocator lDirAlloc =
- new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
-
-
- /**
- * Return the path to local map output file created earlier
- *
- * @return path
- * @throws IOException
- */
- @Override
- public Path getOutputFile()
- throws IOException {
- return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
- + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING, conf);
- }
-
- /**
- * Create a local map output file name.
- *
- * @param size the size of the file
- * @return path
- * @throws IOException
- */
- @Override
- public Path getOutputFileForWrite(long size)
- throws IOException {
- return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
- + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING, size, conf);
- }
-
- /**
- * Create a local map output file name. This should *only* be used if the size
- * of the file is not known. Otherwise use the equivalent which accepts a size
- * parameter.
- *
- * @return path
- * @throws IOException
- */
- @Override
- public Path getOutputFileForWrite() throws IOException {
- return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR
- + Path.SEPARATOR + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING,
- conf);
- }
-
- /**
- * Create a local map output file name on the same volume.
- */
- @Override
- public Path getOutputFileForWriteInVolume(Path existing) {
- return new Path(existing.getParent(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
- }
-
- /**
- * Return the path to a local map output index file created earlier
- *
- * @return path
- * @throws IOException
- */
- @Override
- public Path getOutputIndexFile()
- throws IOException {
- return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
- + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING,
- conf);
- }
-
- /**
- * Create a local map output index file name.
- *
- * @param size the size of the file
- * @return path
- * @throws IOException
- */
- @Override
- public Path getOutputIndexFileForWrite(long size)
- throws IOException {
- return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
- + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING,
- size, conf);
- }
-
- /**
- * Create a local map output index file name on the same volume.
- */
- @Override
- public Path getOutputIndexFileForWriteInVolume(Path existing) {
- return new Path(existing.getParent(),
- Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING);
- }
-
- /**
- * Return a local map spill file created earlier.
- *
- * @param spillNumber the number
- * @return path
- * @throws IOException
- */
- @Override
- public Path getSpillFile(int spillNumber)
- throws IOException {
- return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + "/spill"
- + spillNumber + ".out", conf);
- }
-
- /**
- * Create a local map spill file name.
- *
- * @param spillNumber the number
- * @param size the size of the file
- * @return path
- * @throws IOException
- */
- @Override
- public Path getSpillFileForWrite(int spillNumber, long size)
- throws IOException {
- return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + "/spill"
- + spillNumber + ".out", size, conf);
- }
-
- /**
- * Return a local map spill index file created earlier
- *
- * @param spillNumber the number
- * @return path
- * @throws IOException
- */
- @Override
- public Path getSpillIndexFile(int spillNumber)
- throws IOException {
- return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + "/spill"
- + spillNumber + ".out.index", conf);
- }
-
- /**
- * Create a local map spill index file name.
- *
- * @param spillNumber the number
- * @param size the size of the file
- * @return path
- * @throws IOException
- */
- @Override
- public Path getSpillIndexFileForWrite(int spillNumber, long size)
- throws IOException {
- return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + "/spill"
- + spillNumber + ".out.index", size, conf);
- }
-
- /**
- * Return a local reduce input file created earlier
- *
- * @param mapId a map task id
- * @return path
- * @throws IOException
- */
- @Override
- public Path getInputFile(InputAttemptIdentifier mapId)
- throws IOException {
- return lDirAlloc.getLocalPathToRead(String.format(
- Constants.TEZ_ENGINE_TASK_INPUT_FILE_FORMAT_STRING,
- Constants.TASK_OUTPUT_DIR, Integer.valueOf(mapId.getInputIdentifier().getSrcTaskIndex())), conf);
- }
-
- /**
- * Create a local reduce input file name.
- *
- * @param mapId a map task id
- * @param size the size of the file
- * @return path
- * @throws IOException
- */
- @Override
- public Path getInputFileForWrite(int taskId,
- long size)
- throws IOException {
- return lDirAlloc.getLocalPathForWrite(String.format(
- Constants.TEZ_ENGINE_TASK_INPUT_FILE_FORMAT_STRING, Constants.TASK_OUTPUT_DIR, taskId),
- size, conf);
- }
-
- /** Removes all of the files related to a task. */
- @Override
- public void removeAll()
- throws IOException {
- deleteLocalFiles(Constants.TASK_OUTPUT_DIR);
- }
-
- private String[] getLocalDirs() throws IOException {
- return conf.getStrings(TezJobConfig.LOCAL_DIRS);
- }
-
- @SuppressWarnings("deprecation")
- private void deleteLocalFiles(String subdir) throws IOException {
- String[] localDirs = getLocalDirs();
- for (int i = 0; i < localDirs.length; i++) {
- FileSystem.getLocal(conf).delete(new Path(localDirs[i], subdir));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java
deleted file mode 100644
index e1d83ad..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.task.local.output;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-
-/**
- * Manipulate the working area for the transient store for maps and reduces.
- *
- * This class is used by map and reduce tasks to identify the directories that
- * they need to write to/read from for intermediate files. The callers of
- * these methods are from child space and see mapreduce.cluster.local.dir as
- * taskTracker/jobCache/jobId/attemptId
- * This class should not be used from TaskTracker space.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public abstract class TezTaskOutput {
-
- protected Configuration conf;
- protected String uniqueId;
-
- public TezTaskOutput(Configuration conf, String uniqueId) {
- this.conf = conf;
- this.uniqueId = uniqueId;
- }
-
- /**
- * Return the path to local map output file created earlier
- *
- * @return path
- * @throws IOException
- */
- public abstract Path getOutputFile() throws IOException;
-
- /**
- * Create a local map output file name.
- *
- * @param size the size of the file
- * @return path
- * @throws IOException
- */
- public abstract Path getOutputFileForWrite(long size) throws IOException;
-
- /**
- * Create a local output file name. This method is meant to be used *only* if
- * the size of the file is not know up front.
- *
- * @return path
- * @throws IOException
- */
- public abstract Path getOutputFileForWrite() throws IOException;
-
- /**
- * Create a local map output file name on the same volume.
- */
- public abstract Path getOutputFileForWriteInVolume(Path existing);
-
- /**
- * Return the path to a local map output index file created earlier
- *
- * @return path
- * @throws IOException
- */
- public abstract Path getOutputIndexFile() throws IOException;
-
- /**
- * Create a local map output index file name.
- *
- * @param size the size of the file
- * @return path
- * @throws IOException
- */
- public abstract Path getOutputIndexFileForWrite(long size) throws IOException;
-
- /**
- * Create a local map output index file name on the same volume.
- */
- public abstract Path getOutputIndexFileForWriteInVolume(Path existing);
-
- /**
- * Return a local map spill file created earlier.
- *
- * @param spillNumber the number
- * @return path
- * @throws IOException
- */
- public abstract Path getSpillFile(int spillNumber) throws IOException;
-
- /**
- * Create a local map spill file name.
- *
- * @param spillNumber the number
- * @param size the size of the file
- * @return path
- * @throws IOException
- */
- public abstract Path getSpillFileForWrite(int spillNumber, long size)
- throws IOException;
-
- /**
- * Return a local map spill index file created earlier
- *
- * @param spillNumber the number
- * @return path
- * @throws IOException
- */
- public abstract Path getSpillIndexFile(int spillNumber) throws IOException;
-
- /**
- * Create a local map spill index file name.
- *
- * @param spillNumber the number
- * @param size the size of the file
- * @return path
- * @throws IOException
- */
- public abstract Path getSpillIndexFileForWrite(int spillNumber, long size)
- throws IOException;
-
- /**
- * Return a local reduce input file created earlier
- *
- * @param attemptIdentifier The identifier for the source task
- * @return path
- * @throws IOException
- */
- public abstract Path getInputFile(InputAttemptIdentifier attemptIdentifier) throws IOException;
-
- /**
- * Create a local reduce input file name.
- *
- * @param taskIdentifier The identifier for the source task
- * @param size the size of the file
- * @return path
- * @throws IOException
- */
- public abstract Path getInputFileForWrite(
- int taskIdentifier, long size) throws IOException;
-
- /** Removes all of the files related to a task. */
- public abstract void removeAll() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java
deleted file mode 100644
index b8f051b..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.task.local.output;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
-import org.apache.tez.common.Constants;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-
-/**
- * Manipulate the working area for the transient store for maps and reduces.
- *
- * This class is used by map and reduce tasks to identify the directories that
- * they need to write to/read from for intermediate files. The callers of
- * these methods are from child space and see mapreduce.cluster.local.dir as
- * taskTracker/jobCache/jobId/attemptId
- * This class should not be used from TaskTracker space.
- */
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class TezTaskOutputFiles extends TezTaskOutput {
-
- public TezTaskOutputFiles(Configuration conf, String uniqueId) {
- super(conf, uniqueId);
- }
-
- private static final Log LOG = LogFactory.getLog(TezTaskOutputFiles.class);
-
- private static final String SPILL_FILE_PATTERN = "%s_spill_%d.out";
- private static final String SPILL_INDEX_FILE_PATTERN = SPILL_FILE_PATTERN
- + ".index";
-
-
-
- // assume configured to $localdir/usercache/$user/appcache/$appId
- private LocalDirAllocator lDirAlloc =
- new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
-
-
- private Path getAttemptOutputDir() {
- if (LOG.isDebugEnabled()) {
- LOG.debug("getAttemptOutputDir: "
- + Constants.TASK_OUTPUT_DIR + "/"
- + uniqueId);
- }
- return new Path(Constants.TASK_OUTPUT_DIR, uniqueId);
- }
-
- /**
- * Return the path to local map output file created earlier
- *
- * @return path
- * @throws IOException
- */
- public Path getOutputFile() throws IOException {
- Path attemptOutput =
- new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
- return lDirAlloc.getLocalPathToRead(attemptOutput.toString(), conf);
- }
-
- /**
- * Create a local map output file name.
- *
- * @param size the size of the file
- * @return path
- * @throws IOException
- */
- public Path getOutputFileForWrite(long size) throws IOException {
- Path attemptOutput =
- new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
- return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), size, conf);
- }
-
- /**
- * Create a local map output file name. This should *only* be used if the size
- * of the file is not known. Otherwise use the equivalent which accepts a size
- * parameter.
- *
- * @return path
- * @throws IOException
- */
- public Path getOutputFileForWrite() throws IOException {
- Path attemptOutput =
- new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
- return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), conf);
- }
-
- /**
- * Create a local map output file name on the same volume.
- */
- public Path getOutputFileForWriteInVolume(Path existing) {
- Path outputDir = new Path(existing.getParent(), Constants.TASK_OUTPUT_DIR);
- Path attemptOutputDir = new Path(outputDir, uniqueId);
- return new Path(attemptOutputDir, Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
- }
-
- /**
- * Return the path to a local map output index file created earlier
- *
- * @return path
- * @throws IOException
- */
- public Path getOutputIndexFile() throws IOException {
- Path attemptIndexOutput =
- new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING +
- Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING);
- return lDirAlloc.getLocalPathToRead(attemptIndexOutput.toString(), conf);
- }
-
- /**
- * Create a local map output index file name.
- *
- * @param size the size of the file
- * @return path
- * @throws IOException
- */
- public Path getOutputIndexFileForWrite(long size) throws IOException {
- Path attemptIndexOutput =
- new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING +
- Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING);
- return lDirAlloc.getLocalPathForWrite(attemptIndexOutput.toString(),
- size, conf);
- }
-
- /**
- * Create a local map output index file name on the same volume.
- */
- public Path getOutputIndexFileForWriteInVolume(Path existing) {
- Path outputDir = new Path(existing.getParent(), Constants.TASK_OUTPUT_DIR);
- Path attemptOutputDir = new Path(outputDir, uniqueId);
- return new Path(attemptOutputDir, Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING +
- Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING);
- }
-
- /**
- * Return a local map spill file created earlier.
- *
- * @param spillNumber the number
- * @return path
- * @throws IOException
- */
- public Path getSpillFile(int spillNumber) throws IOException {
- return lDirAlloc.getLocalPathToRead(
- String.format(SPILL_FILE_PATTERN,
- uniqueId, spillNumber), conf);
- }
-
- /**
- * Create a local map spill file name.
- *
- * @param spillNumber the number
- * @param size the size of the file
- * @return path
- * @throws IOException
- */
- public Path getSpillFileForWrite(int spillNumber, long size)
- throws IOException {
- return lDirAlloc.getLocalPathForWrite(
- String.format(String.format(SPILL_FILE_PATTERN,
- uniqueId, spillNumber)), size, conf);
- }
-
- /**
- * Return a local map spill index file created earlier
- *
- * @param spillNumber the number
- * @return path
- * @throws IOException
- */
- public Path getSpillIndexFile(int spillNumber) throws IOException {
- return lDirAlloc.getLocalPathToRead(
- String.format(SPILL_INDEX_FILE_PATTERN,
- uniqueId, spillNumber), conf);
- }
-
- /**
- * Create a local map spill index file name.
- *
- * @param spillNumber the number
- * @param size the size of the file
- * @return path
- * @throws IOException
- */
- public Path getSpillIndexFileForWrite(int spillNumber, long size)
- throws IOException {
- return lDirAlloc.getLocalPathForWrite(
- String.format(SPILL_INDEX_FILE_PATTERN,
- uniqueId, spillNumber), size, conf);
- }
-
- /**
- * Return a local reduce input file created earlier
- *
- * @param attemptIdentifier an identifier for a task. The attempt information is ignored.
- * @return path
- * @throws IOException
- */
- public Path getInputFile(InputAttemptIdentifier attemptIdentifier) throws IOException {
- throw new UnsupportedOperationException("Incompatible with LocalRunner");
- }
-
- /**
- * Create a local reduce input file name.
- *
- * @param attemptIdentifier an identifier for a task. The attempt information is ignored.
- * @param size the size of the file
- * @return path
- * @throws IOException
- */
- public Path getInputFileForWrite(int srcTaskId,
- long size) throws IOException {
- return lDirAlloc.getLocalPathForWrite(String.format(
- uniqueId, getAttemptOutputDir().toString(), srcTaskId),
- size, conf);
- }
-
- /** Removes all of the files related to a task. */
- public void removeAll() throws IOException {
- throw new UnsupportedOperationException("Incompatible with LocalRunner");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/hadoop/compat/NullProgressable.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/hadoop/compat/NullProgressable.java b/tez-engine/src/main/java/org/apache/tez/engine/hadoop/compat/NullProgressable.java
deleted file mode 100644
index 5071dd2..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/hadoop/compat/NullProgressable.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.hadoop.compat;
-
-import org.apache.hadoop.util.Progressable;
-
-public class NullProgressable implements Progressable {
-
- public NullProgressable() {
- // TODO Auto-generated constructor stub
- }
-
- @Override
- public void progress() {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java
deleted file mode 100644
index 6371787..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.lib.input;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.LogicalInput;
-import org.apache.tez.engine.api.TezInputContext;
-import org.apache.tez.engine.common.localshuffle.LocalShuffle;
-
-/**
- * <code>LocalMergedInput</code> in an {@link LogicalInput} which shuffles intermediate
- * sorted data, merges them and provides key/<values> to the consumer.
- */
-public class LocalMergedInput extends ShuffledMergedInputLegacy {
-
- @Override
- public List<Event> initialize(TezInputContext inputContext) throws IOException {
- this.inputContext = inputContext;
- this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
-
- LocalShuffle localShuffle = new LocalShuffle(inputContext, conf, numInputs);
- rawIter = localShuffle.run();
- createValuesIterator();
- return Collections.emptyList();
- }
-
- @Override
- public List<Event> close() throws IOException {
- rawIter.close();
- return Collections.emptyList();
- }
-}