You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2015/01/06 10:46:00 UTC

tez git commit: TEZ-1767. Enable RLE in reducer side merge codepath (Rajesh Balamohan)

Repository: tez
Updated Branches:
  refs/heads/master 976074473 -> 9c385681f


TEZ-1767. Enable RLE in reducer side merge codepath (Rajesh Balamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9c385681
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9c385681
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9c385681

Branch: refs/heads/master
Commit: 9c385681fb19a0a8ea380bcacf7554ac4faa86b7
Parents: 9760744
Author: Rajesh Balamohan <rb...@hortonworks.com>
Authored: Tue Jan 6 15:15:23 2015 +0530
Committer: Rajesh Balamohan <rb...@hortonworks.com>
Committed: Tue Jan 6 15:15:23 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../processor/reduce/ReduceProcessor.java       |   6 +
 .../common/sort/impl/PipelinedSorter.java       |  16 ++
 .../library/common/sort/impl/TezMerger.java     |  82 ++++++-
 .../sort/impl/TezRawKeyValueIterator.java       |   8 +
 .../common/sort/impl/dflt/DefaultSorter.java    |   6 +
 .../input/OrderedGroupedInputLegacy.java        |   5 +
 .../library/common/sort/impl/TestTezMerger.java | 244 +++++++++++++++++++
 8 files changed, 358 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/9c385681/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c41ec49..41fb11a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-1767. Enable RLE in reducer side merge codepath.
   TEZ-1837. Restrict usage of Environment variables to main methods.
   TEZ-1867. Create new central dispatcher for Tez AM
   TEZ-1844. Shouldn't invoke system.exit in local mode when AM is failed to start.

http://git-wip-us.apache.org/repos/asf/tez/blob/9c385681/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
index 9469236..1a6a3a4 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
@@ -291,6 +291,12 @@ public class ReduceProcessor extends MRTask {
       public Progress getProgress() {
         return rawIter.getProgress();
       }
+
+      @Override
+      public boolean isSameKey() throws IOException {
+        return rawIter.isSameKey();
+      }
+
       public DataInputBuffer getValue() throws IOException {
         return rawIter.getValue();
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/9c385681/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index e353c00..ae654b4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -657,6 +657,11 @@ public class PipelinedSorter extends ExternalSorter {
       return progress;
     }
 
+    @Override
+    public boolean isSameKey() throws IOException {
+      throw new UnsupportedOperationException("Not yet supported");
+    }
+
     public int getPartition() {
       final int partition = kvmeta.get(span.offsetFor(kvindex) + PARTITION);
       return partition;
@@ -766,6 +771,12 @@ public class PipelinedSorter extends ExternalSorter {
     public Progress getProgress() {
       return new Progress();
     }
+
+    @Override
+    public boolean isSameKey() throws IOException {
+      return iter.isSameKey();
+    }
+
     public boolean next() throws IOException {
       if(dirty || iter.next()) { 
         int prefix = iter.getPartition();
@@ -916,6 +927,11 @@ public class PipelinedSorter extends ExternalSorter {
       return new Progress();
     }
 
+    @Override
+    public boolean isSameKey() throws IOException {
+      throw new UnsupportedOperationException("isSameKey is not supported");
+    }
+
     public TezRawKeyValueIterator filter(int partition) {
       partIter.reset(partition);
       return partIter;

http://git-wip-us.apache.org/repos/asf/tez/blob/9c385681/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
index 217e63a..3c8e66a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.util.PriorityQueue;
@@ -46,7 +47,7 @@ import org.apache.tez.runtime.library.common.Constants;
 import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader;
 import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader.KeyState;
 import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
-
+import org.apache.tez.runtime.library.utils.BufferUtils;
 
 /**
  * Merger is an utility class used by the Map and Reduce tasks for merging
@@ -55,7 +56,7 @@ import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 @SuppressWarnings({"unchecked", "rawtypes"})
-public class TezMerger {  
+public class TezMerger {
   private static final Log LOG = LogFactory.getLog(TezMerger.class);
 
   
@@ -166,10 +167,10 @@ public class TezMerger {
     return new MergeQueue(conf, fs, segments, comparator, reporter,
                            sortSegments, codec, considerFinalMergeForProgress).
                                          merge(keyClass, valueClass,
-                                               mergeFactor, tmpDir,
-                                               readsCounter, writesCounter,
-                                               bytesReadCounter,
-                                               mergePhase);
+                                             mergeFactor, tmpDir,
+                                             readsCounter, writesCounter,
+                                             bytesReadCounter,
+                                             mergePhase);
   }
 
   public static <K extends Object, V extends Object>
@@ -199,14 +200,23 @@ public class TezMerger {
                  Progressable progressable, long recordsBeforeProgress) 
   throws IOException {
     long recordCtr = 0;
+    long count = 0;
     while(records.next()) {
-      writer.append(records.getKey(), records.getValue());
+      if (records.isSameKey()) {
+        writer.append(IFile.REPEAT_KEY, records.getValue());
+        count++;
+      } else {
+        writer.append(records.getKey(), records.getValue());
+      }
       
       if (((recordCtr++) % recordsBeforeProgress) == 0) {
         progressable.progress();
       }
     }
-}
+    if ((count > 0) && LOG.isDebugEnabled()) {
+      LOG.debug("writeFile SAME_KEY count=" + count);
+    }
+  }
 
   @InterfaceAudience.Private
   @InterfaceStability.Unstable
@@ -407,6 +417,9 @@ public class TezMerger {
       }
     };
 
+    KeyState hasNext;
+    DataOutputBuffer prevKey = new DataOutputBuffer();
+
     public MergeQueue(Configuration conf, FileSystem fs, 
                       Path[] inputs, boolean deleteInputs,
                       CompressionCodec codec, boolean ifileReadAhead,
@@ -478,20 +491,49 @@ public class TezMerger {
 
     private void adjustPriorityQueue(Segment reader) throws IOException{
       long startPos = reader.getPosition();
-      KeyState hasNext = reader.readRawKey();
+      if (hasNext != null && hasNext != KeyState.SAME_KEY) {
+        key.reset();
+        // TODO: This copy can be an unwanted operation when all keys are unique. Revisit this
+        // when we have better stats.
+        BufferUtils.copy(key, prevKey);
+      }
+      hasNext = reader.readRawKey();
       long endPos = reader.getPosition();
       totalBytesProcessed += endPos - startPos;
       mergeProgress.set(totalBytesProcessed * progPerByte);
       if (hasNext == KeyState.NEW_KEY) {
         adjustTop();
+        compareKeyWithNextTopKey(reader);
       } else if(hasNext == KeyState.NO_KEY) {
         pop();
         reader.close();
+        compareKeyWithNextTopKey(null);
       } else if(hasNext == KeyState.SAME_KEY) {
         // do not rebalance the priority queue
       }
     }
 
+    /**
+     * Check if the previous key is same as the next top segment's key.
+     * This would be useful to compute whether same key is spread across multiple segments.
+     *
+     * @param current
+     * @throws IOException
+     */
+    void compareKeyWithNextTopKey(Segment current) throws IOException {
+      Segment nextTop = top();
+      if (nextTop != current) {
+        //we have a different file. Compare it with previous key
+        DataInputBuffer nextKey = nextTop.getKey();
+        int compare = compare(nextKey, prevKey);
+        if (compare == 0) {
+          //Same key is available in the next segment.
+          hasNext = KeyState.SAME_KEY;
+        }
+        nextKey.reset();
+      }
+    }
+
     public boolean next() throws IOException {
       if (size() == 0)
         return false;
@@ -528,6 +570,16 @@ public class TezMerger {
       return true;
     }
 
+    int compare(DataInputBuffer buf1, DataOutputBuffer buf2) {
+      byte[] b1 = buf1.getData();
+      byte[] b2 = buf2.getData();
+      int s1 = buf1.getPosition();
+      int s2 = 0;
+      int l1 = buf1.getLength();
+      int l2 = buf2.getLength();
+      return comparator.compare(b1, s1, (l1 - s1), b2, s2, l2);
+    }
+
     protected boolean lessThan(Object a, Object b) {
       DataInputBuffer key1 = ((Segment)a).getKey();
       DataInputBuffer key2 = ((Segment)b).getKey();
@@ -832,8 +884,13 @@ public class TezMerger {
       return mergeProgress;
     }
 
+    @Override
+    public boolean isSameKey() throws IOException {
+      return (hasNext != null) && (hasNext == KeyState.SAME_KEY);
+    }
+
   }
-  
+
   private static class EmptyIterator implements TezRawKeyValueIterator {
     final Progress progress;
 
@@ -865,5 +922,10 @@ public class TezMerger {
     public Progress getProgress() {
       return progress;
     }
+
+    @Override
+    public boolean isSameKey() throws IOException {
+      throw new UnsupportedOperationException("isSameKey is not supported");
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/9c385681/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezRawKeyValueIterator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezRawKeyValueIterator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezRawKeyValueIterator.java
index 3a2c2bf..2ae07cb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezRawKeyValueIterator.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezRawKeyValueIterator.java
@@ -67,4 +67,12 @@ public interface TezRawKeyValueIterator {
    * indicating the bytes processed by the iterator so far
    */
   Progress getProgress();
+
+  /**
+   * Whether the current key is same as the previous key
+   *
+   * @return
+   * @throws IOException
+   */
+  boolean isSameKey() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/9c385681/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index f872e1f..773eaba 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -984,6 +984,12 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
     public Progress getProgress() {
       return null;
     }
+
+    @Override
+    public boolean isSameKey() throws IOException {
+      throw new UnsupportedOperationException("isSameKey is not supported");
+    }
+
     public void close() { }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/9c385681/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedInputLegacy.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedInputLegacy.java
index e2960f8..6ae156a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedInputLegacy.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedInputLegacy.java
@@ -66,6 +66,11 @@ public class OrderedGroupedInputLegacy extends OrderedGroupedKVInput {
           progress.complete();
           return progress;
         }
+
+        @Override
+        public boolean isSameKey() throws IOException {
+          throw new UnsupportedOperationException("isSameKey is not supported");
+        }
       };
     }
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/9c385681/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
new file mode 100644
index 0000000..ac17d8d
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.sort.impl;
+
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.TreeMultimap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.Progressable;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.TestMergeManager;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+public class TestTezMerger {
+
+  private static final Log LOG = LogFactory.getLog(TestTezMerger.class);
+
+  private static Configuration defaultConf = new Configuration();
+  private static FileSystem localFs = null;
+  private static Path workDir = null;
+
+  //store the generated data for final verification
+  private static ListMultimap<Integer, Long> verificationDataSet = LinkedListMultimap.create();
+
+  static {
+    defaultConf.set("fs.defaultFS", "file:///");
+    try {
+      localFs = FileSystem.getLocal(defaultConf);
+      workDir = new Path(
+          new Path(System.getProperty("test.build.data", "/tmp")), TestTezMerger.class.getName())
+          .makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
+      LOG.info("Using workDir: " + workDir);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    defaultConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, IntWritable.class.getName());
+    defaultConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, LongWritable.class.getName());
+    Path baseDir = new Path(workDir, TestMergeManager.class.getName());
+    String localDirs = baseDir.toString();
+    defaultConf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDirs);
+  }
+
+  @AfterClass
+  public static void cleanup() throws Exception {
+    localFs.delete(workDir, true);
+  }
+
+
+  @Test
+  public void testMerge() throws Exception {
+    /**
+     * test with number of files, keys per file and mergefactor
+     */
+
+    //empty file
+    merge(1, 0, 1);
+    merge(100, 0, 5);
+
+    //small files
+    merge(2, 10, 2);
+    merge(1, 10, 1);
+    merge(5, 10, 3);
+    merge(200, 10, 100);
+
+    //bigger files
+    merge(5, 100, 5);
+    merge(5, 1000, 5);
+    merge(5, 1000, 10);
+    merge(5, 1000, 100);
+  }
+
+  private void merge(int fileCount, int keysPerFile, int mergeFactor) throws Exception {
+    List<Path> pathList = createIFiles(fileCount, keysPerFile);
+
+    //Merge datasets
+    TezMerger merger = new TezMerger();
+    TezRawKeyValueIterator records = merger.merge(defaultConf, localFs, IntWritable.class,
+        LongWritable.class, null, false, 0, 1024, pathList.toArray(new Path[pathList.size()]),
+        true, mergeFactor, new Path(workDir, "tmp_" + System.nanoTime()),
+        ConfigUtils.getIntermediateInputKeyComparator(defaultConf), new Reporter(), null, null,
+        null,
+        new Progress());
+
+    //Verify the merged data is correct
+    Map<Integer, Integer> dataMap = Maps.newHashMap();
+    int pk = -1;
+    while (records.next()) {
+      DataInputBuffer key = records.getKey();
+      DataInputBuffer value = records.getValue();
+
+      IntWritable k = new IntWritable();
+      k.readFields(key);
+      LongWritable v = new LongWritable();
+      v.readFields(value);
+
+      if (records.isSameKey()) {
+        LOG.info("\tSame Key : key=" + k.get() + ", val=" + v.get());
+        //More than one key should be present in the source data
+        Assert.assertTrue(verificationDataSet.get(k.get()).size() > 1);
+        //Ensure this is same as the previous key we saw
+        Assert.assertTrue(pk == k.get());
+      } else {
+        LOG.info("key=" + k.get() + ", val=" + v.get());
+      }
+      pk = k.get();
+
+      int keyCount = (dataMap.containsKey(k.get())) ? (dataMap.get(k.get()) + 1) : 1;
+      dataMap.put(k.get(), keyCount);
+    }
+
+    //Verify if the number of distinct entries is the same in source and the test
+    Assert.assertTrue("dataMap=" + dataMap.keySet().size() + ", verificationSet=" +
+        verificationDataSet.keySet().size(),
+        dataMap.keySet().size() == verificationDataSet.keySet().size());
+
+    //Verify with source data
+    for (Integer key : verificationDataSet.keySet()) {
+      Assert.assertTrue("Data size for " + key + " not matching with source; dataSize:" + dataMap
+              .get(key).intValue() + ", source:" + verificationDataSet.get(key).size(),
+          dataMap.get(key).intValue() == verificationDataSet.get(key).size());
+    }
+
+    //Verify if every key has the same number of repeated items in the source dataset as well
+    for (Map.Entry<Integer, Integer> entry : dataMap.entrySet()) {
+      Assert.assertTrue(entry.getKey() + "", verificationDataSet.get(entry.getKey()).size() == entry
+          .getValue());
+    }
+
+    LOG.info("******************");
+  }
+
+  private List<Path> createIFiles(int fileCount, int keysPerFile)
+      throws IOException {
+    List<Path> pathList = Lists.newLinkedList();
+    verificationDataSet.clear();
+    Random rnd = new Random();
+    for (int i = 0; i < fileCount; i++) {
+      int repeatCount = ((i % 2 == 0) && keysPerFile > 0) ? rnd.nextInt(keysPerFile) : 0;
+      Path ifilePath = writeIFile(keysPerFile, repeatCount);
+      pathList.add(ifilePath);
+    }
+    return pathList;
+  }
+
+  static Path writeIFile(int keysPerFile, int repeatCount) throws IOException {
+    TreeMultimap<Integer, Long> dataSet = createDataForIFile(keysPerFile, repeatCount);
+    Path path = new Path(workDir + "/src", "data_" + System.nanoTime() + ".out");
+    FSDataOutputStream out = localFs.create(path);
+    //create IFile with RLE
+    IFile.Writer writer = new IFile.Writer(defaultConf, out, IntWritable.class
+        , LongWritable.class, null, null, null, true);
+
+    for (Integer key : dataSet.keySet()) {
+      for (Long value : dataSet.get(key)) {
+        writer.append(new IntWritable(key), new LongWritable(value));
+        verificationDataSet.put(key, value);
+      }
+    }
+    writer.close();
+    out.close();
+    return path;
+  }
+
+  /**
+   * Generate data set for ifile.  Create repeated keys if needed.
+   *
+   * @param keyCount approximate number of keys to be created
+   * @param repeatCount number of times a key should be repeated
+   * @return
+   */
+  static TreeMultimap<Integer, Long> createDataForIFile(int keyCount, int repeatCount) {
+    TreeMultimap<Integer, Long> dataSet = TreeMultimap.create();
+    Random rnd = new Random();
+    for (int i = 0; i < keyCount; i++) {
+      if (repeatCount > 0 && (rnd.nextInt(keyCount) % 2  == 0)) {
+        //repeat this key
+        for(int j = 0; j < repeatCount; j++) {
+          IntWritable key = new IntWritable(rnd.nextInt(keyCount));
+          LongWritable value = new LongWritable(System.nanoTime());
+          dataSet.put(key.get(), value.get());
+        }
+        i += repeatCount;
+        LOG.info("Repeated key count=" + (repeatCount));
+      } else {
+        IntWritable key = new IntWritable(rnd.nextInt(keyCount));
+        LongWritable value = new LongWritable(System.nanoTime());
+        dataSet.put(key.get(), value.get());
+      }
+    }
+    for (Integer key : dataSet.keySet()) {
+      for (Long value : dataSet.get(key)) {
+        LOG.info("Key=" + key + ", val=" + value);
+      }
+    }
+    LOG.info("=============");
+    return dataSet;
+  }
+
+
+  private static class Reporter implements Progressable {
+    @Override
+    public void progress() {
+    }
+  }
+
+}