You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/03/25 06:52:05 UTC
git commit: TEZ-972. Shuffle Phase - optimize memory usage of empty
partition data in DataMovementEvent. Contributed by Rajesh Balamohan.
Repository: incubator-tez
Updated Branches:
refs/heads/master 0bf327c28 -> 762f322de
TEZ-972. Shuffle Phase - optimize memory usage of empty partition data
in DataMovementEvent. Contributed by Rajesh Balamohan.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/762f322d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/762f322d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/762f322d
Branch: refs/heads/master
Commit: 762f322deef34e4b02d9df2a380784ceba359a13
Parents: 0bf327c
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Mar 24 22:51:20 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Mar 24 22:51:20 2014 -0700
----------------------------------------------------------------------
.../java/org/apache/tez/common/TezUtils.java | 27 +++++++++
.../org/apache/tez/common/TestTezUtils.java | 60 ++++++++++++++++++++
.../shuffle/impl/ShuffleInputEventHandler.java | 4 +-
.../library/output/OnFileSortedOutput.java | 11 ++--
.../library/output/OnFileUnorderedKVOutput.java | 8 ++-
.../impl/ShuffleInputEventHandlerImpl.java | 4 +-
6 files changed, 104 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/762f322d/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
index 2979d5f..e1fb5df 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
@@ -24,6 +24,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
+import java.util.BitSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
@@ -298,4 +299,30 @@ public class TezUtils {
return base + "_" + addend;
}
}
+
+ public static BitSet fromByteArray(byte[] bytes) {
+ if (bytes == null) {
+ return new BitSet();
+ }
+ BitSet bits = new BitSet();
+ for (int i = 0; i < bytes.length * 8; i++) {
+ if ((bytes[(bytes.length) - (i / 8) - 1] & (1 << (i % 8))) > 0) {
+ bits.set(i);
+ }
+ }
+ return bits;
+ }
+
+ public static byte[] toByteArray(BitSet bits) {
+ if (bits == null) {
+ return null;
+ }
+ byte[] bytes = new byte[bits.length() / 8 + 1];
+ for (int i = 0; i < bits.length(); i++) {
+ if (bits.get(i)) {
+ bytes[(bytes.length) - (i / 8) - 1] |= 1 << (i % 8);
+ }
+ }
+ return bytes;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/762f322d/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
index ef4efe1..a994f8a 100644
--- a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
+++ b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
@@ -18,6 +18,8 @@
package org.apache.tez.common;
import java.io.IOException;
+import java.util.BitSet;
+import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
@@ -61,6 +63,64 @@ public class TestTezUtils {
Assert.assertTrue(cleaned.matches("\\w+"));
}
+ @Test
+ public void testBitSetToByteArray() {
+ BitSet bitSet = createBitSet(0);
+ byte[] bytes = TezUtils.toByteArray(bitSet);
+ Assert.assertTrue(bytes.length == ((bitSet.length() / 8) + 1));
+
+ bitSet = createBitSet(1000);
+ bytes = TezUtils.toByteArray(bitSet);
+ Assert.assertTrue(bytes.length == ((bitSet.length() / 8) + 1));
+ }
+
+ @Test
+ public void testBitSetFromByteArray() {
+ BitSet bitSet = createBitSet(0);
+ byte[] bytes = TezUtils.toByteArray(bitSet);
+ Assert.assertEquals(TezUtils.fromByteArray(bytes).cardinality(), bitSet.cardinality());
+ Assert.assertTrue(TezUtils.fromByteArray(bytes).equals(bitSet));
+
+ bitSet = createBitSet(1);
+ bytes = TezUtils.toByteArray(bitSet);
+ Assert.assertEquals(TezUtils.fromByteArray(bytes).cardinality(), bitSet.cardinality());
+ Assert.assertTrue(TezUtils.fromByteArray(bytes).equals(bitSet));
+
+ bitSet = createBitSet(1000);
+ bytes = TezUtils.toByteArray(bitSet);
+ Assert.assertEquals(TezUtils.fromByteArray(bytes).cardinality(), bitSet.cardinality());
+ Assert.assertTrue(TezUtils.fromByteArray(bytes).equals(bitSet));
+ }
+
+ @Test
+ public void testBitSetConversion() {
+ for (int i = 0 ; i < 16 ; i++) {
+ BitSet bitSet = createBitSetWithSingleEntry(i);
+ byte[] bytes = TezUtils.toByteArray(bitSet);
+
+ BitSet deseraialized = TezUtils.fromByteArray(bytes);
+ Assert.assertEquals(bitSet, deseraialized);
+ Assert.assertEquals(bitSet.cardinality(), deseraialized.cardinality());
+ Assert.assertEquals(1, deseraialized.cardinality());
+ }
+ }
+
+ private BitSet createBitSet(int size) {
+ BitSet bitSet = new BitSet();
+ int bitsToEnable = (int) (size * 0.1);
+ Random rnd = new Random();
+ for(int i = 0;i < bitsToEnable;i++) {
+ bitSet.set(rnd.nextInt(size));
+ }
+ return bitSet;
+ }
+
+ private BitSet createBitSetWithSingleEntry(int bitToSet) {
+ BitSet bitSet = new BitSet();
+ bitSet.set(bitToSet);
+ return bitSet;
+ }
+
private Configuration getConf() {
Configuration conf = new Configuration(false);
conf.set("test1", "value1");
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/762f322d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
index 00d9678..f11575b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
@@ -20,6 +20,7 @@ package org.apache.tez.runtime.library.common.shuffle.impl;
import java.io.IOException;
import java.net.URI;
+import java.util.BitSet;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -87,7 +88,8 @@ public class ShuffleInputEventHandler {
if (shufflePayload.hasEmptyPartitions()) {
try {
byte[] emptyPartitions = TezUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions());
- if (emptyPartitions[partitionId] == 1) {
+ BitSet emptyPartitionsBitSet = TezUtils.fromByteArray(emptyPartitions);
+ if (emptyPartitionsBitSet.get(partitionId)) {
LOG.info("Source partition: " + partitionId + " did not generate any data. Not fetching.");
scheduler.copySucceeded(srcAttemptIdentifier, null, 0, 0, 0, null);
return;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/762f322d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
index dfd238c..c8f2b22 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
@@ -19,6 +19,7 @@ package org.apache.tez.runtime.library.output;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.BitSet;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -154,20 +155,20 @@ public class OnFileSortedOutput implements LogicalOutput {
if (sendEmptyPartitionDetails) {
Path indexFile = sorter.getMapOutput().getOutputIndexFile();
TezSpillRecord spillRecord = new TezSpillRecord(indexFile, conf);
- //TODO: replace with BitSet in JDK 1.7 (no support for valueOf, toByteArray in 1.6)
- byte[] partitionDetails = new byte[numOutputs];
+ BitSet emptyPartitionDetails = new BitSet();
int emptyPartitions = 0;
for(int i=0;i<spillRecord.size();i++) {
TezIndexRecord indexRecord = spillRecord.getIndex(i);
if (!indexRecord.hasData()) {
- partitionDetails[i] = 1;
+ emptyPartitionDetails.set(i);
emptyPartitions++;
}
}
if (emptyPartitions > 0) {
- ByteString emptyPartitionsBytesString = TezUtils.compressByteArrayToByteString(partitionDetails);
+ ByteString emptyPartitionsBytesString =
+ TezUtils.compressByteArrayToByteString(TezUtils.toByteArray(emptyPartitionDetails));
payloadBuilder.setEmptyPartitions(emptyPartitionsBytesString);
- LOG.info("EmptyPartition bitsetSize=" + partitionDetails.length + ", numOutputs="
+ LOG.info("EmptyPartition bitsetSize=" + emptyPartitionDetails.cardinality() + ", numOutputs="
+ numOutputs + ", emptyPartitions=" + emptyPartitions
+ ", compressedSize=" + emptyPartitionsBytesString.size());
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/762f322d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
index 867ccab..91ea94a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
@@ -19,6 +19,7 @@
package org.apache.tez.runtime.library.output;
import java.nio.ByteBuffer;
+import java.util.BitSet;
import java.util.Collections;
import java.util.List;
@@ -128,9 +129,10 @@ public class OnFileUnorderedKVOutput implements LogicalOutput {
// Set the list of empty partitions - single partition on this case.
if (!outputGenerated) {
LOG.info("No output was generated");
- byte[] emptyPartitions = new byte[1];
- emptyPartitions[0] = 1;
- ByteString emptyPartitionsBytesString = TezUtils.compressByteArrayToByteString(emptyPartitions);
+ BitSet emptyPartitions = new BitSet();
+ emptyPartitions.set(0);
+ ByteString emptyPartitionsBytesString =
+ TezUtils.compressByteArrayToByteString(TezUtils.toByteArray(emptyPartitions));
payloadBuilder.setEmptyPartitions(emptyPartitionsBytesString);
}
if (outputGenerated) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/762f322d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
index 6f54b81..09cd1ea 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
@@ -20,6 +20,7 @@
package org.apache.tez.runtime.library.shuffle.common.impl;
import java.io.IOException;
+import java.util.BitSet;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -101,7 +102,8 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
if (shufflePayload.hasEmptyPartitions()) {
byte[] emptyPartitions = TezUtils.decompressByteStringToByteArray(shufflePayload
.getEmptyPartitions());
- if (emptyPartitions[srcIndex] == 1) {
+ BitSet emptyPartionsBitSet = TezUtils.fromByteArray(emptyPartitions);
+ if (emptyPartionsBitSet.get(srcIndex)) {
InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(dme.getTargetIndex(),
dme.getVersion());
LOG.info("Source partition: " + srcIndex + " did not generate any data. Not fetching.");