You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/03/19 10:22:09 UTC

git commit: TEZ-938. Avoid fetching empty partitions when the OnFileSortedOutput, ShuffledMergedInput pair is used. Contributed by Rajesh Balamohan.

Repository: incubator-tez
Updated Branches:
  refs/heads/master 44ea005b5 -> 8d4b8e9a5


TEZ-938. Avoid fetching empty partitions when the OnFileSortedOutput,
ShuffledMergedInput pair is used. Contributed by Rajesh Balamohan.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/8d4b8e9a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/8d4b8e9a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/8d4b8e9a

Branch: refs/heads/master
Commit: 8d4b8e9a5b6cac168cd793b93e0940494c11ebc4
Parents: 44ea005
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Mar 19 02:20:25 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Mar 19 02:20:25 2014 -0700

----------------------------------------------------------------------
 .../org/apache/tez/common/TezJobConfig.java     |  4 ++
 .../shuffle/impl/ShuffleInputEventHandler.java  | 21 +++++++-
 .../common/shuffle/impl/ShuffleScheduler.java   | 18 ++++---
 .../common/sort/impl/TezIndexRecord.java        |  5 ++
 .../library/output/OnFileSortedOutput.java      | 54 ++++++++++++++++++--
 .../impl/ShuffleInputEventHandlerImpl.java      | 14 +++++
 .../src/main/proto/ShufflePayloads.proto        |  1 +
 7 files changed, 104 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d4b8e9a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
index 61a1b64..a45753b 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -332,6 +332,10 @@ public class TezJobConfig {
   public static final String TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS = 
       "tez.runtime.intermediate-input.key.secondary.comparator.class";
 
+  public static final String TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED =
+      "tez.runtime.empty.partitions.info-via-events.enabled";
+  public static final boolean TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED_DEFAULT = true;
+
   public static final String TEZ_RUNTIME_BROADCAST_DATA_VIA_EVENTS_ENABLED = "tez.runtime.broadcast.data-via-events.enabled";
   public static final boolean TEZ_RUNTIME_BROADCAST_DATA_VIA_EVENTS_ENABLED_DEFAULT = false;
   

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d4b8e9a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
index 2676a19..54bb55e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
@@ -18,9 +18,12 @@
 
 package org.apache.tez.runtime.library.common.shuffle.impl;
 
+import java.io.IOException;
 import java.net.URI;
 import java.util.List;
+import java.util.zip.InflaterInputStream;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tez.dag.api.TezUncheckedException;
@@ -75,7 +78,6 @@ public class ShuffleInputEventHandler {
     InputAttemptIdentifier srcAttemptIdentifier = 
         new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion(), shufflePayload.getPathComponent());
     LOG.info("DataMovementEvent baseUri:" + baseUri + ", src: " + srcAttemptIdentifier);
-    scheduler.addKnownMapOutput(shufflePayload.getHost(), partitionId, baseUri.toString(), srcAttemptIdentifier);
     
     // TODO NEWTEZ See if this duration hack can be removed.
     int duration = shufflePayload.getRunDuration();
@@ -83,6 +85,23 @@ public class ShuffleInputEventHandler {
       maxMapRuntime = duration;
       scheduler.informMaxMapRunTime(maxMapRuntime);
     }
+    if (shufflePayload.getData().hasEmptyPartitions()) {
+      try {
+        InflaterInputStream in = new
+            InflaterInputStream(shufflePayload.getData().getEmptyPartitions().newInput());
+        byte[] emptyPartition = IOUtils.toByteArray(in);
+        if (emptyPartition[partitionId] == 1) {
+            LOG.info("Got empty payload : PartitionId : " +
+                      partitionId + " : " + emptyPartition[partitionId]);
+            scheduler.copySucceeded(srcAttemptIdentifier, null, 0, 0, 0, null);
+            return;
+        }
+      } catch (IOException e) {
+        throw new TezUncheckedException("Unable to set " +
+                "the empty partition to succeeded", e);
+      }
+    }
+    scheduler.addKnownMapOutput(shufflePayload.getHost(), partitionId, baseUri.toString(), srcAttemptIdentifier);
   }
   
   private void processTaskFailedEvent(InputFailedEvent ifEvent) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d4b8e9a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
