You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/03/23 01:11:03 UTC
git commit: TEZ-970. Consolidate Shuffle payload to have a single
means of indicating absence of a partition. (sseth)
Repository: incubator-tez
Updated Branches:
refs/heads/master 8c6cab4a2 -> 7f3051b1f
TEZ-970. Consolidate Shuffle payload to have a single means of
indicating absence of a partition. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/7f3051b1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/7f3051b1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/7f3051b1
Branch: refs/heads/master
Commit: 7f3051b1f368cdee7568f8f12e8e2cd814e1ae17
Parents: 8c6cab4
Author: Siddharth Seth <ss...@apache.org>
Authored: Sat Mar 22 17:10:44 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Sat Mar 22 17:10:44 2014 -0700
----------------------------------------------------------------------
.../java/org/apache/tez/common/TezUtils.java | 18 +++++++-
.../shuffle/impl/ShuffleInputEventHandler.java | 18 +++-----
.../library/output/OnFileSortedOutput.java | 18 ++------
.../library/output/OnFileUnorderedKVOutput.java | 9 +++-
.../impl/ShuffleInputEventHandlerImpl.java | 46 ++++++++++----------
.../src/main/proto/ShufflePayloads.proto | 3 +-
.../output/TestOnFileUnorderedKVOutput.java | 3 +-
7 files changed, 60 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7f3051b1/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
index c0f5528..c7f2faf 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
@@ -33,6 +33,7 @@ import java.util.zip.DeflaterOutputStream;
import java.util.zip.Inflater;
import java.util.zip.InflaterInputStream;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -222,6 +223,22 @@ public class TezUtils {
return output;
}
+ public static ByteString compressByteArrayToByteString(byte[] inBytes) throws IOException {
+ ByteString.Output os = ByteString.newOutput();
+ DeflaterOutputStream compressOs = new DeflaterOutputStream(os, new Deflater(
+ Deflater.BEST_COMPRESSION));
+ compressOs.write(inBytes);
+ compressOs.finish();
+ ByteString byteString = os.toByteString();
+ return byteString;
+ }
+
+ public static byte[] decompressByteStringToByteArray(ByteString byteString) throws IOException {
+ InflaterInputStream in = new InflaterInputStream(byteString.newInput());
+ byte[] bytes = IOUtils.toByteArray(in);
+ return bytes;
+ }
+
public static void updateLoggers(String addend) throws FileNotFoundException {
String containerLogDir = null;
@@ -260,5 +277,4 @@ public class TezUtils {
return base + "_" + addend;
}
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7f3051b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
index 54bb55e..00d9678 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
@@ -21,11 +21,10 @@ package org.apache.tez.runtime.library.common.shuffle.impl;
import java.io.IOException;
import java.net.URI;
import java.util.List;
-import java.util.zip.InflaterInputStream;
-import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.TezInputContext;
@@ -85,16 +84,13 @@ public class ShuffleInputEventHandler {
maxMapRuntime = duration;
scheduler.informMaxMapRunTime(maxMapRuntime);
}
- if (shufflePayload.getData().hasEmptyPartitions()) {
+ if (shufflePayload.hasEmptyPartitions()) {
try {
- InflaterInputStream in = new
- InflaterInputStream(shufflePayload.getData().getEmptyPartitions().newInput());
- byte[] emptyPartition = IOUtils.toByteArray(in);
- if (emptyPartition[partitionId] == 1) {
- LOG.info("Got empty payload : PartitionId : " +
- partitionId + " : " + emptyPartition[partitionId]);
- scheduler.copySucceeded(srcAttemptIdentifier, null, 0, 0, 0, null);
- return;
+ byte[] emptyPartitions = TezUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions());
+ if (emptyPartitions[partitionId] == 1) {
+ LOG.info("Source partition: " + partitionId + " did not generate any data. Not fetching.");
+ scheduler.copySucceeded(srcAttemptIdentifier, null, 0, 0, 0, null);
+ return;
}
} catch (IOException e) {
throw new TezUncheckedException("Unable to set " +
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7f3051b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
index 8c64ed0..3dd41e4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
@@ -22,8 +22,6 @@ import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.zip.Deflater;
-import java.util.zip.DeflaterOutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -45,7 +43,6 @@ import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter;
import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
-import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
@@ -154,20 +151,11 @@ public class OnFileSortedOutput implements LogicalOutput {
}
}
if (emptyPartitions > 0) {
- //compress the data
- ByteString.Output os = ByteString.newOutput();
- DeflaterOutputStream compressOs = new DeflaterOutputStream(os,
- new Deflater(Deflater.BEST_COMPRESSION));
- compressOs.write(partitionDetails);
- compressOs.finish();
-
- ShuffleUserPayloads.DataProto.Builder dataProtoBuilder = ShuffleUserPayloads.DataProto.newBuilder();
- dataProtoBuilder.setEmptyPartitions(os.toByteString());
- payloadBuilder.setData(dataProtoBuilder.build());
-
+ ByteString emptyPartitionsBytesString = TezUtils.compressByteArrayToByteString(partitionDetails);
+ payloadBuilder.setEmptyPartitions(emptyPartitionsBytesString);
LOG.info("EmptyPartition bitsetSize=" + partitionDetails.length + ", numOutputs="
+ numOutputs + ", emptyPartitions=" + emptyPartitions
- + ", compressedSize=" + os.toByteString().size());
+ + ", compressedSize=" + emptyPartitionsBytesString.size());
}
}
payloadBuilder.setHost(host);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7f3051b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
index 6cc6554..867ccab 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
@@ -125,7 +125,14 @@ public class OnFileUnorderedKVOutput implements LogicalOutput {
.getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
int shufflePort = ShuffleUtils
.deserializeShuffleProviderMetaData(shuffleMetadata);
- payloadBuilder.setOutputGenerated(outputGenerated);
+ // Set the list of empty partitions - single partition on this case.
+ if (!outputGenerated) {
+ LOG.info("No output was generated");
+ byte[] emptyPartitions = new byte[1];
+ emptyPartitions[0] = 1;
+ ByteString emptyPartitionsBytesString = TezUtils.compressByteArrayToByteString(emptyPartitions);
+ payloadBuilder.setEmptyPartitions(emptyPartitionsBytesString);
+ }
if (outputGenerated) {
payloadBuilder.setHost(host);
payloadBuilder.setPort(shufflePort);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7f3051b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
index 5f2b368..6f54b81 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
@@ -21,12 +21,11 @@ package org.apache.tez.runtime.library.shuffle.common.impl;
import java.io.IOException;
import java.util.List;
-import java.util.zip.InflaterInputStream;
-import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.TezInputContext;
@@ -93,36 +92,35 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
} catch (InvalidProtocolBufferException e) {
throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
}
+ int srcIndex = dme.getSourceIndex();
LOG.info("Processing DataMovementEvent with srcIndex: "
- + dme.getSourceIndex() + ", targetIndex: " + dme.getTargetIndex()
+ + srcIndex + ", targetIndex: " + dme.getTargetIndex()
+ ", attemptNum: " + dme.getVersion() + ", payload: "
+ stringify(shufflePayload));
- if (shufflePayload.getOutputGenerated()) {
- InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(
- dme.getTargetIndex(), dme.getVersion(),
- shufflePayload.getPathComponent());
+
+ if (shufflePayload.hasEmptyPartitions()) {
+ byte[] emptyPartitions = TezUtils.decompressByteStringToByteArray(shufflePayload
+ .getEmptyPartitions());
+ if (emptyPartitions[srcIndex] == 1) {
+ InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(dme.getTargetIndex(),
+ dme.getVersion());
+ LOG.info("Source partition: " + srcIndex + " did not generate any data. Not fetching.");
+ shuffleManager.addCompletedInputWithNoData(srcAttemptIdentifier);
+ return;
+ }
+ } else {
+ InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(dme.getTargetIndex(),
+ dme.getVersion(), shufflePayload.getPathComponent());
if (shufflePayload.hasData()) {
DataProto dataProto = shufflePayload.getData();
- if (dataProto.hasEmptyPartitions()) {
- int partitionId = dme.getSourceIndex();
- InflaterInputStream in = new
- InflaterInputStream(shufflePayload.getData().getEmptyPartitions().newInput());
- byte[] emptyPartition = IOUtils.toByteArray(in);
- if (emptyPartition[partitionId] == 1) {
- LOG.info("Got empty payload : PartitionId : " +
- partitionId + " : " + emptyPartition[partitionId]);
- shuffleManager.addCompletedInputWithNoData(srcAttemptIdentifier);
- return;
- }
- }
- FetchedInput fetchedInput = inputAllocator.allocate(dataProto.getRawLength(), dataProto.getCompressedLength(), srcAttemptIdentifier);
+ FetchedInput fetchedInput = inputAllocator.allocate(dataProto.getRawLength(),
+ dataProto.getCompressedLength(), srcAttemptIdentifier);
moveDataToFetchedInput(dataProto, fetchedInput);
shuffleManager.addCompletedInputWithData(srcAttemptIdentifier, fetchedInput);
} else {
- shuffleManager.addKnownInput(shufflePayload.getHost(), shufflePayload.getPort(), srcAttemptIdentifier, dme.getSourceIndex());
+ shuffleManager.addKnownInput(shufflePayload.getHost(), shufflePayload.getPort(),
+ srcAttemptIdentifier, srcIndex);
}
- } else {
- shuffleManager.addCompletedInputWithNoData(new InputAttemptIdentifier(dme.getTargetIndex(), dme.getVersion()));
}
}
@@ -153,7 +151,7 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
private String stringify(DataMovementEventPayloadProto dmProto) {
StringBuilder sb = new StringBuilder();
sb.append("[");
- sb.append("outputGenerated: " + dmProto.getOutputGenerated()).append(", ");
+ sb.append("hasEmptyPartitions: ").append(dmProto.hasEmptyPartitions()).append(", ");
sb.append("host: " + dmProto.getHost()).append(", ");
sb.append("port: " + dmProto.getPort()).append(", ");
sb.append("pathComponent: " + dmProto.getPathComponent()).append(", ");
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7f3051b1/tez-runtime-library/src/main/proto/ShufflePayloads.proto
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/proto/ShufflePayloads.proto b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
index ad676bf..9c711bb 100644
--- a/tez-runtime-library/src/main/proto/ShufflePayloads.proto
+++ b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
@@ -21,7 +21,7 @@ option java_outer_classname = "ShuffleUserPayloads";
option java_generate_equals_and_hash = true;
message DataMovementEventPayloadProto {
- optional bool output_generated = 1 [default = true];
+ optional bytes empty_partitions = 1;
optional string host = 2;
optional int32 port = 3;
optional string path_component = 4;
@@ -33,7 +33,6 @@ message DataProto {
optional int32 raw_length = 1;
optional int32 compressed_length = 2;
optional bytes data = 3;
- optional bytes empty_partitions = 4;
}
message InputInformationEventPayloadProto {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7f3051b1/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
index 64a3bdf..064cbbc 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
@@ -21,6 +21,7 @@ package org.apache.tez.runtime.library.output;
import static org.mockito.Mockito.mock;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -144,7 +145,7 @@ public class TestOnFileUnorderedKVOutput {
DataMovementEventPayloadProto shufflePayload = DataMovementEventPayloadProto
.parseFrom(dmEvent.getUserPayload());
- assertTrue(shufflePayload.getOutputGenerated());
+ assertFalse(shufflePayload.hasEmptyPartitions());
assertEquals(outputContext.getUniqueIdentifier(), shufflePayload.getPathComponent());
assertEquals(shufflePort, shufflePayload.getPort());
assertEquals("host", shufflePayload.getHost());