You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2015/06/24 11:29:16 UTC

tez git commit: TEZ-1314. Port MAPREDUCE-5821 to Tez (rbalamohan)

Repository: tez
Updated Branches:
  refs/heads/master 142bd428b -> d48d9f04d


TEZ-1314. Port MAPREDUCE-5821 to Tez (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d48d9f04
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d48d9f04
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d48d9f04

Branch: refs/heads/master
Commit: d48d9f04df34a4b770587dcba7092ad957b0f29c
Parents: 142bd42
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Wed Jun 24 15:00:34 2015 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Wed Jun 24 15:00:34 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../library/common/sort/impl/TezMerger.java     |  12 ++-
 .../library/common/sort/impl/TestTezMerger.java | 107 ++++++++++++++++++-
 3 files changed, 113 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/d48d9f04/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ac8d8d6..f989d7e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
   TEZ-2468. Change the minimum Java version to Java 7.
 
 ALL CHANGES:
+  TEZ-1314. Port MAPREDUCE-5821 to Tez.
   TEZ-2558. Upload additional Tez images.
   TEZ-2486. Update tez website to include links based on
     http://www.apache.org/foundation/marks/pmcs.html#navigation.

http://git-wip-us.apache.org/repos/asf/tez/blob/d48d9f04/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
index 3b7bf05..35a9276 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -384,7 +385,8 @@ public class TezMerger {
     }
   }
 
-  private static class MergeQueue<K extends Object, V extends Object> 
+  @VisibleForTesting
+  static class MergeQueue<K extends Object, V extends Object>
   extends PriorityQueue<Segment> implements TezRawKeyValueIterator {
     Configuration conf;
     FileSystem fs;
@@ -575,6 +577,8 @@ public class TezMerger {
         }
       }
       minSegment = top();