index ad197f1..f1f12ef 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
@@ -157,10 +157,19 @@ class ShuffleScheduler {
                                          ) throws IOException {
     String taskIdentifier = TezRuntimeUtils.getTaskAttemptIdentifier(srcAttemptIdentifier.getInputIdentifier().getInputIndex(), srcAttemptIdentifier.getAttemptNumber());
     failureCounts.remove(taskIdentifier);
-    hostFailures.remove(host.getHostName());
+    if (host != null) {
+      hostFailures.remove(host.getHostName());
+    }
     
     if (!isInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex())) {
-      output.commit();
+      if (output != null) {
+        output.commit();
+        if (output.getType() == Type.DISK) {
+          bytesShuffledToDisk.increment(bytesCompressed);
+        } else {
+          bytesShuffledToMem.increment(bytesCompressed);
+        }
+      }
       setInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex());
       shuffledMapsCounter.increment(1);
       if (--remainingMaps == 0) {
@@ -173,11 +182,6 @@ class ShuffleScheduler {
       logProgress();
       reduceShuffleBytes.increment(bytesCompressed);
       reduceBytesDecompressed.increment(bytesDecompressed);
-      if (output.getType() == Type.DISK) {
-        bytesShuffledToDisk.increment(bytesCompressed);
-      } else {
-        bytesShuffledToMem.increment(bytesCompressed);
-      }
       if (LOG.isDebugEnabled()) {
         LOG.debug("src task: "
             + TezRuntimeUtils.getTaskAttemptIdentifier(

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d4b8e9a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezIndexRecord.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezIndexRecord.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezIndexRecord.java
index 95ae8eb..f6fd802 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezIndexRecord.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezIndexRecord.java
@@ -42,4 +42,9 @@ public class TezIndexRecord {
   public long getPartLength() {
     return partLength;
   }
+
+  public boolean hasData() {
+    //TEZ-941 - Avoid writing out empty partitions
+    return !(rawLength == 2);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d4b8e9a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
index 7ed5c21..8c64ed0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
@@ -22,8 +22,13 @@ import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezUtils;
@@ -36,12 +41,16 @@ import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter;
 import org.apache.tez.runtime.library.common.sort.impl.PipelinedSorter;
+import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
+import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
 import org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter;
 import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
 
 import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
 
 /**
  * <code>OnFileSortedOutput</code> is an {@link LogicalOutput} which sorts key/value pairs 
@@ -49,16 +58,17 @@ import com.google.common.collect.Lists;
  */
 public class OnFileSortedOutput implements LogicalOutput {
 
+  private static final Log LOG = LogFactory.getLog(OnFileSortedOutput.class);
+
   protected ExternalSorter sorter;
   protected Configuration conf;
   protected int numOutputs;
   protected TezOutputContext outputContext;
   private long startTime;
   private long endTime;
-  
+  private boolean sendEmptyPartitionDetails;
   private final AtomicBoolean isStarted = new AtomicBoolean(false);
-  
-    
+
   @Override
   public List<Event> initialize(TezOutputContext outputContext)
       throws IOException {
@@ -76,7 +86,10 @@ public class OnFileSortedOutput implements LogicalOutput {
     } else {
       sorter = new DefaultSorter();
     }
-    
+
+    sendEmptyPartitionDetails = this.conf.getBoolean(
+        TezJobConfig.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED,
+        TezJobConfig.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED_DEFAULT);
     sorter.initialize(outputContext, conf, numOutputs);
     return Collections.emptyList();
   }
@@ -126,6 +139,37 @@ public class OnFileSortedOutput implements LogicalOutput {
 
     DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
         .newBuilder();
+
+    if (sendEmptyPartitionDetails) {
+      Path indexFile = sorter.getMapOutput().getOutputIndexFile();
+      TezSpillRecord spillRecord = new TezSpillRecord(indexFile, conf);
+      //TODO: replace with BitSet in JDK 1.7 (no support for valueOf, toByteArray in 1.6)
+      byte[] partitionDetails = new byte[numOutputs];
+      int emptyPartitions = 0;
+      for(int i=0;i<spillRecord.size();i++) {
+        TezIndexRecord indexRecord = spillRecord.getIndex(i);
+        if (!indexRecord.hasData()) {
+          partitionDetails[i] = 1;
+          emptyPartitions++;
+        }
+      }
+      if (emptyPartitions > 0) {
+        //compress the data
+        ByteString.Output os = ByteString.newOutput();
+        DeflaterOutputStream compressOs = new DeflaterOutputStream(os,
+            new Deflater(Deflater.BEST_COMPRESSION));
+        compressOs.write(partitionDetails);
+        compressOs.finish();
+
+        ShuffleUserPayloads.DataProto.Builder dataProtoBuilder = ShuffleUserPayloads.DataProto.newBuilder();
+        dataProtoBuilder.setEmptyPartitions(os.toByteString());
+        payloadBuilder.setData(dataProtoBuilder.build());
+
+        LOG.info("EmptyPartition bitsetSize=" + partitionDetails.length + ", numOutputs="
+                + numOutputs + ", emptyPartitions=" + emptyPartitions
+              + ", compressedSize=" + os.toByteString().size());
+      }
+    }
     payloadBuilder.setHost(host);
     payloadBuilder.setPort(shufflePort);
     payloadBuilder.setPathComponent(outputContext.getUniqueIdentifier());
@@ -149,4 +193,4 @@ public class OnFileSortedOutput implements LogicalOutput {
 
     return events;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d4b8e9a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
index 63957e9..5f2b368 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
@@ -21,7 +21,9 @@ package org.apache.tez.runtime.library.shuffle.common.impl;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.zip.InflaterInputStream;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -101,6 +103,18 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
           shufflePayload.getPathComponent());
       if (shufflePayload.hasData()) {
         DataProto dataProto = shufflePayload.getData();
+        if (dataProto.hasEmptyPartitions()) {
+          int partitionId = dme.getSourceIndex();
+          InflaterInputStream in = new
+              InflaterInputStream(shufflePayload.getData().getEmptyPartitions().newInput());
+          byte[] emptyPartition = IOUtils.toByteArray(in);
+          if (emptyPartition[partitionId] == 1) {
+              LOG.info("Got empty payload : PartitionId : " +
+                        partitionId + " : " + emptyPartition[partitionId]);
+              shuffleManager.addCompletedInputWithNoData(srcAttemptIdentifier);
+              return;
+          }
+        }
         FetchedInput fetchedInput = inputAllocator.allocate(dataProto.getRawLength(), dataProto.getCompressedLength(), srcAttemptIdentifier);
         moveDataToFetchedInput(dataProto, fetchedInput);
         shuffleManager.addCompletedInputWithData(srcAttemptIdentifier, fetchedInput);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d4b8e9a/tez-runtime-library/src/main/proto/ShufflePayloads.proto
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/proto/ShufflePayloads.proto b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
index b4ae332..ad676bf 100644
--- a/tez-runtime-library/src/main/proto/ShufflePayloads.proto
+++ b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
@@ -33,6 +33,7 @@ message DataProto {
   optional int32 raw_length = 1;
   optional int32 compressed_length = 2;
   optional bytes data = 3;
+  optional bytes empty_partitions = 4;
 }
 
 message InputInformationEventPayloadProto {