You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/02/05 22:00:47 UTC
[3/8] tez git commit: TEZ-1999. IndexOutOfBoundsException during
merge (rbalamohan)
TEZ-1999. IndexOutOfBoundsException during merge (rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ad6bf07e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ad6bf07e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ad6bf07e
Branch: refs/heads/TEZ-2003
Commit: ad6bf07eba9923fca2627503652d16cfceb72d39
Parents: b726869
Author: Rajesh Balamohan <rb...@hortonworks.com>
Authored: Sat Jan 31 19:53:23 2015 +0530
Committer: Rajesh Balamohan <rb...@hortonworks.com>
Committed: Sat Jan 31 19:53:23 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../library/common/sort/impl/TezMerger.java | 29 +-
.../library/common/sort/impl/TestTezMerger.java | 518 ++++++++++++++++++-
3 files changed, 525 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/ad6bf07e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5c0bec0..03b0624 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-1999. IndexOutOfBoundsException during merge.
TEZ-2000. Source vertex exists error during DAG submission.
TEZ-2008. Add methods to SecureShuffleUtils to verify a reply based on a provided Key.
TEZ-1995. Build failure against hadoop 2.2.
http://git-wip-us.apache.org/repos/asf/tez/blob/ad6bf07e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
index ed9a59d..5dd538a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
@@ -487,13 +487,32 @@ public class TezMerger {
return value;
}
+ private void populatePreviousKey() throws IOException {
+ key.reset();
+ BufferUtils.copy(key, prevKey);
+ }
+
private void adjustPriorityQueue(Segment reader) throws IOException{
long startPos = reader.getPosition();
- if (hasNext != null && hasNext != KeyState.SAME_KEY) {
- key.reset();
- // TODO: This copy can be an unwanted operation when all keys are unique. Revisit this
- // when we have better stats.
- BufferUtils.copy(key, prevKey);
+ if (hasNext == null) {
+ /**
+ * hasNext can be null during first iteration & prevKey is initialized here.
+ * In cases of NO_KEY/NEW_KEY, we readjust the queue later. If new segment/file is found
+ * during this process, we need to compare keys for RLE across segment boundaries.
+ * prevKey can't be empty at that time (e.g custom comparators)
+ */
+ populatePreviousKey();
+ } else {
+ //indicates a key has been read already
+ if (hasNext != KeyState.SAME_KEY) {
+ /**
+ * Store previous key before reading next for later key comparisons.
+ * If all keys in a segment are unique, it would always hit this code path and key copies
+ * are wasteful in such condition, as these comparisons are mainly done for RLE.
+ * TODO: When better stats are available, this condition can be avoided.
+ */
+ populatePreviousKey();
+ }
}
hasNext = reader.readRawKey();
long endPos = reader.getPosition();
http://git-wip-us.apache.org/repos/asf/tez/blob/ad6bf07e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
index ac17d8d..1e14b9b 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
@@ -18,6 +18,7 @@
package org.apache.tez.runtime.library.common.sort.impl;
+import com.google.common.base.Preconditions;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
@@ -32,23 +33,27 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.TestMergeManager;
-import org.junit.After;
import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import static org.junit.Assert.assertTrue;
+
public class TestTezMerger {
private static final Log LOG = LogFactory.getLog(TestTezMerger.class);
@@ -56,6 +61,11 @@ public class TestTezMerger {
private static Configuration defaultConf = new Configuration();
private static FileSystem localFs = null;
private static Path workDir = null;
+ private static RawComparator comparator = null;
+ private static Random rnd = new Random();
+
+ private static final String SAME_KEY = "SAME_KEY";
+ private static final String DIFF_KEY = "DIFF_KEY";
//store the generated data for final verification
private static ListMultimap<Integer, Long> verificationDataSet = LinkedListMultimap.create();
@@ -76,6 +86,7 @@ public class TestTezMerger {
Path baseDir = new Path(workDir, TestMergeManager.class.getName());
String localDirs = baseDir.toString();
defaultConf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDirs);
+ comparator = ConfigUtils.getIntermediateInputKeyComparator(defaultConf);
}
@AfterClass
@@ -83,8 +94,7 @@ public class TestTezMerger {
localFs.delete(workDir, true);
}
-
- @Test
+ @Test(timeout = 80000)
public void testMerge() throws Exception {
/**
* test with number of files, keys per file and mergefactor
@@ -95,6 +105,7 @@ public class TestTezMerger {
merge(100, 0, 5);
//small files
+ merge(12, 4, 2);
merge(2, 10, 2);
merge(1, 10, 1);
merge(5, 10, 3);
@@ -105,17 +116,487 @@ public class TestTezMerger {
merge(5, 1000, 5);
merge(5, 1000, 10);
merge(5, 1000, 100);
+
+ //Create random mix of files (empty files + files with keys)
+ List<Path> pathList = new LinkedList<Path>();
+ pathList.clear();
+ pathList.addAll(createIFiles(Math.max(2, rnd.nextInt(20)), 0));
+ pathList.addAll(createIFiles(Math.max(2, rnd.nextInt(20)), Math.max(2, rnd.nextInt(10))));
+ merge(pathList, Math.max(2, rnd.nextInt(10)));
+ }
+
+ private Path createIFileWithTextData(List<String> data) throws IOException {
+ Path path = new Path(workDir + "/src", "data_" + System.nanoTime() + ".out");
+ FSDataOutputStream out = localFs.create(path);
+ IFile.Writer writer = new IFile.Writer(defaultConf, out, Text.class,
+ Text.class, null, null, null, true);
+ for (String key : data) {
+ writer.append(new Text(key), new Text(key + "_" + System.nanoTime()));
+ }
+ writer.close();
+ out.close();
+ return path;
+ }
+
+ /**
+ * Verify if the records are as per the expected data set
+ *
+ * @param records
+ * @param expectedResult
+ * @throws IOException
+ */
+ private void verify(TezRawKeyValueIterator records, String[][] expectedResult)
+ throws IOException {
+ //Iterate through merged dataset (shouldn't throw any exceptions)
+ int i = 0;
+ while (records.next()) {
+ DataInputBuffer key = records.getKey();
+ DataInputBuffer value = records.getValue();
+
+ Text k = new Text();
+ k.readFields(key);
+ Text v = new Text();
+ v.readFields(value);
+
+ assertTrue(k.toString().equals(expectedResult[i][0]));
+
+ String correctResult = expectedResult[i][1];
+
+ if (records.isSameKey()) {
+ assertTrue("Expected " + correctResult, correctResult.equalsIgnoreCase(SAME_KEY));
+ LOG.info("\tSame Key : key=" + k + ", val=" + v);
+ } else {
+ assertTrue("Expected " + correctResult, correctResult.equalsIgnoreCase(DIFF_KEY));
+ LOG.info("key=" + k + ", val=" + v);
+ }
+
+ i++;
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testWithCustomComparator_WithEmptyStrings() throws Exception {
+ List<Path> pathList = new LinkedList<Path>();
+ List<String> data = Lists.newLinkedList();
+ //Merge datasets with custom comparator
+ RawComparator rc = new CustomComparator();
+
+ LOG.info("Test with custom comparator with empty strings in middle");
+
+ //Test with 4 files, where some texts are empty strings
+ data.add("0");
+ data.add("0");
+ pathList.add(createIFileWithTextData(data));
+
+ //Second file with empty key
+ data.clear();
+ data.add("");
+ pathList.add(createIFileWithTextData(data));
+
+ //Third file
+ data.clear();
+ data.add("0");
+ data.add("0");
+ pathList.add(createIFileWithTextData(data));
+
+ //Third file
+ data.clear();
+ data.add("1");
+ data.add("2");
+ pathList.add(createIFileWithTextData(data));
+
+ TezRawKeyValueIterator records = merge(pathList, rc);
+
+ String[][] expectedResult =
+ {
+ //formatting intentionally
+ { "", DIFF_KEY },
+ { "0", DIFF_KEY },
+ { "0", SAME_KEY },
+ { "0", SAME_KEY },
+ { "0", SAME_KEY },
+ { "1", DIFF_KEY },
+ { "2", DIFF_KEY }
+ };
+
+ verify(records, expectedResult);
+ pathList.clear();
+ data.clear();
+ }
+
+ @Test(timeout = 5000)
+ public void testWithCustomComparator_No_RLE() throws Exception {
+ List<Path> pathList = new LinkedList<Path>();
+ List<String> data = Lists.newLinkedList();
+ //Merge datasets with custom comparator
+ RawComparator rc = new CustomComparator();
+
+ LOG.info("Test with custom comparator with no RLE");
+
+ //Test with 3 files,
+ data.add("1");
+ data.add("4");
+ data.add("5");
+ pathList.add(createIFileWithTextData(data));
+
+ //Second file with empty key
+ data.clear();
+ data.add("2");
+ data.add("6");
+ data.add("7");
+ pathList.add(createIFileWithTextData(data));
+
+ //Third file
+ data.clear();
+ data.add("3");
+ data.add("8");
+ data.add("9");
+ pathList.add(createIFileWithTextData(data));
+
+ TezRawKeyValueIterator records = merge(pathList, rc);
+
+ String[][] expectedResult =
+ {
+ { "1", DIFF_KEY },
+ { "2", DIFF_KEY },
+ { "3", DIFF_KEY },
+ { "4", DIFF_KEY },
+ { "5", DIFF_KEY },
+ { "6", DIFF_KEY },
+ { "7", DIFF_KEY },
+ { "8", DIFF_KEY },
+ { "9", DIFF_KEY }
+ };
+
+ verify(records, expectedResult);
+ pathList.clear();
+ data.clear();
+ }
+
+ @Test(timeout = 5000)
+ public void testWithCustomComparator_RLE_acrossFiles() throws Exception {
+ List<Path> pathList = new LinkedList<Path>();
+ List<String> data = Lists.newLinkedList();
+
+ LOG.info("Test with custom comparator with RLE spanning across segment boundaries");
+
+ //Test with 2 files, where the RLE keys can span across files
+ //First file
+ data.clear();
+ data.add("0");
+ data.add("0");
+ pathList.add(createIFileWithTextData(data));
+
+ //Second file
+ data.clear();
+ data.add("0");
+ data.add("1");
+ pathList.add(createIFileWithTextData(data));
+
+ //Merge datasets with custom comparator
+ RawComparator rc = new CustomComparator();
+ TezRawKeyValueIterator records = merge(pathList, rc);
+
+ //expected result
+ String[][] expectedResult =
+ {
+ //formatting intentionally
+ { "0", DIFF_KEY },
+ { "0", SAME_KEY },
+ { "0", SAME_KEY },
+ { "1", DIFF_KEY }
+ };
+
+ verify(records, expectedResult);
+ pathList.clear();
+ data.clear();
+
+ }
+
+ @Test(timeout = 5000)
+ public void testWithCustomComparator_mixedFiles() throws Exception {
+ List<Path> pathList = new LinkedList<Path>();
+ List<String> data = Lists.newLinkedList();
+
+ LOG.info("Test with custom comparator with mixed set of segments (empty, non-empty etc)");
+
+ //Test with 2 files, where the RLE keys can span across files
+ //First file
+ data.clear();
+ data.add("0");
+ pathList.add(createIFileWithTextData(data));
+
+ //Second file; empty file
+ data.clear();
+ pathList.add(createIFileWithTextData(data));
+
+ //Third file with empty key
+ data.clear();
+ data.add("");
+ pathList.add(createIFileWithTextData(data));
+
+ //Fourth file with repeated keys
+ data.clear();
+ data.add("0");
+ data.add("0");
+ data.add("0");
+ pathList.add(createIFileWithTextData(data));
+
+ //Merge datasets with custom comparator
+ RawComparator rc = new CustomComparator();
+ TezRawKeyValueIterator records = merge(pathList, rc);
+
+ //expected result
+ String[][] expectedResult =
+ {
+ //formatting intentionally
+ { "", DIFF_KEY },
+ { "0", DIFF_KEY },
+ { "0", SAME_KEY },
+ { "0", SAME_KEY },
+ { "0", SAME_KEY }
+ };
+
+ verify(records, expectedResult);
+ pathList.clear();
+ data.clear();
+ }
+
+ @Test(timeout = 5000)
+ public void testWithCustomComparator_RLE() throws Exception {
+ List<Path> pathList = new LinkedList<Path>();
+ List<String> data = Lists.newLinkedList();
+
+ LOG.info("Test with custom comparator 2 files one containing RLE and also other segment "
+ + "starting with same key");
+
+ //Test with 2 files, same keys in middle of file
+ //First file
+ data.clear();
+ data.add("1");
+ data.add("2");
+ data.add("2");
+ pathList.add(createIFileWithTextData(data));
+
+ //Second file
+ data.clear();
+ data.add("2");
+ data.add("3");
+ pathList.add(createIFileWithTextData(data));
+
+ TezRawKeyValueIterator records = merge(pathList, new CustomComparator());
+
+ String[][] expectedResult =
+ {
+ //formatting intentionally
+ { "1", DIFF_KEY },
+ { "2", DIFF_KEY },
+ { "2", SAME_KEY },
+ { "2", SAME_KEY },
+ { "3", DIFF_KEY }
+ };
+
+ verify(records, expectedResult);
+ pathList.clear();
+ data.clear();
+ }
+
+ @Test(timeout = 5000)
+ public void testWithCustomComparator_RLE2() throws Exception {
+ List<Path> pathList = new LinkedList<Path>();
+ List<String> data = Lists.newLinkedList();
+
+ LOG.info(
+ "Test with custom comparator 3 files with RLE (starting keys) spanning across boundaries");
+
+ //Test with 3 files, same keys in middle of file
+ //First file
+ data.clear();
+ data.add("0");
+ data.add("1");
+ data.add("1");
+ pathList.add(createIFileWithTextData(data));
+
+ //Second file
+ data.clear();
+ data.add("0");
+ data.add("1");
+ pathList.add(createIFileWithTextData(data));
+
+ //Third file
+ data.clear();
+ data.add("0");
+ data.add("1");
+ data.add("1");
+ pathList.add(createIFileWithTextData(data));
+
+ TezRawKeyValueIterator records = merge(pathList, new CustomComparator());
+ String[][] expectedResult =
+ {
+ //formatting intentionally
+ { "0", DIFF_KEY },
+ { "0", SAME_KEY },
+ { "0", SAME_KEY },
+ { "1", DIFF_KEY },
+ { "1", SAME_KEY },
+ { "1", SAME_KEY },
+ { "1", SAME_KEY },
+ { "1", SAME_KEY }
+
+ };
+
+ verify(records, expectedResult);
+ pathList.clear();
+ data.clear();
+ }
+
+ @Test(timeout = 5000)
+ public void testWithCustomComparator() throws Exception {
+ List<Path> pathList = new LinkedList<Path>();
+ List<String> data = Lists.newLinkedList();
+
+ LOG.info(
+ "Test with custom comparator 3 files with RLE (starting keys) spanning across boundaries");
+
+ //Test with 3 files
+ //First file
+ data.clear();
+ data.add("0");
+ pathList.add(createIFileWithTextData(data));
+
+ //Second file
+ data.clear();
+ data.add("0");
+ pathList.add(createIFileWithTextData(data));
+
+ //Third file
+ data.clear();
+ data.add("1");
+ pathList.add(createIFileWithTextData(data));
+
+ TezRawKeyValueIterator records = merge(pathList, new CustomComparator());
+ String[][] expectedResult =
+ {
+ //formatting intentionally
+ { "0", DIFF_KEY },
+ { "0", SAME_KEY },
+ { "1", DIFF_KEY }
+ };
+
+ verify(records, expectedResult);
+ pathList.clear();
+ data.clear();
+ }
+
+ @Test(timeout = 5000)
+ public void testWithCustomComparator_RLE3() throws Exception {
+ List<Path> pathList = new LinkedList<Path>();
+ List<String> data = Lists.newLinkedList();
+
+ LOG.info("Test with custom comparator");
+
+ //Test with 3 files, same keys in middle of file
+ //First file
+ data.clear();
+ data.add("0");
+ pathList.add(createIFileWithTextData(data));
+
+ //Second file
+ data.clear();
+ data.add("0");
+ data.add("1");
+ data.add("1");
+ pathList.add(createIFileWithTextData(data));
+
+ TezRawKeyValueIterator records = merge(pathList, new CustomComparator());
+
+ String[][] expectedResult =
+ {
+ //formatting intentionally
+ { "0", DIFF_KEY },
+ { "0", SAME_KEY },
+ { "1", DIFF_KEY },
+ { "1", SAME_KEY } };
+
+ verify(records, expectedResult);
+ pathList.clear();
+ data.clear();
+ }
+
+ @Test(timeout = 5000)
+ public void testWithCustomComparator_allEmptyFiles() throws Exception {
+ List<Path> pathList = new LinkedList<Path>();
+ List<String> data = Lists.newLinkedList();
+
+ LOG.info("Test with custom comparator where all files are empty");
+
+ //First file
+ pathList.add(createIFileWithTextData(data));
+
+ //Second file
+ pathList.add(createIFileWithTextData(data));
+
+ //Third file
+ pathList.add(createIFileWithTextData(data));
+
+ //Fourth file
+ pathList.add(createIFileWithTextData(data));
+
+ TezRawKeyValueIterator records = merge(pathList, new CustomComparator());
+
+ String[][] expectedResult = new String[0][0];
+
+ verify(records, expectedResult);
+ }
+
+ /**
+ * Merge the data sets
+ *
+ * @param pathList
+ * @param rc
+ * @return
+ * @throws IOException
+ */
+ private TezRawKeyValueIterator merge(List<Path> pathList, RawComparator rc) throws IOException {
+ TezMerger merger = new TezMerger();
+ TezRawKeyValueIterator records = merger.merge(defaultConf, localFs, IntWritable.class,
+ LongWritable.class, null, false, 0, 1024, pathList.toArray(new Path[pathList.size()]),
+ true, 4, new Path(workDir, "tmp_" + System.nanoTime()), ((rc == null) ? comparator : rc),
+ new Reporter(), null, null,
+ null, new Progress());
+ return records;
+ }
+
+
+
+ //Sample comparator to test TEZ-1999 corner case
+ static class CustomComparator extends WritableComparator {
+ @Override
+ //Not a valid comparison, but just to check byte boundaries
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ Preconditions.checkArgument(l2 > 0 && l1 > 0, "l2=" + l2 + ",l1=" + l1);
+ ByteBuffer bb1 = ByteBuffer.wrap(b1, s1, l1);
+ ByteBuffer bb2 = ByteBuffer.wrap(b2, s2, l2);
+ return bb1.compareTo(bb2);
+ }
+ }
+
+ private void merge(List<Path> pathList, int mergeFactor) throws Exception {
+ merge(pathList, mergeFactor, null);
}
private void merge(int fileCount, int keysPerFile, int mergeFactor) throws Exception {
List<Path> pathList = createIFiles(fileCount, keysPerFile);
+ merge(pathList, mergeFactor, null);
+ }
+ private void merge(List<Path> pathList, int mergeFactor, RawComparator rc) throws Exception {
//Merge datasets
TezMerger merger = new TezMerger();
TezRawKeyValueIterator records = merger.merge(defaultConf, localFs, IntWritable.class,
LongWritable.class, null, false, 0, 1024, pathList.toArray(new Path[pathList.size()]),
true, mergeFactor, new Path(workDir, "tmp_" + System.nanoTime()),
- ConfigUtils.getIntermediateInputKeyComparator(defaultConf), new Reporter(), null, null,
+ ((rc == null) ? comparator : rc), new Reporter(), null, null,
null,
new Progress());
@@ -134,9 +615,9 @@ public class TestTezMerger {
if (records.isSameKey()) {
LOG.info("\tSame Key : key=" + k.get() + ", val=" + v.get());
//More than one key should be present in the source data
- Assert.assertTrue(verificationDataSet.get(k.get()).size() > 1);
+ assertTrue(verificationDataSet.get(k.get()).size() > 1);
//Ensure this is same as the previous key we saw
- Assert.assertTrue(pk == k.get());
+ assertTrue("previousKey=" + pk + ", current=" + k.get(), pk == k.get());
} else {
LOG.info("key=" + k.get() + ", val=" + v.get());
}
@@ -147,30 +628,30 @@ public class TestTezMerger {
}
//Verify if the number of distinct entries is the same in source and the test
- Assert.assertTrue("dataMap=" + dataMap.keySet().size() + ", verificationSet=" +
- verificationDataSet.keySet().size(),
+ assertTrue("dataMap=" + dataMap.keySet().size() + ", verificationSet=" +
+ verificationDataSet.keySet().size(),
dataMap.keySet().size() == verificationDataSet.keySet().size());
//Verify with source data
for (Integer key : verificationDataSet.keySet()) {
- Assert.assertTrue("Data size for " + key + " not matching with source; dataSize:" + dataMap
+ assertTrue("Data size for " + key + " not matching with source; dataSize:" + dataMap
.get(key).intValue() + ", source:" + verificationDataSet.get(key).size(),
dataMap.get(key).intValue() == verificationDataSet.get(key).size());
}
//Verify if every key has the same number of repeated items in the source dataset as well
for (Map.Entry<Integer, Integer> entry : dataMap.entrySet()) {
- Assert.assertTrue(entry.getKey() + "", verificationDataSet.get(entry.getKey()).size() == entry
+ assertTrue(entry.getKey() + "", verificationDataSet.get(entry.getKey()).size() == entry
.getValue());
}
LOG.info("******************");
+ verificationDataSet.clear();
}
private List<Path> createIFiles(int fileCount, int keysPerFile)
throws IOException {
List<Path> pathList = Lists.newLinkedList();
- verificationDataSet.clear();
Random rnd = new Random();
for (int i = 0; i < fileCount; i++) {
int repeatCount = ((i % 2 == 0) && keysPerFile > 0) ? rnd.nextInt(keysPerFile) : 0;
@@ -180,8 +661,10 @@ public class TestTezMerger {
return pathList;
}
- static Path writeIFile(int keysPerFile, int repeatCount) throws IOException {
+ static Path writeIFile(int keysPerFile, int repeatCount) throws
+ IOException {
TreeMultimap<Integer, Long> dataSet = createDataForIFile(keysPerFile, repeatCount);
+ LOG.info("DataSet size : " + dataSet.size());
Path path = new Path(workDir + "/src", "data_" + System.nanoTime() + ".out");
FSDataOutputStream out = localFs.create(path);
//create IFile with RLE
@@ -202,7 +685,7 @@ public class TestTezMerger {
/**
* Generate data set for ifile. Create repeated keys if needed.
*
- * @param keyCount approximate number of keys to be created
+ * @param keyCount approximate number of keys to be created
* @param repeatCount number of times a key should be repeated
* @return
*/
@@ -210,9 +693,9 @@ public class TestTezMerger {
TreeMultimap<Integer, Long> dataSet = TreeMultimap.create();
Random rnd = new Random();
for (int i = 0; i < keyCount; i++) {
- if (repeatCount > 0 && (rnd.nextInt(keyCount) % 2 == 0)) {
+ if (repeatCount > 0 && (rnd.nextInt(keyCount) % 2 == 0)) {
//repeat this key
- for(int j = 0; j < repeatCount; j++) {
+ for (int j = 0; j < repeatCount; j++) {
IntWritable key = new IntWritable(rnd.nextInt(keyCount));
LongWritable value = new LongWritable(System.nanoTime());
dataSet.put(key.get(), value.get());
@@ -234,7 +717,6 @@ public class TestTezMerger {
return dataSet;
}
-
private static class Reporter implements Progressable {
@Override
public void progress() {