You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by go...@apache.org on 2015/01/17 01:38:01 UTC

tez git commit: TEZ-1593: Refactor PipelinedSorter to remove all MMAP based ByteBuffer references (gopalv, rbalamohan)

Repository: tez
Updated Branches:
  refs/heads/master d676ef2ff -> c68465397


TEZ-1593: Refactor PipelinedSorter to remove all MMAP based ByteBuffer references (gopalv, rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c6846539
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c6846539
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c6846539

Branch: refs/heads/master
Commit: c68465397abf8d2ef23f165d15187cee92908be0
Parents: d676ef2
Author: Gopal V <go...@apache.org>
Authored: Fri Jan 16 16:37:56 2015 -0800
Committer: Gopal V <go...@apache.org>
Committed: Fri Jan 16 16:37:56 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../common/sort/impl/PipelinedSorter.java       | 233 ++++++++++---------
 2 files changed, 120 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/c6846539/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1b7f19c..d270f46 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -36,6 +36,7 @@ ALL CHANGES:
   TEZ-1313. Setup pre-commit build to test submitted patches.
   TEZ-1856. Remove LocalOnFileSortedOutput, LocalMergedInput, LocalTaskOutputFiles.
   TEZ-1949. Whitelist TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH for broadcast edges.
+  TEZ-1593. Refactor PipelinedSorter to remove all MMAP based ByteBuffer references.
 
 Release 0.6.0: Unreleased
 

http://git-wip-us.apache.org/repos/asf/tez/blob/c6846539/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index e44d943..35ea954 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -108,10 +109,11 @@ public class PipelinedSorter extends ExternalSorter {
     int maxMemUsage = sortmb << 20;
     maxMemUsage -= maxMemUsage % METASIZE;
     largeBuffer = ByteBuffer.allocate(maxMemUsage);
+    Preconditions.checkArgument(largeBuffer.hasArray(), "Expected array backed byte buffer");
     LOG.info(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + " = " + sortmb);
     // TODO: configurable setting?
-    span = new SortSpan(largeBuffer, 1024*1024, 16);
-    merger = new SpanMerger();
+    span = new SortSpan(largeBuffer, 1024*1024, 16, comparator);
+    merger = new SpanMerger(); // SpanIterators are comparable
     final int sortThreads = 
             this.conf.getInt(
                 TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, 
@@ -148,7 +150,7 @@ public class PipelinedSorter extends ExternalSorter {
 
     if(newSpan == null) {
       // sort in the same thread, do not wait for the thread pool
-      merger.add(span.sort(sorter, comparator));
+      merger.add(span.sort(sorter));
       spill();
       int items = 1024*1024;
       int perItem = 16;
@@ -160,11 +162,11 @@ public class PipelinedSorter extends ExternalSorter {
             // our goal is to have 1M splits and sort early
             items = 1024*1024;
         }
-      }      
-      span = new SortSpan(largeBuffer, items, perItem);
+      }
+      span = new SortSpan(largeBuffer, items, perItem, this.comparator);
     } else {
       // queue up the sort
-      SortTask task = new SortTask(span, sorter, comparator);
+      SortTask task = new SortTask(span, sorter);
       Future<SpanIterator> future = sortmaster.submit(task);
       merger.add(future);
       span = newSpan;
@@ -234,20 +236,14 @@ public class PipelinedSorter extends ExternalSorter {
     span.kvmeta.put(keystart);
     span.kvmeta.put(valstart);
     span.kvmeta.put(valend - valstart);
-    if((valstart - keystart) > span.keymax) {
-      span.keymax = (valstart - keystart);
-    }
-    if((valend - valstart) > span.valmax) {
-      span.valmax = (valend - valstart);
-    }
     mapOutputRecordCounter.increment(1);
     mapOutputByteCounter.increment(valend - keystart);
   }
 
   public void spill() throws IOException { 
     // create spill file
-    final long size = largeBuffer.capacity() + 
-      (partitions * APPROX_HEADER_LENGTH);
+    final long size = largeBuffer.capacity()
+        + (partitions * APPROX_HEADER_LENGTH);
     final TezSpillRecord spillRec = new TezSpillRecord(partitions);
     final Path filename =
       mapOutputFile.getSpillFileForWrite(numSpills, size);    
@@ -305,12 +301,14 @@ public class PipelinedSorter extends ExternalSorter {
 
     LOG.info("Starting flush of map output");
     span.end();
-    merger.add(span.sort(sorter, comparator));
+    merger.add(span.sort(sorter));
     spill();
     sortmaster.shutdown();
 
     largeBuffer = null;
 
+    numAdditionalSpills.increment(numSpills - 1);
+
     if(numSpills == 1) {
       // someday be able to pass this directly to shuffle
       // without writing to disk
@@ -420,50 +418,50 @@ public class PipelinedSorter extends ExternalSorter {
     public void write(byte[] b, int off, int len) throws IOException { out.put(b, off, len); }
   }
 
-  protected static class InputByteBuffer extends DataInputBuffer {
+  private static final class InputByteBuffer extends DataInputBuffer {
     private byte[] buffer = new byte[256]; 
     private ByteBuffer wrapped = ByteBuffer.wrap(buffer);
     private void resize(int length) {
-      if(length > buffer.length) {
+      if(length > buffer.length || (buffer.length > 10 * (1+length))) {
+        // scale down as well as scale up across values
         buffer = new byte[length];
         wrapped = ByteBuffer.wrap(buffer);
       }
       wrapped.limit(length);
     }
-    public void reset(ByteBuffer b, int start, int length) {
-      resize(length);
-      b.position(start);
-      b.get(buffer, 0, length);
-      super.reset(buffer, 0, length);
-    }
-    // clone-ish function
+
+    // shallow copy
     public void reset(DataInputBuffer clone) {
       byte[] data = clone.getData();
       int start = clone.getPosition();
-      int length = clone.getLength();
+      int length = clone.getLength() - start;
+      super.reset(data, start, length);
+    }
+
+    // deep copy
+    @SuppressWarnings("unused")
+    public void copy(DataInputBuffer clone) {
+      byte[] data = clone.getData();
+      int start = clone.getPosition();
+      int length = clone.getLength() - start;
       resize(length);
       System.arraycopy(data, start, buffer, 0, length);
       super.reset(buffer, 0, length);
     }
   }
 
-  private class SortSpan  implements IndexedSortable {
+  private final class SortSpan  implements IndexedSortable {
     final IntBuffer kvmeta;
     final ByteBuffer kvbuffer;
-    final DataOutputStream out;    
-    private RawComparator comparator; 
+    final DataOutputStream out;
+    final RawComparator comparator;
     final int imeta[] = new int[NMETA];
     final int jmeta[] = new int[NMETA];
-    int keymax = 1;
-    int valmax = 1;
-    private int i,j;
-    private byte[] ki;
-    private byte[] kj;
+
     private int index = 0;
-    private InputByteBuffer hay = new InputByteBuffer();
     private long eq = 0;
 
-    public SortSpan(ByteBuffer source, int maxItems, int perItem) {
+    public SortSpan(ByteBuffer source, int maxItems, int perItem, RawComparator comparator) {
       int capacity = source.remaining(); 
       int metasize = METASIZE*maxItems;
       int dataSize = maxItems * perItem;
@@ -473,8 +471,8 @@ public class PipelinedSorter extends ExternalSorter {
       }
       ByteBuffer reserved = source.duplicate();
       reserved.mark();
-      LOG.info("reserved.remaining() = "+reserved.remaining());
-      LOG.info("reserved.size = "+metasize);
+      LOG.info("reserved.remaining() = " + reserved.remaining());
+      LOG.info("reserved.size = "+ metasize);
       reserved.position(metasize);
       kvbuffer = reserved.slice();
       reserved.flip();
@@ -485,12 +483,10 @@ public class PipelinedSorter extends ExternalSorter {
                .asIntBuffer();
       out = new DataOutputStream(
               new BufferStreamWrapper(kvbuffer));
+      this.comparator = comparator;
     }
 
-    public SpanIterator sort(IndexedSorter sorter, RawComparator comparator) {
-    	this.comparator = comparator;
-      ki = new byte[keymax];
-      kj = new byte[keymax];
+    public SpanIterator sort(IndexedSorter sorter) {
       long start = System.currentTimeMillis();
       if(length() > 1) {
         sorter.sort(this, 0, length(), nullProgressable);
@@ -512,11 +508,31 @@ public class PipelinedSorter extends ExternalSorter {
       kvmeta.position(kvj); kvmeta.get(jmeta);
       kvmeta.position(kvj); kvmeta.put(imeta);
       kvmeta.position(kvi); kvmeta.put(jmeta);
+    }
 
-      if(i == mi || j == mj) i = -1;
-      if(i == mi || j == mj) j = -1;
+    private int compareKeys(final int kvi, final int kvj) {
+      final int istart = kvmeta.get(kvi + KEYSTART);
+      final int jstart = kvmeta.get(kvj + KEYSTART);
+      final int ilen   = kvmeta.get(kvi + VALSTART) - istart;
+      final int jlen   = kvmeta.get(kvj + VALSTART) - jstart;
+
+      if (ilen == 0 || jlen == 0) {
+        if (ilen == jlen) {
+          eq++;
+        }
+        return ilen - jlen;
+      }
+
+      final byte[] buf = kvbuffer.array();
+      final int off = kvbuffer.arrayOffset();
+
+      // sort by key
+      final int cmp = comparator.compare(buf, off + istart, ilen, buf, off + jstart, jlen);
+      if(cmp == 0) eq++;
+      return cmp;
     }
 
+
     public int compare(final int mi, final int mj) {
       final int kvi = offsetFor(mi);
       final int kvj = offsetFor(mj);
@@ -526,20 +542,7 @@ public class PipelinedSorter extends ExternalSorter {
       if (kvip != kvjp) {
         return kvip - kvjp;
       }
-      
-      final int istart = kvmeta.get(kvi + KEYSTART);
-      final int jstart = kvmeta.get(kvj + KEYSTART);
-      final int ilen   = kvmeta.get(kvi + VALSTART) - istart;
-      final int jlen   = kvmeta.get(kvj + VALSTART) - jstart;
-
-      kvbuffer.position(istart);
-      kvbuffer.get(ki, 0, ilen);
-      kvbuffer.position(jstart);
-      kvbuffer.get(kj, 0, jlen);
-      // sort by key
-      final int cmp = comparator.compare(ki, 0, ilen, kj, 0, jlen);
-      if(cmp == 0) eq++;
-      return cmp;
+      return compareKeys(kvi, kvj);
     }
 
     public SortSpan next() {
@@ -547,7 +550,7 @@ public class PipelinedSorter extends ExternalSorter {
       if(remaining != null) {
         int items = length();
         int perItem = kvbuffer.position()/items;
-        SortSpan newSpan = new SortSpan(remaining, items, perItem);
+        SortSpan newSpan = new SortSpan(remaining, items, perItem, this.comparator);
         newSpan.index = index+1;
         return newSpan;
       }
@@ -576,22 +579,22 @@ public class PipelinedSorter extends ExternalSorter {
       return remaining;
     }
 
-    private int compareInternal(DataInputBuffer needle, int needlePart, int index) {
+    public int compareInternal(final DataInputBuffer needle, final int needlePart, final int index) {
       int cmp = 0;
-      int keystart;
-      int valstart;
-      int partition;
-      partition = kvmeta.get(span.offsetFor(index) + PARTITION);
+      final int keystart;
+      final int valstart;
+      final int partition;
+      partition = kvmeta.get(this.offsetFor(index) + PARTITION);
       if(partition != needlePart) {
           cmp = (partition-needlePart);
       } else {
-        keystart = kvmeta.get(span.offsetFor(index) + KEYSTART);
-        valstart = kvmeta.get(span.offsetFor(index) + VALSTART);
-        // hay is allocated ahead of time
-        hay.reset(kvbuffer, keystart, valstart - keystart);
-        cmp = comparator.compare(hay.getData(), 
-            hay.getPosition(), hay.getLength(),
-            needle.getData(), 
+        keystart = kvmeta.get(this.offsetFor(index) + KEYSTART);
+        valstart = kvmeta.get(this.offsetFor(index) + VALSTART);
+        final byte[] buf = kvbuffer.array();
+        final int off = kvbuffer.arrayOffset();
+        cmp = comparator.compare(buf,
+            keystart + off , (valstart - keystart),
+            needle.getData(),
             needle.getPosition(), needle.getLength());
       }
       return cmp;
@@ -609,13 +612,13 @@ public class PipelinedSorter extends ExternalSorter {
 
   private static class SpanIterator implements PartitionedRawKeyValueIterator, Comparable<SpanIterator> {
     private int kvindex = -1;
-    private int maxindex;
-    private IntBuffer kvmeta;
-    private ByteBuffer kvbuffer;
-    private SortSpan span;
-    private InputByteBuffer key = new InputByteBuffer();
-    private InputByteBuffer value = new InputByteBuffer();
-    private Progress progress = new Progress();
+    private final int maxindex;
+    private final IntBuffer kvmeta;
+    private final ByteBuffer kvbuffer;
+    private final SortSpan span;
+    private final InputByteBuffer key = new InputByteBuffer();
+    private final InputByteBuffer value = new InputByteBuffer();
+    private final Progress progress = new Progress();
 
     private static final int minrun = (1 << 4);
 
@@ -626,21 +629,25 @@ public class PipelinedSorter extends ExternalSorter {
       this.maxindex = (kvmeta.limit()/NMETA) - 1;
     }
 
-    public DataInputBuffer getKey() throws IOException {
+    public DataInputBuffer getKey()  {
       final int keystart = kvmeta.get(span.offsetFor(kvindex) + KEYSTART);
       final int valstart = kvmeta.get(span.offsetFor(kvindex) + VALSTART);
-      key.reset(kvbuffer, keystart, valstart - keystart);
+      final byte[] buf = kvbuffer.array();
+      final int off = kvbuffer.arrayOffset();
+      key.reset(buf, off + keystart, valstart - keystart);
       return key;
     }
 
-    public DataInputBuffer getValue() throws IOException {
+    public DataInputBuffer getValue() {
       final int valstart = kvmeta.get(span.offsetFor(kvindex) + VALSTART);
       final int vallen = kvmeta.get(span.offsetFor(kvindex) + VALLEN);
-      value.reset(kvbuffer, valstart, vallen);
+      final byte[] buf = kvbuffer.array();
+      final int off = kvbuffer.arrayOffset();
+      value.reset(buf, off + valstart, vallen);
       return value;
     }
 
-    public boolean next() throws IOException {
+    public boolean next() {
       // caveat: since we use this as a comparable in the merger 
       if(kvindex == maxindex) return false;
       if(kvindex % 100 == 0) {
@@ -650,7 +657,7 @@ public class PipelinedSorter extends ExternalSorter {
       return true;
     }
 
-    public void close() throws IOException {
+    public void close() {
     }
 
     public Progress getProgress() { 
@@ -667,22 +674,18 @@ public class PipelinedSorter extends ExternalSorter {
       return partition;
     }
 
+    @SuppressWarnings("unused")
     public int size() {
       return (maxindex - kvindex);
     }
 
     public int compareTo(SpanIterator other) {
-      try {
-        return span.compareInternal(other.getKey(), other.getPartition(), kvindex);
-      } catch(IOException ie) {
-        // since we're not reading off disk, how could getKey() throw exceptions?
-      }
-      return -1;
+      return span.compareInternal(other.getKey(), other.getPartition(), kvindex);
     }
     
     @Override
     public String toString() {
-        return String.format("SpanIterator<%d:%d> (span=%s)", kvindex, maxindex, span.toString());
+      return String.format("SpanIterator<%d:%d> (span=%s)", kvindex, maxindex, span.toString());
     }
 
     /**
@@ -744,17 +747,14 @@ public class PipelinedSorter extends ExternalSorter {
   private static class SortTask implements Callable<SpanIterator> {
     private final SortSpan sortable;
     private final IndexedSorter sorter;
-    private final RawComparator comparator;
-    
-    public SortTask(SortSpan sortable, 
-              IndexedSorter sorter, RawComparator comparator) {
+
+    public SortTask(SortSpan sortable, IndexedSorter sorter) {
         this.sortable = sortable;
         this.sorter = sorter;
-        this.comparator = comparator;
     }
 
     public SpanIterator call() {
-      return sortable.sort(sorter, comparator);
+      return sortable.sort(sorter);
     }
   }
 
@@ -795,12 +795,15 @@ public class PipelinedSorter extends ExternalSorter {
       this.partition = partition;
     }
 
+    @SuppressWarnings("unused")
     public int getPartition() {
       return this.partition;
     }
   }
 
   private static class SpanHeap extends java.util.PriorityQueue<SpanIterator> {
+    private static final long serialVersionUID = 1L;
+
     public SpanHeap() {
       super(256);
     }
@@ -813,7 +816,7 @@ public class PipelinedSorter extends ExternalSorter {
     }
   }
 
-  private class SpanMerger implements PartitionedRawKeyValueIterator {
+  private final class SpanMerger implements PartitionedRawKeyValueIterator {
     InputByteBuffer key = new InputByteBuffer();
     InputByteBuffer value = new InputByteBuffer();
     int partition;
@@ -829,20 +832,21 @@ public class PipelinedSorter extends ExternalSorter {
     private long eq = 0;
     
     public SpanMerger() {
+      // SpanIterators are comparable
       partIter = new PartitionFilter(this);
     }
 
-    public void add(SpanIterator iter) throws IOException{
+    public final void add(SpanIterator iter) {
       if(iter.next()) {
         heap.add(iter);
       }
     }
 
-    public void add(Future<SpanIterator> iter) throws IOException{
+    public final void add(Future<SpanIterator> iter) {
       this.futures.add(iter);
     }
 
-    public boolean ready() throws IOException, InterruptedException {
+    public final boolean ready() throws IOException, InterruptedException {
       try {
         SpanIterator iter = null;
         while(this.futures.size() > 0) {
@@ -866,7 +870,7 @@ public class PipelinedSorter extends ExternalSorter {
       }
     }
 
-    private SpanIterator pop() throws IOException {
+    private SpanIterator pop() {
       if(gallop > 0) {
         gallop--;
         return horse;
@@ -875,7 +879,7 @@ public class PipelinedSorter extends ExternalSorter {
       SpanIterator next = heap.peek();
       if(next != null && current != null &&
         ((Object)horse) == ((Object)current)) {
-        // TODO: a better threshold check
+        // TODO: a better threshold check than 1 key repeating
         gallop = current.bisect(next.getKey(), next.getPartition())-1;
       }
       horse = current;
@@ -885,26 +889,27 @@ public class PipelinedSorter extends ExternalSorter {
     public boolean needsRLE() {
       return (eq > 0.1 * total);
     }
-    
-    private SpanIterator peek() throws IOException {
-    	if(gallop > 0) {
-            return horse;
-        }
-    	return heap.peek();
+
+    @SuppressWarnings("unused")
+    private SpanIterator peek() {
+      if (gallop > 0) {
+        return horse;
+      }
+      return heap.peek();
     }
 
-    public boolean next() throws IOException {
+    public final boolean next() {
       SpanIterator current = pop();
 
       if(current != null) {
-        // keep local copies, since add() will move it all out
+        partition = current.getPartition();
         key.reset(current.getKey());
         value.reset(current.getValue());
-        partition = current.getPartition();
         if(gallop <= 0) {
+          // since all keys and values are references to the kvbuffer, no more deep copies
           this.add(current);
         } else {
-          // galloping
+          // galloping, no deep copies required anyway
           current.next();
         }
         return true;
@@ -912,8 +917,8 @@ public class PipelinedSorter extends ExternalSorter {
       return false;
     }
 
-    public DataInputBuffer getKey() throws IOException { return key; }
-    public DataInputBuffer getValue() throws IOException { return value; }
+    public DataInputBuffer getKey() { return key; }
+    public DataInputBuffer getValue() { return value; }
     public int getPartition() { return partition; }
 
     public void close() throws IOException {