You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by go...@apache.org on 2015/01/17 01:38:01 UTC
tez git commit: TEZ-1593: Refactor PipelinedSorter to remove all MMAP
based ByteBuffer references (gopalv, rbalamohan)
Repository: tez
Updated Branches:
refs/heads/master d676ef2ff -> c68465397
TEZ-1593: Refactor PipelinedSorter to remove all MMAP based ByteBuffer references (gopalv, rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c6846539
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c6846539
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c6846539
Branch: refs/heads/master
Commit: c68465397abf8d2ef23f165d15187cee92908be0
Parents: d676ef2
Author: Gopal V <go...@apache.org>
Authored: Fri Jan 16 16:37:56 2015 -0800
Committer: Gopal V <go...@apache.org>
Committed: Fri Jan 16 16:37:56 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../common/sort/impl/PipelinedSorter.java | 233 ++++++++++---------
2 files changed, 120 insertions(+), 114 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/c6846539/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1b7f19c..d270f46 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -36,6 +36,7 @@ ALL CHANGES:
TEZ-1313. Setup pre-commit build to test submitted patches.
TEZ-1856. Remove LocalOnFileSortedOutput, LocalMergedInput, LocalTaskOutputFiles.
TEZ-1949. Whitelist TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH for broadcast edges.
+ TEZ-1593. Refactor PipelinedSorter to remove all MMAP based ByteBuffer references.
Release 0.6.0: Unreleased
http://git-wip-us.apache.org/repos/asf/tez/blob/c6846539/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index e44d943..35ea954 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -108,10 +109,11 @@ public class PipelinedSorter extends ExternalSorter {
int maxMemUsage = sortmb << 20;
maxMemUsage -= maxMemUsage % METASIZE;
largeBuffer = ByteBuffer.allocate(maxMemUsage);
+ Preconditions.checkArgument(largeBuffer.hasArray(), "Expected array backed byte buffer");
LOG.info(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + " = " + sortmb);
// TODO: configurable setting?
- span = new SortSpan(largeBuffer, 1024*1024, 16);
- merger = new SpanMerger();
+ span = new SortSpan(largeBuffer, 1024*1024, 16, comparator);
+ merger = new SpanMerger(); // SpanIterators are comparable
final int sortThreads =
this.conf.getInt(
TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS,
@@ -148,7 +150,7 @@ public class PipelinedSorter extends ExternalSorter {
if(newSpan == null) {
// sort in the same thread, do not wait for the thread pool
- merger.add(span.sort(sorter, comparator));
+ merger.add(span.sort(sorter));
spill();
int items = 1024*1024;
int perItem = 16;
@@ -160,11 +162,11 @@ public class PipelinedSorter extends ExternalSorter {
// our goal is to have 1M splits and sort early
items = 1024*1024;
}
- }
- span = new SortSpan(largeBuffer, items, perItem);
+ }
+ span = new SortSpan(largeBuffer, items, perItem, this.comparator);
} else {
// queue up the sort
- SortTask task = new SortTask(span, sorter, comparator);
+ SortTask task = new SortTask(span, sorter);
Future<SpanIterator> future = sortmaster.submit(task);
merger.add(future);
span = newSpan;
@@ -234,20 +236,14 @@ public class PipelinedSorter extends ExternalSorter {
span.kvmeta.put(keystart);
span.kvmeta.put(valstart);
span.kvmeta.put(valend - valstart);
- if((valstart - keystart) > span.keymax) {
- span.keymax = (valstart - keystart);
- }
- if((valend - valstart) > span.valmax) {
- span.valmax = (valend - valstart);
- }
mapOutputRecordCounter.increment(1);
mapOutputByteCounter.increment(valend - keystart);
}
public void spill() throws IOException {
// create spill file
- final long size = largeBuffer.capacity() +
- (partitions * APPROX_HEADER_LENGTH);
+ final long size = largeBuffer.capacity()
+ + (partitions * APPROX_HEADER_LENGTH);
final TezSpillRecord spillRec = new TezSpillRecord(partitions);
final Path filename =
mapOutputFile.getSpillFileForWrite(numSpills, size);
@@ -305,12 +301,14 @@ public class PipelinedSorter extends ExternalSorter {
LOG.info("Starting flush of map output");
span.end();
- merger.add(span.sort(sorter, comparator));
+ merger.add(span.sort(sorter));
spill();
sortmaster.shutdown();
largeBuffer = null;
+ numAdditionalSpills.increment(numSpills - 1);
+
if(numSpills == 1) {
// someday be able to pass this directly to shuffle
// without writing to disk
@@ -420,50 +418,50 @@ public class PipelinedSorter extends ExternalSorter {
public void write(byte[] b, int off, int len) throws IOException { out.put(b, off, len); }
}
- protected static class InputByteBuffer extends DataInputBuffer {
+ private static final class InputByteBuffer extends DataInputBuffer {
private byte[] buffer = new byte[256];
private ByteBuffer wrapped = ByteBuffer.wrap(buffer);
private void resize(int length) {
- if(length > buffer.length) {
+ if(length > buffer.length || (buffer.length > 10 * (1+length))) {
+ // scale down as well as scale up across values
buffer = new byte[length];
wrapped = ByteBuffer.wrap(buffer);
}
wrapped.limit(length);
}
- public void reset(ByteBuffer b, int start, int length) {
- resize(length);
- b.position(start);
- b.get(buffer, 0, length);
- super.reset(buffer, 0, length);
- }
- // clone-ish function
+
+ // shallow copy
public void reset(DataInputBuffer clone) {
byte[] data = clone.getData();
int start = clone.getPosition();
- int length = clone.getLength();
+ int length = clone.getLength() - start;
+ super.reset(data, start, length);
+ }
+
+ // deep copy
+ @SuppressWarnings("unused")
+ public void copy(DataInputBuffer clone) {
+ byte[] data = clone.getData();
+ int start = clone.getPosition();
+ int length = clone.getLength() - start;
resize(length);
System.arraycopy(data, start, buffer, 0, length);
super.reset(buffer, 0, length);
}
}
- private class SortSpan implements IndexedSortable {
+ private final class SortSpan implements IndexedSortable {
final IntBuffer kvmeta;
final ByteBuffer kvbuffer;
- final DataOutputStream out;
- private RawComparator comparator;
+ final DataOutputStream out;
+ final RawComparator comparator;
final int imeta[] = new int[NMETA];
final int jmeta[] = new int[NMETA];
- int keymax = 1;
- int valmax = 1;
- private int i,j;
- private byte[] ki;
- private byte[] kj;
+
private int index = 0;
- private InputByteBuffer hay = new InputByteBuffer();
private long eq = 0;
- public SortSpan(ByteBuffer source, int maxItems, int perItem) {
+ public SortSpan(ByteBuffer source, int maxItems, int perItem, RawComparator comparator) {
int capacity = source.remaining();
int metasize = METASIZE*maxItems;
int dataSize = maxItems * perItem;
@@ -473,8 +471,8 @@ public class PipelinedSorter extends ExternalSorter {
}
ByteBuffer reserved = source.duplicate();
reserved.mark();
- LOG.info("reserved.remaining() = "+reserved.remaining());
- LOG.info("reserved.size = "+metasize);
+ LOG.info("reserved.remaining() = " + reserved.remaining());
+ LOG.info("reserved.size = "+ metasize);
reserved.position(metasize);
kvbuffer = reserved.slice();
reserved.flip();
@@ -485,12 +483,10 @@ public class PipelinedSorter extends ExternalSorter {
.asIntBuffer();
out = new DataOutputStream(
new BufferStreamWrapper(kvbuffer));
+ this.comparator = comparator;
}
- public SpanIterator sort(IndexedSorter sorter, RawComparator comparator) {
- this.comparator = comparator;
- ki = new byte[keymax];
- kj = new byte[keymax];
+ public SpanIterator sort(IndexedSorter sorter) {
long start = System.currentTimeMillis();
if(length() > 1) {
sorter.sort(this, 0, length(), nullProgressable);
@@ -512,11 +508,31 @@ public class PipelinedSorter extends ExternalSorter {
kvmeta.position(kvj); kvmeta.get(jmeta);
kvmeta.position(kvj); kvmeta.put(imeta);
kvmeta.position(kvi); kvmeta.put(jmeta);
+ }
- if(i == mi || j == mj) i = -1;
- if(i == mi || j == mj) j = -1;
+ private int compareKeys(final int kvi, final int kvj) {
+ final int istart = kvmeta.get(kvi + KEYSTART);
+ final int jstart = kvmeta.get(kvj + KEYSTART);
+ final int ilen = kvmeta.get(kvi + VALSTART) - istart;
+ final int jlen = kvmeta.get(kvj + VALSTART) - jstart;
+
+ if (ilen == 0 || jlen == 0) {
+ if (ilen == jlen) {
+ eq++;
+ }
+ return ilen - jlen;
+ }
+
+ final byte[] buf = kvbuffer.array();
+ final int off = kvbuffer.arrayOffset();
+
+ // sort by key
+ final int cmp = comparator.compare(buf, off + istart, ilen, buf, off + jstart, jlen);
+ if(cmp == 0) eq++;
+ return cmp;
}
+
public int compare(final int mi, final int mj) {
final int kvi = offsetFor(mi);
final int kvj = offsetFor(mj);
@@ -526,20 +542,7 @@ public class PipelinedSorter extends ExternalSorter {
if (kvip != kvjp) {
return kvip - kvjp;
}
-
- final int istart = kvmeta.get(kvi + KEYSTART);
- final int jstart = kvmeta.get(kvj + KEYSTART);
- final int ilen = kvmeta.get(kvi + VALSTART) - istart;
- final int jlen = kvmeta.get(kvj + VALSTART) - jstart;
-
- kvbuffer.position(istart);
- kvbuffer.get(ki, 0, ilen);
- kvbuffer.position(jstart);
- kvbuffer.get(kj, 0, jlen);
- // sort by key
- final int cmp = comparator.compare(ki, 0, ilen, kj, 0, jlen);
- if(cmp == 0) eq++;
- return cmp;
+ return compareKeys(kvi, kvj);
}
public SortSpan next() {
@@ -547,7 +550,7 @@ public class PipelinedSorter extends ExternalSorter {
if(remaining != null) {
int items = length();
int perItem = kvbuffer.position()/items;
- SortSpan newSpan = new SortSpan(remaining, items, perItem);
+ SortSpan newSpan = new SortSpan(remaining, items, perItem, this.comparator);
newSpan.index = index+1;
return newSpan;
}
@@ -576,22 +579,22 @@ public class PipelinedSorter extends ExternalSorter {
return remaining;
}
- private int compareInternal(DataInputBuffer needle, int needlePart, int index) {
+ public int compareInternal(final DataInputBuffer needle, final int needlePart, final int index) {
int cmp = 0;
- int keystart;
- int valstart;
- int partition;
- partition = kvmeta.get(span.offsetFor(index) + PARTITION);
+ final int keystart;
+ final int valstart;
+ final int partition;
+ partition = kvmeta.get(this.offsetFor(index) + PARTITION);
if(partition != needlePart) {
cmp = (partition-needlePart);
} else {
- keystart = kvmeta.get(span.offsetFor(index) + KEYSTART);
- valstart = kvmeta.get(span.offsetFor(index) + VALSTART);
- // hay is allocated ahead of time
- hay.reset(kvbuffer, keystart, valstart - keystart);
- cmp = comparator.compare(hay.getData(),
- hay.getPosition(), hay.getLength(),
- needle.getData(),
+ keystart = kvmeta.get(this.offsetFor(index) + KEYSTART);
+ valstart = kvmeta.get(this.offsetFor(index) + VALSTART);
+ final byte[] buf = kvbuffer.array();
+ final int off = kvbuffer.arrayOffset();
+ cmp = comparator.compare(buf,
+ keystart + off , (valstart - keystart),
+ needle.getData(),
needle.getPosition(), needle.getLength());
}
return cmp;
@@ -609,13 +612,13 @@ public class PipelinedSorter extends ExternalSorter {
private static class SpanIterator implements PartitionedRawKeyValueIterator, Comparable<SpanIterator> {
private int kvindex = -1;
- private int maxindex;
- private IntBuffer kvmeta;
- private ByteBuffer kvbuffer;
- private SortSpan span;
- private InputByteBuffer key = new InputByteBuffer();
- private InputByteBuffer value = new InputByteBuffer();
- private Progress progress = new Progress();
+ private final int maxindex;
+ private final IntBuffer kvmeta;
+ private final ByteBuffer kvbuffer;
+ private final SortSpan span;
+ private final InputByteBuffer key = new InputByteBuffer();
+ private final InputByteBuffer value = new InputByteBuffer();
+ private final Progress progress = new Progress();
private static final int minrun = (1 << 4);
@@ -626,21 +629,25 @@ public class PipelinedSorter extends ExternalSorter {
this.maxindex = (kvmeta.limit()/NMETA) - 1;
}
- public DataInputBuffer getKey() throws IOException {
+ public DataInputBuffer getKey() {
final int keystart = kvmeta.get(span.offsetFor(kvindex) + KEYSTART);
final int valstart = kvmeta.get(span.offsetFor(kvindex) + VALSTART);
- key.reset(kvbuffer, keystart, valstart - keystart);
+ final byte[] buf = kvbuffer.array();
+ final int off = kvbuffer.arrayOffset();
+ key.reset(buf, off + keystart, valstart - keystart);
return key;
}
- public DataInputBuffer getValue() throws IOException {
+ public DataInputBuffer getValue() {
final int valstart = kvmeta.get(span.offsetFor(kvindex) + VALSTART);
final int vallen = kvmeta.get(span.offsetFor(kvindex) + VALLEN);
- value.reset(kvbuffer, valstart, vallen);
+ final byte[] buf = kvbuffer.array();
+ final int off = kvbuffer.arrayOffset();
+ value.reset(buf, off + valstart, vallen);
return value;
}
- public boolean next() throws IOException {
+ public boolean next() {
// caveat: since we use this as a comparable in the merger
if(kvindex == maxindex) return false;
if(kvindex % 100 == 0) {
@@ -650,7 +657,7 @@ public class PipelinedSorter extends ExternalSorter {
return true;
}
- public void close() throws IOException {
+ public void close() {
}
public Progress getProgress() {
@@ -667,22 +674,18 @@ public class PipelinedSorter extends ExternalSorter {
return partition;
}
+ @SuppressWarnings("unused")
public int size() {
return (maxindex - kvindex);
}
public int compareTo(SpanIterator other) {
- try {
- return span.compareInternal(other.getKey(), other.getPartition(), kvindex);
- } catch(IOException ie) {
- // since we're not reading off disk, how could getKey() throw exceptions?
- }
- return -1;
+ return span.compareInternal(other.getKey(), other.getPartition(), kvindex);
}
@Override
public String toString() {
- return String.format("SpanIterator<%d:%d> (span=%s)", kvindex, maxindex, span.toString());
+ return String.format("SpanIterator<%d:%d> (span=%s)", kvindex, maxindex, span.toString());
}
/**
@@ -744,17 +747,14 @@ public class PipelinedSorter extends ExternalSorter {
private static class SortTask implements Callable<SpanIterator> {
private final SortSpan sortable;
private final IndexedSorter sorter;
- private final RawComparator comparator;
-
- public SortTask(SortSpan sortable,
- IndexedSorter sorter, RawComparator comparator) {
+
+ public SortTask(SortSpan sortable, IndexedSorter sorter) {
this.sortable = sortable;
this.sorter = sorter;
- this.comparator = comparator;
}
public SpanIterator call() {
- return sortable.sort(sorter, comparator);
+ return sortable.sort(sorter);
}
}
@@ -795,12 +795,15 @@ public class PipelinedSorter extends ExternalSorter {
this.partition = partition;
}
+ @SuppressWarnings("unused")
public int getPartition() {
return this.partition;
}
}
private static class SpanHeap extends java.util.PriorityQueue<SpanIterator> {
+ private static final long serialVersionUID = 1L;
+
public SpanHeap() {
super(256);
}
@@ -813,7 +816,7 @@ public class PipelinedSorter extends ExternalSorter {
}
}
- private class SpanMerger implements PartitionedRawKeyValueIterator {
+ private final class SpanMerger implements PartitionedRawKeyValueIterator {
InputByteBuffer key = new InputByteBuffer();
InputByteBuffer value = new InputByteBuffer();
int partition;
@@ -829,20 +832,21 @@ public class PipelinedSorter extends ExternalSorter {
private long eq = 0;
public SpanMerger() {
+ // SpanIterators are comparable
partIter = new PartitionFilter(this);
}
- public void add(SpanIterator iter) throws IOException{
+ public final void add(SpanIterator iter) {
if(iter.next()) {
heap.add(iter);
}
}
- public void add(Future<SpanIterator> iter) throws IOException{
+ public final void add(Future<SpanIterator> iter) {
this.futures.add(iter);
}
- public boolean ready() throws IOException, InterruptedException {
+ public final boolean ready() throws IOException, InterruptedException {
try {
SpanIterator iter = null;
while(this.futures.size() > 0) {
@@ -866,7 +870,7 @@ public class PipelinedSorter extends ExternalSorter {
}
}
- private SpanIterator pop() throws IOException {
+ private SpanIterator pop() {
if(gallop > 0) {
gallop--;
return horse;
@@ -875,7 +879,7 @@ public class PipelinedSorter extends ExternalSorter {
SpanIterator next = heap.peek();
if(next != null && current != null &&
((Object)horse) == ((Object)current)) {
- // TODO: a better threshold check
+ // TODO: a better threshold check than 1 key repeating
gallop = current.bisect(next.getKey(), next.getPartition())-1;
}
horse = current;
@@ -885,26 +889,27 @@ public class PipelinedSorter extends ExternalSorter {
public boolean needsRLE() {
return (eq > 0.1 * total);
}
-
- private SpanIterator peek() throws IOException {
- if(gallop > 0) {
- return horse;
- }
- return heap.peek();
+
+ @SuppressWarnings("unused")
+ private SpanIterator peek() {
+ if (gallop > 0) {
+ return horse;
+ }
+ return heap.peek();
}
- public boolean next() throws IOException {
+ public final boolean next() {
SpanIterator current = pop();
if(current != null) {
- // keep local copies, since add() will move it all out
+ partition = current.getPartition();
key.reset(current.getKey());
value.reset(current.getValue());
- partition = current.getPartition();
if(gallop <= 0) {
+ // since all keys and values are references to the kvbuffer, no more deep copies
this.add(current);
} else {
- // galloping
+ // galloping, no deep copies required anyway
current.next();
}
return true;
@@ -912,8 +917,8 @@ public class PipelinedSorter extends ExternalSorter {
return false;
}
- public DataInputBuffer getKey() throws IOException { return key; }
- public DataInputBuffer getValue() throws IOException { return value; }
+ public DataInputBuffer getKey() { return key; }
+ public DataInputBuffer getValue() { return value; }
public int getPartition() { return partition; }
public void close() throws IOException {