You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/02/05 22:00:47 UTC

[3/8] tez git commit: TEZ-1999. IndexOutOfBoundsException during merge (rbalamohan)

TEZ-1999. IndexOutOfBoundsException during merge (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ad6bf07e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ad6bf07e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ad6bf07e

Branch: refs/heads/TEZ-2003
Commit: ad6bf07eba9923fca2627503652d16cfceb72d39
Parents: b726869
Author: Rajesh Balamohan <rb...@hortonworks.com>
Authored: Sat Jan 31 19:53:23 2015 +0530
Committer: Rajesh Balamohan <rb...@hortonworks.com>
Committed: Sat Jan 31 19:53:23 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../library/common/sort/impl/TezMerger.java     |  29 +-
 .../library/common/sort/impl/TestTezMerger.java | 518 ++++++++++++++++++-
 3 files changed, 525 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/ad6bf07e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5c0bec0..03b0624 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-1999. IndexOutOfBoundsException during merge.
   TEZ-2000. Source vertex exists error during DAG submission.
   TEZ-2008. Add methods to SecureShuffleUtils to verify a reply based on a provided Key.
   TEZ-1995. Build failure against hadoop 2.2.

http://git-wip-us.apache.org/repos/asf/tez/blob/ad6bf07e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
index ed9a59d..5dd538a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
@@ -487,13 +487,32 @@ public class TezMerger {
       return value;
     }
 
+    private void populatePreviousKey() throws IOException {
+      key.reset();
+      BufferUtils.copy(key, prevKey);
+    }
+
     private void adjustPriorityQueue(Segment reader) throws IOException{
       long startPos = reader.getPosition();
-      if (hasNext != null && hasNext != KeyState.SAME_KEY) {
-        key.reset();
-        // TODO: This copy can be an unwanted operation when all keys are unique. Revisit this
-        // when we have better stats.
-        BufferUtils.copy(key, prevKey);
+      if (hasNext == null) {
+        /**
+         * hasNext can be null during first iteration & prevKey is initialized here.
+         * In cases of NO_KEY/NEW_KEY, we readjust the queue later. If new segment/file is found
+         * during this process, we need to compare keys for RLE across segment boundaries.
+         * prevKey can't be empty at that time (e.g custom comparators)
+         */
+        populatePreviousKey();
+      } else {
+        //indicates a key has been read already
+        if (hasNext != KeyState.SAME_KEY) {
+          /**
+           * Store previous key before reading next for later key comparisons.
+           * If all keys in a segment are unique, it would always hit this code path and key copies
+           * are wasteful in such condition, as these comparisons are mainly done for RLE.
+           * TODO: When better stats are available, this condition can be avoided.
+           */
+          populatePreviousKey();
+        }
       }
       hasNext = reader.readRawKey();
       long endPos = reader.getPosition();

http://git-wip-us.apache.org/repos/asf/tez/blob/ad6bf07e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
index ac17d8d..1e14b9b 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.runtime.library.common.sort.impl;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
@@ -32,23 +33,27 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progressable;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.TestMergeManager;
-import org.junit.After;
 import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
+import static org.junit.Assert.assertTrue;
+
 public class TestTezMerger {
 
   private static final Log LOG = LogFactory.getLog(TestTezMerger.class);
@@ -56,6 +61,11 @@ public class TestTezMerger {
   private static Configuration defaultConf = new Configuration();
   private static FileSystem localFs = null;
   private static Path workDir = null;
+  private static RawComparator comparator = null;
+  private static Random rnd = new Random();
+
+  private static final String SAME_KEY = "SAME_KEY";
+  private static final String DIFF_KEY = "DIFF_KEY";
 
   //store the generated data for final verification
   private static ListMultimap<Integer, Long> verificationDataSet = LinkedListMultimap.create();
@@ -76,6 +86,7 @@ public class TestTezMerger {
     Path baseDir = new Path(workDir, TestMergeManager.class.getName());
     String localDirs = baseDir.toString();
     defaultConf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDirs);
+    comparator = ConfigUtils.getIntermediateInputKeyComparator(defaultConf);
   }
 
   @AfterClass
@@ -83,8 +94,7 @@ public class TestTezMerger {
     localFs.delete(workDir, true);
   }
 
-
-  @Test
+  @Test(timeout = 80000)
   public void testMerge() throws Exception {
     /**
      * test with number of files, keys per file and mergefactor
@@ -95,6 +105,7 @@ public class TestTezMerger {
     merge(100, 0, 5);
 
     //small files
+    merge(12, 4, 2);
     merge(2, 10, 2);
     merge(1, 10, 1);
     merge(5, 10, 3);
@@ -105,17 +116,487 @@ public class TestTezMerger {
     merge(5, 1000, 5);
     merge(5, 1000, 10);
     merge(5, 1000, 100);
+
+    //Create random mix of files (empty files + files with keys)
+    List<Path> pathList = new LinkedList<Path>();
+    pathList.clear();
+    pathList.addAll(createIFiles(Math.max(2, rnd.nextInt(20)), 0));
+    pathList.addAll(createIFiles(Math.max(2, rnd.nextInt(20)), Math.max(2, rnd.nextInt(10))));
+    merge(pathList, Math.max(2, rnd.nextInt(10)));
+  }
+
+  private Path createIFileWithTextData(List<String> data) throws IOException {
+    Path path = new Path(workDir + "/src", "data_" + System.nanoTime() + ".out");
+    FSDataOutputStream out = localFs.create(path);
+    IFile.Writer writer = new IFile.Writer(defaultConf, out, Text.class,
+        Text.class, null, null, null, true);
+    for (String key : data) {
+      writer.append(new Text(key), new Text(key + "_" + System.nanoTime()));
+    }
+    writer.close();
+    out.close();
+    return path;
+  }
+
+  /**
+   * Verify if the records are as per the expected data set
+   *
+   * @param records
+   * @param expectedResult
+   * @throws IOException
+   */
+  private void verify(TezRawKeyValueIterator records, String[][] expectedResult)
+      throws IOException {
+    //Iterate through merged dataset (shouldn't throw any exceptions)
+    int i = 0;
+    while (records.next()) {
+      DataInputBuffer key = records.getKey();
+      DataInputBuffer value = records.getValue();
+
+      Text k = new Text();
+      k.readFields(key);
+      Text v = new Text();
+      v.readFields(value);
+
+      assertTrue(k.toString().equals(expectedResult[i][0]));
+
+      String correctResult = expectedResult[i][1];
+
+      if (records.isSameKey()) {
+        assertTrue("Expected " + correctResult, correctResult.equalsIgnoreCase(SAME_KEY));
+        LOG.info("\tSame Key : key=" + k + ", val=" + v);
+      } else {
+        assertTrue("Expected " + correctResult, correctResult.equalsIgnoreCase(DIFF_KEY));
+        LOG.info("key=" + k + ", val=" + v);
+      }
+
+      i++;
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testWithCustomComparator_WithEmptyStrings() throws Exception {
+    List<Path> pathList = new LinkedList<Path>();
+    List<String> data = Lists.newLinkedList();
+    //Merge datasets with custom comparator
+    RawComparator rc = new CustomComparator();
+
+    LOG.info("Test with custom comparator with empty strings in middle");
+
+    //Test with 4 files, where some texts are empty strings
+    data.add("0");
+    data.add("0");
+    pathList.add(createIFileWithTextData(data));
+
+    //Second file with empty key
+    data.clear();
+    data.add("");
+    pathList.add(createIFileWithTextData(data));
+
+    //Third file
+    data.clear();
+    data.add("0");
+    data.add("0");
+    pathList.add(createIFileWithTextData(data));
+
+    //Third file
+    data.clear();
+    data.add("1");
+    data.add("2");
+    pathList.add(createIFileWithTextData(data));
+
+    TezRawKeyValueIterator records = merge(pathList, rc);
+
+    String[][] expectedResult =
+        {
+            //formatting intentionally
+            { "", DIFF_KEY },
+            { "0", DIFF_KEY },
+              { "0", SAME_KEY },
+              { "0", SAME_KEY },
+              { "0", SAME_KEY },
+            { "1", DIFF_KEY },
+            { "2", DIFF_KEY }
+        };
+
+    verify(records, expectedResult);
+    pathList.clear();
+    data.clear();
+  }
+
+  @Test(timeout = 5000)
+  public void testWithCustomComparator_No_RLE() throws Exception {
+    List<Path> pathList = new LinkedList<Path>();
+    List<String> data = Lists.newLinkedList();
+    //Merge datasets with custom comparator
+    RawComparator rc = new CustomComparator();
+
+    LOG.info("Test with custom comparator with no RLE");
+
+    //Test with 3 files,
+    data.add("1");
+    data.add("4");
+    data.add("5");
+    pathList.add(createIFileWithTextData(data));
+
+    //Second file with empty key
+    data.clear();
+    data.add("2");
+    data.add("6");
+    data.add("7");
+    pathList.add(createIFileWithTextData(data));
+
+    //Third file
+    data.clear();
+    data.add("3");
+    data.add("8");
+    data.add("9");
+    pathList.add(createIFileWithTextData(data));
+
+    TezRawKeyValueIterator records = merge(pathList, rc);
+
+    String[][] expectedResult =
+        {
+            { "1", DIFF_KEY },
+            { "2", DIFF_KEY },
+            { "3", DIFF_KEY },
+            { "4", DIFF_KEY },
+            { "5", DIFF_KEY },
+            { "6", DIFF_KEY },
+            { "7", DIFF_KEY },
+            { "8", DIFF_KEY },
+            { "9", DIFF_KEY }
+        };
+
+    verify(records, expectedResult);
+    pathList.clear();
+    data.clear();
+  }
+
+  @Test(timeout = 5000)
+  public void testWithCustomComparator_RLE_acrossFiles() throws Exception {
+    List<Path> pathList = new LinkedList<Path>();
+    List<String> data = Lists.newLinkedList();
+
+    LOG.info("Test with custom comparator with RLE spanning across segment boundaries");
+
+    //Test with 2 files, where the RLE keys can span across files
+    //First file
+    data.clear();
+    data.add("0");
+    data.add("0");
+    pathList.add(createIFileWithTextData(data));
+
+    //Second file
+    data.clear();
+    data.add("0");
+    data.add("1");
+    pathList.add(createIFileWithTextData(data));
+
+    //Merge datasets with custom comparator
+    RawComparator rc = new CustomComparator();
+    TezRawKeyValueIterator records = merge(pathList, rc);
+
+    //expected result
+    String[][] expectedResult =
+        {
+            //formatting intentionally
+            { "0", DIFF_KEY },
+              { "0", SAME_KEY },
+              { "0", SAME_KEY },
+            { "1", DIFF_KEY }
+        };
+
+    verify(records, expectedResult);
+    pathList.clear();
+    data.clear();
+
+  }
+
+  @Test(timeout = 5000)
+  public void testWithCustomComparator_mixedFiles() throws Exception {
+    List<Path> pathList = new LinkedList<Path>();
+    List<String> data = Lists.newLinkedList();
+
+    LOG.info("Test with custom comparator with mixed set of segments (empty, non-empty etc)");
+
+    //Test with 2 files, where the RLE keys can span across files
+    //First file
+    data.clear();
+    data.add("0");
+    pathList.add(createIFileWithTextData(data));
+
+    //Second file; empty file
+    data.clear();
+    pathList.add(createIFileWithTextData(data));
+
+    //Third file with empty key
+    data.clear();
+    data.add("");
+    pathList.add(createIFileWithTextData(data));
+
+    //Fourth file with repeated keys
+    data.clear();
+    data.add("0");
+    data.add("0");
+    data.add("0");
+    pathList.add(createIFileWithTextData(data));
+
+    //Merge datasets with custom comparator
+    RawComparator rc = new CustomComparator();
+    TezRawKeyValueIterator records = merge(pathList, rc);
+
+    //expected result
+    String[][] expectedResult =
+        {
+            //formatting intentionally
+            { "", DIFF_KEY },
+            { "0", DIFF_KEY },
+              { "0", SAME_KEY },
+              { "0", SAME_KEY },
+              { "0", SAME_KEY }
+        };
+
+    verify(records, expectedResult);
+    pathList.clear();
+    data.clear();
+  }
+
+  @Test(timeout = 5000)
+  public void testWithCustomComparator_RLE() throws Exception {
+    List<Path> pathList = new LinkedList<Path>();
+    List<String> data = Lists.newLinkedList();
+
+    LOG.info("Test with custom comparator 2 files one containing RLE and also other segment "
+        + "starting with same key");
+
+    //Test with 2 files, same keys in middle of file
+    //First file
+    data.clear();
+    data.add("1");
+    data.add("2");
+    data.add("2");
+    pathList.add(createIFileWithTextData(data));
+
+    //Second file
+    data.clear();
+    data.add("2");
+    data.add("3");
+    pathList.add(createIFileWithTextData(data));
+
+    TezRawKeyValueIterator records = merge(pathList, new CustomComparator());
+
+    String[][] expectedResult =
+        {
+            //formatting intentionally
+            { "1", DIFF_KEY },
+            { "2", DIFF_KEY },
+              { "2", SAME_KEY },
+              { "2", SAME_KEY },
+            { "3", DIFF_KEY }
+        };
+
+    verify(records, expectedResult);
+    pathList.clear();
+    data.clear();
+  }
+
+  @Test(timeout = 5000)
+  public void testWithCustomComparator_RLE2() throws Exception {
+    List<Path> pathList = new LinkedList<Path>();
+    List<String> data = Lists.newLinkedList();
+
+    LOG.info(
+        "Test with custom comparator 3 files with RLE (starting keys) spanning across boundaries");
+
+    //Test with 3 files, same keys in middle of file
+    //First file
+    data.clear();
+    data.add("0");
+    data.add("1");
+    data.add("1");
+    pathList.add(createIFileWithTextData(data));
+
+    //Second file
+    data.clear();
+    data.add("0");
+    data.add("1");
+    pathList.add(createIFileWithTextData(data));
+
+    //Third file
+    data.clear();
+    data.add("0");
+    data.add("1");
+    data.add("1");
+    pathList.add(createIFileWithTextData(data));
+
+    TezRawKeyValueIterator records = merge(pathList, new CustomComparator());
+    String[][] expectedResult =
+        {
+            //formatting intentionally
+            { "0", DIFF_KEY },
+              { "0", SAME_KEY },
+              { "0", SAME_KEY },
+            { "1", DIFF_KEY },
+              { "1", SAME_KEY },
+              { "1", SAME_KEY },
+              { "1", SAME_KEY },
+              { "1", SAME_KEY }
+
+        };
+
+    verify(records, expectedResult);
+    pathList.clear();
+    data.clear();
+  }
+
+  @Test(timeout = 5000)
+  public void testWithCustomComparator() throws Exception {
+    List<Path> pathList = new LinkedList<Path>();
+    List<String> data = Lists.newLinkedList();
+
+    LOG.info(
+        "Test with custom comparator 3 files with RLE (starting keys) spanning across boundaries");
+
+    //Test with 3 files
+    //First file
+    data.clear();
+    data.add("0");
+    pathList.add(createIFileWithTextData(data));
+
+    //Second file
+    data.clear();
+    data.add("0");
+    pathList.add(createIFileWithTextData(data));
+
+    //Third file
+    data.clear();
+    data.add("1");
+    pathList.add(createIFileWithTextData(data));
+
+    TezRawKeyValueIterator records = merge(pathList, new CustomComparator());
+    String[][] expectedResult =
+        {
+            //formatting intentionally
+            { "0", DIFF_KEY },
+              { "0", SAME_KEY },
+            { "1", DIFF_KEY }
+        };
+
+    verify(records, expectedResult);
+    pathList.clear();
+    data.clear();
+  }
+
+  @Test(timeout = 5000)
+  public void testWithCustomComparator_RLE3() throws Exception {
+    List<Path> pathList = new LinkedList<Path>();
+    List<String> data = Lists.newLinkedList();
+
+    LOG.info("Test with custom comparator");
+
+    //Test with 3 files, same keys in middle of file
+    //First file
+    data.clear();
+    data.add("0");
+    pathList.add(createIFileWithTextData(data));
+
+    //Second file
+    data.clear();
+    data.add("0");
+    data.add("1");
+    data.add("1");
+    pathList.add(createIFileWithTextData(data));
+
+    TezRawKeyValueIterator records = merge(pathList, new CustomComparator());
+
+    String[][] expectedResult =
+        {
+            //formatting intentionally
+            { "0", DIFF_KEY },
+              { "0", SAME_KEY },
+            { "1", DIFF_KEY },
+              { "1", SAME_KEY } };
+
+    verify(records, expectedResult);
+    pathList.clear();
+    data.clear();
+  }
+
+  @Test(timeout = 5000)
+  public void testWithCustomComparator_allEmptyFiles() throws Exception {
+    List<Path> pathList = new LinkedList<Path>();
+    List<String> data = Lists.newLinkedList();
+
+    LOG.info("Test with custom comparator where all files are empty");
+
+    //First file
+    pathList.add(createIFileWithTextData(data));
+
+    //Second file
+    pathList.add(createIFileWithTextData(data));
+
+    //Third file
+    pathList.add(createIFileWithTextData(data));
+
+    //Fourth file
+    pathList.add(createIFileWithTextData(data));
+
+    TezRawKeyValueIterator records = merge(pathList, new CustomComparator());
+
+    String[][] expectedResult = new String[0][0];
+
+    verify(records, expectedResult);
+  }
+
+  /**
+   * Merge the data sets
+   *
+   * @param pathList
+   * @param rc
+   * @return
+   * @throws IOException
+   */
+  private TezRawKeyValueIterator merge(List<Path> pathList, RawComparator rc) throws IOException {
+    TezMerger merger = new TezMerger();
+    TezRawKeyValueIterator records = merger.merge(defaultConf, localFs, IntWritable.class,
+        LongWritable.class, null, false, 0, 1024, pathList.toArray(new Path[pathList.size()]),
+        true, 4, new Path(workDir, "tmp_" + System.nanoTime()), ((rc == null) ? comparator : rc),
+        new Reporter(), null, null,
+        null, new Progress());
+    return records;
+  }
+
+
+
+  //Sample comparator to test TEZ-1999 corner case
+  static class CustomComparator extends WritableComparator {
+    @Override
+    //Not a valid comparison, but just to check byte boundaries
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      Preconditions.checkArgument(l2 > 0 && l1 > 0, "l2=" + l2 + ",l1=" + l1);
+      ByteBuffer bb1 = ByteBuffer.wrap(b1, s1, l1);
+      ByteBuffer bb2 = ByteBuffer.wrap(b2, s2, l2);
+      return bb1.compareTo(bb2);
+    }
+  }
+
+  private void merge(List<Path> pathList, int mergeFactor) throws Exception {
+    merge(pathList, mergeFactor, null);
   }
 
   private void merge(int fileCount, int keysPerFile, int mergeFactor) throws Exception {
     List<Path> pathList = createIFiles(fileCount, keysPerFile);
+    merge(pathList, mergeFactor, null);
+  }
 
+  private void merge(List<Path> pathList, int mergeFactor, RawComparator rc) throws Exception {
     //Merge datasets
     TezMerger merger = new TezMerger();
     TezRawKeyValueIterator records = merger.merge(defaultConf, localFs, IntWritable.class,
         LongWritable.class, null, false, 0, 1024, pathList.toArray(new Path[pathList.size()]),
         true, mergeFactor, new Path(workDir, "tmp_" + System.nanoTime()),
-        ConfigUtils.getIntermediateInputKeyComparator(defaultConf), new Reporter(), null, null,
+        ((rc == null) ? comparator : rc), new Reporter(), null, null,
         null,
         new Progress());
 
@@ -134,9 +615,9 @@ public class TestTezMerger {
       if (records.isSameKey()) {
         LOG.info("\tSame Key : key=" + k.get() + ", val=" + v.get());
         //More than one key should be present in the source data
-        Assert.assertTrue(verificationDataSet.get(k.get()).size() > 1);
+        assertTrue(verificationDataSet.get(k.get()).size() > 1);
         //Ensure this is same as the previous key we saw
-        Assert.assertTrue(pk == k.get());
+        assertTrue("previousKey=" + pk + ", current=" + k.get(), pk == k.get());
       } else {
         LOG.info("key=" + k.get() + ", val=" + v.get());
       }
@@ -147,30 +628,30 @@ public class TestTezMerger {
     }
 
     //Verify if the number of distinct entries is the same in source and the test
-    Assert.assertTrue("dataMap=" + dataMap.keySet().size() + ", verificationSet=" +
-        verificationDataSet.keySet().size(),
+    assertTrue("dataMap=" + dataMap.keySet().size() + ", verificationSet=" +
+            verificationDataSet.keySet().size(),
         dataMap.keySet().size() == verificationDataSet.keySet().size());
 
     //Verify with source data
     for (Integer key : verificationDataSet.keySet()) {
-      Assert.assertTrue("Data size for " + key + " not matching with source; dataSize:" + dataMap
+      assertTrue("Data size for " + key + " not matching with source; dataSize:" + dataMap
               .get(key).intValue() + ", source:" + verificationDataSet.get(key).size(),
           dataMap.get(key).intValue() == verificationDataSet.get(key).size());
     }
 
     //Verify if every key has the same number of repeated items in the source dataset as well
     for (Map.Entry<Integer, Integer> entry : dataMap.entrySet()) {
-      Assert.assertTrue(entry.getKey() + "", verificationDataSet.get(entry.getKey()).size() == entry
+      assertTrue(entry.getKey() + "", verificationDataSet.get(entry.getKey()).size() == entry
           .getValue());
     }
 
     LOG.info("******************");
+    verificationDataSet.clear();
   }
 
   private List<Path> createIFiles(int fileCount, int keysPerFile)
       throws IOException {
     List<Path> pathList = Lists.newLinkedList();
-    verificationDataSet.clear();
     Random rnd = new Random();
     for (int i = 0; i < fileCount; i++) {
       int repeatCount = ((i % 2 == 0) && keysPerFile > 0) ? rnd.nextInt(keysPerFile) : 0;
@@ -180,8 +661,10 @@ public class TestTezMerger {
     return pathList;
   }
 
-  static Path writeIFile(int keysPerFile, int repeatCount) throws IOException {
+  static Path writeIFile(int keysPerFile, int repeatCount) throws
+      IOException {
     TreeMultimap<Integer, Long> dataSet = createDataForIFile(keysPerFile, repeatCount);
+    LOG.info("DataSet size : " + dataSet.size());
     Path path = new Path(workDir + "/src", "data_" + System.nanoTime() + ".out");
     FSDataOutputStream out = localFs.create(path);
     //create IFile with RLE
@@ -202,7 +685,7 @@ public class TestTezMerger {
   /**
    * Generate data set for ifile.  Create repeated keys if needed.
    *
-   * @param keyCount approximate number of keys to be created
+   * @param keyCount    approximate number of keys to be created
    * @param repeatCount number of times a key should be repeated
    * @return
    */
@@ -210,9 +693,9 @@ public class TestTezMerger {
     TreeMultimap<Integer, Long> dataSet = TreeMultimap.create();
     Random rnd = new Random();
     for (int i = 0; i < keyCount; i++) {
-      if (repeatCount > 0 && (rnd.nextInt(keyCount) % 2  == 0)) {
+      if (repeatCount > 0 && (rnd.nextInt(keyCount) % 2 == 0)) {
         //repeat this key
-        for(int j = 0; j < repeatCount; j++) {
+        for (int j = 0; j < repeatCount; j++) {
           IntWritable key = new IntWritable(rnd.nextInt(keyCount));
           LongWritable value = new LongWritable(System.nanoTime());
           dataSet.put(key.get(), value.get());
@@ -234,7 +717,6 @@ public class TestTezMerger {
     return dataSet;
   }
 
-
   private static class Reporter implements Progressable {
     @Override
     public void progress() {