You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/03/25 06:52:05 UTC

git commit: TEZ-972. Shuffle Phase - optimize memory usage of empty partition data in DataMovementEvent. Contributed by Rajesh Balamohan.

Repository: incubator-tez
Updated Branches:
  refs/heads/master 0bf327c28 -> 762f322de


TEZ-972. Shuffle Phase - optimize memory usage of empty partition data
in DataMovementEvent. Contributed by Rajesh Balamohan.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/762f322d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/762f322d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/762f322d

Branch: refs/heads/master
Commit: 762f322deef34e4b02d9df2a380784ceba359a13
Parents: 0bf327c
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Mar 24 22:51:20 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Mar 24 22:51:20 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/tez/common/TezUtils.java    | 27 +++++++++
 .../org/apache/tez/common/TestTezUtils.java     | 60 ++++++++++++++++++++
 .../shuffle/impl/ShuffleInputEventHandler.java  |  4 +-
 .../library/output/OnFileSortedOutput.java      | 11 ++--
 .../library/output/OnFileUnorderedKVOutput.java |  8 ++-
 .../impl/ShuffleInputEventHandlerImpl.java      |  4 +-
 6 files changed, 104 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/762f322d/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
index 2979d5f..e1fb5df 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
@@ -24,6 +24,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintStream;
+import java.util.BitSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
@@ -298,4 +299,30 @@ public class TezUtils {
       return base + "_" + addend;
     }
   }
