You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2015/06/24 11:29:16 UTC
tez git commit: TEZ-1314. Port MAPREDUCE-5821 to Tez (rbalamohan)
Repository: tez
Updated Branches:
refs/heads/master 142bd428b -> d48d9f04d
TEZ-1314. Port MAPREDUCE-5821 to Tez (rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d48d9f04
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d48d9f04
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d48d9f04
Branch: refs/heads/master
Commit: d48d9f04df34a4b770587dcba7092ad957b0f29c
Parents: 142bd42
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Wed Jun 24 15:00:34 2015 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Wed Jun 24 15:00:34 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../library/common/sort/impl/TezMerger.java | 12 ++-
.../library/common/sort/impl/TestTezMerger.java | 107 ++++++++++++++++++-
3 files changed, 113 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/d48d9f04/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ac8d8d6..f989d7e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
TEZ-2468. Change the minimum Java version to Java 7.
ALL CHANGES:
+ TEZ-1314. Port MAPREDUCE-5821 to Tez.
TEZ-2558. Upload additional Tez images.
TEZ-2486. Update tez website to include links based on
http://www.apache.org/foundation/marks/pmcs.html#navigation.
http://git-wip-us.apache.org/repos/asf/tez/blob/d48d9f04/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
index 3b7bf05..35a9276 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -384,7 +385,8 @@ public class TezMerger {
}
}
- private static class MergeQueue<K extends Object, V extends Object>
+ @VisibleForTesting
+ static class MergeQueue<K extends Object, V extends Object>
extends PriorityQueue<Segment> implements TezRawKeyValueIterator {
Configuration conf;
FileSystem fs;
@@ -575,6 +577,8 @@ public class TezMerger {
}
}
minSegment = top();
+ long startPos = minSegment.getPosition();
+ key = minSegment.getKey();
if (!minSegment.inMemory()) {
//When we load the value from an inmemory segment, we reset
//the "value" DIB in this class to the inmem segment's byte[].
@@ -585,11 +589,11 @@ public class TezMerger {
//segment, we reset the "value" DIB to the byte[] in that (so
//we reuse the disk segment DIB whenever we consider
//a disk segment).
+ minSegment.getValue(diskIFileValue);
value.reset(diskIFileValue.getData(), diskIFileValue.getLength());
+ } else {
+ minSegment.getValue(value);
}
- long startPos = minSegment.getPosition();
- key = minSegment.getKey();
- minSegment.getValue(value);
long endPos = minSegment.getPosition();
totalBytesProcessed += endPos - startPos;
mergeProgress.set(totalBytesProcessed * progPerByte);
http://git-wip-us.apache.org/repos/asf/tez/blob/d48d9f04/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
index b86d054..b654b0f 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
@@ -24,13 +24,13 @@ import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.TreeMultimap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BoundedByteArrayOutputStream;
import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.RawComparator;
@@ -41,9 +41,14 @@ import org.apache.hadoop.util.Progressable;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryReader;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryWriter;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MergeManager;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.TestMergeManager;
import org.junit.AfterClass;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -53,6 +58,7 @@ import java.util.Map;
import java.util.Random;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
public class TestTezMerger {
@@ -70,6 +76,8 @@ public class TestTezMerger {
//store the generated data for final verification
private static ListMultimap<Integer, Long> verificationDataSet = LinkedListMultimap.create();
+ private MergeManager merger = mock(MergeManager.class);
+
static {
defaultConf.set("fs.defaultFS", "file:///");
try {
@@ -601,6 +609,11 @@ public class TestTezMerger {
null,
new Progress());
+ verifyData(records);
+ verificationDataSet.clear();
+ }
+
+ private void verifyData(TezRawKeyValueIterator records) throws IOException {
//Verify the merged data is correct
Map<Integer, Integer> dataMap = Maps.newHashMap();
int pk = -1;
@@ -647,7 +660,6 @@ public class TestTezMerger {
}
LOG.info("******************");
- verificationDataSet.clear();
}
private List<Path> createIFiles(int fileCount, int keysPerFile)
@@ -662,6 +674,95 @@ public class TestTezMerger {
return pathList;
}
+ @Test(timeout = 20000)
+ public void testMergeSegments() throws Exception {
+ List<TezMerger.Segment> segments = Lists.newLinkedList();
+ segments.addAll(createInMemorySegments(10, 100));
+ segments.addAll(createDiskSegments(10, 100));
+ mergeSegments(segments, 5, true);
+ verificationDataSet.clear();
+ segments.clear();
+
+ segments.addAll(createDiskSegments(10, 100));
+ mergeSegments(segments, 5, true);
+ verificationDataSet.clear();
+ segments.clear();
+
+ segments.addAll(createInMemorySegments(3, 100));
+ mergeSegments(segments, 5, false);
+ verificationDataSet.clear();
+ segments.clear();
+ }
+
+ @SuppressWarnings("unchecked")
+ private void mergeSegments(List<TezMerger.Segment> segmentList, int mergeFactor, boolean
+ hasDiskSegments) throws Exception {
+ //Merge datasets
+ TezMerger.MergeQueue mergeQueue = new TezMerger.MergeQueue(defaultConf, localFs, segmentList,
+ comparator, new Reporter(), false, false);
+
+ TezRawKeyValueIterator records = mergeQueue.merge(IntWritable.class, LongWritable.class,
+ mergeFactor, new Path(workDir, "tmp_"
+ + System.nanoTime()), null, null, null, null);
+
+ //Verify the merged data is correct
+ verifyData(records);
+
+ //ensure disk buffers are used
+ int diskBufLen = mergeQueue.diskIFileValue.getLength();
+ assertTrue(diskBufLen + " disk buf length should be > 0", (hasDiskSegments == diskBufLen > 0));
+
+ verificationDataSet.clear();
+ }
+
+ private List<TezMerger.Segment> createInMemorySegments(int segmentCount, int keysPerSegment)
+ throws IOException {
+ List<TezMerger.Segment> segmentList = Lists.newLinkedList();
+ Random rnd = new Random();
+ DataInputBuffer key = new DataInputBuffer();
+ DataInputBuffer value = new DataInputBuffer();
+ for (int i = 0; i < segmentCount; i++) {
+ BoundedByteArrayOutputStream stream = new BoundedByteArrayOutputStream(10000);
+ InMemoryWriter writer = new InMemoryWriter(stream);
+
+ for (int j = 0; j < keysPerSegment; j++) {
+ populateData(new IntWritable(rnd.nextInt()), new LongWritable(rnd.nextLong()), key, value);
+ writer.append(key, value);
+ }
+ writer.close();
+ InMemoryReader reader = new InMemoryReader(merger, null, stream.getBuffer(), 0, stream.getLimit());
+
+ segmentList.add(new TezMerger.Segment(reader, true, null));
+ }
+ return segmentList;
+ }
+
+ private void populateData(IntWritable intKey, LongWritable longVal, DataInputBuffer key,
+ DataInputBuffer value)
+ throws IOException {
+ DataOutputBuffer k = new DataOutputBuffer();
+ DataOutputBuffer v = new DataOutputBuffer();
+ intKey.write(k);
+ longVal.write(v);
+ key.reset(k.getData(), 0, k.getLength());
+ value.reset(v.getData(), 0, v.getLength());
+ verificationDataSet.put(intKey.get(), longVal.get());
+ }
+
+ private List<TezMerger.Segment> createDiskSegments(int segmentCount, int keysPerSegment) throws
+ IOException {
+ List<TezMerger.Segment> segmentList = Lists.newLinkedList();
+ Random rnd = new Random();
+ for (int i = 0; i < segmentCount; i++) {
+ int repeatCount = ((i % 2 == 0) && keysPerSegment > 0) ? rnd.nextInt(keysPerSegment) : 0;
+ Path ifilePath = writeIFile(keysPerSegment, repeatCount);
+
+ segmentList.add(new TezMerger.Segment(localFs, ifilePath, 0, localFs.getFileStatus
+ (ifilePath).getLen(), null, false, 1024, 1024, false, null));
+ }
+ return segmentList;
+ }
+
static Path writeIFile(int keysPerFile, int repeatCount) throws
IOException {
TreeMultimap<Integer, Long> dataSet = createDataForIFile(keysPerFile, repeatCount);