+      long startPos = minSegment.getPosition();
+      key = minSegment.getKey();
       if (!minSegment.inMemory()) {
         //When we load the value from an inmemory segment, we reset
         //the "value" DIB in this class to the inmem segment's byte[].
@@ -585,11 +589,11 @@ public class TezMerger {
         //segment, we reset the "value" DIB to the byte[] in that (so 
         //we reuse the disk segment DIB whenever we consider
         //a disk segment).
+        minSegment.getValue(diskIFileValue);
         value.reset(diskIFileValue.getData(), diskIFileValue.getLength());
+      } else {
+        minSegment.getValue(value);
       }
-      long startPos = minSegment.getPosition();
-      key = minSegment.getKey();
-      minSegment.getValue(value);
       long endPos = minSegment.getPosition();
       totalBytesProcessed += endPos - startPos;
       mergeProgress.set(totalBytesProcessed * progPerByte);

http://git-wip-us.apache.org/repos/asf/tez/blob/d48d9f04/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
index b86d054..b654b0f 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
@@ -24,13 +24,13 @@ import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.TreeMultimap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BoundedByteArrayOutputStream;
 import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.RawComparator;
@@ -41,9 +41,14 @@ import org.apache.hadoop.util.Progressable;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryReader;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryWriter;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MergeManager;
 import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.TestMergeManager;
 import org.junit.AfterClass;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -53,6 +58,7 @@ import java.util.Map;
 import java.util.Random;
 
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 
 public class TestTezMerger {
 
@@ -70,6 +76,8 @@ public class TestTezMerger {
   //store the generated data for final verification
   private static ListMultimap<Integer, Long> verificationDataSet = LinkedListMultimap.create();
 
+  private MergeManager merger = mock(MergeManager.class);
+
   static {
     defaultConf.set("fs.defaultFS", "file:///");
     try {
@@ -601,6 +609,11 @@ public class TestTezMerger {
         null,
         new Progress());
 
+    verifyData(records);
+    verificationDataSet.clear();
+  }
+
+  private void verifyData(TezRawKeyValueIterator records) throws IOException {
     //Verify the merged data is correct
     Map<Integer, Integer> dataMap = Maps.newHashMap();
     int pk = -1;
@@ -647,7 +660,6 @@ public class TestTezMerger {
     }
 
     LOG.info("******************");
-    verificationDataSet.clear();
   }
 
   private List<Path> createIFiles(int fileCount, int keysPerFile)
@@ -662,6 +674,95 @@ public class TestTezMerger {
     return pathList;
   }
 
+  @Test(timeout = 20000)
+  public void testMergeSegments() throws Exception {
+    List<TezMerger.Segment> segments = Lists.newLinkedList();
+    segments.addAll(createInMemorySegments(10, 100));
+    segments.addAll(createDiskSegments(10, 100));
+    mergeSegments(segments, 5, true);
+    verificationDataSet.clear();
+    segments.clear();
+
+    segments.addAll(createDiskSegments(10, 100));
+    mergeSegments(segments, 5, true);
+    verificationDataSet.clear();
+    segments.clear();
+
+    segments.addAll(createInMemorySegments(3, 100));
+    mergeSegments(segments, 5, false);
+    verificationDataSet.clear();
+    segments.clear();
+  }
+
+  @SuppressWarnings("unchecked")
+  private void mergeSegments(List<TezMerger.Segment> segmentList, int mergeFactor, boolean
+      hasDiskSegments) throws Exception {
+    //Merge datasets
+    TezMerger.MergeQueue mergeQueue = new TezMerger.MergeQueue(defaultConf, localFs, segmentList,
+        comparator, new Reporter(), false, false);
+
+    TezRawKeyValueIterator records = mergeQueue.merge(IntWritable.class, LongWritable.class,
+        mergeFactor, new Path(workDir, "tmp_"
+        + System.nanoTime()), null, null, null, null);
+
+    //Verify the merged data is correct
+    verifyData(records);
+
+    //ensure disk buffers are used
+    int diskBufLen = mergeQueue.diskIFileValue.getLength();
+    assertTrue(diskBufLen + " disk buf length should be > 0", (hasDiskSegments == diskBufLen > 0));
+
+    verificationDataSet.clear();
+  }
+
+  private List<TezMerger.Segment> createInMemorySegments(int segmentCount, int keysPerSegment)
+      throws IOException {
+    List<TezMerger.Segment> segmentList = Lists.newLinkedList();
+    Random rnd = new Random();
+    DataInputBuffer key = new DataInputBuffer();
+    DataInputBuffer value = new DataInputBuffer();
+    for (int i = 0; i < segmentCount; i++) {
+      BoundedByteArrayOutputStream stream = new BoundedByteArrayOutputStream(10000);
+      InMemoryWriter writer = new InMemoryWriter(stream);
+
+      for (int j = 0; j < keysPerSegment; j++) {
+        populateData(new IntWritable(rnd.nextInt()), new LongWritable(rnd.nextLong()), key, value);
+        writer.append(key, value);
+      }
+      writer.close();
+      InMemoryReader reader = new InMemoryReader(merger, null, stream.getBuffer(), 0, stream.getLimit());
+
+      segmentList.add(new TezMerger.Segment(reader, true, null));
+    }
+    return segmentList;
+  }
+
+  private void populateData(IntWritable intKey, LongWritable longVal, DataInputBuffer key,
+      DataInputBuffer value)
+      throws  IOException {
+    DataOutputBuffer k = new DataOutputBuffer();
+    DataOutputBuffer v = new DataOutputBuffer();
+    intKey.write(k);
+    longVal.write(v);
+    key.reset(k.getData(), 0, k.getLength());
+    value.reset(v.getData(), 0, v.getLength());
+    verificationDataSet.put(intKey.get(), longVal.get());
+  }
+
+  private List<TezMerger.Segment> createDiskSegments(int segmentCount, int keysPerSegment) throws
+      IOException {
+    List<TezMerger.Segment> segmentList = Lists.newLinkedList();
+    Random rnd = new Random();
+    for (int i = 0; i < segmentCount; i++) {
+      int repeatCount = ((i % 2 == 0) && keysPerSegment > 0) ? rnd.nextInt(keysPerSegment) : 0;
+      Path ifilePath = writeIFile(keysPerSegment, repeatCount);
+
+      segmentList.add(new TezMerger.Segment(localFs, ifilePath, 0, localFs.getFileStatus
+          (ifilePath).getLen(), null, false, 1024, 1024, false, null));
+    }
+    return segmentList;
+  }
+
   static Path writeIFile(int keysPerFile, int repeatCount) throws
       IOException {
     TreeMultimap<Integer, Long> dataSet = createDataForIFile(keysPerFile, repeatCount);