You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/03/23 01:11:03 UTC

git commit: TEZ-970. Consolidate Shuffle payload to have a single means of indicating absence of a partition. (sseth)

Repository: incubator-tez
Updated Branches:
  refs/heads/master 8c6cab4a2 -> 7f3051b1f


TEZ-970. Consolidate Shuffle payload to have a single means of
indicating absence of a partition. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/7f3051b1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/7f3051b1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/7f3051b1

Branch: refs/heads/master
Commit: 7f3051b1f368cdee7568f8f12e8e2cd814e1ae17
Parents: 8c6cab4
Author: Siddharth Seth <ss...@apache.org>
Authored: Sat Mar 22 17:10:44 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Sat Mar 22 17:10:44 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/tez/common/TezUtils.java    | 18 +++++++-
 .../shuffle/impl/ShuffleInputEventHandler.java  | 18 +++-----
 .../library/output/OnFileSortedOutput.java      | 18 ++------
 .../library/output/OnFileUnorderedKVOutput.java |  9 +++-
 .../impl/ShuffleInputEventHandlerImpl.java      | 46 ++++++++++----------
 .../src/main/proto/ShufflePayloads.proto        |  3 +-
 .../output/TestOnFileUnorderedKVOutput.java     |  3 +-
 7 files changed, 60 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7f3051b1/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
index c0f5528..c7f2faf 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
@@ -33,6 +33,7 @@ import java.util.zip.DeflaterOutputStream;
 import java.util.zip.Inflater;
 import java.util.zip.InflaterInputStream;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -222,6 +223,22 @@ public class TezUtils {
     return output;
   }
 
