You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/04/19 01:54:28 UTC
svn commit: r1469642 [25/36] - in /incubator/tez/branches/TEZ-1: ./
example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/
example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/
tez-common/src/main/ tez-common/src/main/java/ t...
Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,961 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.tez.engine.common.sort.impl;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.IntBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.HashComparator;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.util.IndexedSortable;
+import org.apache.hadoop.util.IndexedSorter;
+import org.apache.hadoop.util.Progress;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.common.ConfigUtils;
+import org.apache.tez.engine.common.sort.SortingOutput;
+import org.apache.tez.engine.common.sort.impl.IFile.Writer;
+import org.apache.tez.engine.common.sort.impl.TezMerger.Segment;
+import org.apache.tez.engine.records.OutputContext;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class PipelinedSorter extends ExternalSorter implements SortingOutput {
+
+ private static final Log LOG = LogFactory.getLog(PipelinedSorter.class);
+
+ /**
+ * The size of each record in the index file for the map-outputs.
+ */
+ public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
+
+ private final static int APPROX_HEADER_LENGTH = 150;
+
+ int partitionBits;
+
+ private static final int PARTITION = 0; // partition offset in acct
+ private static final int KEYSTART = 1; // key offset in acct
+ private static final int VALSTART = 2; // val offset in acct
+ private static final int VALLEN = 3; // val len in acct
+ private static final int NMETA = 4; // num meta ints
+ private static final int METASIZE = NMETA * 4; // size in bytes
+
+ // spill accounting
+ volatile Throwable sortSpillException = null;
+
+ int numSpills = 0;
+ int minSpillsForCombine;
+ private HashComparator hasher;
+ // SortSpans
+ private SortSpan span;
+ private ByteBuffer largeBuffer;
+ // Merger
+ private SpanMerger merger;
+ private ExecutorService sortmaster;
+
+ final ArrayList<TezSpillRecord> indexCacheList =
+ new ArrayList<TezSpillRecord>();
+ private int totalIndexCacheMemory;
+ private int indexCacheMemoryLimit;
+
+
+ @Inject
+ public PipelinedSorter(
+ @Assisted TezTask task
+ ) throws IOException {
+ }
+
+ public void initialize(Configuration conf, Master master)
+ throws IOException, InterruptedException {
+
+ if (task == null) {
+ LOG.info("Bailing!", new IOException());
+ return;
+ }
+ super.initialize(conf, master);
+
+ partitionBits = bitcount(partitions)+1;
+
+ //sanity checks
+ final float spillper =
+ job.getFloat(
+ TezJobConfig.TEZ_ENGINE_SORT_SPILL_PERCENT,
+ TezJobConfig.DEFAULT_TEZ_ENGINE_SORT_SPILL_PERCENT);
+ final int sortmb =
+ job.getInt(
+ TezJobConfig.TEZ_ENGINE_IO_SORT_MB,
+ TezJobConfig.DEFAULT_TEZ_ENGINE_IO_SORT_MB);
+ indexCacheMemoryLimit = job.getInt(TezJobConfig.TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES,
+ TezJobConfig.DEFAULT_TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES);
+ if (spillper > (float)1.0 || spillper <= (float)0.0) {
+ throw new IOException("Invalid \"" + TezJobConfig.TEZ_ENGINE_SORT_SPILL_PERCENT +
+ "\": " + spillper);
+ }
+ if ((sortmb & 0x7FF) != sortmb) {
+ throw new IOException(
+ "Invalid \"" + TezJobConfig.TEZ_ENGINE_IO_SORT_MB + "\": " + sortmb);
+ }
+
+ // buffers and accounting
+ int maxMemUsage = sortmb << 20;
+ maxMemUsage -= maxMemUsage % METASIZE;
+ largeBuffer = ByteBuffer.allocate(maxMemUsage);
+ LOG.info(TezJobConfig.TEZ_ENGINE_IO_SORT_MB + " = " + sortmb);
+ // TODO: configurable setting?
+ span = new SortSpan(largeBuffer, 1024*1024, 16);
+ merger = new SpanMerger(comparator);
+ final int sortThreads =
+ job.getInt(
+ TezJobConfig.TEZ_ENGINE_SORT_THREADS,
+ TezJobConfig.DEFAULT_TEZ_ENGINE_SORT_THREADS);
+ sortmaster = Executors.newFixedThreadPool(sortThreads);
+
+ // k/v serialization
+ if(comparator instanceof HashComparator) {
+ hasher = (HashComparator)comparator;
+ LOG.info("Using the HashComparator");
+ } else {
+ hasher = null;
+ }
+ valSerializer.open(span.out);
+ keySerializer.open(span.out);
+ minSpillsForCombine = job.getInt(TezJobConfig.TEZ_ENGINE_COMBINE_MIN_SPILLS, 3);
+ }
+
+ private int bitcount(int n) {
+ int bit = 0;
+ while(n!=0) {
+ bit++;
+ n >>= 1;
+ }
+ return bit;
+ }
+
+ public void sort() throws IOException {
+ SortSpan newSpan = span.next();
+
+ if(newSpan == null) {
+ // sort in the same thread, do not wait for the thread pool
+ merger.add(span.sort(sorter, comparator));
+ spill();
+ int items = 1024*1024;
+ int perItem = 16;
+ if(span.length() != 0) {
+ items = span.length();
+ perItem = span.kvbuffer.limit()/items;
+ items = (largeBuffer.capacity())/(METASIZE+perItem);
+ if(items > 1024*1024) {
+ // our goal is to have 1M splits and sort early
+ items = 1024*1024;
+ }
+ }
+ span = new SortSpan(largeBuffer, items, perItem);
+ } else {
+ // queue up the sort
+ SortTask task = new SortTask(span, sorter, comparator);
+ Future<SpanIterator> future = sortmaster.submit(task);
+ merger.add(future);
+ span = newSpan;
+ }
+ valSerializer.open(span.out);
+ keySerializer.open(span.out);
+ }
+
+ public void write(Object key, Object value)
+ throws IOException, InterruptedException {
+ collect(
+ key, value, partitioner.getPartition(key, value, partitions));
+ }
+
+ /**
+ * Serialize the key, value to intermediate storage.
+ * When this method returns, kvindex must refer to sufficient unused
+ * storage to store one METADATA.
+ */
+ synchronized void collect(Object key, Object value, final int partition
+ ) throws IOException {
+ task.getTaskReporter().progress();
+ if (key.getClass() != keyClass) {
+ throw new IOException("Type mismatch in key from map: expected "
+ + keyClass.getName() + ", received "
+ + key.getClass().getName());
+ }
+ if (value.getClass() != valClass) {
+ throw new IOException("Type mismatch in value from map: expected "
+ + valClass.getName() + ", received "
+ + value.getClass().getName());
+ }
+ if (partition < 0 || partition >= partitions) {
+ throw new IOException("Illegal partition for " + key + " (" +
+ partition + ")");
+ }
+ if(span.kvmeta.remaining() < METASIZE) {
+ this.sort();
+ }
+ int keystart = span.kvbuffer.position();
+ int valstart = -1;
+ int valend = -1;
+ try {
+ keySerializer.serialize(key);
+ valstart = span.kvbuffer.position();
+ valSerializer.serialize(value);
+ valend = span.kvbuffer.position();
+ } catch(BufferOverflowException overflow) {
+ // restore limit
+ span.kvbuffer.position(keystart);
+ this.sort();
+ // try again
+ this.collect(key, value, partition);
+ return;
+ }
+
+ int prefix = 0;
+
+ if(hasher != null) {
+ prefix = hasher.getHashCode(key);
+ }
+
+ prefix = (partition << (32 - partitionBits)) | (prefix >>> partitionBits);
+
+ /* maintain order as in PARTITION, KEYSTART, VALSTART, VALLEN */
+ span.kvmeta.put(prefix);
+ span.kvmeta.put(keystart);
+ span.kvmeta.put(valstart);
+ span.kvmeta.put(valend - valstart);
+ if((valstart - keystart) > span.keymax) {
+ span.keymax = (valstart - keystart);
+ }
+ if((valend - valstart) > span.valmax) {
+ span.valmax = (valend - valstart);
+ }
+ mapOutputRecordCounter.increment(1);
+ mapOutputByteCounter.increment(valend - keystart);
+ task.getTaskReporter().progress();
+ }
+
+ public void spill() throws IOException {
+ // create spill file
+ final long size = largeBuffer.capacity() +
+ (partitions * APPROX_HEADER_LENGTH);
+ final TezSpillRecord spillRec = new TezSpillRecord(partitions);
+ final Path filename =
+ mapOutputFile.getSpillFileForWrite(numSpills, size);
+ FSDataOutputStream out = rfs.create(filename, true, 4096);
+
+ try {
+ merger.ready(); // wait for all the future results from sort threads
+ LOG.info("Spilling to " + filename.toString());
+ for (int i = 0; i < partitions; ++i) {
+ TezRawKeyValueIterator kvIter = merger.filter(i);
+ //write merged output to disk
+ long segmentStart = out.getPos();
+ Writer writer =
+ new Writer(job, out, keyClass, valClass, codec,
+ spilledRecordsCounter);
+ writer.setRLE(merger.needsRLE());
+ if (combineProcessor == null) {
+ while(kvIter.next()) {
+ writer.append(kvIter.getKey(), kvIter.getValue());
+ }
+ } else {
+ runCombineProcessor(kvIter, writer);
+ }
+ //close
+ writer.close();
+
+ // record offsets
+ final TezIndexRecord rec =
+ new TezIndexRecord(
+ segmentStart,
+ writer.getRawLength(),
+ writer.getCompressedLength());
+ spillRec.putIndex(rec, i);
+ }
+
+ Path indexFilename =
+ mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
+ * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+ // TODO: cache
+ spillRec.writeToFile(indexFilename, job);
+ ++numSpills;
+ } catch(InterruptedException ie) {
+ // TODO:the combiner has been interrupted
+ } finally {
+ out.close();
+ }
+ }
+
+ @Override
+ public void flush() throws IOException, InterruptedException {
+ final TezTaskAttemptID mapId = task.getTaskAttemptId();
+ Path finalOutputFile =
+ mapOutputFile.getOutputFileForWrite(0); //TODO
+ Path finalIndexFile =
+ mapOutputFile.getOutputIndexFileForWrite(0); //TODO
+
+ LOG.info("Starting flush of map output");
+ span.end();
+ merger.add(span.sort(sorter, comparator));
+ spill();
+ sortmaster.shutdown();
+
+ largeBuffer = null;
+
+ if(numSpills == 1) {
+ // someday be able to pass this directly to shuffle
+ // without writing to disk
+ final Path filename =
+ mapOutputFile.getSpillFile(0);
+ Path indexFilename =
+ mapOutputFile.getSpillIndexFile(0);
+ sameVolRename(filename, finalOutputFile);
+ sameVolRename(indexFilename, finalIndexFile);
+ return;
+ }
+
+ //The output stream for the final single output file
+ FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
+
+ sortPhase.addPhases(partitions); // Divide sort phase into sub-phases
+ TezMerger.considerFinalMergeForProgress();
+
+ final TezSpillRecord spillRec = new TezSpillRecord(partitions);
+ final ArrayList<TezSpillRecord> indexCacheList = new ArrayList<TezSpillRecord>();
+
+ for(int i = 0; i < numSpills; i++) {
+ // TODO: build this cache before
+ Path indexFilename = mapOutputFile.getSpillIndexFile(i);
+ TezSpillRecord spillIndex = new TezSpillRecord(indexFilename, job);
+ indexCacheList.add(spillIndex);
+ }
+
+ for (int parts = 0; parts < partitions; parts++) {
+ //create the segments to be merged
+ List<Segment> segmentList =
+ new ArrayList<Segment>(numSpills);
+ for(int i = 0; i < numSpills; i++) {
+ Path spillFilename = mapOutputFile.getSpillFile(i);
+ TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
+
+ Segment s =
+ new Segment(job, rfs, spillFilename, indexRecord.getStartOffset(),
+ indexRecord.getPartLength(), codec, true);
+ segmentList.add(i, s);
+ }
+
+ int mergeFactor =
+ job.getInt(TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR,
+ TezJobConfig.DEFAULT_TEZ_ENGINE_IO_SORT_FACTOR);
+ // sort the segments only if there are intermediate merges
+ boolean sortSegments = segmentList.size() > mergeFactor;
+ //merge
+ @SuppressWarnings("unchecked")
+ TezRawKeyValueIterator kvIter = TezMerger.merge(job, rfs,
+ keyClass, valClass, codec,
+ segmentList, mergeFactor,
+ new Path(mapId.toString()),
+ (RawComparator)ConfigUtils.getOutputKeyComparator(job),
+ task.getTaskReporter(), sortSegments,
+ null, spilledRecordsCounter, sortPhase.phase());
+
+ //write merged output to disk
+ long segmentStart = finalOut.getPos();
+ Writer writer =
+ new Writer(job, finalOut, keyClass, valClass, codec,
+ spilledRecordsCounter);
+ writer.setRLE(merger.needsRLE());
+ if (combineProcessor == null || numSpills < minSpillsForCombine) {
+ TezMerger.writeFile(kvIter, writer, task.getTaskReporter(), job);
+ } else {
+ runCombineProcessor(kvIter, writer);
+ }
+
+ //close
+ writer.close();
+
+ sortPhase.startNextPhase();
+
+ // record offsets
+ final TezIndexRecord rec =
+ new TezIndexRecord(
+ segmentStart,
+ writer.getRawLength(),
+ writer.getCompressedLength());
+ spillRec.putIndex(rec, parts);
+ }
+
+ spillRec.writeToFile(finalIndexFile, job);
+ finalOut.close();
+ for(int i = 0; i < numSpills; i++) {
+ Path indexFilename = mapOutputFile.getSpillIndexFile(i);
+ Path spillFilename = mapOutputFile.getSpillFile(i);
+ rfs.delete(indexFilename,true);
+ rfs.delete(spillFilename,true);
+ }
+ }
+
+ public void close() { }
+
+ private interface PartitionedRawKeyValueIterator extends TezRawKeyValueIterator {
+ int getPartition();
+ }
+
+ private class BufferStreamWrapper extends OutputStream
+ {
+ private final ByteBuffer out;
+ public BufferStreamWrapper(ByteBuffer out) {
+ this.out = out;
+ }
+
+ @Override
+ public void write(int b) throws IOException { out.put((byte)b); }
+ @Override
+ public void write(byte[] b) throws IOException { out.put(b); }
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException { out.put(b, off, len); }
+ }
+
+ protected class InputByteBuffer extends DataInputBuffer {
+ private byte[] buffer = new byte[256];
+ private ByteBuffer wrapped = ByteBuffer.wrap(buffer);
+ private void resize(int length) {
+ if(length > buffer.length) {
+ buffer = new byte[length];
+ wrapped = ByteBuffer.wrap(buffer);
+ }
+ wrapped.limit(length);
+ }
+ public void reset(ByteBuffer b, int start, int length) {
+ resize(length);
+ b.position(start);
+ b.get(buffer, 0, length);
+ super.reset(buffer, 0, length);
+ }
+ // clone-ish function
+ public void reset(DataInputBuffer clone) {
+ byte[] data = clone.getData();
+ int start = clone.getPosition();
+ int length = clone.getLength();
+ resize(length);
+ System.arraycopy(data, start, buffer, 0, length);
+ super.reset(buffer, 0, length);
+ }
+ }
+
+ private class SortSpan implements IndexedSortable {
+ final IntBuffer kvmeta;
+ final ByteBuffer kvbuffer;
+ final DataOutputStream out;
+ private RawComparator comparator;
+ final int imeta[] = new int[NMETA];
+ final int jmeta[] = new int[NMETA];
+ int keymax = 1;
+ int valmax = 1;
+ private int i,j;
+ private byte[] ki;
+ private byte[] kj;
+ private int index = 0;
+ private InputByteBuffer hay = new InputByteBuffer();
+ private long eq = 0;
+
+ public SortSpan(ByteBuffer source, int maxItems, int perItem) {
+ int capacity = source.remaining();
+ int metasize = METASIZE*maxItems;
+ int dataSize = maxItems * perItem;
+ if(capacity < (metasize+dataSize)) {
+ // try to allocate less meta space, because we have sample data
+ metasize = METASIZE*(capacity/(perItem+METASIZE));
+ }
+ ByteBuffer reserved = source.duplicate();
+ reserved.mark();
+ LOG.info("reserved.remaining() = "+reserved.remaining());
+ LOG.info("reserved.size = "+metasize);
+ reserved.position(metasize);
+ kvbuffer = reserved.slice();
+ reserved.flip();
+ reserved.limit(metasize);
+ kvmeta = reserved
+ .slice()
+ .order(ByteOrder.nativeOrder())
+ .asIntBuffer();
+ out = new DataOutputStream(
+ new BufferStreamWrapper(kvbuffer));
+ }
+
+ public SpanIterator sort(IndexedSorter sorter, RawComparator comparator) {
+ this.comparator = comparator;
+ ki = new byte[keymax];
+ kj = new byte[keymax];
+ LOG.info("begin sorting Span"+index + " ("+length()+")");
+ if(length() > 1) {
+ sorter.sort(this, 0, length(), task.getTaskReporter());
+ }
+ LOG.info("done sorting Span"+index);
+ return new SpanIterator(this);
+ }
+
+ int offsetFor(int i) {
+ return (i * NMETA);
+ }
+
+ public void swap(final int mi, final int mj) {
+ final int kvi = offsetFor(mi);
+ final int kvj = offsetFor(mj);
+
+ kvmeta.position(kvi); kvmeta.get(imeta);
+ kvmeta.position(kvj); kvmeta.get(jmeta);
+ kvmeta.position(kvj); kvmeta.put(imeta);
+ kvmeta.position(kvi); kvmeta.put(jmeta);
+
+ if(i == mi || j == mj) i = -1;
+ if(i == mi || j == mj) j = -1;
+ }
+
+ public int compare(final int mi, final int mj) {
+ final int kvi = offsetFor(mi);
+ final int kvj = offsetFor(mj);
+ final int kvip = kvmeta.get(kvi + PARTITION);
+ final int kvjp = kvmeta.get(kvj + PARTITION);
+ // sort by partition
+ if (kvip != kvjp) {
+ return kvip - kvjp;
+ }
+
+ final int istart = kvmeta.get(kvi + KEYSTART);
+ final int jstart = kvmeta.get(kvj + KEYSTART);
+ final int ilen = kvmeta.get(kvi + VALSTART) - istart;
+ final int jlen = kvmeta.get(kvj + VALSTART) - jstart;
+
+ kvbuffer.position(istart);
+ kvbuffer.get(ki, 0, ilen);
+ kvbuffer.position(jstart);
+ kvbuffer.get(kj, 0, jlen);
+ // sort by key
+ final int cmp = comparator.compare(ki, 0, ilen, kj, 0, jlen);
+ if(cmp == 0) eq++;
+ return cmp;
+ }
+
+ public SortSpan next() {
+ ByteBuffer remaining = end();
+ if(remaining != null) {
+ int items = length();
+ int perItem = kvbuffer.position()/items;
+ SortSpan newSpan = new SortSpan(remaining, items, perItem);
+ newSpan.index = index+1;
+ return newSpan;
+ }
+ return null;
+ }
+
+ public int length() {
+ return kvmeta.limit()/NMETA;
+ }
+
+ public ByteBuffer end() {
+ ByteBuffer remaining = kvbuffer.duplicate();
+ remaining.position(kvbuffer.position());
+ remaining = remaining.slice();
+ kvbuffer.limit(kvbuffer.position());
+ kvmeta.limit(kvmeta.position());
+ int items = length();
+ if(items == 0) {
+ return null;
+ }
+ int perItem = kvbuffer.position()/items;
+ LOG.info(String.format("Span%d.length = %d, perItem = %d", index, length(), perItem));
+ if(remaining.remaining() < NMETA+perItem) {
+ return null;
+ }
+ return remaining;
+ }
+
+ private int compareInternal(DataInputBuffer needle, int needlePart, int index) {
+ int cmp = 0;
+ int keystart;
+ int valstart;
+ int partition;
+ partition = kvmeta.get(span.offsetFor(index) + PARTITION);
+ if(partition != needlePart) {
+ cmp = (partition-needlePart);
+ } else {
+ keystart = kvmeta.get(span.offsetFor(index) + KEYSTART);
+ valstart = kvmeta.get(span.offsetFor(index) + VALSTART);
+ // hay is allocated ahead of time
+ hay.reset(kvbuffer, keystart, valstart - keystart);
+ cmp = comparator.compare(hay.getData(),
+ hay.getPosition(), hay.getLength(),
+ needle.getData(),
+ needle.getPosition(), needle.getLength());
+ }
+ return cmp;
+ }
+
+ public long getEq() {
+ return eq;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Span[%d,%d]", NMETA*kvmeta.capacity(), kvbuffer.limit());
+ }
+ }
+
+ private class SpanIterator implements PartitionedRawKeyValueIterator, Comparable<SpanIterator> {
+ private int kvindex = -1;
+ private int maxindex;
+ private IntBuffer kvmeta;
+ private ByteBuffer kvbuffer;
+ private SortSpan span;
+ private InputByteBuffer key = new InputByteBuffer();
+ private InputByteBuffer value = new InputByteBuffer();
+ private Progress progress = new Progress();
+
+ private final int minrun = (1 << 4);
+
+ public SpanIterator(SortSpan span) {
+ this.kvmeta = span.kvmeta;
+ this.kvbuffer = span.kvbuffer;
+ this.span = span;
+ this.maxindex = (kvmeta.limit()/NMETA) - 1;
+ }
+
+ public DataInputBuffer getKey() throws IOException {
+ final int keystart = kvmeta.get(span.offsetFor(kvindex) + KEYSTART);
+ final int valstart = kvmeta.get(span.offsetFor(kvindex) + VALSTART);
+ key.reset(kvbuffer, keystart, valstart - keystart);
+ return key;
+ }
+
+ public DataInputBuffer getValue() throws IOException {
+ final int valstart = kvmeta.get(span.offsetFor(kvindex) + VALSTART);
+ final int vallen = kvmeta.get(span.offsetFor(kvindex) + VALLEN);
+ value.reset(kvbuffer, valstart, vallen);
+ return value;
+ }
+
+ public boolean next() throws IOException {
+ // caveat: since we use this as a comparable in the merger
+ if(kvindex == maxindex) return false;
+ if(kvindex % 100 == 0) {
+ progress.set((kvindex-maxindex) / maxindex);
+ }
+ kvindex += 1;
+ return true;
+ }
+
+ public void close() throws IOException {
+ }
+
+ public Progress getProgress() {
+ return progress;
+ }
+
+ public int getPartition() {
+ final int partition = kvmeta.get(span.offsetFor(kvindex) + PARTITION);
+ return partition;
+ }
+
+ public int size() {
+ return (maxindex - kvindex);
+ }
+
+ public int compareTo(SpanIterator other) {
+ try {
+ return span.compareInternal(other.getKey(), other.getPartition(), kvindex);
+ } catch(IOException ie) {
+ // since we're not reading off disk, how could getKey() throw exceptions?
+ }
+ return -1;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("SpanIterator<%d:%d> (span=%s)", kvindex, maxindex, span.toString());
+ }
+
+ /**
+ * bisect returns the next insertion point for a given raw key, skipping keys
+ * which are <= needle using a binary search instead of a linear comparison.
+ * This is massively efficient when long strings of identical keys occur.
+ * @param needle
+ * @param needlePart
+ * @return
+ */
+ int bisect(DataInputBuffer needle, int needlePart) {
+ int start = kvindex;
+ int end = maxindex-1;
+ int mid = start;
+ int cmp = 0;
+
+ if(end - start < minrun) {
+ return 0;
+ }
+
+ if(span.compareInternal(needle, needlePart, start) > 0) {
+ return kvindex;
+ }
+
+ // bail out early if we haven't got a min run
+ if(span.compareInternal(needle, needlePart, start+minrun) > 0) {
+ return 0;
+ }
+
+ if(span.compareInternal(needle, needlePart, end) < 0) {
+ return end - kvindex;
+ }
+
+ boolean found = false;
+
+ // we sort 100k items, the max it can do is 20 loops, but break early
+ for(int i = 0; start < end && i < 16; i++) {
+ mid = start + (end - start)/2;
+ cmp = span.compareInternal(needle, needlePart, mid);
+ if(cmp == 0) {
+ start = mid;
+ found = true;
+ } else if(cmp < 0) {
+ start = mid;
+ found = true;
+ }
+ if(cmp > 0) {
+ end = mid;
+ }
+ }
+
+ if(found) {
+ return start - kvindex;
+ }
+ return 0;
+ }
+ }
+
+ private class SortTask implements Callable<SpanIterator> {
+ private final SortSpan sortable;
+ private final IndexedSorter sorter;
+ private final RawComparator comparator;
+
+ public SortTask(SortSpan sortable,
+ IndexedSorter sorter, RawComparator comparator) {
+ this.sortable = sortable;
+ this.sorter = sorter;
+ this.comparator = comparator;
+ }
+
+ public SpanIterator call() {
+ return sortable.sort(sorter, comparator);
+ }
+ }
+
+ private class PartitionFilter implements TezRawKeyValueIterator {
+ private final PartitionedRawKeyValueIterator iter;
+ private int partition;
+ private boolean dirty = false;
+ public PartitionFilter(PartitionedRawKeyValueIterator iter) {
+ this.iter = iter;
+ }
+ public DataInputBuffer getKey() throws IOException { return iter.getKey(); }
+ public DataInputBuffer getValue() throws IOException { return iter.getValue(); }
+ public void close() throws IOException { }
+ public Progress getProgress() {
+ return new Progress();
+ }
+ public boolean next() throws IOException {
+ if(dirty || iter.next()) {
+ int prefix = iter.getPartition();
+
+ if((prefix >>> (32 - partitionBits)) == partition) {
+ dirty = false; // we found what we were looking for, good
+ return true;
+ } else if(!dirty) {
+ dirty = true; // we did a lookahead and failed to find partition
+ }
+ }
+ return false;
+ }
+
+ public void reset(int partition) {
+ this.partition = partition;
+ }
+
+ public int getPartition() {
+ return this.partition;
+ }
+ }
+
+ private class SpanHeap extends java.util.PriorityQueue<SpanIterator> {
+ public SpanHeap() {
+ super(256);
+ }
+ /**
+ * {@link PriorityQueue}.poll() by a different name
+ * @return
+ */
+ public SpanIterator pop() {
+ return this.poll();
+ }
+ }
+
+ private class SpanMerger implements PartitionedRawKeyValueIterator {
+ private final RawComparator comparator;
+ InputByteBuffer key = new InputByteBuffer();
+ InputByteBuffer value = new InputByteBuffer();
+ int partition;
+
+ private ArrayList< Future<SpanIterator>> futures = new ArrayList< Future<SpanIterator>>();
+
+ private SpanHeap heap = new SpanHeap();
+ private PartitionFilter partIter;
+
+ private int gallop = 0;
+ private SpanIterator horse;
+ private long total = 0;
+ private long count = 0;
+ private long eq = 0;
+
+ public SpanMerger(RawComparator comparator) {
+ this.comparator = comparator;
+ partIter = new PartitionFilter(this);
+ }
+
+ public void add(SpanIterator iter) throws IOException{
+ if(iter.next()) {
+ heap.add(iter);
+ }
+ }
+
+ public void add(Future<SpanIterator> iter) throws IOException{
+ this.futures.add(iter);
+ }
+
+ public boolean ready() throws IOException, InterruptedException {
+ try {
+ SpanIterator iter = null;
+ while(this.futures.size() > 0) {
+ Future<SpanIterator> futureIter = this.futures.remove(0);
+ iter = futureIter.get();
+ this.add(iter);
+ }
+
+ StringBuilder sb = new StringBuilder();
+ for(SpanIterator sp: heap) {
+ sb.append(sp.toString());
+ sb.append(",");
+ total += sp.span.length();
+ eq += sp.span.getEq();
+ }
+ LOG.info("Heap = " + sb.toString());
+ return true;
+ } catch(Exception e) {
+ LOG.info(e.toString());
+ return false;
+ }
+ }
+
+ private SpanIterator pop() throws IOException {
+ if(gallop > 0) {
+ gallop--;
+ return horse;
+ }
+ SpanIterator current = heap.pop();
+ SpanIterator next = heap.peek();
+ if(next != null && current != null &&
+ ((Object)horse) == ((Object)current)) {
+ // TODO: a better threshold check
+ gallop = current.bisect(next.getKey(), next.getPartition())-1;
+ }
+ horse = current;
+ return current;
+ }
+
+ public boolean needsRLE() {
+ return (eq > 0.1 * total);
+ }
+
+ private SpanIterator peek() throws IOException {
+ if(gallop > 0) {
+ return horse;
+ }
+ return heap.peek();
+ }
+
+ public boolean next() throws IOException {
+ SpanIterator current = pop();
+
+ if(current != null) {
+ // keep local copies, since add() will move it all out
+ key.reset(current.getKey());
+ value.reset(current.getValue());
+ partition = current.getPartition();
+ if(gallop <= 0) {
+ this.add(current);
+ } else {
+ // galloping
+ current.next();
+ }
+ return true;
+ }
+ return false;
+ }
+
+ public DataInputBuffer getKey() throws IOException { return key; }
+ public DataInputBuffer getValue() throws IOException { return value; }
+ public int getPartition() { return partition; }
+
+ public void close() throws IOException {
+ }
+
+ public Progress getProgress() {
+ // TODO
+ return new Progress();
+ }
+
+ public TezRawKeyValueIterator filter(int partition) {
+ partIter.reset(partition);
+ return partIter;
+ }
+
+ }
+
+ @Override
+ public OutputContext getOutputContext() {
+ return null;
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezIndexRecord.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezIndexRecord.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezIndexRecord.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezIndexRecord.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,45 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.common.sort.impl;
+
+public class TezIndexRecord {
+ private long startOffset;
+ private long rawLength;
+ private long partLength;
+
+ public TezIndexRecord() { }
+
+ public TezIndexRecord(long startOffset, long rawLength, long partLength) {
+ this.startOffset = startOffset;
+ this.rawLength = rawLength;
+ this.partLength = partLength;
+ }
+
+ public long getStartOffset() {
+ return startOffset;
+ }
+
+ public long getRawLength() {
+ return rawLength;
+ }
+
+ public long getPartLength() {
+ return partLength;
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezIndexRecord.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,797 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.common.sort.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumFileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.util.PriorityQueue;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.Progressable;
+import org.apache.tez.common.Constants;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.common.sort.impl.IFile.Reader;
+import org.apache.tez.engine.common.sort.impl.IFile.Writer;
+
+/**
+ * Merger is an utility class used by the Map and Reduce tasks for merging
+ * both their memory and disk segments
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class TezMerger {
+ private static final Log LOG = LogFactory.getLog(TezMerger.class);
+
+ // Local directories
+ private static LocalDirAllocator lDirAlloc =
+ new LocalDirAllocator(TezJobConfig.LOCAL_DIR);
+
+ public static
+ TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
+ Class keyClass, Class valueClass,
+ CompressionCodec codec,
+ Path[] inputs, boolean deleteInputs,
+ int mergeFactor, Path tmpDir,
+ RawComparator comparator, Progressable reporter,
+ TezCounter readsCounter,
+ TezCounter writesCounter,
+ Progress mergePhase)
+ throws IOException {
+ return
+ new MergeQueue(conf, fs, inputs, deleteInputs, codec, comparator,
+ reporter, null).merge(keyClass, valueClass,
+ mergeFactor, tmpDir,
+ readsCounter, writesCounter,
+ mergePhase);
+ }
+
+ public static
+ TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
+ Class keyClass, Class valueClass,
+ CompressionCodec codec,
+ Path[] inputs, boolean deleteInputs,
+ int mergeFactor, Path tmpDir,
+ RawComparator comparator,
+ Progressable reporter,
+ TezCounter readsCounter,
+ TezCounter writesCounter,
+ TezCounter mergedMapOutputsCounter,
+ Progress mergePhase)
+ throws IOException {
+ return
+ new MergeQueue(conf, fs, inputs, deleteInputs, codec, comparator,
+ reporter, mergedMapOutputsCounter).merge(
+ keyClass, valueClass,
+ mergeFactor, tmpDir,
+ readsCounter, writesCounter,
+ mergePhase);
+ }
+
+ public static
+ TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
+ Class keyClass, Class valueClass,
+ List<Segment> segments,
+ int mergeFactor, Path tmpDir,
+ RawComparator comparator, Progressable reporter,
+ TezCounter readsCounter,
+ TezCounter writesCounter,
+ Progress mergePhase)
+ throws IOException {
+ return merge(conf, fs, keyClass, valueClass, segments, mergeFactor, tmpDir,
+ comparator, reporter, false, readsCounter, writesCounter,
+ mergePhase);
+ }
+
+ public static <K extends Object, V extends Object>
+ TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
+ Class keyClass, Class valueClass,
+ List<Segment> segments,
+ int mergeFactor, Path tmpDir,
+ RawComparator comparator, Progressable reporter,
+ boolean sortSegments,
+ TezCounter readsCounter,
+ TezCounter writesCounter,
+ Progress mergePhase)
+ throws IOException {
+ return new MergeQueue(conf, fs, segments, comparator, reporter,
+ sortSegments).merge(keyClass, valueClass,
+ mergeFactor, tmpDir,
+ readsCounter, writesCounter,
+ mergePhase);
+ }
+
+ public static <K extends Object, V extends Object>
+ TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
+ Class keyClass, Class valueClass,
+ CompressionCodec codec,
+ List<Segment> segments,
+ int mergeFactor, Path tmpDir,
+ RawComparator comparator, Progressable reporter,
+ boolean sortSegments,
+ TezCounter readsCounter,
+ TezCounter writesCounter,
+ Progress mergePhase)
+ throws IOException {
+ return new MergeQueue(conf, fs, segments, comparator, reporter,
+ sortSegments, codec).merge(keyClass, valueClass,
+ mergeFactor, tmpDir,
+ readsCounter, writesCounter,
+ mergePhase);
+ }
+
+ public static <K extends Object, V extends Object>
+ TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
+ Class keyClass, Class valueClass,
+ List<Segment> segments,
+ int mergeFactor, int inMemSegments, Path tmpDir,
+ RawComparator comparator, Progressable reporter,
+ boolean sortSegments,
+ TezCounter readsCounter,
+ TezCounter writesCounter,
+ Progress mergePhase)
+ throws IOException {
+ return new MergeQueue(conf, fs, segments, comparator, reporter,
+ sortSegments).merge(keyClass, valueClass,
+ mergeFactor, inMemSegments,
+ tmpDir,
+ readsCounter, writesCounter,
+ mergePhase);
+ }
+
+
+ static <K extends Object, V extends Object>
+ TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
+ Class keyClass, Class valueClass,
+ CompressionCodec codec,
+ List<Segment> segments,
+ int mergeFactor, int inMemSegments, Path tmpDir,
+ RawComparator comparator, Progressable reporter,
+ boolean sortSegments,
+ TezCounter readsCounter,
+ TezCounter writesCounter,
+ Progress mergePhase)
+ throws IOException {
+ return new MergeQueue(conf, fs, segments, comparator, reporter,
+ sortSegments, codec).merge(keyClass, valueClass,
+ mergeFactor, inMemSegments,
+ tmpDir,
+ readsCounter, writesCounter,
+ mergePhase);
+}
+
+ public static <K extends Object, V extends Object>
+ void writeFile(TezRawKeyValueIterator records, Writer writer,
+ Progressable progressable, Configuration conf)
+ throws IOException {
+ long progressBar =
+ conf.getLong(TezJobConfig.RECORDS_BEFORE_PROGRESS,
+ TezJobConfig.DEFAULT_RECORDS_BEFORE_PROGRESS);
+ long recordCtr = 0;
+ while(records.next()) {
+ writer.append(records.getKey(), records.getValue());
+
+ if (((recordCtr++) % progressBar) == 0) {
+ progressable.progress();
+ }
+ }
+}
+
+ @InterfaceAudience.Private
+ @InterfaceStability.Unstable
+ public static class Segment<K extends Object, V extends Object> {
+ Reader reader = null;
+ final DataInputBuffer key = new DataInputBuffer();
+
+ Configuration conf = null;
+ FileSystem fs = null;
+ Path file = null;
+ boolean preserve = false;
+ CompressionCodec codec = null;
+ long segmentOffset = 0;
+ long segmentLength = -1;
+
+ TezCounter mapOutputsCounter = null;
+
+ public Segment(Configuration conf, FileSystem fs, Path file,
+ CompressionCodec codec, boolean preserve)
+ throws IOException {
+ this(conf, fs, file, codec, preserve, null);
+ }
+
+ public Segment(Configuration conf, FileSystem fs, Path file,
+ CompressionCodec codec, boolean preserve,
+ TezCounter mergedMapOutputsCounter)
+ throws IOException {
+ this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve,
+ mergedMapOutputsCounter);
+ }
+
+ public Segment(Configuration conf, FileSystem fs, Path file,
+ long segmentOffset, long segmentLength,
+ CompressionCodec codec,
+ boolean preserve) throws IOException {
+ this(conf, fs, file, segmentOffset, segmentLength, codec, preserve, null);
+ }
+
+ public Segment(Configuration conf, FileSystem fs, Path file,
+ long segmentOffset, long segmentLength, CompressionCodec codec,
+ boolean preserve, TezCounter mergedMapOutputsCounter)
+ throws IOException {
+ this.conf = conf;
+ this.fs = fs;
+ this.file = file;
+ this.codec = codec;
+ this.preserve = preserve;
+
+ this.segmentOffset = segmentOffset;
+ this.segmentLength = segmentLength;
+
+ this.mapOutputsCounter = mergedMapOutputsCounter;
+ }
+
+ public Segment(Reader reader, boolean preserve) {
+ this(reader, preserve, null);
+ }
+
+ public Segment(Reader reader, boolean preserve,
+ TezCounter mapOutputsCounter) {
+ this.reader = reader;
+ this.preserve = preserve;
+
+ this.segmentLength = reader.getLength();
+
+ this.mapOutputsCounter = mapOutputsCounter;
+ }
+
+ void init(TezCounter readsCounter) throws IOException {
+ if (reader == null) {
+ FSDataInputStream in = fs.open(file);
+ in.seek(segmentOffset);
+ reader = new Reader(conf, in, segmentLength, codec, readsCounter);
+ }
+
+ if (mapOutputsCounter != null) {
+ mapOutputsCounter.increment(1);
+ }
+ }
+
+ boolean inMemory() {
+ return fs == null;
+ }
+
+ DataInputBuffer getKey() { return key; }
+
+ DataInputBuffer getValue(DataInputBuffer value) throws IOException {
+ nextRawValue(value);
+ return value;
+ }
+
+ public long getLength() {
+ return (reader == null) ?
+ segmentLength : reader.getLength();
+ }
+
+ boolean nextRawKey() throws IOException {
+ return reader.nextRawKey(key);
+ }
+
+ void nextRawValue(DataInputBuffer value) throws IOException {
+ reader.nextRawValue(value);
+ }
+
+ void closeReader() throws IOException {
+ if (reader != null) {
+ reader.close();
+ reader = null;
+ }
+ }
+
+ void close() throws IOException {
+ closeReader();
+ if (!preserve && fs != null) {
+ fs.delete(file, false);
+ }
+ }
+
+ public long getPosition() throws IOException {
+ return reader.getPosition();
+ }
+
+ // This method is used by BackupStore to extract the
+ // absolute position after a reset
+ long getActualPosition() throws IOException {
+ return segmentOffset + reader.getPosition();
+ }
+
+ Reader getReader() {
+ return reader;
+ }
+
+ // This method is used by BackupStore to reinitialize the
+ // reader to start reading from a different segment offset
+ void reinitReader(int offset) throws IOException {
+ if (!inMemory()) {
+ closeReader();
+ segmentOffset = offset;
+ segmentLength = fs.getFileStatus(file).getLen() - segmentOffset;
+ init(null);
+ }
+ }
+ }
+
+ // Boolean variable for including/considering final merge as part of sort
+ // phase or not. This is true in map task, false in reduce task. It is
+ // used in calculating mergeProgress.
+ static boolean includeFinalMerge = false;
+
+ /**
+ * Sets the boolean variable includeFinalMerge to true. Called from
+ * map task before calling merge() so that final merge of map task
+ * is also considered as part of sort phase.
+ */
+ public static void considerFinalMergeForProgress() {
+ includeFinalMerge = true;
+ }
+
+ private static class MergeQueue<K extends Object, V extends Object>
+ extends PriorityQueue<Segment> implements TezRawKeyValueIterator {
+ Configuration conf;
+ FileSystem fs;
+ CompressionCodec codec;
+
+ List<Segment> segments = new ArrayList<Segment>();
+
+ RawComparator comparator;
+
+ private long totalBytesProcessed;
+ private float progPerByte;
+ private Progress mergeProgress = new Progress();
+
+ Progressable reporter;
+
+ DataInputBuffer key;
+ final DataInputBuffer value = new DataInputBuffer();
+ final DataInputBuffer diskIFileValue = new DataInputBuffer();
+
+ Segment minSegment;
+ Comparator<Segment> segmentComparator =
+ new Comparator<Segment>() {
+ public int compare(Segment o1, Segment o2) {
+ if (o1.getLength() == o2.getLength()) {
+ return 0;
+ }
+
+ return o1.getLength() < o2.getLength() ? -1 : 1;
+ }
+ };
+
+ public MergeQueue(Configuration conf, FileSystem fs,
+ Path[] inputs, boolean deleteInputs,
+ CompressionCodec codec, RawComparator comparator,
+ Progressable reporter,
+ TezCounter mergedMapOutputsCounter)
+ throws IOException {
+ this.conf = conf;
+ this.fs = fs;
+ this.codec = codec;
+ this.comparator = comparator;
+ this.reporter = reporter;
+
+ for (Path file : inputs) {
+ LOG.debug("MergeQ: adding: " + file);
+ segments.add(new Segment(conf, fs, file, codec, !deleteInputs,
+ (file.toString().endsWith(
+ Constants.MERGED_OUTPUT_PREFIX) ?
+ null : mergedMapOutputsCounter)));
+ }
+
+ // Sort segments on file-lengths
+ Collections.sort(segments, segmentComparator);
+ }
+
+ public MergeQueue(Configuration conf, FileSystem fs,
+ List<Segment> segments, RawComparator comparator,
+ Progressable reporter, boolean sortSegments) {
+ this.conf = conf;
+ this.fs = fs;
+ this.comparator = comparator;
+ this.segments = segments;
+ this.reporter = reporter;
+ if (sortSegments) {
+ Collections.sort(segments, segmentComparator);
+ }
+ }
+
+ public MergeQueue(Configuration conf, FileSystem fs,
+ List<Segment> segments, RawComparator comparator,
+ Progressable reporter, boolean sortSegments, CompressionCodec codec) {
+ this(conf, fs, segments, comparator, reporter, sortSegments);
+ this.codec = codec;
+ }
+
+ public void close() throws IOException {
+ Segment segment;
+ while((segment = pop()) != null) {
+ segment.close();
+ }
+ }
+
+ public DataInputBuffer getKey() throws IOException {
+ return key;
+ }
+
+ public DataInputBuffer getValue() throws IOException {
+ return value;
+ }
+
+ private void adjustPriorityQueue(Segment reader) throws IOException{
+ long startPos = reader.getPosition();
+ boolean hasNext = reader.nextRawKey();
+ long endPos = reader.getPosition();
+ totalBytesProcessed += endPos - startPos;
+ mergeProgress.set(totalBytesProcessed * progPerByte);
+ if (hasNext) {
+ adjustTop();
+ } else {
+ pop();
+ reader.close();
+ }
+ }
+
+ public boolean next() throws IOException {
+ if (size() == 0)
+ return false;
+
+ if (minSegment != null) {
+ //minSegment is non-null for all invocations of next except the first
+ //one. For the first invocation, the priority queue is ready for use
+ //but for the subsequent invocations, first adjust the queue
+ adjustPriorityQueue(minSegment);
+ if (size() == 0) {
+ minSegment = null;
+ return false;
+ }
+ }
+ minSegment = top();
+ if (!minSegment.inMemory()) {
+ //When we load the value from an inmemory segment, we reset
+ //the "value" DIB in this class to the inmem segment's byte[].
+ //When we load the value bytes from disk, we shouldn't use
+ //the same byte[] since it would corrupt the data in the inmem
+ //segment. So we maintain an explicit DIB for value bytes
+ //obtained from disk, and if the current segment is a disk
+ //segment, we reset the "value" DIB to the byte[] in that (so
+ //we reuse the disk segment DIB whenever we consider
+ //a disk segment).
+ value.reset(diskIFileValue.getData(), diskIFileValue.getLength());
+ }
+ long startPos = minSegment.getPosition();
+ key = minSegment.getKey();
+ minSegment.getValue(value);
+ long endPos = minSegment.getPosition();
+ totalBytesProcessed += endPos - startPos;
+ mergeProgress.set(totalBytesProcessed * progPerByte);
+ return true;
+ }
+
+ protected boolean lessThan(Object a, Object b) {
+ DataInputBuffer key1 = ((Segment)a).getKey();
+ DataInputBuffer key2 = ((Segment)b).getKey();
+ int s1 = key1.getPosition();
+ int l1 = key1.getLength() - s1;
+ int s2 = key2.getPosition();
+ int l2 = key2.getLength() - s2;
+
+ return comparator.compare(key1.getData(), s1, l1, key2.getData(), s2, l2) < 0;
+ }
+
+ public TezRawKeyValueIterator merge(Class keyClass, Class valueClass,
+ int factor, Path tmpDir,
+ TezCounter readsCounter,
+ TezCounter writesCounter,
+ Progress mergePhase)
+ throws IOException {
+ return merge(keyClass, valueClass, factor, 0, tmpDir,
+ readsCounter, writesCounter, mergePhase);
+ }
+
+ TezRawKeyValueIterator merge(Class keyClass, Class valueClass,
+ int factor, int inMem, Path tmpDir,
+ TezCounter readsCounter,
+ TezCounter writesCounter,
+ Progress mergePhase)
+ throws IOException {
+ LOG.info("Merging " + segments.size() + " sorted segments");
+
+ /*
+ * If there are inMemory segments, then they come first in the segments
+ * list and then the sorted disk segments. Otherwise(if there are only
+ * disk segments), then they are sorted segments if there are more than
+ * factor segments in the segments list.
+ */
+ int numSegments = segments.size();
+ int origFactor = factor;
+ int passNo = 1;
+ if (mergePhase != null) {
+ mergeProgress = mergePhase;
+ }
+
+ long totalBytes = computeBytesInMerges(factor, inMem);
+ if (totalBytes != 0) {
+ progPerByte = 1.0f / (float)totalBytes;
+ }
+
+ //create the MergeStreams from the sorted map created in the constructor
+ //and dump the final output to a file
+ do {
+ //get the factor for this pass of merge. We assume in-memory segments
+ //are the first entries in the segment list and that the pass factor
+ //doesn't apply to them
+ factor = getPassFactor(factor, passNo, numSegments - inMem);
+ if (1 == passNo) {
+ factor += inMem;
+ }
+ List<Segment> segmentsToMerge =
+ new ArrayList<Segment>();
+ int segmentsConsidered = 0;
+ int numSegmentsToConsider = factor;
+ long startBytes = 0; // starting bytes of segments of this merge
+ while (true) {
+ //extract the smallest 'factor' number of segments
+ //Call cleanup on the empty segments (no key/value data)
+ List<Segment> mStream =
+ getSegmentDescriptors(numSegmentsToConsider);
+ for (Segment segment : mStream) {
+ // Initialize the segment at the last possible moment;
+ // this helps in ensuring we don't use buffers until we need them
+ segment.init(readsCounter);
+ long startPos = segment.getPosition();
+ boolean hasNext = segment.nextRawKey();
+ long endPos = segment.getPosition();
+
+ if (hasNext) {
+ startBytes += endPos - startPos;
+ segmentsToMerge.add(segment);
+ segmentsConsidered++;
+ }
+ else {
+ segment.close();
+ numSegments--; //we ignore this segment for the merge
+ }
+ }
+ //if we have the desired number of segments
+ //or looked at all available segments, we break
+ if (segmentsConsidered == factor ||
+ segments.size() == 0) {
+ break;
+ }
+
+ numSegmentsToConsider = factor - segmentsConsidered;
+ }
+
+ //feed the streams to the priority queue
+ initialize(segmentsToMerge.size());
+ clear();
+ for (Segment segment : segmentsToMerge) {
+ put(segment);
+ }
+
+ //if we have lesser number of segments remaining, then just return the
+ //iterator, else do another single level merge
+ if (numSegments <= factor) {
+ if (!includeFinalMerge) { // for reduce task
+
+ // Reset totalBytesProcessed and recalculate totalBytes from the
+ // remaining segments to track the progress of the final merge.
+ // Final merge is considered as the progress of the reducePhase,
+ // the 3rd phase of reduce task.
+ totalBytesProcessed = 0;
+ totalBytes = 0;
+ for (int i = 0; i < segmentsToMerge.size(); i++) {
+ totalBytes += segmentsToMerge.get(i).getLength();
+ }
+ }
+ if (totalBytes != 0) //being paranoid
+ progPerByte = 1.0f / (float)totalBytes;
+
+ totalBytesProcessed += startBytes;
+ if (totalBytes != 0)
+ mergeProgress.set(totalBytesProcessed * progPerByte);
+ else
+ mergeProgress.set(1.0f); // Last pass and no segments left - we're done
+
+ LOG.info("Down to the last merge-pass, with " + numSegments +
+ " segments left of total size: " +
+ (totalBytes - totalBytesProcessed) + " bytes");
+ return this;
+ } else {
+ LOG.info("Merging " + segmentsToMerge.size() +
+ " intermediate segments out of a total of " +
+ (segments.size()+segmentsToMerge.size()));
+
+ long bytesProcessedInPrevMerges = totalBytesProcessed;
+ totalBytesProcessed += startBytes;
+
+ //we want to spread the creation of temp files on multiple disks if
+ //available under the space constraints
+ long approxOutputSize = 0;
+ for (Segment s : segmentsToMerge) {
+ approxOutputSize += s.getLength() +
+ ChecksumFileSystem.getApproxChkSumLength(
+ s.getLength());
+ }
+ Path tmpFilename =
+ new Path(tmpDir, "intermediate").suffix("." + passNo);
+
+ Path outputFile = lDirAlloc.getLocalPathForWrite(
+ tmpFilename.toString(),
+ approxOutputSize, conf);
+
+ Writer writer =
+ new Writer(conf, fs, outputFile, keyClass, valueClass, codec,
+ writesCounter);
+ writeFile(this, writer, reporter, conf);
+ writer.close();
+
+ //we finished one single level merge; now clean up the priority
+ //queue
+ this.close();
+
+ // Add the newly create segment to the list of segments to be merged
+ Segment tempSegment =
+ new Segment(conf, fs, outputFile, codec, false);
+
+ // Insert new merged segment into the sorted list
+ int pos = Collections.binarySearch(segments, tempSegment,
+ segmentComparator);
+ if (pos < 0) {
+ // binary search failed. So position to be inserted at is -pos-1
+ pos = -pos-1;
+ }
+ segments.add(pos, tempSegment);
+ numSegments = segments.size();
+
+ // Subtract the difference between expected size of new segment and
+ // actual size of new segment(Expected size of new segment is
+ // inputBytesOfThisMerge) from totalBytes. Expected size and actual
+ // size will match(almost) if combiner is not called in merge.
+ long inputBytesOfThisMerge = totalBytesProcessed -
+ bytesProcessedInPrevMerges;
+ totalBytes -= inputBytesOfThisMerge - tempSegment.getLength();
+ if (totalBytes != 0) {
+ progPerByte = 1.0f / (float)totalBytes;
+ }
+
+ passNo++;
+ }
+ //we are worried about only the first pass merge factor. So reset the
+ //factor to what it originally was
+ factor = origFactor;
+ } while(true);
+ }
+
+ /**
+ * Determine the number of segments to merge in a given pass. Assuming more
+ * than factor segments, the first pass should attempt to bring the total
+ * number of segments - 1 to be divisible by the factor - 1 (each pass
+ * takes X segments and produces 1) to minimize the number of merges.
+ */
+ private int getPassFactor(int factor, int passNo, int numSegments) {
+ if (passNo > 1 || numSegments <= factor || factor == 1)
+ return factor;
+ int mod = (numSegments - 1) % (factor - 1);
+ if (mod == 0)
+ return factor;
+ return mod + 1;
+ }
+
+ /** Return (& remove) the requested number of segment descriptors from the
+ * sorted map.
+ */
+ private List<Segment> getSegmentDescriptors(int numDescriptors) {
+ if (numDescriptors > segments.size()) {
+ List<Segment> subList = new ArrayList<Segment>(segments);
+ segments.clear();
+ return subList;
+ }
+
+ List<Segment> subList =
+ new ArrayList<Segment>(segments.subList(0, numDescriptors));
+ for (int i=0; i < numDescriptors; ++i) {
+ segments.remove(0);
+ }
+ return subList;
+ }
+
+ /**
+ * Compute expected size of input bytes to merges, will be used in
+ * calculating mergeProgress. This simulates the above merge() method and
+ * tries to obtain the number of bytes that are going to be merged in all
+ * merges(assuming that there is no combiner called while merging).
+ * @param factor mapreduce.task.io.sort.factor
+ * @param inMem number of segments in memory to be merged
+ */
+ long computeBytesInMerges(int factor, int inMem) {
+ int numSegments = segments.size();
+ List<Long> segmentSizes = new ArrayList<Long>(numSegments);
+ long totalBytes = 0;
+ int n = numSegments - inMem;
+ // factor for 1st pass
+ int f = getPassFactor(factor, 1, n) + inMem;
+ n = numSegments;
+
+ for (int i = 0; i < numSegments; i++) {
+ // Not handling empty segments here assuming that it would not affect
+ // much in calculation of mergeProgress.
+ segmentSizes.add(segments.get(i).getLength());
+ }
+
+ // If includeFinalMerge is true, allow the following while loop iterate
+ // for 1 more iteration. This is to include final merge as part of the
+ // computation of expected input bytes of merges
+ boolean considerFinalMerge = includeFinalMerge;
+
+ while (n > f || considerFinalMerge) {
+ if (n <=f ) {
+ considerFinalMerge = false;
+ }
+ long mergedSize = 0;
+ f = Math.min(f, segmentSizes.size());
+ for (int j = 0; j < f; j++) {
+ mergedSize += segmentSizes.remove(0);
+ }
+ totalBytes += mergedSize;
+
+ // insert new size into the sorted list
+ int pos = Collections.binarySearch(segmentSizes, mergedSize);
+ if (pos < 0) {
+ pos = -pos-1;
+ }
+ segmentSizes.add(pos, mergedSize);
+
+ n -= (f-1);
+ f = factor;
+ }
+
+ return totalBytes;
+ }
+
+ public Progress getProgress() {
+ return mergeProgress;
+ }
+
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezRawKeyValueIterator.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezRawKeyValueIterator.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezRawKeyValueIterator.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezRawKeyValueIterator.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.common.sort.impl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.util.Progress;
+
+/**
+ * <code>TezRawKeyValueIterator</code> is an iterator used to iterate over
+ * the raw keys and values during sort/merge of intermediate data.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface TezRawKeyValueIterator {
+ /**
+ * Gets the current raw key.
+ *
+ * @return Gets the current raw key as a DataInputBuffer
+ * @throws IOException
+ */
+ DataInputBuffer getKey() throws IOException;
+
+ /**
+ * Gets the current raw value.
+ *
+ * @return Gets the current raw value as a DataInputBuffer
+ * @throws IOException
+ */
+ DataInputBuffer getValue() throws IOException;
+
+ /**
+ * Sets up the current key and value (for getKey and getValue).
+ *
+ * @return <code>true</code> if there exists a key/value,
+ * <code>false</code> otherwise.
+ * @throws IOException
+ */
+ boolean next() throws IOException;
+
+ /**
+ * Closes the iterator so that the underlying streams can be closed.
+ *
+ * @throws IOException
+ */
+ void close() throws IOException;
+
+ /** Gets the Progress object; this has a float (0.0 - 1.0)
+ * indicating the bytes processed by the iterator so far
+ */
+ Progress getProgress();
+}
Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezRawKeyValueIterator.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezSpillRecord.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezSpillRecord.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezSpillRecord.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezSpillRecord.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.common.sort.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.LongBuffer;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.CheckedOutputStream;
+import java.util.zip.Checksum;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.PureJavaCrc32;
+import org.apache.tez.common.Constants;
+
+public class TezSpillRecord {
+
+ /** Backing store */
+ private final ByteBuffer buf;
+ /** View of backing storage as longs */
+ private final LongBuffer entries;
+
+ public TezSpillRecord(int numPartitions) {
+ buf = ByteBuffer.allocate(
+ numPartitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH);
+ entries = buf.asLongBuffer();
+ }
+
+ public TezSpillRecord(Path indexFileName, Configuration job) throws IOException {
+ this(indexFileName, job, null);
+ }
+
+ public TezSpillRecord(Path indexFileName, Configuration job, String expectedIndexOwner)
+ throws IOException {
+ this(indexFileName, job, new PureJavaCrc32(), expectedIndexOwner);
+ }
+
+ public TezSpillRecord(Path indexFileName, Configuration job, Checksum crc,
+ String expectedIndexOwner)
+ throws IOException {
+
+ final FileSystem rfs = FileSystem.getLocal(job).getRaw();
+ final FSDataInputStream in = rfs.open(indexFileName);
+ try {
+ final long length = rfs.getFileStatus(indexFileName).getLen();
+ final int partitions =
+ (int) length / Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
+ final int size = partitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
+
+ buf = ByteBuffer.allocate(size);
+ if (crc != null) {
+ crc.reset();
+ CheckedInputStream chk = new CheckedInputStream(in, crc);
+ IOUtils.readFully(chk, buf.array(), 0, size);
+ if (chk.getChecksum().getValue() != in.readLong()) {
+ throw new ChecksumException("Checksum error reading spill index: " +
+ indexFileName, -1);
+ }
+ } else {
+ IOUtils.readFully(in, buf.array(), 0, size);
+ }
+ entries = buf.asLongBuffer();
+ } finally {
+ in.close();
+ }
+ }
+
+ /**
+ * Return number of IndexRecord entries in this spill.
+ */
+ public int size() {
+ return entries.capacity() / (Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8);
+ }
+
+ /**
+ * Get spill offsets for given partition.
+ */
+ public TezIndexRecord getIndex(int partition) {
+ final int pos = partition * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8;
+ return new TezIndexRecord(entries.get(pos), entries.get(pos + 1),
+ entries.get(pos + 2));
+ }
+
+ /**
+ * Set spill offsets for given partition.
+ */
+ public void putIndex(TezIndexRecord rec, int partition) {
+ final int pos = partition * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8;
+ entries.put(pos, rec.getStartOffset());
+ entries.put(pos + 1, rec.getRawLength());
+ entries.put(pos + 2, rec.getPartLength());
+ }
+
+ /**
+ * Write this spill record to the location provided.
+ */
+ public void writeToFile(Path loc, Configuration job)
+ throws IOException {
+ writeToFile(loc, job, new PureJavaCrc32());
+ }
+
+ public void writeToFile(Path loc, Configuration job, Checksum crc)
+ throws IOException {
+ final FileSystem rfs = FileSystem.getLocal(job).getRaw();
+ CheckedOutputStream chk = null;
+ final FSDataOutputStream out = rfs.create(loc);
+ try {
+ if (crc != null) {
+ crc.reset();
+ chk = new CheckedOutputStream(out, crc);
+ chk.write(buf.array());
+ out.writeLong(chk.getChecksum().getValue());
+ } else {
+ out.write(buf.array());
+ }
+ } finally {
+ if (chk != null) {
+ chk.close();
+ } else {
+ out.close();
+ }
+ }
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezSpillRecord.java
------------------------------------------------------------------------------
svn:eol-style = native