You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2015/01/14 02:09:59 UTC

tez git commit: TEZ-1913. Reduce deserialize cost in ValuesIterator (Rajesh Balamohan)

Repository: tez
Updated Branches:
  refs/heads/master 832616598 -> 8a491f869


TEZ-1913. Reduce deserialize cost in ValuesIterator (Rajesh Balamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8a491f86
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8a491f86
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8a491f86

Branch: refs/heads/master
Commit: 8a491f869496d55792820dccc99b6a4898c31df4
Parents: 8326165
Author: Rajesh Balamohan <rb...@hortonworks.com>
Authored: Wed Jan 14 06:39:12 2015 +0530
Committer: Rajesh Balamohan <rb...@hortonworks.com>
Committed: Wed Jan 14 06:39:12 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../runtime/library/common/ValuesIterator.java  |  14 ++-
 .../common/sort/impl/PipelinedSorter.java       |   4 +-
 .../common/sort/impl/dflt/DefaultSorter.java    |   2 +-
 .../library/common/TestValuesIterator.java      | 113 +++++++++++++++----
 5 files changed, 103 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/8a491f86/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f246a4b..25eff15 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-1913. Reduce deserialize cost in ValuesIterator.
   TEZ-1917. Examples should extend TezExampleBase.
   TEZ-1892. Add hashCode and equals for Vertex/VertexGroup/Edge/GroupInputEdge.
   TEZ-1904. Fix findbugs warnings in tez-runtime-library module.

http://git-wip-us.apache.org/repos/asf/tez/blob/8a491f86/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
index 117f354..c4c38fa 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
@@ -181,11 +181,15 @@ public class ValuesIterator<KEY,VALUE> {
     more = in.next();
     if (more) {      
       DataInputBuffer nextKeyBytes = in.getKey();
-     keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(),
-         nextKeyBytes.getLength() - nextKeyBytes.getPosition());
-      nextKey = keyDeserializer.deserialize(nextKey);
-      // TODO Is a counter increment required here ?
-      hasMoreValues = key != null && (comparator.compare(key, nextKey) == 0);
+      if (!in.isSameKey()) {
+        keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(),
+            nextKeyBytes.getLength() - nextKeyBytes.getPosition());
+        nextKey = keyDeserializer.deserialize(nextKey);
+        // TODO Is a counter increment required here ?
+        hasMoreValues = key != null && (comparator.compare(key, nextKey) == 0);
+      } else {
+        hasMoreValues = in.isSameKey();
+      }
     } else {
       hasMoreValues = false;
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/8a491f86/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 2fef83b..e44d943 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -659,7 +659,7 @@ public class PipelinedSorter extends ExternalSorter {
 
     @Override
     public boolean isSameKey() throws IOException {
-      throw new UnsupportedOperationException("Not yet supported");
+      return false;
     }
 
     public int getPartition() {
@@ -926,7 +926,7 @@ public class PipelinedSorter extends ExternalSorter {
 
     @Override
     public boolean isSameKey() throws IOException {
-      throw new UnsupportedOperationException("isSameKey is not supported");
+      return false;
     }
 
     public TezRawKeyValueIterator filter(int partition) {

http://git-wip-us.apache.org/repos/asf/tez/blob/8a491f86/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 29ffba1..f9e6935 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -987,7 +987,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
 
     @Override
     public boolean isSameKey() throws IOException {
-      throw new UnsupportedOperationException("isSameKey is not supported");
+      return false;
     }
 
     public void close() { }

http://git-wip-us.apache.org/repos/asf/tez/blob/8a491f86/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
index 7d48056..9794ac8 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
@@ -1,13 +1,16 @@
 package org.apache.tez.runtime.library.common;
 
-import com.google.common.collect.Ordering;
-import com.google.common.collect.TreeMultimap;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
 
 import java.nio.ByteBuffer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
@@ -45,6 +48,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.mockito.internal.util.collections.Sets;
 
 import java.io.IOException;
 import java.math.BigInteger;
@@ -55,6 +59,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.TreeMap;
 
 import static org.junit.Assert.assertFalse;
@@ -103,7 +108,7 @@ public class TestValuesIterator {
 
   int mergeFactor;
   //For storing original data
-  final TreeMultimap<Writable, Writable> sortedDataMap;
+  final ListMultimap<Writable, Writable> originalData;
 
   TezRawKeyValueIterator rawKeyValueIterator;
 
@@ -131,12 +136,12 @@ public class TestValuesIterator {
     this.correctComparator =
         (correctComparator == null) ? this.comparator : getComparator(correctComparator);
     this.expectedTestResult = testResult;
-    sortedDataMap = TreeMultimap.create(this.correctComparator, (java.util.Comparator) Ordering.natural());
+    originalData = LinkedListMultimap.create();
     setupConf(serializationClassName);
   }
 
   private void setupConf(String serializationClassName) throws IOException {
-    mergeFactor = Math.max(2, rnd.nextInt(100));
+    mergeFactor = 2;
     conf = new Configuration();
     conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR, mergeFactor);
     if (serializationClassName != null) {
@@ -158,40 +163,90 @@ public class TestValuesIterator {
   @After
   public void cleanup() throws Exception {
     fs.delete(baseDir, true);
-    sortedDataMap.clear();
+    originalData.clear();
   }
 
   @Test(timeout = 20000)
   public void testIteratorWithInMemoryReader() throws IOException {
     ValuesIterator iterator = createIterator(true);
-    testIterator(iterator);
+    verifyIteratorData(iterator);
   }
 
   @Test(timeout = 20000)
   public void testIteratorWithIFileReader() throws IOException {
     ValuesIterator iterator = createIterator(false);
-    testIterator(iterator);
+    verifyIteratorData(iterator);
   }
 
+  @Test(timeout = 20000)
+  public void testIteratorWithIFileReaderEmptyPartitions() throws IOException {
+    ValuesIterator iterator = createEmptyIterator(false);
+    assert(iterator.moveToNext() == false);
+
+    iterator = createEmptyIterator(true);
+    assert(iterator.moveToNext() == false);
+  }
+
+  private ValuesIterator createEmptyIterator(boolean inMemory) throws IOException {
+    if (!inMemory) {
+      streamPaths = new Path[0];
+      //This will return EmptyIterator
+      rawKeyValueIterator =
+          TezMerger.merge(conf, fs, keyClass, valClass, null,
+              false, -1, 1024, streamPaths, false, mergeFactor, tmpDir, comparator,
+              new ProgressReporter(), null, null, null, null);
+    } else {
+      List<TezMerger.Segment> segments = Lists.newLinkedList();
+      //This will return EmptyIterator
+      rawKeyValueIterator =
+          TezMerger.merge(conf, fs, keyClass, valClass, segments, mergeFactor, tmpDir,
+              comparator, new ProgressReporter(), new GenericCounter("readsCounter", "y"),
+              new GenericCounter("writesCounter", "y1"),
+              new GenericCounter("bytesReadCounter", "y2"), new Progress());
+    }
+    return new ValuesIterator(rawKeyValueIterator, comparator,
+        keyClass, valClass, conf, (TezCounter) new GenericCounter("inputKeyCounter", "y3"),
+        (TezCounter) new GenericCounter("inputValueCounter", "y4"));
+  }
+
+
   /**
    * Tests whether data in valuesIterator matches with sorted input data set.
    *
    * @param valuesIterator
    * @throws IOException
    */
-  private void testIterator(ValuesIterator valuesIterator) throws IOException {
-    Iterator<Writable> oriKeySet = sortedDataMap.keySet().iterator();
+  private void verifyIteratorData(ValuesIterator valuesIterator) throws IOException {
     boolean result = true;
-    while (valuesIterator.moveToNext()) {
-      Writable key = (Writable) valuesIterator.getKey();
-      assertTrue(oriKeySet.hasNext());
-      Writable ori = oriKeySet.next();
-      if (!key.equals(ori)) {
+
+    //sort original data based on comparator
+    ListMultimap<Writable, Writable> sortedMap =
+        new ImmutableListMultimap.Builder<Writable, Writable>()
+            .orderKeysBy(this.correctComparator).putAll
+            (originalData).build();
+
+    Set<Map.Entry<Writable, Writable>> oriKeySet = Sets.newSet();
+    oriKeySet.addAll(sortedMap.entries());
+
+    //Iterate through sorted data and valuesIterator for verification
+    for (Map.Entry<Writable, Writable> entry : oriKeySet) {
+
+      assertTrue(valuesIterator.moveToNext());
+
+      Writable oriKey = entry.getKey();
+      //Verify if the key and the original key are same
+      if (!oriKey.equals((Writable) valuesIterator.getKey())) {
         result = false;
         break;
       }
-      for (Object val : valuesIterator.getValues()) {
-        if (!sortedDataMap.get(key).contains(val)) {
+
+      //Verify values
+      Iterator<Writable> vItr = valuesIterator.getValues().iterator();
+      for (Writable val : sortedMap.get(oriKey)) {
+        assertTrue(vItr.hasNext());
+
+        //Verify if the values are same
+        if (!val.equals((Writable) vItr.next())) {
           result = false;
           break;
         }
@@ -299,22 +354,32 @@ public class TestValuesIterator {
 
   private Path[] createFiles() throws IOException {
     int numberOfStreams = Math.max(2, rnd.nextInt(10));
+    mergeFactor = Math.max(mergeFactor, numberOfStreams);
     LOG.info("No of streams : " + numberOfStreams);
 
     Path[] paths = new Path[numberOfStreams];
     for (int i = 0; i < numberOfStreams; i++) {
       paths[i] = new Path(baseDir, "ifile_" + i + ".out");
-      IFile.Writer writer =
-          new IFile.Writer(conf, fs, paths[i], keyClass, valClass, null,
-              null, null);
+      FSDataOutputStream out = fs.create(paths[i]);
+      //write data with RLE
+      IFile.Writer writer = new IFile.Writer(conf, out, keyClass, valClass, null, null, null, true);
       Map<Writable, Writable> data = createData();
-      //write data
+
       for (Map.Entry<Writable, Writable> entry : data.entrySet()) {
         writer.append(entry.getKey(), entry.getValue());
+        originalData.put(entry.getKey(), entry.getValue());
+        if (rnd.nextInt() % 2 == 0) {
+          for (int j = 0; j < rnd.nextInt(100); j++) {
+            //add some duplicate keys
+            writer.append(entry.getKey(), entry.getValue());
+            originalData.put(entry.getKey(), entry.getValue());
+          }
+        }
       }
       LOG.info("Wrote " + data.size() + " in " + paths[i]);
       data.clear();
       writer.close();
+      out.close();
     }
     return paths;
   }
@@ -326,7 +391,7 @@ public class TestValuesIterator {
    * @throws IOException
    */
   public List<TezMerger.Segment> createInMemStreams() throws IOException {
-    int numberOfStreams = Math.max(2, rnd.nextInt(5));
+    int numberOfStreams = Math.max(2, rnd.nextInt(10));
     LOG.info("No of streams : " + numberOfStreams);
 
     SerializationFactory serializationFactory = new SerializationFactory(conf);
@@ -359,6 +424,7 @@ public class TestValuesIterator {
         keyIn.reset(keyBuf.getData(), 0, keyBuf.getLength());
         valIn.reset(valBuf.getData(), 0, valBuf.getLength());
         writer.append(keyIn, valIn);
+        originalData.put(entry.getKey(), entry.getValue());
         keyBuf.reset();
         valBuf.reset();
         keyIn.reset();
@@ -392,11 +458,12 @@ public class TestValuesIterator {
       Writable key = createData(keyClass);
       Writable value = createData(valClass);
       map.put(key, value);
-      sortedDataMap.put(key, value);
+      //sortedDataMap.put(key, value);
     }
     return map;
   }
 
+
   private Writable createData(Class c) {
     if (c.getName().equalsIgnoreCase(BytesWritable.class.getName())) {
       return new BytesWritable(new BigInteger(256, rnd).toString().getBytes());