+  public static ByteString compressByteArrayToByteString(byte[] inBytes) throws IOException {
+    ByteString.Output os = ByteString.newOutput();
+    DeflaterOutputStream compressOs = new DeflaterOutputStream(os, new Deflater(
+        Deflater.BEST_COMPRESSION));
+    compressOs.write(inBytes);
+    compressOs.finish();
+    ByteString byteString = os.toByteString();
+    return byteString;
+  }
+  
+  public static byte[] decompressByteStringToByteArray(ByteString byteString) throws IOException {
+    InflaterInputStream in = new InflaterInputStream(byteString.newInput());
+    byte[] bytes = IOUtils.toByteArray(in);
+    return bytes;
+  }
+
   public static void updateLoggers(String addend) throws FileNotFoundException {
     String containerLogDir = null;
 
@@ -260,5 +277,4 @@ public class TezUtils {
       return base + "_" + addend;
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7f3051b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
index 54bb55e..00d9678 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
@@ -21,11 +21,10 @@ package org.apache.tez.runtime.library.common.shuffle.impl;
 import java.io.IOException;
 import java.net.URI;
 import java.util.List;
-import java.util.zip.InflaterInputStream;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.TezInputContext;
@@ -85,16 +84,13 @@ public class ShuffleInputEventHandler {
       maxMapRuntime = duration;
       scheduler.informMaxMapRunTime(maxMapRuntime);
     }
-    if (shufflePayload.getData().hasEmptyPartitions()) {
+    if (shufflePayload.hasEmptyPartitions()) {
       try {
-        InflaterInputStream in = new
-            InflaterInputStream(shufflePayload.getData().getEmptyPartitions().newInput());
-        byte[] emptyPartition = IOUtils.toByteArray(in);
-        if (emptyPartition[partitionId] == 1) {
-            LOG.info("Got empty payload : PartitionId : " +
-                      partitionId + " : " + emptyPartition[partitionId]);
-            scheduler.copySucceeded(srcAttemptIdentifier, null, 0, 0, 0, null);
-            return;
+        byte[] emptyPartitions = TezUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions());
+        if (emptyPartitions[partitionId] == 1) {
+          LOG.info("Source partition: " + partitionId + " did not generate any data. Not fetching.");
+          scheduler.copySucceeded(srcAttemptIdentifier, null, 0, 0, 0, null);
+          return;
         }
       } catch (IOException e) {
         throw new TezUncheckedException("Unable to set " +

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7f3051b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
index 8c64ed0..3dd41e4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
@@ -22,8 +22,6 @@ import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.zip.Deflater;
-import java.util.zip.DeflaterOutputStream;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -45,7 +43,6 @@ import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
 import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
 import org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter;
 import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
-import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
 
@@ -154,20 +151,11 @@ public class OnFileSortedOutput implements LogicalOutput {
         }
       }
       if (emptyPartitions > 0) {
-        //compress the data
-        ByteString.Output os = ByteString.newOutput();
-        DeflaterOutputStream compressOs = new DeflaterOutputStream(os,
-            new Deflater(Deflater.BEST_COMPRESSION));
-        compressOs.write(partitionDetails);
-        compressOs.finish();
-
-        ShuffleUserPayloads.DataProto.Builder dataProtoBuilder = ShuffleUserPayloads.DataProto.newBuilder();
-        dataProtoBuilder.setEmptyPartitions(os.toByteString());
-        payloadBuilder.setData(dataProtoBuilder.build());
-
+        ByteString emptyPartitionsBytesString = TezUtils.compressByteArrayToByteString(partitionDetails);
+        payloadBuilder.setEmptyPartitions(emptyPartitionsBytesString);
         LOG.info("EmptyPartition bitsetSize=" + partitionDetails.length + ", numOutputs="
                 + numOutputs + ", emptyPartitions=" + emptyPartitions
-              + ", compressedSize=" + os.toByteString().size());
+              + ", compressedSize=" + emptyPartitionsBytesString.size());
       }
     }
     payloadBuilder.setHost(host);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7f3051b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
index 6cc6554..867ccab 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
@@ -125,7 +125,14 @@ public class OnFileUnorderedKVOutput implements LogicalOutput {
         .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
     int shufflePort = ShuffleUtils
         .deserializeShuffleProviderMetaData(shuffleMetadata);
-    payloadBuilder.setOutputGenerated(outputGenerated);
+    // Set the list of empty partitions - single partition on this case.
+    if (!outputGenerated) {
+      LOG.info("No output was generated");
+      byte[] emptyPartitions = new byte[1];
+      emptyPartitions[0] = 1;
+      ByteString emptyPartitionsBytesString = TezUtils.compressByteArrayToByteString(emptyPartitions);
+      payloadBuilder.setEmptyPartitions(emptyPartitionsBytesString);
+    }
     if (outputGenerated) {
       payloadBuilder.setHost(host);
       payloadBuilder.setPort(shufflePort);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7f3051b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
index 5f2b368..6f54b81 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
@@ -21,12 +21,11 @@ package org.apache.tez.runtime.library.shuffle.common.impl;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.zip.InflaterInputStream;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.TezInputContext;
@@ -93,36 +92,35 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
     } catch (InvalidProtocolBufferException e) {
       throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
     }
+    int srcIndex = dme.getSourceIndex();
     LOG.info("Processing DataMovementEvent with srcIndex: "
-        + dme.getSourceIndex() + ", targetIndex: " + dme.getTargetIndex()
+        + srcIndex + ", targetIndex: " + dme.getTargetIndex()
         + ", attemptNum: " + dme.getVersion() + ", payload: "
         + stringify(shufflePayload));
-    if (shufflePayload.getOutputGenerated()) {
-      InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(
-          dme.getTargetIndex(), dme.getVersion(),
-          shufflePayload.getPathComponent());
+
+    if (shufflePayload.hasEmptyPartitions()) {
+      byte[] emptyPartitions = TezUtils.decompressByteStringToByteArray(shufflePayload
+          .getEmptyPartitions());
+      if (emptyPartitions[srcIndex] == 1) {
+        InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(dme.getTargetIndex(),
+            dme.getVersion());
+        LOG.info("Source partition: " + srcIndex + " did not generate any data. Not fetching.");
+        shuffleManager.addCompletedInputWithNoData(srcAttemptIdentifier);
+        return;
+      }
+    } else {
+      InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(dme.getTargetIndex(),
+          dme.getVersion(), shufflePayload.getPathComponent());
       if (shufflePayload.hasData()) {
         DataProto dataProto = shufflePayload.getData();
-        if (dataProto.hasEmptyPartitions()) {
-          int partitionId = dme.getSourceIndex();
-          InflaterInputStream in = new
-              InflaterInputStream(shufflePayload.getData().getEmptyPartitions().newInput());
-          byte[] emptyPartition = IOUtils.toByteArray(in);
-          if (emptyPartition[partitionId] == 1) {
-              LOG.info("Got empty payload : PartitionId : " +
-                        partitionId + " : " + emptyPartition[partitionId]);
-              shuffleManager.addCompletedInputWithNoData(srcAttemptIdentifier);
-              return;
-          }
-        }
-        FetchedInput fetchedInput = inputAllocator.allocate(dataProto.getRawLength(), dataProto.getCompressedLength(), srcAttemptIdentifier);
+        FetchedInput fetchedInput = inputAllocator.allocate(dataProto.getRawLength(),
+            dataProto.getCompressedLength(), srcAttemptIdentifier);
         moveDataToFetchedInput(dataProto, fetchedInput);
         shuffleManager.addCompletedInputWithData(srcAttemptIdentifier, fetchedInput);
       } else {
-        shuffleManager.addKnownInput(shufflePayload.getHost(), shufflePayload.getPort(), srcAttemptIdentifier, dme.getSourceIndex());
+        shuffleManager.addKnownInput(shufflePayload.getHost(), shufflePayload.getPort(),
+            srcAttemptIdentifier, srcIndex);
       }
-    } else {
-      shuffleManager.addCompletedInputWithNoData(new InputAttemptIdentifier(dme.getTargetIndex(), dme.getVersion()));
     }
   }
   
@@ -153,7 +151,7 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
   private String stringify(DataMovementEventPayloadProto dmProto) {
     StringBuilder sb = new StringBuilder();
     sb.append("[");
-    sb.append("outputGenerated: " + dmProto.getOutputGenerated()).append(", ");
+    sb.append("hasEmptyPartitions: ").append(dmProto.hasEmptyPartitions()).append(", ");
     sb.append("host: " + dmProto.getHost()).append(", ");
     sb.append("port: " + dmProto.getPort()).append(", ");
     sb.append("pathComponent: " + dmProto.getPathComponent()).append(", ");

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7f3051b1/tez-runtime-library/src/main/proto/ShufflePayloads.proto
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/proto/ShufflePayloads.proto b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
index ad676bf..9c711bb 100644
--- a/tez-runtime-library/src/main/proto/ShufflePayloads.proto
+++ b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
@@ -21,7 +21,7 @@ option java_outer_classname = "ShuffleUserPayloads";
 option java_generate_equals_and_hash = true;
 
 message DataMovementEventPayloadProto {
-  optional bool output_generated = 1 [default = true];
+  optional bytes empty_partitions = 1;
   optional string host = 2;
   optional int32 port = 3;
   optional string path_component = 4;
@@ -33,7 +33,6 @@ message DataProto {
   optional int32 raw_length = 1;
   optional int32 compressed_length = 2;
   optional bytes data = 3;
-  optional bytes empty_partitions = 4;
 }
 
 message InputInformationEventPayloadProto {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7f3051b1/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
index 64a3bdf..064cbbc 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
@@ -21,6 +21,7 @@ package org.apache.tez.runtime.library.output;
 import static org.mockito.Mockito.mock;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -144,7 +145,7 @@ public class TestOnFileUnorderedKVOutput {
     DataMovementEventPayloadProto shufflePayload = DataMovementEventPayloadProto
         .parseFrom(dmEvent.getUserPayload());
 
-    assertTrue(shufflePayload.getOutputGenerated());
+    assertFalse(shufflePayload.hasEmptyPartitions());
     assertEquals(outputContext.getUniqueIdentifier(), shufflePayload.getPathComponent());
     assertEquals(shufflePort, shufflePayload.getPort());
     assertEquals("host", shufflePayload.getHost());