You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2015/01/06 10:46:00 UTC
tez git commit: TEZ-1767. Enable RLE in reducer side merge codepath
(Rajesh Balamohan)
Repository: tez
Updated Branches:
refs/heads/master 976074473 -> 9c385681f
TEZ-1767. Enable RLE in reducer side merge codepath (Rajesh Balamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9c385681
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9c385681
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9c385681
Branch: refs/heads/master
Commit: 9c385681fb19a0a8ea380bcacf7554ac4faa86b7
Parents: 9760744
Author: Rajesh Balamohan <rb...@hortonworks.com>
Authored: Tue Jan 6 15:15:23 2015 +0530
Committer: Rajesh Balamohan <rb...@hortonworks.com>
Committed: Tue Jan 6 15:15:23 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../processor/reduce/ReduceProcessor.java | 6 +
.../common/sort/impl/PipelinedSorter.java | 16 ++
.../library/common/sort/impl/TezMerger.java | 82 ++++++-
.../sort/impl/TezRawKeyValueIterator.java | 8 +
.../common/sort/impl/dflt/DefaultSorter.java | 6 +
.../input/OrderedGroupedInputLegacy.java | 5 +
.../library/common/sort/impl/TestTezMerger.java | 244 +++++++++++++++++++
8 files changed, 358 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/9c385681/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c41ec49..41fb11a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-1767. Enable RLE in reducer side merge codepath.
TEZ-1837. Restrict usage of Environment variables to main methods.
TEZ-1867. Create new central dispatcher for Tez AM
TEZ-1844. Shouldn't invoke system.exit in local mode when AM is failed to start.
http://git-wip-us.apache.org/repos/asf/tez/blob/9c385681/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
index 9469236..1a6a3a4 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
@@ -291,6 +291,12 @@ public class ReduceProcessor extends MRTask {
public Progress getProgress() {
return rawIter.getProgress();
}
+
+ @Override
+ public boolean isSameKey() throws IOException {
+ return rawIter.isSameKey();
+ }
+
public DataInputBuffer getValue() throws IOException {
return rawIter.getValue();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/9c385681/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index e353c00..ae654b4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -657,6 +657,11 @@ public class PipelinedSorter extends ExternalSorter {
return progress;
}
+ @Override
+ public boolean isSameKey() throws IOException {
+ throw new UnsupportedOperationException("Not yet supported");
+ }
+
public int getPartition() {
final int partition = kvmeta.get(span.offsetFor(kvindex) + PARTITION);
return partition;
@@ -766,6 +771,12 @@ public class PipelinedSorter extends ExternalSorter {
public Progress getProgress() {
return new Progress();
}
+
+ @Override
+ public boolean isSameKey() throws IOException {
+ return iter.isSameKey();
+ }
+
public boolean next() throws IOException {
if(dirty || iter.next()) {
int prefix = iter.getPartition();
@@ -916,6 +927,11 @@ public class PipelinedSorter extends ExternalSorter {
return new Progress();
}
+ @Override
+ public boolean isSameKey() throws IOException {
+ throw new UnsupportedOperationException("isSameKey is not supported");
+ }
+
public TezRawKeyValueIterator filter(int partition) {
partIter.reset(partition);
return partIter;
http://git-wip-us.apache.org/repos/asf/tez/blob/9c385681/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
index 217e63a..3c8e66a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.util.PriorityQueue;
@@ -46,7 +47,7 @@ import org.apache.tez.runtime.library.common.Constants;
import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader;
import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader.KeyState;
import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
-
+import org.apache.tez.runtime.library.utils.BufferUtils;
/**
* Merger is an utility class used by the Map and Reduce tasks for merging
@@ -55,7 +56,7 @@ import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
@InterfaceAudience.Private
@InterfaceStability.Unstable
@SuppressWarnings({"unchecked", "rawtypes"})
-public class TezMerger {
+public class TezMerger {
private static final Log LOG = LogFactory.getLog(TezMerger.class);
@@ -166,10 +167,10 @@ public class TezMerger {
return new MergeQueue(conf, fs, segments, comparator, reporter,
sortSegments, codec, considerFinalMergeForProgress).
merge(keyClass, valueClass,
- mergeFactor, tmpDir,
- readsCounter, writesCounter,
- bytesReadCounter,
- mergePhase);
+ mergeFactor, tmpDir,
+ readsCounter, writesCounter,
+ bytesReadCounter,
+ mergePhase);
}
public static <K extends Object, V extends Object>
@@ -199,14 +200,23 @@ public class TezMerger {
Progressable progressable, long recordsBeforeProgress)
throws IOException {
long recordCtr = 0;
+ long count = 0;
while(records.next()) {
- writer.append(records.getKey(), records.getValue());
+ if (records.isSameKey()) {
+ writer.append(IFile.REPEAT_KEY, records.getValue());
+ count++;
+ } else {
+ writer.append(records.getKey(), records.getValue());
+ }
if (((recordCtr++) % recordsBeforeProgress) == 0) {
progressable.progress();
}
}
-}
+ if ((count > 0) && LOG.isDebugEnabled()) {
+ LOG.debug("writeFile SAME_KEY count=" + count);
+ }
+ }
@InterfaceAudience.Private
@InterfaceStability.Unstable
@@ -407,6 +417,9 @@ public class TezMerger {
}
};
+ KeyState hasNext;
+ DataOutputBuffer prevKey = new DataOutputBuffer();
+
public MergeQueue(Configuration conf, FileSystem fs,
Path[] inputs, boolean deleteInputs,
CompressionCodec codec, boolean ifileReadAhead,
@@ -478,20 +491,49 @@ public class TezMerger {
private void adjustPriorityQueue(Segment reader) throws IOException{
long startPos = reader.getPosition();
- KeyState hasNext = reader.readRawKey();
+ if (hasNext != null && hasNext != KeyState.SAME_KEY) {
+ key.reset();
+ // TODO: This copy can be an unwanted operation when all keys are unique. Revisit this
+ // when we have better stats.
+ BufferUtils.copy(key, prevKey);
+ }
+ hasNext = reader.readRawKey();
long endPos = reader.getPosition();
totalBytesProcessed += endPos - startPos;
mergeProgress.set(totalBytesProcessed * progPerByte);
if (hasNext == KeyState.NEW_KEY) {
adjustTop();
+ compareKeyWithNextTopKey(reader);
} else if(hasNext == KeyState.NO_KEY) {
pop();
reader.close();
+ compareKeyWithNextTopKey(null);
} else if(hasNext == KeyState.SAME_KEY) {
// do not rebalance the priority queue
}
}
+ /**
+ * Check if the previous key is same as the next top segment's key.
+ * This would be useful to compute whether same key is spread across multiple segments.
+ *
+ * @param current
+ * @throws IOException
+ */
+ void compareKeyWithNextTopKey(Segment current) throws IOException {
+ Segment nextTop = top();
+ if (nextTop != current) {
+ //we have a different file. Compare it with previous key
+ DataInputBuffer nextKey = nextTop.getKey();
+ int compare = compare(nextKey, prevKey);
+ if (compare == 0) {
+ //Same key is available in the next segment.
+ hasNext = KeyState.SAME_KEY;
+ }
+ nextKey.reset();
+ }
+ }
+
public boolean next() throws IOException {
if (size() == 0)
return false;
@@ -528,6 +570,16 @@ public class TezMerger {
return true;
}
+ int compare(DataInputBuffer buf1, DataOutputBuffer buf2) {
+ byte[] b1 = buf1.getData();
+ byte[] b2 = buf2.getData();
+ int s1 = buf1.getPosition();
+ int s2 = 0;
+ int l1 = buf1.getLength();
+ int l2 = buf2.getLength();
+ return comparator.compare(b1, s1, (l1 - s1), b2, s2, l2);
+ }
+
protected boolean lessThan(Object a, Object b) {
DataInputBuffer key1 = ((Segment)a).getKey();
DataInputBuffer key2 = ((Segment)b).getKey();
@@ -832,8 +884,13 @@ public class TezMerger {
return mergeProgress;
}
+ @Override
+ public boolean isSameKey() throws IOException {
+ return (hasNext != null) && (hasNext == KeyState.SAME_KEY);
+ }
+
}
-
+
private static class EmptyIterator implements TezRawKeyValueIterator {
final Progress progress;
@@ -865,5 +922,10 @@ public class TezMerger {
public Progress getProgress() {
return progress;
}
+
+ @Override
+ public boolean isSameKey() throws IOException {
+ throw new UnsupportedOperationException("isSameKey is not supported");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/9c385681/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezRawKeyValueIterator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezRawKeyValueIterator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezRawKeyValueIterator.java
index 3a2c2bf..2ae07cb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezRawKeyValueIterator.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezRawKeyValueIterator.java
@@ -67,4 +67,12 @@ public interface TezRawKeyValueIterator {
* indicating the bytes processed by the iterator so far
*/
Progress getProgress();
+
+ /**
+ * Whether the current key is same as the previous key
+ *
+ * @return
+ * @throws IOException
+ */
+ boolean isSameKey() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/9c385681/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index f872e1f..773eaba 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -984,6 +984,12 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
public Progress getProgress() {
return null;
}
+
+ @Override
+ public boolean isSameKey() throws IOException {
+ throw new UnsupportedOperationException("isSameKey is not supported");
+ }
+
public void close() { }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/9c385681/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedInputLegacy.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedInputLegacy.java
index e2960f8..6ae156a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedInputLegacy.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedInputLegacy.java
@@ -66,6 +66,11 @@ public class OrderedGroupedInputLegacy extends OrderedGroupedKVInput {
progress.complete();
return progress;
}
+
+ @Override
+ public boolean isSameKey() throws IOException {
+ throw new UnsupportedOperationException("isSameKey is not supported");
+ }
};
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/9c385681/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
new file mode 100644
index 0000000..ac17d8d
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.sort.impl;
+
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.TreeMultimap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.Progressable;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.TestMergeManager;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+public class TestTezMerger {
+
+ private static final Log LOG = LogFactory.getLog(TestTezMerger.class);
+
+ private static Configuration defaultConf = new Configuration();
+ private static FileSystem localFs = null;
+ private static Path workDir = null;
+
+ //store the generated data for final verification
+ private static ListMultimap<Integer, Long> verificationDataSet = LinkedListMultimap.create();
+
+ static {
+ defaultConf.set("fs.defaultFS", "file:///");
+ try {
+ localFs = FileSystem.getLocal(defaultConf);
+ workDir = new Path(
+ new Path(System.getProperty("test.build.data", "/tmp")), TestTezMerger.class.getName())
+ .makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
+ LOG.info("Using workDir: " + workDir);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ defaultConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, IntWritable.class.getName());
+ defaultConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, LongWritable.class.getName());
+ Path baseDir = new Path(workDir, TestMergeManager.class.getName());
+ String localDirs = baseDir.toString();
+ defaultConf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDirs);
+ }
+
+ @AfterClass
+ public static void cleanup() throws Exception {
+ localFs.delete(workDir, true);
+ }
+
+
+ @Test
+ public void testMerge() throws Exception {
+ /**
+ * test with number of files, keys per file and mergefactor
+ */
+
+ //empty file
+ merge(1, 0, 1);
+ merge(100, 0, 5);
+
+ //small files
+ merge(2, 10, 2);
+ merge(1, 10, 1);
+ merge(5, 10, 3);
+ merge(200, 10, 100);
+
+ //bigger files
+ merge(5, 100, 5);
+ merge(5, 1000, 5);
+ merge(5, 1000, 10);
+ merge(5, 1000, 100);
+ }
+
+ private void merge(int fileCount, int keysPerFile, int mergeFactor) throws Exception {
+ List<Path> pathList = createIFiles(fileCount, keysPerFile);
+
+ //Merge datasets
+ TezMerger merger = new TezMerger();
+ TezRawKeyValueIterator records = merger.merge(defaultConf, localFs, IntWritable.class,
+ LongWritable.class, null, false, 0, 1024, pathList.toArray(new Path[pathList.size()]),
+ true, mergeFactor, new Path(workDir, "tmp_" + System.nanoTime()),
+ ConfigUtils.getIntermediateInputKeyComparator(defaultConf), new Reporter(), null, null,
+ null,
+ new Progress());
+
+ //Verify the merged data is correct
+ Map<Integer, Integer> dataMap = Maps.newHashMap();
+ int pk = -1;
+ while (records.next()) {
+ DataInputBuffer key = records.getKey();
+ DataInputBuffer value = records.getValue();
+
+ IntWritable k = new IntWritable();
+ k.readFields(key);
+ LongWritable v = new LongWritable();
+ v.readFields(value);
+
+ if (records.isSameKey()) {
+ LOG.info("\tSame Key : key=" + k.get() + ", val=" + v.get());
+ //More than one key should be present in the source data
+ Assert.assertTrue(verificationDataSet.get(k.get()).size() > 1);
+ //Ensure this is same as the previous key we saw
+ Assert.assertTrue(pk == k.get());
+ } else {
+ LOG.info("key=" + k.get() + ", val=" + v.get());
+ }
+ pk = k.get();
+
+ int keyCount = (dataMap.containsKey(k.get())) ? (dataMap.get(k.get()) + 1) : 1;
+ dataMap.put(k.get(), keyCount);
+ }
+
+ //Verify if the number of distinct entries is the same in source and the test
+ Assert.assertTrue("dataMap=" + dataMap.keySet().size() + ", verificationSet=" +
+ verificationDataSet.keySet().size(),
+ dataMap.keySet().size() == verificationDataSet.keySet().size());
+
+ //Verify with source data
+ for (Integer key : verificationDataSet.keySet()) {
+ Assert.assertTrue("Data size for " + key + " not matching with source; dataSize:" + dataMap
+ .get(key).intValue() + ", source:" + verificationDataSet.get(key).size(),
+ dataMap.get(key).intValue() == verificationDataSet.get(key).size());
+ }
+
+ //Verify if every key has the same number of repeated items in the source dataset as well
+ for (Map.Entry<Integer, Integer> entry : dataMap.entrySet()) {
+ Assert.assertTrue(entry.getKey() + "", verificationDataSet.get(entry.getKey()).size() == entry
+ .getValue());
+ }
+
+ LOG.info("******************");
+ }
+
+ private List<Path> createIFiles(int fileCount, int keysPerFile)
+ throws IOException {
+ List<Path> pathList = Lists.newLinkedList();
+ verificationDataSet.clear();
+ Random rnd = new Random();
+ for (int i = 0; i < fileCount; i++) {
+ int repeatCount = ((i % 2 == 0) && keysPerFile > 0) ? rnd.nextInt(keysPerFile) : 0;
+ Path ifilePath = writeIFile(keysPerFile, repeatCount);
+ pathList.add(ifilePath);
+ }
+ return pathList;
+ }
+
+ static Path writeIFile(int keysPerFile, int repeatCount) throws IOException {
+ TreeMultimap<Integer, Long> dataSet = createDataForIFile(keysPerFile, repeatCount);
+ Path path = new Path(workDir + "/src", "data_" + System.nanoTime() + ".out");
+ FSDataOutputStream out = localFs.create(path);
+ //create IFile with RLE
+ IFile.Writer writer = new IFile.Writer(defaultConf, out, IntWritable.class
+ , LongWritable.class, null, null, null, true);
+
+ for (Integer key : dataSet.keySet()) {
+ for (Long value : dataSet.get(key)) {
+ writer.append(new IntWritable(key), new LongWritable(value));
+ verificationDataSet.put(key, value);
+ }
+ }
+ writer.close();
+ out.close();
+ return path;
+ }
+
+ /**
+ * Generate data set for ifile. Create repeated keys if needed.
+ *
+ * @param keyCount approximate number of keys to be created
+ * @param repeatCount number of times a key should be repeated
+ * @return
+ */
+ static TreeMultimap<Integer, Long> createDataForIFile(int keyCount, int repeatCount) {
+ TreeMultimap<Integer, Long> dataSet = TreeMultimap.create();
+ Random rnd = new Random();
+ for (int i = 0; i < keyCount; i++) {
+ if (repeatCount > 0 && (rnd.nextInt(keyCount) % 2 == 0)) {
+ //repeat this key
+ for(int j = 0; j < repeatCount; j++) {
+ IntWritable key = new IntWritable(rnd.nextInt(keyCount));
+ LongWritable value = new LongWritable(System.nanoTime());
+ dataSet.put(key.get(), value.get());
+ }
+ i += repeatCount;
+ LOG.info("Repeated key count=" + (repeatCount));
+ } else {
+ IntWritable key = new IntWritable(rnd.nextInt(keyCount));
+ LongWritable value = new LongWritable(System.nanoTime());
+ dataSet.put(key.get(), value.get());
+ }
+ }
+ for (Integer key : dataSet.keySet()) {
+ for (Long value : dataSet.get(key)) {
+ LOG.info("Key=" + key + ", val=" + value);
+ }
+ }
+ LOG.info("=============");
+ return dataSet;
+ }
+
+
+ private static class Reporter implements Progressable {
+ @Override
+ public void progress() {
+ }
+ }
+
+}