+
+  public static BitSet fromByteArray(byte[] bytes) {
+    if (bytes == null) {
+      return new BitSet();
+    }
+    BitSet bits = new BitSet();
+    for (int i = 0; i < bytes.length * 8; i++) {
+      if ((bytes[(bytes.length) - (i / 8) - 1] & (1 << (i % 8))) > 0) {
+        bits.set(i);
+      }
+    }
+    return bits;
+  }
+
+  public static byte[] toByteArray(BitSet bits) {
+    if (bits == null) {
+      return null;
+    }
+    byte[] bytes = new byte[bits.length() / 8 + 1];
+    for (int i = 0; i < bits.length(); i++) {
+      if (bits.get(i)) {
+        bytes[(bytes.length) - (i / 8) - 1] |= 1 << (i % 8);
+      }
+    }
+    return bytes;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/762f322d/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
index ef4efe1..a994f8a 100644
--- a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
+++ b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
@@ -18,6 +18,8 @@
 package org.apache.tez.common;
 
 import java.io.IOException;
+import java.util.BitSet;
+import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
 import org.junit.Assert;
@@ -61,6 +63,64 @@ public class TestTezUtils {
     Assert.assertTrue(cleaned.matches("\\w+"));
   }
 
+  @Test
+  public void testBitSetToByteArray() {
+    BitSet bitSet = createBitSet(0);
+    byte[] bytes = TezUtils.toByteArray(bitSet);
+    Assert.assertTrue(bytes.length == ((bitSet.length() / 8) + 1));
+
+    bitSet = createBitSet(1000);
+    bytes = TezUtils.toByteArray(bitSet);
+    Assert.assertTrue(bytes.length == ((bitSet.length() / 8) + 1));
+  }
+
+  @Test
+  public void testBitSetFromByteArray() {
+    BitSet bitSet = createBitSet(0);
+    byte[] bytes = TezUtils.toByteArray(bitSet);
+    Assert.assertEquals(TezUtils.fromByteArray(bytes).cardinality(), bitSet.cardinality());
+    Assert.assertTrue(TezUtils.fromByteArray(bytes).equals(bitSet));
+
+    bitSet = createBitSet(1);
+    bytes = TezUtils.toByteArray(bitSet);
+    Assert.assertEquals(TezUtils.fromByteArray(bytes).cardinality(), bitSet.cardinality());
+    Assert.assertTrue(TezUtils.fromByteArray(bytes).equals(bitSet));
+    
+    bitSet = createBitSet(1000);
+    bytes = TezUtils.toByteArray(bitSet);
+    Assert.assertEquals(TezUtils.fromByteArray(bytes).cardinality(), bitSet.cardinality());
+    Assert.assertTrue(TezUtils.fromByteArray(bytes).equals(bitSet));
+  }
+
+  @Test
+  public void testBitSetConversion() {
+    for (int i = 0 ; i < 16 ; i++) {
+      BitSet bitSet = createBitSetWithSingleEntry(i);
+      byte[] bytes = TezUtils.toByteArray(bitSet);
+      
+      BitSet deseraialized = TezUtils.fromByteArray(bytes);
+      Assert.assertEquals(bitSet, deseraialized);
+      Assert.assertEquals(bitSet.cardinality(), deseraialized.cardinality());
+      Assert.assertEquals(1, deseraialized.cardinality());
+    }
+  }
+
+  private BitSet createBitSet(int size) {
+    BitSet bitSet = new BitSet();
+    int bitsToEnable = (int) (size * 0.1);
+    Random rnd = new Random();
+    for(int i = 0;i < bitsToEnable;i++) {
+      bitSet.set(rnd.nextInt(size));
+    }
+    return bitSet;
+  }
+
+  private BitSet createBitSetWithSingleEntry(int bitToSet) {
+    BitSet bitSet = new BitSet();
+    bitSet.set(bitToSet);
+    return bitSet;
+  }
+
   private Configuration getConf() {
     Configuration conf = new Configuration(false);
     conf.set("test1", "value1");

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/762f322d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
index 00d9678..f11575b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
@@ -20,6 +20,7 @@ package org.apache.tez.runtime.library.common.shuffle.impl;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.BitSet;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -87,7 +88,8 @@ public class ShuffleInputEventHandler {
     if (shufflePayload.hasEmptyPartitions()) {
       try {
         byte[] emptyPartitions = TezUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions());
-        if (emptyPartitions[partitionId] == 1) {
+        BitSet emptyPartitionsBitSet = TezUtils.fromByteArray(emptyPartitions);
+        if (emptyPartitionsBitSet.get(partitionId)) {
           LOG.info("Source partition: " + partitionId + " did not generate any data. Not fetching.");
           scheduler.copySucceeded(srcAttemptIdentifier, null, 0, 0, 0, null);
           return;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/762f322d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
index dfd238c..c8f2b22 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
@@ -19,6 +19,7 @@ package org.apache.tez.runtime.library.output;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.BitSet;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -154,20 +155,20 @@ public class OnFileSortedOutput implements LogicalOutput {
     if (sendEmptyPartitionDetails) {
       Path indexFile = sorter.getMapOutput().getOutputIndexFile();
       TezSpillRecord spillRecord = new TezSpillRecord(indexFile, conf);
-      //TODO: replace with BitSet in JDK 1.7 (no support for valueOf, toByteArray in 1.6)
-      byte[] partitionDetails = new byte[numOutputs];
+      BitSet emptyPartitionDetails = new BitSet();
       int emptyPartitions = 0;
       for(int i=0;i<spillRecord.size();i++) {
         TezIndexRecord indexRecord = spillRecord.getIndex(i);
         if (!indexRecord.hasData()) {
-          partitionDetails[i] = 1;
+          emptyPartitionDetails.set(i);
           emptyPartitions++;
         }
       }
       if (emptyPartitions > 0) {
-        ByteString emptyPartitionsBytesString = TezUtils.compressByteArrayToByteString(partitionDetails);
+        ByteString emptyPartitionsBytesString =
+            TezUtils.compressByteArrayToByteString(TezUtils.toByteArray(emptyPartitionDetails));
         payloadBuilder.setEmptyPartitions(emptyPartitionsBytesString);
-        LOG.info("EmptyPartition bitsetSize=" + partitionDetails.length + ", numOutputs="
+        LOG.info("EmptyPartition bitsetSize=" + emptyPartitionDetails.cardinality() + ", numOutputs="
                 + numOutputs + ", emptyPartitions=" + emptyPartitions
               + ", compressedSize=" + emptyPartitionsBytesString.size());
       }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/762f322d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
index 867ccab..91ea94a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
@@ -19,6 +19,7 @@
 package org.apache.tez.runtime.library.output;
 
 import java.nio.ByteBuffer;
+import java.util.BitSet;
 import java.util.Collections;
 import java.util.List;
 
@@ -128,9 +129,10 @@ public class OnFileUnorderedKVOutput implements LogicalOutput {
     // Set the list of empty partitions - single partition on this case.
     if (!outputGenerated) {
       LOG.info("No output was generated");
-      byte[] emptyPartitions = new byte[1];
-      emptyPartitions[0] = 1;
-      ByteString emptyPartitionsBytesString = TezUtils.compressByteArrayToByteString(emptyPartitions);
+      BitSet emptyPartitions = new BitSet();
+      emptyPartitions.set(0);
+      ByteString emptyPartitionsBytesString =
+          TezUtils.compressByteArrayToByteString(TezUtils.toByteArray(emptyPartitions));
       payloadBuilder.setEmptyPartitions(emptyPartitionsBytesString);
     }
     if (outputGenerated) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/762f322d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
index 6f54b81..09cd1ea 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
@@ -20,6 +20,7 @@
 package org.apache.tez.runtime.library.shuffle.common.impl;
 
 import java.io.IOException;
+import java.util.BitSet;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -101,7 +102,8 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
     if (shufflePayload.hasEmptyPartitions()) {
       byte[] emptyPartitions = TezUtils.decompressByteStringToByteArray(shufflePayload
           .getEmptyPartitions());
-      if (emptyPartitions[srcIndex] == 1) {
+      BitSet emptyPartionsBitSet = TezUtils.fromByteArray(emptyPartitions);
+      if (emptyPartionsBitSet.get(srcIndex)) {
         InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(dme.getTargetIndex(),
             dme.getVersion());
         LOG.info("Source partition: " + srcIndex + " did not generate any data. Not fetching.");