You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/25 09:31:47 UTC
[40/50] [abbrv] Rename tez-engine-api to tez-runtime-api and
tez-engine is split into 2: - tez-engine-library for user-visible
Input/Output/Processor implementations - tez-engine-internals for framework
internals
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java
deleted file mode 100644
index 1bf17a3..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java
+++ /dev/null
@@ -1,932 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.tez.engine.common.sort.impl;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.BufferOverflowException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.IntBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.PriorityQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.HashComparator;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.util.IndexedSortable;
-import org.apache.hadoop.util.IndexedSorter;
-import org.apache.hadoop.util.Progress;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.engine.api.TezOutputContext;
-import org.apache.tez.engine.common.ConfigUtils;
-import org.apache.tez.engine.common.sort.impl.IFile.Writer;
-import org.apache.tez.engine.common.sort.impl.TezMerger.Segment;
-
-@SuppressWarnings({"unchecked", "rawtypes"})
-public class PipelinedSorter extends ExternalSorter {
-
- private static final Log LOG = LogFactory.getLog(PipelinedSorter.class);
-
- /**
- * The size of each record in the index file for the map-outputs.
- */
- public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
-
- private final static int APPROX_HEADER_LENGTH = 150;
-
- int partitionBits;
-
- private static final int PARTITION = 0; // partition offset in acct
- private static final int KEYSTART = 1; // key offset in acct
- private static final int VALSTART = 2; // val offset in acct
- private static final int VALLEN = 3; // val len in acct
- private static final int NMETA = 4; // num meta ints
- private static final int METASIZE = NMETA * 4; // size in bytes
-
- // spill accounting
- volatile Throwable sortSpillException = null;
-
- int numSpills = 0;
- int minSpillsForCombine;
- private HashComparator hasher;
- // SortSpans
- private SortSpan span;
- private ByteBuffer largeBuffer;
- // Merger
- private SpanMerger merger;
- private ExecutorService sortmaster;
-
- final ArrayList<TezSpillRecord> indexCacheList =
- new ArrayList<TezSpillRecord>();
- private int totalIndexCacheMemory;
- private int indexCacheMemoryLimit;
-
- public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException {
- super.initialize(outputContext, conf, numOutputs);
-
- partitionBits = bitcount(partitions)+1;
-
- //sanity checks
- final float spillper =
- this.conf.getFloat(
- TezJobConfig.TEZ_ENGINE_SORT_SPILL_PERCENT,
- TezJobConfig.DEFAULT_TEZ_ENGINE_SORT_SPILL_PERCENT);
- final int sortmb =
- this.conf.getInt(
- TezJobConfig.TEZ_ENGINE_IO_SORT_MB,
- TezJobConfig.DEFAULT_TEZ_ENGINE_IO_SORT_MB);
- indexCacheMemoryLimit = this.conf.getInt(TezJobConfig.TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES,
- TezJobConfig.DEFAULT_TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES);
- if (spillper > (float)1.0 || spillper <= (float)0.0) {
- throw new IOException("Invalid \"" + TezJobConfig.TEZ_ENGINE_SORT_SPILL_PERCENT +
- "\": " + spillper);
- }
- if ((sortmb & 0x7FF) != sortmb) {
- throw new IOException(
- "Invalid \"" + TezJobConfig.TEZ_ENGINE_IO_SORT_MB + "\": " + sortmb);
- }
-
- // buffers and accounting
- int maxMemUsage = sortmb << 20;
- maxMemUsage -= maxMemUsage % METASIZE;
- largeBuffer = ByteBuffer.allocate(maxMemUsage);
- LOG.info(TezJobConfig.TEZ_ENGINE_IO_SORT_MB + " = " + sortmb);
- // TODO: configurable setting?
- span = new SortSpan(largeBuffer, 1024*1024, 16);
- merger = new SpanMerger(comparator);
- final int sortThreads =
- this.conf.getInt(
- TezJobConfig.TEZ_ENGINE_SORT_THREADS,
- TezJobConfig.DEFAULT_TEZ_ENGINE_SORT_THREADS);
- sortmaster = Executors.newFixedThreadPool(sortThreads);
-
- // k/v serialization
- if(comparator instanceof HashComparator) {
- hasher = (HashComparator)comparator;
- LOG.info("Using the HashComparator");
- } else {
- hasher = null;
- }
- valSerializer.open(span.out);
- keySerializer.open(span.out);
- minSpillsForCombine = this.conf.getInt(TezJobConfig.TEZ_ENGINE_COMBINE_MIN_SPILLS, 3);
- }
-
- private int bitcount(int n) {
- int bit = 0;
- while(n!=0) {
- bit++;
- n >>= 1;
- }
- return bit;
- }
-
- public void sort() throws IOException {
- SortSpan newSpan = span.next();
-
- if(newSpan == null) {
- // sort in the same thread, do not wait for the thread pool
- merger.add(span.sort(sorter, comparator));
- spill();
- int items = 1024*1024;
- int perItem = 16;
- if(span.length() != 0) {
- items = span.length();
- perItem = span.kvbuffer.limit()/items;
- items = (largeBuffer.capacity())/(METASIZE+perItem);
- if(items > 1024*1024) {
- // our goal is to have 1M splits and sort early
- items = 1024*1024;
- }
- }
- span = new SortSpan(largeBuffer, items, perItem);
- } else {
- // queue up the sort
- SortTask task = new SortTask(span, sorter, comparator);
- Future<SpanIterator> future = sortmaster.submit(task);
- merger.add(future);
- span = newSpan;
- }
- valSerializer.open(span.out);
- keySerializer.open(span.out);
- }
-
- @Override
- public void write(Object key, Object value)
- throws IOException {
- collect(
- key, value, partitioner.getPartition(key, value, partitions));
- }
-
- /**
- * Serialize the key, value to intermediate storage.
- * When this method returns, kvindex must refer to sufficient unused
- * storage to store one METADATA.
- */
- synchronized void collect(Object key, Object value, final int partition
- ) throws IOException {
- if (key.getClass() != keyClass) {
- throw new IOException("Type mismatch in key from map: expected "
- + keyClass.getName() + ", received "
- + key.getClass().getName());
- }
- if (value.getClass() != valClass) {
- throw new IOException("Type mismatch in value from map: expected "
- + valClass.getName() + ", received "
- + value.getClass().getName());
- }
- if (partition < 0 || partition >= partitions) {
- throw new IOException("Illegal partition for " + key + " (" +
- partition + ")");
- }
- if(span.kvmeta.remaining() < METASIZE) {
- this.sort();
- }
- int keystart = span.kvbuffer.position();
- int valstart = -1;
- int valend = -1;
- try {
- keySerializer.serialize(key);
- valstart = span.kvbuffer.position();
- valSerializer.serialize(value);
- valend = span.kvbuffer.position();
- } catch(BufferOverflowException overflow) {
- // restore limit
- span.kvbuffer.position(keystart);
- this.sort();
- // try again
- this.collect(key, value, partition);
- return;
- }
-
- int prefix = 0;
-
- if(hasher != null) {
- prefix = hasher.getHashCode(key);
- }
-
- prefix = (partition << (32 - partitionBits)) | (prefix >>> partitionBits);
-
- /* maintain order as in PARTITION, KEYSTART, VALSTART, VALLEN */
- span.kvmeta.put(prefix);
- span.kvmeta.put(keystart);
- span.kvmeta.put(valstart);
- span.kvmeta.put(valend - valstart);
- if((valstart - keystart) > span.keymax) {
- span.keymax = (valstart - keystart);
- }
- if((valend - valstart) > span.valmax) {
- span.valmax = (valend - valstart);
- }
- mapOutputRecordCounter.increment(1);
- mapOutputByteCounter.increment(valend - keystart);
- }
-
- public void spill() throws IOException {
- // create spill file
- final long size = largeBuffer.capacity() +
- (partitions * APPROX_HEADER_LENGTH);
- final TezSpillRecord spillRec = new TezSpillRecord(partitions);
- final Path filename =
- mapOutputFile.getSpillFileForWrite(numSpills, size);
- FSDataOutputStream out = rfs.create(filename, true, 4096);
-
- try {
- merger.ready(); // wait for all the future results from sort threads
- LOG.info("Spilling to " + filename.toString());
- for (int i = 0; i < partitions; ++i) {
- TezRawKeyValueIterator kvIter = merger.filter(i);
- //write merged output to disk
- long segmentStart = out.getPos();
- Writer writer =
- new Writer(conf, out, keyClass, valClass, codec,
- spilledRecordsCounter);
- writer.setRLE(merger.needsRLE());
- if (combiner == null) {
- while(kvIter.next()) {
- writer.append(kvIter.getKey(), kvIter.getValue());
- }
- } else {
- runCombineProcessor(kvIter, writer);
- }
- //close
- writer.close();
-
- // record offsets
- final TezIndexRecord rec =
- new TezIndexRecord(
- segmentStart,
- writer.getRawLength(),
- writer.getCompressedLength());
- spillRec.putIndex(rec, i);
- }
-
- Path indexFilename =
- mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
- * MAP_OUTPUT_INDEX_RECORD_LENGTH);
- // TODO: cache
- spillRec.writeToFile(indexFilename, conf);
- ++numSpills;
- } catch(InterruptedException ie) {
- // TODO:the combiner has been interrupted
- } finally {
- out.close();
- }
- }
-
- @Override
- public void flush() throws IOException {
- final String uniqueIdentifier = outputContext.getUniqueIdentifier();
- Path finalOutputFile =
- mapOutputFile.getOutputFileForWrite(0); //TODO
- Path finalIndexFile =
- mapOutputFile.getOutputIndexFileForWrite(0); //TODO
-
- LOG.info("Starting flush of map output");
- span.end();
- merger.add(span.sort(sorter, comparator));
- spill();
- sortmaster.shutdown();
-
- largeBuffer = null;
-
- if(numSpills == 1) {
- // someday be able to pass this directly to shuffle
- // without writing to disk
- final Path filename =
- mapOutputFile.getSpillFile(0);
- Path indexFilename =
- mapOutputFile.getSpillIndexFile(0);
- sameVolRename(filename, finalOutputFile);
- sameVolRename(indexFilename, finalIndexFile);
- return;
- }
-
- //The output stream for the final single output file
- FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
-
- TezMerger.considerFinalMergeForProgress();
-
- final TezSpillRecord spillRec = new TezSpillRecord(partitions);
- final ArrayList<TezSpillRecord> indexCacheList = new ArrayList<TezSpillRecord>();
-
- for(int i = 0; i < numSpills; i++) {
- // TODO: build this cache before
- Path indexFilename = mapOutputFile.getSpillIndexFile(i);
- TezSpillRecord spillIndex = new TezSpillRecord(indexFilename, conf);
- indexCacheList.add(spillIndex);
- }
-
- for (int parts = 0; parts < partitions; parts++) {
- //create the segments to be merged
- List<Segment> segmentList =
- new ArrayList<Segment>(numSpills);
- for(int i = 0; i < numSpills; i++) {
- Path spillFilename = mapOutputFile.getSpillFile(i);
- TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
-
- Segment s =
- new Segment(conf, rfs, spillFilename, indexRecord.getStartOffset(),
- indexRecord.getPartLength(), codec, true);
- segmentList.add(i, s);
- }
-
- int mergeFactor =
- this.conf.getInt(TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR,
- TezJobConfig.DEFAULT_TEZ_ENGINE_IO_SORT_FACTOR);
- // sort the segments only if there are intermediate merges
- boolean sortSegments = segmentList.size() > mergeFactor;
- //merge
- TezRawKeyValueIterator kvIter = TezMerger.merge(conf, rfs,
- keyClass, valClass, codec,
- segmentList, mergeFactor,
- new Path(uniqueIdentifier),
- (RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(conf),
- nullProgressable, sortSegments,
- null, spilledRecordsCounter,
- null); // Not using any Progress in TezMerger. Should just work.
-
- //write merged output to disk
- long segmentStart = finalOut.getPos();
- Writer writer =
- new Writer(conf, finalOut, keyClass, valClass, codec,
- spilledRecordsCounter);
- writer.setRLE(merger.needsRLE());
- if (combiner == null || numSpills < minSpillsForCombine) {
- TezMerger.writeFile(kvIter, writer, nullProgressable, conf);
- } else {
- runCombineProcessor(kvIter, writer);
- }
-
- //close
- writer.close();
-
- // record offsets
- final TezIndexRecord rec =
- new TezIndexRecord(
- segmentStart,
- writer.getRawLength(),
- writer.getCompressedLength());
- spillRec.putIndex(rec, parts);
- }
-
- spillRec.writeToFile(finalIndexFile, conf);
- finalOut.close();
- for(int i = 0; i < numSpills; i++) {
- Path indexFilename = mapOutputFile.getSpillIndexFile(i);
- Path spillFilename = mapOutputFile.getSpillFile(i);
- rfs.delete(indexFilename,true);
- rfs.delete(spillFilename,true);
- }
- }
-
- public void close() { }
-
- private interface PartitionedRawKeyValueIterator extends TezRawKeyValueIterator {
- int getPartition();
- }
-
- private class BufferStreamWrapper extends OutputStream
- {
- private final ByteBuffer out;
- public BufferStreamWrapper(ByteBuffer out) {
- this.out = out;
- }
-
- @Override
- public void write(int b) throws IOException { out.put((byte)b); }
- @Override
- public void write(byte[] b) throws IOException { out.put(b); }
- @Override
- public void write(byte[] b, int off, int len) throws IOException { out.put(b, off, len); }
- }
-
- protected class InputByteBuffer extends DataInputBuffer {
- private byte[] buffer = new byte[256];
- private ByteBuffer wrapped = ByteBuffer.wrap(buffer);
- private void resize(int length) {
- if(length > buffer.length) {
- buffer = new byte[length];
- wrapped = ByteBuffer.wrap(buffer);
- }
- wrapped.limit(length);
- }
- public void reset(ByteBuffer b, int start, int length) {
- resize(length);
- b.position(start);
- b.get(buffer, 0, length);
- super.reset(buffer, 0, length);
- }
- // clone-ish function
- public void reset(DataInputBuffer clone) {
- byte[] data = clone.getData();
- int start = clone.getPosition();
- int length = clone.getLength();
- resize(length);
- System.arraycopy(data, start, buffer, 0, length);
- super.reset(buffer, 0, length);
- }
- }
-
- private class SortSpan implements IndexedSortable {
- final IntBuffer kvmeta;
- final ByteBuffer kvbuffer;
- final DataOutputStream out;
- private RawComparator comparator;
- final int imeta[] = new int[NMETA];
- final int jmeta[] = new int[NMETA];
- int keymax = 1;
- int valmax = 1;
- private int i,j;
- private byte[] ki;
- private byte[] kj;
- private int index = 0;
- private InputByteBuffer hay = new InputByteBuffer();
- private long eq = 0;
-
- public SortSpan(ByteBuffer source, int maxItems, int perItem) {
- int capacity = source.remaining();
- int metasize = METASIZE*maxItems;
- int dataSize = maxItems * perItem;
- if(capacity < (metasize+dataSize)) {
- // try to allocate less meta space, because we have sample data
- metasize = METASIZE*(capacity/(perItem+METASIZE));
- }
- ByteBuffer reserved = source.duplicate();
- reserved.mark();
- LOG.info("reserved.remaining() = "+reserved.remaining());
- LOG.info("reserved.size = "+metasize);
- reserved.position(metasize);
- kvbuffer = reserved.slice();
- reserved.flip();
- reserved.limit(metasize);
- kvmeta = reserved
- .slice()
- .order(ByteOrder.nativeOrder())
- .asIntBuffer();
- out = new DataOutputStream(
- new BufferStreamWrapper(kvbuffer));
- }
-
- public SpanIterator sort(IndexedSorter sorter, RawComparator comparator) {
- this.comparator = comparator;
- ki = new byte[keymax];
- kj = new byte[keymax];
- LOG.info("begin sorting Span"+index + " ("+length()+")");
- if(length() > 1) {
- sorter.sort(this, 0, length(), nullProgressable);
- }
- LOG.info("done sorting Span"+index);
- return new SpanIterator(this);
- }
-
- int offsetFor(int i) {
- return (i * NMETA);
- }
-
- public void swap(final int mi, final int mj) {
- final int kvi = offsetFor(mi);
- final int kvj = offsetFor(mj);
-
- kvmeta.position(kvi); kvmeta.get(imeta);
- kvmeta.position(kvj); kvmeta.get(jmeta);
- kvmeta.position(kvj); kvmeta.put(imeta);
- kvmeta.position(kvi); kvmeta.put(jmeta);
-
- if(i == mi || j == mj) i = -1;
- if(i == mi || j == mj) j = -1;
- }
-
- public int compare(final int mi, final int mj) {
- final int kvi = offsetFor(mi);
- final int kvj = offsetFor(mj);
- final int kvip = kvmeta.get(kvi + PARTITION);
- final int kvjp = kvmeta.get(kvj + PARTITION);
- // sort by partition
- if (kvip != kvjp) {
- return kvip - kvjp;
- }
-
- final int istart = kvmeta.get(kvi + KEYSTART);
- final int jstart = kvmeta.get(kvj + KEYSTART);
- final int ilen = kvmeta.get(kvi + VALSTART) - istart;
- final int jlen = kvmeta.get(kvj + VALSTART) - jstart;
-
- kvbuffer.position(istart);
- kvbuffer.get(ki, 0, ilen);
- kvbuffer.position(jstart);
- kvbuffer.get(kj, 0, jlen);
- // sort by key
- final int cmp = comparator.compare(ki, 0, ilen, kj, 0, jlen);
- if(cmp == 0) eq++;
- return cmp;
- }
-
- public SortSpan next() {
- ByteBuffer remaining = end();
- if(remaining != null) {
- int items = length();
- int perItem = kvbuffer.position()/items;
- SortSpan newSpan = new SortSpan(remaining, items, perItem);
- newSpan.index = index+1;
- return newSpan;
- }
- return null;
- }
-
- public int length() {
- return kvmeta.limit()/NMETA;
- }
-
- public ByteBuffer end() {
- ByteBuffer remaining = kvbuffer.duplicate();
- remaining.position(kvbuffer.position());
- remaining = remaining.slice();
- kvbuffer.limit(kvbuffer.position());
- kvmeta.limit(kvmeta.position());
- int items = length();
- if(items == 0) {
- return null;
- }
- int perItem = kvbuffer.position()/items;
- LOG.info(String.format("Span%d.length = %d, perItem = %d", index, length(), perItem));
- if(remaining.remaining() < NMETA+perItem) {
- return null;
- }
- return remaining;
- }
-
- private int compareInternal(DataInputBuffer needle, int needlePart, int index) {
- int cmp = 0;
- int keystart;
- int valstart;
- int partition;
- partition = kvmeta.get(span.offsetFor(index) + PARTITION);
- if(partition != needlePart) {
- cmp = (partition-needlePart);
- } else {
- keystart = kvmeta.get(span.offsetFor(index) + KEYSTART);
- valstart = kvmeta.get(span.offsetFor(index) + VALSTART);
- // hay is allocated ahead of time
- hay.reset(kvbuffer, keystart, valstart - keystart);
- cmp = comparator.compare(hay.getData(),
- hay.getPosition(), hay.getLength(),
- needle.getData(),
- needle.getPosition(), needle.getLength());
- }
- return cmp;
- }
-
- public long getEq() {
- return eq;
- }
-
- @Override
- public String toString() {
- return String.format("Span[%d,%d]", NMETA*kvmeta.capacity(), kvbuffer.limit());
- }
- }
-
- private class SpanIterator implements PartitionedRawKeyValueIterator, Comparable<SpanIterator> {
- private int kvindex = -1;
- private int maxindex;
- private IntBuffer kvmeta;
- private ByteBuffer kvbuffer;
- private SortSpan span;
- private InputByteBuffer key = new InputByteBuffer();
- private InputByteBuffer value = new InputByteBuffer();
- private Progress progress = new Progress();
-
- private final int minrun = (1 << 4);
-
- public SpanIterator(SortSpan span) {
- this.kvmeta = span.kvmeta;
- this.kvbuffer = span.kvbuffer;
- this.span = span;
- this.maxindex = (kvmeta.limit()/NMETA) - 1;
- }
-
- public DataInputBuffer getKey() throws IOException {
- final int keystart = kvmeta.get(span.offsetFor(kvindex) + KEYSTART);
- final int valstart = kvmeta.get(span.offsetFor(kvindex) + VALSTART);
- key.reset(kvbuffer, keystart, valstart - keystart);
- return key;
- }
-
- public DataInputBuffer getValue() throws IOException {
- final int valstart = kvmeta.get(span.offsetFor(kvindex) + VALSTART);
- final int vallen = kvmeta.get(span.offsetFor(kvindex) + VALLEN);
- value.reset(kvbuffer, valstart, vallen);
- return value;
- }
-
- public boolean next() throws IOException {
- // caveat: since we use this as a comparable in the merger
- if(kvindex == maxindex) return false;
- if(kvindex % 100 == 0) {
- progress.set((kvindex-maxindex) / maxindex);
- }
- kvindex += 1;
- return true;
- }
-
- public void close() throws IOException {
- }
-
- public Progress getProgress() {
- return progress;
- }
-
- public int getPartition() {
- final int partition = kvmeta.get(span.offsetFor(kvindex) + PARTITION);
- return partition;
- }
-
- public int size() {
- return (maxindex - kvindex);
- }
-
- public int compareTo(SpanIterator other) {
- try {
- return span.compareInternal(other.getKey(), other.getPartition(), kvindex);
- } catch(IOException ie) {
- // since we're not reading off disk, how could getKey() throw exceptions?
- }
- return -1;
- }
-
- @Override
- public String toString() {
- return String.format("SpanIterator<%d:%d> (span=%s)", kvindex, maxindex, span.toString());
- }
-
- /**
- * bisect returns the next insertion point for a given raw key, skipping keys
- * which are <= needle using a binary search instead of a linear comparison.
- * This is massively efficient when long strings of identical keys occur.
- * @param needle
- * @param needlePart
- * @return
- */
- int bisect(DataInputBuffer needle, int needlePart) {
- int start = kvindex;
- int end = maxindex-1;
- int mid = start;
- int cmp = 0;
-
- if(end - start < minrun) {
- return 0;
- }
-
- if(span.compareInternal(needle, needlePart, start) > 0) {
- return kvindex;
- }
-
- // bail out early if we haven't got a min run
- if(span.compareInternal(needle, needlePart, start+minrun) > 0) {
- return 0;
- }
-
- if(span.compareInternal(needle, needlePart, end) < 0) {
- return end - kvindex;
- }
-
- boolean found = false;
-
- // we sort 100k items, the max it can do is 20 loops, but break early
- for(int i = 0; start < end && i < 16; i++) {
- mid = start + (end - start)/2;
- cmp = span.compareInternal(needle, needlePart, mid);
- if(cmp == 0) {
- start = mid;
- found = true;
- } else if(cmp < 0) {
- start = mid;
- found = true;
- }
- if(cmp > 0) {
- end = mid;
- }
- }
-
- if(found) {
- return start - kvindex;
- }
- return 0;
- }
- }
-
- private class SortTask implements Callable<SpanIterator> {
- private final SortSpan sortable;
- private final IndexedSorter sorter;
- private final RawComparator comparator;
-
- public SortTask(SortSpan sortable,
- IndexedSorter sorter, RawComparator comparator) {
- this.sortable = sortable;
- this.sorter = sorter;
- this.comparator = comparator;
- }
-
- public SpanIterator call() {
- return sortable.sort(sorter, comparator);
- }
- }
-
- private class PartitionFilter implements TezRawKeyValueIterator {
- private final PartitionedRawKeyValueIterator iter;
- private int partition;
- private boolean dirty = false;
- public PartitionFilter(PartitionedRawKeyValueIterator iter) {
- this.iter = iter;
- }
- public DataInputBuffer getKey() throws IOException { return iter.getKey(); }
- public DataInputBuffer getValue() throws IOException { return iter.getValue(); }
- public void close() throws IOException { }
- public Progress getProgress() {
- return new Progress();
- }
- public boolean next() throws IOException {
- if(dirty || iter.next()) {
- int prefix = iter.getPartition();
-
- if((prefix >>> (32 - partitionBits)) == partition) {
- dirty = false; // we found what we were looking for, good
- return true;
- } else if(!dirty) {
- dirty = true; // we did a lookahead and failed to find partition
- }
- }
- return false;
- }
-
- public void reset(int partition) {
- this.partition = partition;
- }
-
- public int getPartition() {
- return this.partition;
- }
- }
-
- private class SpanHeap extends java.util.PriorityQueue<SpanIterator> {
- public SpanHeap() {
- super(256);
- }
- /**
- * {@link PriorityQueue}.poll() by a different name
- * @return
- */
- public SpanIterator pop() {
- return this.poll();
- }
- }
-
- private class SpanMerger implements PartitionedRawKeyValueIterator {
- private final RawComparator comparator;
- InputByteBuffer key = new InputByteBuffer();
- InputByteBuffer value = new InputByteBuffer();
- int partition;
-
- private ArrayList< Future<SpanIterator>> futures = new ArrayList< Future<SpanIterator>>();
-
- private SpanHeap heap = new SpanHeap();
- private PartitionFilter partIter;
-
- private int gallop = 0;
- private SpanIterator horse;
- private long total = 0;
- private long count = 0;
- private long eq = 0;
-
- public SpanMerger(RawComparator comparator) {
- this.comparator = comparator;
- partIter = new PartitionFilter(this);
- }
-
- public void add(SpanIterator iter) throws IOException{
- if(iter.next()) {
- heap.add(iter);
- }
- }
-
- public void add(Future<SpanIterator> iter) throws IOException{
- this.futures.add(iter);
- }
-
- public boolean ready() throws IOException, InterruptedException {
- try {
- SpanIterator iter = null;
- while(this.futures.size() > 0) {
- Future<SpanIterator> futureIter = this.futures.remove(0);
- iter = futureIter.get();
- this.add(iter);
- }
-
- StringBuilder sb = new StringBuilder();
- for(SpanIterator sp: heap) {
- sb.append(sp.toString());
- sb.append(",");
- total += sp.span.length();
- eq += sp.span.getEq();
- }
- LOG.info("Heap = " + sb.toString());
- return true;
- } catch(Exception e) {
- LOG.info(e.toString());
- return false;
- }
- }
-
- private SpanIterator pop() throws IOException {
- if(gallop > 0) {
- gallop--;
- return horse;
- }
- SpanIterator current = heap.pop();
- SpanIterator next = heap.peek();
- if(next != null && current != null &&
- ((Object)horse) == ((Object)current)) {
- // TODO: a better threshold check
- gallop = current.bisect(next.getKey(), next.getPartition())-1;
- }
- horse = current;
- return current;
- }
-
- public boolean needsRLE() {
- return (eq > 0.1 * total);
- }
-
- private SpanIterator peek() throws IOException {
- if(gallop > 0) {
- return horse;
- }
- return heap.peek();
- }
-
- public boolean next() throws IOException {
- SpanIterator current = pop();
-
- if(current != null) {
- // keep local copies, since add() will move it all out
- key.reset(current.getKey());
- value.reset(current.getValue());
- partition = current.getPartition();
- if(gallop <= 0) {
- this.add(current);
- } else {
- // galloping
- current.next();
- }
- return true;
- }
- return false;
- }
-
- public DataInputBuffer getKey() throws IOException { return key; }
- public DataInputBuffer getValue() throws IOException { return value; }
- public int getPartition() { return partition; }
-
- public void close() throws IOException {
- }
-
- public Progress getProgress() {
- // TODO
- return new Progress();
- }
-
- public TezRawKeyValueIterator filter(int partition) {
- partIter.reset(partition);
- return partIter;
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezIndexRecord.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezIndexRecord.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezIndexRecord.java
deleted file mode 100644
index ac0267c..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezIndexRecord.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.common.sort.impl;
-
-public class TezIndexRecord {
- private long startOffset;
- private long rawLength;
- private long partLength;
-
- public TezIndexRecord() { }
-
- public TezIndexRecord(long startOffset, long rawLength, long partLength) {
- this.startOffset = startOffset;
- this.rawLength = rawLength;
- this.partLength = partLength;
- }
-
- public long getStartOffset() {
- return startOffset;
- }
-
- public long getRawLength() {
- return rawLength;
- }
-
- public long getPartLength() {
- return partLength;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java
deleted file mode 100644
index 7815569..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java
+++ /dev/null
@@ -1,798 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.sort.impl;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ChecksumFileSystem;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.util.PriorityQueue;
-import org.apache.hadoop.util.Progress;
-import org.apache.hadoop.util.Progressable;
-import org.apache.tez.common.Constants;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.common.sort.impl.IFile.Reader;
-import org.apache.tez.engine.common.sort.impl.IFile.Writer;
-
-/**
- * Merger is an utility class used by the Map and Reduce tasks for merging
- * both their memory and disk segments
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-@SuppressWarnings({"unchecked", "rawtypes"})
-public class TezMerger {
- private static final Log LOG = LogFactory.getLog(TezMerger.class);
-
-
- // Local directories
- private static LocalDirAllocator lDirAlloc =
- new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
-
- public static
- TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
- Class keyClass, Class valueClass,
- CompressionCodec codec,
- Path[] inputs, boolean deleteInputs,
- int mergeFactor, Path tmpDir,
- RawComparator comparator, Progressable reporter,
- TezCounter readsCounter,
- TezCounter writesCounter,
- Progress mergePhase)
- throws IOException {
- return
- new MergeQueue(conf, fs, inputs, deleteInputs, codec, comparator,
- reporter, null).merge(keyClass, valueClass,
- mergeFactor, tmpDir,
- readsCounter, writesCounter,
- mergePhase);
- }
-
- public static
- TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
- Class keyClass, Class valueClass,
- CompressionCodec codec,
- Path[] inputs, boolean deleteInputs,
- int mergeFactor, Path tmpDir,
- RawComparator comparator,
- Progressable reporter,
- TezCounter readsCounter,
- TezCounter writesCounter,
- TezCounter mergedMapOutputsCounter,
- Progress mergePhase)
- throws IOException {
- return
- new MergeQueue(conf, fs, inputs, deleteInputs, codec, comparator,
- reporter, mergedMapOutputsCounter).merge(
- keyClass, valueClass,
- mergeFactor, tmpDir,
- readsCounter, writesCounter,
- mergePhase);
- }
-
- public static
- TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
- Class keyClass, Class valueClass,
- List<Segment> segments,
- int mergeFactor, Path tmpDir,
- RawComparator comparator, Progressable reporter,
- TezCounter readsCounter,
- TezCounter writesCounter,
- Progress mergePhase)
- throws IOException {
- return merge(conf, fs, keyClass, valueClass, segments, mergeFactor, tmpDir,
- comparator, reporter, false, readsCounter, writesCounter,
- mergePhase);
- }
-
- public static <K extends Object, V extends Object>
- TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
- Class keyClass, Class valueClass,
- List<Segment> segments,
- int mergeFactor, Path tmpDir,
- RawComparator comparator, Progressable reporter,
- boolean sortSegments,
- TezCounter readsCounter,
- TezCounter writesCounter,
- Progress mergePhase)
- throws IOException {
- return new MergeQueue(conf, fs, segments, comparator, reporter,
- sortSegments).merge(keyClass, valueClass,
- mergeFactor, tmpDir,
- readsCounter, writesCounter,
- mergePhase);
- }
-
- public static <K extends Object, V extends Object>
- TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
- Class keyClass, Class valueClass,
- CompressionCodec codec,
- List<Segment> segments,
- int mergeFactor, Path tmpDir,
- RawComparator comparator, Progressable reporter,
- boolean sortSegments,
- TezCounter readsCounter,
- TezCounter writesCounter,
- Progress mergePhase)
- throws IOException {
- return new MergeQueue(conf, fs, segments, comparator, reporter,
- sortSegments, codec).merge(keyClass, valueClass,
- mergeFactor, tmpDir,
- readsCounter, writesCounter,
- mergePhase);
- }
-
- public static <K extends Object, V extends Object>
- TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
- Class keyClass, Class valueClass,
- List<Segment> segments,
- int mergeFactor, int inMemSegments, Path tmpDir,
- RawComparator comparator, Progressable reporter,
- boolean sortSegments,
- TezCounter readsCounter,
- TezCounter writesCounter,
- Progress mergePhase)
- throws IOException {
- return new MergeQueue(conf, fs, segments, comparator, reporter,
- sortSegments).merge(keyClass, valueClass,
- mergeFactor, inMemSegments,
- tmpDir,
- readsCounter, writesCounter,
- mergePhase);
- }
-
-
- static <K extends Object, V extends Object>
- TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
- Class keyClass, Class valueClass,
- CompressionCodec codec,
- List<Segment> segments,
- int mergeFactor, int inMemSegments, Path tmpDir,
- RawComparator comparator, Progressable reporter,
- boolean sortSegments,
- TezCounter readsCounter,
- TezCounter writesCounter,
- Progress mergePhase)
- throws IOException {
- return new MergeQueue(conf, fs, segments, comparator, reporter,
- sortSegments, codec).merge(keyClass, valueClass,
- mergeFactor, inMemSegments,
- tmpDir,
- readsCounter, writesCounter,
- mergePhase);
-}
-
- public static <K extends Object, V extends Object>
- void writeFile(TezRawKeyValueIterator records, Writer writer,
- Progressable progressable, Configuration conf)
- throws IOException {
- long progressBar =
- conf.getLong(TezJobConfig.RECORDS_BEFORE_PROGRESS,
- TezJobConfig.DEFAULT_RECORDS_BEFORE_PROGRESS);
- long recordCtr = 0;
- while(records.next()) {
- writer.append(records.getKey(), records.getValue());
-
- if (((recordCtr++) % progressBar) == 0) {
- progressable.progress();
- }
- }
-}
-
- @InterfaceAudience.Private
- @InterfaceStability.Unstable
- public static class Segment<K extends Object, V extends Object> {
- Reader reader = null;
- final DataInputBuffer key = new DataInputBuffer();
-
- Configuration conf = null;
- FileSystem fs = null;
- Path file = null;
- boolean preserve = false;
- CompressionCodec codec = null;
- long segmentOffset = 0;
- long segmentLength = -1;
-
- TezCounter mapOutputsCounter = null;
-
- public Segment(Configuration conf, FileSystem fs, Path file,
- CompressionCodec codec, boolean preserve)
- throws IOException {
- this(conf, fs, file, codec, preserve, null);
- }
-
- public Segment(Configuration conf, FileSystem fs, Path file,
- CompressionCodec codec, boolean preserve,
- TezCounter mergedMapOutputsCounter)
- throws IOException {
- this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve,
- mergedMapOutputsCounter);
- }
-
- public Segment(Configuration conf, FileSystem fs, Path file,
- long segmentOffset, long segmentLength,
- CompressionCodec codec,
- boolean preserve) throws IOException {
- this(conf, fs, file, segmentOffset, segmentLength, codec, preserve, null);
- }
-
- public Segment(Configuration conf, FileSystem fs, Path file,
- long segmentOffset, long segmentLength, CompressionCodec codec,
- boolean preserve, TezCounter mergedMapOutputsCounter)
- throws IOException {
- this.conf = conf;
- this.fs = fs;
- this.file = file;
- this.codec = codec;
- this.preserve = preserve;
-
- this.segmentOffset = segmentOffset;
- this.segmentLength = segmentLength;
-
- this.mapOutputsCounter = mergedMapOutputsCounter;
- }
-
- public Segment(Reader reader, boolean preserve) {
- this(reader, preserve, null);
- }
-
- public Segment(Reader reader, boolean preserve,
- TezCounter mapOutputsCounter) {
- this.reader = reader;
- this.preserve = preserve;
-
- this.segmentLength = reader.getLength();
-
- this.mapOutputsCounter = mapOutputsCounter;
- }
-
- void init(TezCounter readsCounter) throws IOException {
- if (reader == null) {
- FSDataInputStream in = fs.open(file);
- in.seek(segmentOffset);
- reader = new Reader(conf, in, segmentLength, codec, readsCounter);
- }
-
- if (mapOutputsCounter != null) {
- mapOutputsCounter.increment(1);
- }
- }
-
- boolean inMemory() {
- return fs == null;
- }
-
- DataInputBuffer getKey() { return key; }
-
- DataInputBuffer getValue(DataInputBuffer value) throws IOException {
- nextRawValue(value);
- return value;
- }
-
- public long getLength() {
- return (reader == null) ?
- segmentLength : reader.getLength();
- }
-
- boolean nextRawKey() throws IOException {
- return reader.nextRawKey(key);
- }
-
- void nextRawValue(DataInputBuffer value) throws IOException {
- reader.nextRawValue(value);
- }
-
- void closeReader() throws IOException {
- if (reader != null) {
- reader.close();
- reader = null;
- }
- }
-
- void close() throws IOException {
- closeReader();
- if (!preserve && fs != null) {
- fs.delete(file, false);
- }
- }
-
- public long getPosition() throws IOException {
- return reader.getPosition();
- }
-
- // This method is used by BackupStore to extract the
- // absolute position after a reset
- long getActualPosition() throws IOException {
- return segmentOffset + reader.getPosition();
- }
-
- Reader getReader() {
- return reader;
- }
-
- // This method is used by BackupStore to reinitialize the
- // reader to start reading from a different segment offset
- void reinitReader(int offset) throws IOException {
- if (!inMemory()) {
- closeReader();
- segmentOffset = offset;
- segmentLength = fs.getFileStatus(file).getLen() - segmentOffset;
- init(null);
- }
- }
- }
-
- // Boolean variable for including/considering final merge as part of sort
- // phase or not. This is true in map task, false in reduce task. It is
- // used in calculating mergeProgress.
- static boolean includeFinalMerge = false;
-
- /**
- * Sets the boolean variable includeFinalMerge to true. Called from
- * map task before calling merge() so that final merge of map task
- * is also considered as part of sort phase.
- */
- public static void considerFinalMergeForProgress() {
- includeFinalMerge = true;
- }
-
- private static class MergeQueue<K extends Object, V extends Object>
- extends PriorityQueue<Segment> implements TezRawKeyValueIterator {
- Configuration conf;
- FileSystem fs;
- CompressionCodec codec;
-
- List<Segment> segments = new ArrayList<Segment>();
-
- RawComparator comparator;
-
- private long totalBytesProcessed;
- private float progPerByte;
- private Progress mergeProgress = new Progress();
-
- Progressable reporter;
-
- DataInputBuffer key;
- final DataInputBuffer value = new DataInputBuffer();
- final DataInputBuffer diskIFileValue = new DataInputBuffer();
-
- Segment minSegment;
- Comparator<Segment> segmentComparator =
- new Comparator<Segment>() {
- public int compare(Segment o1, Segment o2) {
- if (o1.getLength() == o2.getLength()) {
- return 0;
- }
-
- return o1.getLength() < o2.getLength() ? -1 : 1;
- }
- };
-
- public MergeQueue(Configuration conf, FileSystem fs,
- Path[] inputs, boolean deleteInputs,
- CompressionCodec codec, RawComparator comparator,
- Progressable reporter,
- TezCounter mergedMapOutputsCounter)
- throws IOException {
- this.conf = conf;
- this.fs = fs;
- this.codec = codec;
- this.comparator = comparator;
- this.reporter = reporter;
-
- for (Path file : inputs) {
- LOG.debug("MergeQ: adding: " + file);
- segments.add(new Segment(conf, fs, file, codec, !deleteInputs,
- (file.toString().endsWith(
- Constants.MERGED_OUTPUT_PREFIX) ?
- null : mergedMapOutputsCounter)));
- }
-
- // Sort segments on file-lengths
- Collections.sort(segments, segmentComparator);
- }
-
- public MergeQueue(Configuration conf, FileSystem fs,
- List<Segment> segments, RawComparator comparator,
- Progressable reporter, boolean sortSegments) {
- this.conf = conf;
- this.fs = fs;
- this.comparator = comparator;
- this.segments = segments;
- this.reporter = reporter;
- if (sortSegments) {
- Collections.sort(segments, segmentComparator);
- }
- }
-
- public MergeQueue(Configuration conf, FileSystem fs,
- List<Segment> segments, RawComparator comparator,
- Progressable reporter, boolean sortSegments, CompressionCodec codec) {
- this(conf, fs, segments, comparator, reporter, sortSegments);
- this.codec = codec;
- }
-
- public void close() throws IOException {
- Segment segment;
- while((segment = pop()) != null) {
- segment.close();
- }
- }
-
- public DataInputBuffer getKey() throws IOException {
- return key;
- }
-
- public DataInputBuffer getValue() throws IOException {
- return value;
- }
-
- private void adjustPriorityQueue(Segment reader) throws IOException{
- long startPos = reader.getPosition();
- boolean hasNext = reader.nextRawKey();
- long endPos = reader.getPosition();
- totalBytesProcessed += endPos - startPos;
- mergeProgress.set(totalBytesProcessed * progPerByte);
- if (hasNext) {
- adjustTop();
- } else {
- pop();
- reader.close();
- }
- }
-
- public boolean next() throws IOException {
- if (size() == 0)
- return false;
-
- if (minSegment != null) {
- //minSegment is non-null for all invocations of next except the first
- //one. For the first invocation, the priority queue is ready for use
- //but for the subsequent invocations, first adjust the queue
- adjustPriorityQueue(minSegment);
- if (size() == 0) {
- minSegment = null;
- return false;
- }
- }
- minSegment = top();
- if (!minSegment.inMemory()) {
- //When we load the value from an inmemory segment, we reset
- //the "value" DIB in this class to the inmem segment's byte[].
- //When we load the value bytes from disk, we shouldn't use
- //the same byte[] since it would corrupt the data in the inmem
- //segment. So we maintain an explicit DIB for value bytes
- //obtained from disk, and if the current segment is a disk
- //segment, we reset the "value" DIB to the byte[] in that (so
- //we reuse the disk segment DIB whenever we consider
- //a disk segment).
- value.reset(diskIFileValue.getData(), diskIFileValue.getLength());
- }
- long startPos = minSegment.getPosition();
- key = minSegment.getKey();
- minSegment.getValue(value);
- long endPos = minSegment.getPosition();
- totalBytesProcessed += endPos - startPos;
- mergeProgress.set(totalBytesProcessed * progPerByte);
- return true;
- }
-
- protected boolean lessThan(Object a, Object b) {
- DataInputBuffer key1 = ((Segment)a).getKey();
- DataInputBuffer key2 = ((Segment)b).getKey();
- int s1 = key1.getPosition();
- int l1 = key1.getLength() - s1;
- int s2 = key2.getPosition();
- int l2 = key2.getLength() - s2;
-
- return comparator.compare(key1.getData(), s1, l1, key2.getData(), s2, l2) < 0;
- }
-
- public TezRawKeyValueIterator merge(Class keyClass, Class valueClass,
- int factor, Path tmpDir,
- TezCounter readsCounter,
- TezCounter writesCounter,
- Progress mergePhase)
- throws IOException {
- return merge(keyClass, valueClass, factor, 0, tmpDir,
- readsCounter, writesCounter, mergePhase);
- }
-
- TezRawKeyValueIterator merge(Class keyClass, Class valueClass,
- int factor, int inMem, Path tmpDir,
- TezCounter readsCounter,
- TezCounter writesCounter,
- Progress mergePhase)
- throws IOException {
- LOG.info("Merging " + segments.size() + " sorted segments");
-
- /*
- * If there are inMemory segments, then they come first in the segments
- * list and then the sorted disk segments. Otherwise(if there are only
- * disk segments), then they are sorted segments if there are more than
- * factor segments in the segments list.
- */
- int numSegments = segments.size();
- int origFactor = factor;
- int passNo = 1;
- if (mergePhase != null) {
- mergeProgress = mergePhase;
- }
-
- long totalBytes = computeBytesInMerges(factor, inMem);
- if (totalBytes != 0) {
- progPerByte = 1.0f / (float)totalBytes;
- }
-
- //create the MergeStreams from the sorted map created in the constructor
- //and dump the final output to a file
- do {
- //get the factor for this pass of merge. We assume in-memory segments
- //are the first entries in the segment list and that the pass factor
- //doesn't apply to them
- factor = getPassFactor(factor, passNo, numSegments - inMem);
- if (1 == passNo) {
- factor += inMem;
- }
- List<Segment> segmentsToMerge =
- new ArrayList<Segment>();
- int segmentsConsidered = 0;
- int numSegmentsToConsider = factor;
- long startBytes = 0; // starting bytes of segments of this merge
- while (true) {
- //extract the smallest 'factor' number of segments
- //Call cleanup on the empty segments (no key/value data)
- List<Segment> mStream =
- getSegmentDescriptors(numSegmentsToConsider);
- for (Segment segment : mStream) {
- // Initialize the segment at the last possible moment;
- // this helps in ensuring we don't use buffers until we need them
- segment.init(readsCounter);
- long startPos = segment.getPosition();
- boolean hasNext = segment.nextRawKey();
- long endPos = segment.getPosition();
-
- if (hasNext) {
- startBytes += endPos - startPos;
- segmentsToMerge.add(segment);
- segmentsConsidered++;
- }
- else {
- segment.close();
- numSegments--; //we ignore this segment for the merge
- }
- }
- //if we have the desired number of segments
- //or looked at all available segments, we break
- if (segmentsConsidered == factor ||
- segments.size() == 0) {
- break;
- }
-
- numSegmentsToConsider = factor - segmentsConsidered;
- }
-
- //feed the streams to the priority queue
- initialize(segmentsToMerge.size());
- clear();
- for (Segment segment : segmentsToMerge) {
- put(segment);
- }
-
- //if we have lesser number of segments remaining, then just return the
- //iterator, else do another single level merge
- if (numSegments <= factor) {
- if (!includeFinalMerge) { // for reduce task
-
- // Reset totalBytesProcessed and recalculate totalBytes from the
- // remaining segments to track the progress of the final merge.
- // Final merge is considered as the progress of the reducePhase,
- // the 3rd phase of reduce task.
- totalBytesProcessed = 0;
- totalBytes = 0;
- for (int i = 0; i < segmentsToMerge.size(); i++) {
- totalBytes += segmentsToMerge.get(i).getLength();
- }
- }
- if (totalBytes != 0) //being paranoid
- progPerByte = 1.0f / (float)totalBytes;
-
- totalBytesProcessed += startBytes;
- if (totalBytes != 0)
- mergeProgress.set(totalBytesProcessed * progPerByte);
- else
- mergeProgress.set(1.0f); // Last pass and no segments left - we're done
-
- LOG.info("Down to the last merge-pass, with " + numSegments +
- " segments left of total size: " +
- (totalBytes - totalBytesProcessed) + " bytes");
- return this;
- } else {
- LOG.info("Merging " + segmentsToMerge.size() +
- " intermediate segments out of a total of " +
- (segments.size()+segmentsToMerge.size()));
-
- long bytesProcessedInPrevMerges = totalBytesProcessed;
- totalBytesProcessed += startBytes;
-
- //we want to spread the creation of temp files on multiple disks if
- //available under the space constraints
- long approxOutputSize = 0;
- for (Segment s : segmentsToMerge) {
- approxOutputSize += s.getLength() +
- ChecksumFileSystem.getApproxChkSumLength(
- s.getLength());
- }
- Path tmpFilename =
- new Path(tmpDir, "intermediate").suffix("." + passNo);
-
- Path outputFile = lDirAlloc.getLocalPathForWrite(
- tmpFilename.toString(),
- approxOutputSize, conf);
-
- Writer writer =
- new Writer(conf, fs, outputFile, keyClass, valueClass, codec,
- writesCounter);
- writeFile(this, writer, reporter, conf);
- writer.close();
-
- //we finished one single level merge; now clean up the priority
- //queue
- this.close();
-
- // Add the newly create segment to the list of segments to be merged
- Segment tempSegment =
- new Segment(conf, fs, outputFile, codec, false);
-
- // Insert new merged segment into the sorted list
- int pos = Collections.binarySearch(segments, tempSegment,
- segmentComparator);
- if (pos < 0) {
- // binary search failed. So position to be inserted at is -pos-1
- pos = -pos-1;
- }
- segments.add(pos, tempSegment);
- numSegments = segments.size();
-
- // Subtract the difference between expected size of new segment and
- // actual size of new segment(Expected size of new segment is
- // inputBytesOfThisMerge) from totalBytes. Expected size and actual
- // size will match(almost) if combiner is not called in merge.
- long inputBytesOfThisMerge = totalBytesProcessed -
- bytesProcessedInPrevMerges;
- totalBytes -= inputBytesOfThisMerge - tempSegment.getLength();
- if (totalBytes != 0) {
- progPerByte = 1.0f / (float)totalBytes;
- }
-
- passNo++;
- }
- //we are worried about only the first pass merge factor. So reset the
- //factor to what it originally was
- factor = origFactor;
- } while(true);
- }
-
- /**
- * Determine the number of segments to merge in a given pass. Assuming more
- * than factor segments, the first pass should attempt to bring the total
- * number of segments - 1 to be divisible by the factor - 1 (each pass
- * takes X segments and produces 1) to minimize the number of merges.
- */
- private int getPassFactor(int factor, int passNo, int numSegments) {
- if (passNo > 1 || numSegments <= factor || factor == 1)
- return factor;
- int mod = (numSegments - 1) % (factor - 1);
- if (mod == 0)
- return factor;
- return mod + 1;
- }
-
- /** Return (& remove) the requested number of segment descriptors from the
- * sorted map.
- */
- private List<Segment> getSegmentDescriptors(int numDescriptors) {
- if (numDescriptors > segments.size()) {
- List<Segment> subList = new ArrayList<Segment>(segments);
- segments.clear();
- return subList;
- }
-
- List<Segment> subList =
- new ArrayList<Segment>(segments.subList(0, numDescriptors));
- for (int i=0; i < numDescriptors; ++i) {
- segments.remove(0);
- }
- return subList;
- }
-
- /**
- * Compute expected size of input bytes to merges, will be used in
- * calculating mergeProgress. This simulates the above merge() method and
- * tries to obtain the number of bytes that are going to be merged in all
- * merges(assuming that there is no combiner called while merging).
- * @param factor mapreduce.task.io.sort.factor
- * @param inMem number of segments in memory to be merged
- */
- long computeBytesInMerges(int factor, int inMem) {
- int numSegments = segments.size();
- List<Long> segmentSizes = new ArrayList<Long>(numSegments);
- long totalBytes = 0;
- int n = numSegments - inMem;
- // factor for 1st pass
- int f = getPassFactor(factor, 1, n) + inMem;
- n = numSegments;
-
- for (int i = 0; i < numSegments; i++) {
- // Not handling empty segments here assuming that it would not affect
- // much in calculation of mergeProgress.
- segmentSizes.add(segments.get(i).getLength());
- }
-
- // If includeFinalMerge is true, allow the following while loop iterate
- // for 1 more iteration. This is to include final merge as part of the
- // computation of expected input bytes of merges
- boolean considerFinalMerge = includeFinalMerge;
-
- while (n > f || considerFinalMerge) {
- if (n <=f ) {
- considerFinalMerge = false;
- }
- long mergedSize = 0;
- f = Math.min(f, segmentSizes.size());
- for (int j = 0; j < f; j++) {
- mergedSize += segmentSizes.remove(0);
- }
- totalBytes += mergedSize;
-
- // insert new size into the sorted list
- int pos = Collections.binarySearch(segmentSizes, mergedSize);
- if (pos < 0) {
- pos = -pos-1;
- }
- segmentSizes.add(pos, mergedSize);
-
- n -= (f-1);
- f = factor;
- }
-
- return totalBytes;
- }
-
- public Progress getProgress() {
- return mergeProgress;
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezRawKeyValueIterator.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezRawKeyValueIterator.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezRawKeyValueIterator.java
deleted file mode 100644
index 39cffcb..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezRawKeyValueIterator.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.sort.impl;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.util.Progress;
-
-/**
- * <code>TezRawKeyValueIterator</code> is an iterator used to iterate over
- * the raw keys and values during sort/merge of intermediate data.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public interface TezRawKeyValueIterator {
- /**
- * Gets the current raw key.
- *
- * @return Gets the current raw key as a DataInputBuffer
- * @throws IOException
- */
- DataInputBuffer getKey() throws IOException;
-
- /**
- * Gets the current raw value.
- *
- * @return Gets the current raw value as a DataInputBuffer
- * @throws IOException
- */
- DataInputBuffer getValue() throws IOException;
-
- /**
- * Sets up the current key and value (for getKey and getValue).
- *
- * @return <code>true</code> if there exists a key/value,
- * <code>false</code> otherwise.
- * @throws IOException
- */
- boolean next() throws IOException;
-
- /**
- * Closes the iterator so that the underlying streams can be closed.
- *
- * @throws IOException
- */
- void close() throws IOException;
-
- /** Gets the Progress object; this has a float (0.0 - 1.0)
- * indicating the bytes processed by the iterator so far
- */
- Progress getProgress();
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezSpillRecord.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezSpillRecord.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezSpillRecord.java
deleted file mode 100644
index 19fbd7f..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezSpillRecord.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.sort.impl;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.LongBuffer;
-import java.util.zip.CheckedInputStream;
-import java.util.zip.CheckedOutputStream;
-import java.util.zip.Checksum;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ChecksumException;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.util.PureJavaCrc32;
-import org.apache.tez.common.Constants;
-
-public class TezSpillRecord {
-
- /** Backing store */
- private final ByteBuffer buf;
- /** View of backing storage as longs */
- private final LongBuffer entries;
-
- public TezSpillRecord(int numPartitions) {
- buf = ByteBuffer.allocate(
- numPartitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH);
- entries = buf.asLongBuffer();
- }
-
- public TezSpillRecord(Path indexFileName, Configuration job) throws IOException {
- this(indexFileName, job, null);
- }
-
- public TezSpillRecord(Path indexFileName, Configuration job, String expectedIndexOwner)
- throws IOException {
- this(indexFileName, job, new PureJavaCrc32(), expectedIndexOwner);
- }
-
- public TezSpillRecord(Path indexFileName, Configuration job, Checksum crc,
- String expectedIndexOwner)
- throws IOException {
-
- final FileSystem rfs = FileSystem.getLocal(job).getRaw();
- final FSDataInputStream in = rfs.open(indexFileName);
- try {
- final long length = rfs.getFileStatus(indexFileName).getLen();
- final int partitions =
- (int) length / Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
- final int size = partitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
-
- buf = ByteBuffer.allocate(size);
- if (crc != null) {
- crc.reset();
- CheckedInputStream chk = new CheckedInputStream(in, crc);
- IOUtils.readFully(chk, buf.array(), 0, size);
- if (chk.getChecksum().getValue() != in.readLong()) {
- throw new ChecksumException("Checksum error reading spill index: " +
- indexFileName, -1);
- }
- } else {
- IOUtils.readFully(in, buf.array(), 0, size);
- }
- entries = buf.asLongBuffer();
- } finally {
- in.close();
- }
- }
-
- /**
- * Return number of IndexRecord entries in this spill.
- */
- public int size() {
- return entries.capacity() / (Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8);
- }
-
- /**
- * Get spill offsets for given partition.
- */
- public TezIndexRecord getIndex(int partition) {
- final int pos = partition * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8;
- return new TezIndexRecord(entries.get(pos), entries.get(pos + 1),
- entries.get(pos + 2));
- }
-
- /**
- * Set spill offsets for given partition.
- */
- public void putIndex(TezIndexRecord rec, int partition) {
- final int pos = partition * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8;
- entries.put(pos, rec.getStartOffset());
- entries.put(pos + 1, rec.getRawLength());
- entries.put(pos + 2, rec.getPartLength());
- }
-
- /**
- * Write this spill record to the location provided.
- */
- public void writeToFile(Path loc, Configuration job)
- throws IOException {
- writeToFile(loc, job, new PureJavaCrc32());
- }
-
- public void writeToFile(Path loc, Configuration job, Checksum crc)
- throws IOException {
- final FileSystem rfs = FileSystem.getLocal(job).getRaw();
- CheckedOutputStream chk = null;
- final FSDataOutputStream out = rfs.create(loc);
- try {
- if (crc != null) {
- crc.reset();
- chk = new CheckedOutputStream(out, crc);
- chk.write(buf.array());
- out.writeLong(chk.getChecksum().getValue());
- } else {
- out.write(buf.array());
- }
- } finally {
- if (chk != null) {
- chk.close();
- } else {
- out.close();
- }
- }
- }
-
-}