You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2015/01/14 02:09:59 UTC
tez git commit: TEZ-1913. Reduce deserialize cost in ValuesIterator
(Rajesh Balamohan)
Repository: tez
Updated Branches:
refs/heads/master 832616598 -> 8a491f869
TEZ-1913. Reduce deserialize cost in ValuesIterator (Rajesh Balamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8a491f86
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8a491f86
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8a491f86
Branch: refs/heads/master
Commit: 8a491f869496d55792820dccc99b6a4898c31df4
Parents: 8326165
Author: Rajesh Balamohan <rb...@hortonworks.com>
Authored: Wed Jan 14 06:39:12 2015 +0530
Committer: Rajesh Balamohan <rb...@hortonworks.com>
Committed: Wed Jan 14 06:39:12 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../runtime/library/common/ValuesIterator.java | 14 ++-
.../common/sort/impl/PipelinedSorter.java | 4 +-
.../common/sort/impl/dflt/DefaultSorter.java | 2 +-
.../library/common/TestValuesIterator.java | 113 +++++++++++++++----
5 files changed, 103 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/8a491f86/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f246a4b..25eff15 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-1913. Reduce deserialize cost in ValuesIterator.
TEZ-1917. Examples should extend TezExampleBase.
TEZ-1892. Add hashCode and equals for Vertex/VertexGroup/Edge/GroupInputEdge.
TEZ-1904. Fix findbugs warnings in tez-runtime-library module.
http://git-wip-us.apache.org/repos/asf/tez/blob/8a491f86/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
index 117f354..c4c38fa 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
@@ -181,11 +181,15 @@ public class ValuesIterator<KEY,VALUE> {
more = in.next();
if (more) {
DataInputBuffer nextKeyBytes = in.getKey();
- keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(),
- nextKeyBytes.getLength() - nextKeyBytes.getPosition());
- nextKey = keyDeserializer.deserialize(nextKey);
- // TODO Is a counter increment required here ?
- hasMoreValues = key != null && (comparator.compare(key, nextKey) == 0);
+ if (!in.isSameKey()) {
+ keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(),
+ nextKeyBytes.getLength() - nextKeyBytes.getPosition());
+ nextKey = keyDeserializer.deserialize(nextKey);
+ // TODO Is a counter increment required here ?
+ hasMoreValues = key != null && (comparator.compare(key, nextKey) == 0);
+ } else {
+ hasMoreValues = in.isSameKey();
+ }
} else {
hasMoreValues = false;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/8a491f86/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 2fef83b..e44d943 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -659,7 +659,7 @@ public class PipelinedSorter extends ExternalSorter {
@Override
public boolean isSameKey() throws IOException {
- throw new UnsupportedOperationException("Not yet supported");
+ return false;
}
public int getPartition() {
@@ -926,7 +926,7 @@ public class PipelinedSorter extends ExternalSorter {
@Override
public boolean isSameKey() throws IOException {
- throw new UnsupportedOperationException("isSameKey is not supported");
+ return false;
}
public TezRawKeyValueIterator filter(int partition) {
http://git-wip-us.apache.org/repos/asf/tez/blob/8a491f86/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 29ffba1..f9e6935 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -987,7 +987,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
@Override
public boolean isSameKey() throws IOException {
- throw new UnsupportedOperationException("isSameKey is not supported");
+ return false;
}
public void close() { }
http://git-wip-us.apache.org/repos/asf/tez/blob/8a491f86/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
index 7d48056..9794ac8 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
@@ -1,13 +1,16 @@
package org.apache.tez.runtime.library.common;
-import com.google.common.collect.Ordering;
-import com.google.common.collect.TreeMultimap;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
import java.nio.ByteBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
@@ -45,6 +48,7 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import org.mockito.internal.util.collections.Sets;
import java.io.IOException;
import java.math.BigInteger;
@@ -55,6 +59,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
import java.util.TreeMap;
import static org.junit.Assert.assertFalse;
@@ -103,7 +108,7 @@ public class TestValuesIterator {
int mergeFactor;
//For storing original data
- final TreeMultimap<Writable, Writable> sortedDataMap;
+ final ListMultimap<Writable, Writable> originalData;
TezRawKeyValueIterator rawKeyValueIterator;
@@ -131,12 +136,12 @@ public class TestValuesIterator {
this.correctComparator =
(correctComparator == null) ? this.comparator : getComparator(correctComparator);
this.expectedTestResult = testResult;
- sortedDataMap = TreeMultimap.create(this.correctComparator, (java.util.Comparator) Ordering.natural());
+ originalData = LinkedListMultimap.create();
setupConf(serializationClassName);
}
private void setupConf(String serializationClassName) throws IOException {
- mergeFactor = Math.max(2, rnd.nextInt(100));
+ mergeFactor = 2;
conf = new Configuration();
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR, mergeFactor);
if (serializationClassName != null) {
@@ -158,40 +163,90 @@ public class TestValuesIterator {
@After
public void cleanup() throws Exception {
fs.delete(baseDir, true);
- sortedDataMap.clear();
+ originalData.clear();
}
@Test(timeout = 20000)
public void testIteratorWithInMemoryReader() throws IOException {
ValuesIterator iterator = createIterator(true);
- testIterator(iterator);
+ verifyIteratorData(iterator);
}
@Test(timeout = 20000)
public void testIteratorWithIFileReader() throws IOException {
ValuesIterator iterator = createIterator(false);
- testIterator(iterator);
+ verifyIteratorData(iterator);
}
+ @Test(timeout = 20000)
+ public void testIteratorWithIFileReaderEmptyPartitions() throws IOException {
+ ValuesIterator iterator = createEmptyIterator(false);
+ assert(iterator.moveToNext() == false);
+
+ iterator = createEmptyIterator(true);
+ assert(iterator.moveToNext() == false);
+ }
+
+ private ValuesIterator createEmptyIterator(boolean inMemory) throws IOException {
+ if (!inMemory) {
+ streamPaths = new Path[0];
+ //This will return EmptyIterator
+ rawKeyValueIterator =
+ TezMerger.merge(conf, fs, keyClass, valClass, null,
+ false, -1, 1024, streamPaths, false, mergeFactor, tmpDir, comparator,
+ new ProgressReporter(), null, null, null, null);
+ } else {
+ List<TezMerger.Segment> segments = Lists.newLinkedList();
+ //This will return EmptyIterator
+ rawKeyValueIterator =
+ TezMerger.merge(conf, fs, keyClass, valClass, segments, mergeFactor, tmpDir,
+ comparator, new ProgressReporter(), new GenericCounter("readsCounter", "y"),
+ new GenericCounter("writesCounter", "y1"),
+ new GenericCounter("bytesReadCounter", "y2"), new Progress());
+ }
+ return new ValuesIterator(rawKeyValueIterator, comparator,
+ keyClass, valClass, conf, (TezCounter) new GenericCounter("inputKeyCounter", "y3"),
+ (TezCounter) new GenericCounter("inputValueCounter", "y4"));
+ }
+
+
/**
* Tests whether data in valuesIterator matches with sorted input data set.
*
* @param valuesIterator
* @throws IOException
*/
- private void testIterator(ValuesIterator valuesIterator) throws IOException {
- Iterator<Writable> oriKeySet = sortedDataMap.keySet().iterator();
+ private void verifyIteratorData(ValuesIterator valuesIterator) throws IOException {
boolean result = true;
- while (valuesIterator.moveToNext()) {
- Writable key = (Writable) valuesIterator.getKey();
- assertTrue(oriKeySet.hasNext());
- Writable ori = oriKeySet.next();
- if (!key.equals(ori)) {
+
+ //sort original data based on comparator
+ ListMultimap<Writable, Writable> sortedMap =
+ new ImmutableListMultimap.Builder<Writable, Writable>()
+ .orderKeysBy(this.correctComparator).putAll
+ (originalData).build();
+
+ Set<Map.Entry<Writable, Writable>> oriKeySet = Sets.newSet();
+ oriKeySet.addAll(sortedMap.entries());
+
+ //Iterate through sorted data and valuesIterator for verification
+ for (Map.Entry<Writable, Writable> entry : oriKeySet) {
+
+ assertTrue(valuesIterator.moveToNext());
+
+ Writable oriKey = entry.getKey();
+ //Verify if the key and the original key are same
+ if (!oriKey.equals((Writable) valuesIterator.getKey())) {
result = false;
break;
}
- for (Object val : valuesIterator.getValues()) {
- if (!sortedDataMap.get(key).contains(val)) {
+
+ //Verify values
+ Iterator<Writable> vItr = valuesIterator.getValues().iterator();
+ for (Writable val : sortedMap.get(oriKey)) {
+ assertTrue(vItr.hasNext());
+
+ //Verify if the values are same
+ if (!val.equals((Writable) vItr.next())) {
result = false;
break;
}
@@ -299,22 +354,32 @@ public class TestValuesIterator {
private Path[] createFiles() throws IOException {
int numberOfStreams = Math.max(2, rnd.nextInt(10));
+ mergeFactor = Math.max(mergeFactor, numberOfStreams);
LOG.info("No of streams : " + numberOfStreams);
Path[] paths = new Path[numberOfStreams];
for (int i = 0; i < numberOfStreams; i++) {
paths[i] = new Path(baseDir, "ifile_" + i + ".out");
- IFile.Writer writer =
- new IFile.Writer(conf, fs, paths[i], keyClass, valClass, null,
- null, null);
+ FSDataOutputStream out = fs.create(paths[i]);
+ //write data with RLE
+ IFile.Writer writer = new IFile.Writer(conf, out, keyClass, valClass, null, null, null, true);
Map<Writable, Writable> data = createData();
- //write data
+
for (Map.Entry<Writable, Writable> entry : data.entrySet()) {
writer.append(entry.getKey(), entry.getValue());
+ originalData.put(entry.getKey(), entry.getValue());
+ if (rnd.nextInt() % 2 == 0) {
+ for (int j = 0; j < rnd.nextInt(100); j++) {
+ //add some duplicate keys
+ writer.append(entry.getKey(), entry.getValue());
+ originalData.put(entry.getKey(), entry.getValue());
+ }
+ }
}
LOG.info("Wrote " + data.size() + " in " + paths[i]);
data.clear();
writer.close();
+ out.close();
}
return paths;
}
@@ -326,7 +391,7 @@ public class TestValuesIterator {
* @throws IOException
*/
public List<TezMerger.Segment> createInMemStreams() throws IOException {
- int numberOfStreams = Math.max(2, rnd.nextInt(5));
+ int numberOfStreams = Math.max(2, rnd.nextInt(10));
LOG.info("No of streams : " + numberOfStreams);
SerializationFactory serializationFactory = new SerializationFactory(conf);
@@ -359,6 +424,7 @@ public class TestValuesIterator {
keyIn.reset(keyBuf.getData(), 0, keyBuf.getLength());
valIn.reset(valBuf.getData(), 0, valBuf.getLength());
writer.append(keyIn, valIn);
+ originalData.put(entry.getKey(), entry.getValue());
keyBuf.reset();
valBuf.reset();
keyIn.reset();
@@ -392,11 +458,12 @@ public class TestValuesIterator {
Writable key = createData(keyClass);
Writable value = createData(valClass);
map.put(key, value);
- sortedDataMap.put(key, value);
+ //sortedDataMap.put(key, value);
}
return map;
}
+
private Writable createData(Class c) {
if (c.getName().equalsIgnoreCase(BytesWritable.class.getName())) {
return new BytesWritable(new BigInteger(256, rnd).toString().getBytes());