You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2019/10/07 06:57:42 UTC

[tez] branch master updated: TEZ-4075: Reimplement tez.runtime.transfer.data-via-events.enabled (#48) (Contributed by Richard Zhang)

This is an automated email from the ASF dual-hosted git repository.

rbalamohan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new de019d5  TEZ-4075: Reimplement tez.runtime.transfer.data-via-events.enabled (#48) (Contributed by Richard Zhang)
de019d5 is described below

commit de019d5e31ba72659dd88a3d8fe5db8b4af0a3cd
Author: rbalamohan <rb...@apache.org>
AuthorDate: Mon Oct 7 12:27:38 2019 +0530

    TEZ-4075: Reimplement tez.runtime.transfer.data-via-events.enabled (#48) (Contributed by Richard Zhang)
    
    * TEZ-4075: Reimplement tez.runtime.transfer.data-via-events.enabled
    
    * TEZ-4075: Reimplement tez.runtime.transfer.data-via-events.enabled
---
 .../apache/tez/common/counters/TaskCounter.java    | 10 +++-
 .../library/api/TezRuntimeConfiguration.java       | 14 +++++
 .../library/common/shuffle/ShuffleUtils.java       |  3 +-
 .../shuffle/impl/ShuffleInputEventHandlerImpl.java | 49 ++++++++++++++-
 .../common/shuffle/impl/ShuffleManager.java        | 46 ++++++++++++++
 .../writers/BaseUnorderedPartitionedKVWriter.java  |  6 ++
 .../writers/UnorderedPartitionedKVWriter.java      | 63 ++++++++++++++++---
 .../runtime/library/output/UnorderedKVOutput.java  |  2 +
 .../output/UnorderedPartitionedKVOutput.java       |  2 +
 .../src/main/proto/ShufflePayloads.proto           |  2 +
 .../writers/TestUnorderedPartitionedKVWriter.java  | 70 ++++++++++++++--------
 .../TestUnorderedPartitionedKVOutputConfig.java    |  8 +++
 .../test/java/org/apache/tez/test/TestTezJobs.java | 55 +++++++++++++++++
 13 files changed, 293 insertions(+), 37 deletions(-)

diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
index 80424c7..e604f37 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
@@ -220,5 +220,13 @@ public enum TaskCounter {
    *
    * Represented in milliseconds
    */
-  LAST_EVENT_RECEIVED
+  LAST_EVENT_RECEIVED,
+
+
+  /**
+   * The size of the data that is transmitted via event.
+   *
+   * Represented in number of bytes
+   */
+  DATA_BYTES_VIA_EVENT
 }
\ No newline at end of file
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
index 86792e2..d4532c9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
@@ -484,6 +484,18 @@ public class TezRuntimeConfiguration {
           "empty.partitions.info-via-events.enabled";
   public static final boolean TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED_DEFAULT = true;
 
+  @Private
+  public static final String TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED =
+          TEZ_RUNTIME_PREFIX + "transfer.data-via-events.enabled";
+  @Private
+  public static final boolean TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED_DEFAULT = false;
+
+  @Private
+  public static final String TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE =
+          TEZ_RUNTIME_PREFIX + "transfer.data-via-events.max-size";
+  @Private
+  public static final int TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE_DEFAULT = 512;
+
   /**
    * If the shuffle input is on the local host bypass the http fetch and access the files directly
    */
@@ -619,6 +631,8 @@ public class TezRuntimeConfiguration {
     tezRuntimeKeys.add(TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED);
     tezRuntimeKeys.add(TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED);
     tezRuntimeKeys.add(TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT);
+    tezRuntimeKeys.add(TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED);
+    tezRuntimeKeys.add(TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE);
     tezRuntimeKeys.add(TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS);
     tezRuntimeKeys.add(TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH);
     tezRuntimeKeys.add(TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH);
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index df4281a..00f3745 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -263,7 +263,8 @@ public class ShuffleUtils {
     sb.append("host: " + dmProto.getHost()).append(", ");
     sb.append("port: " + dmProto.getPort()).append(", ");
     sb.append("pathComponent: " + dmProto.getPathComponent()).append(", ");
-    sb.append("runDuration: " + dmProto.getRunDuration());
+    sb.append("runDuration: " + dmProto.getRunDuration()).append(", ");
+    sb.append("hasDataInEvent: " + dmProto.hasData());
     sb.append("]");
     return sb.toString();
   }
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
index 542ec34..7ad1389 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
@@ -43,7 +43,11 @@ import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataProto;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
+import org.apache.tez.runtime.library.common.shuffle.DiskFetchedInput;
+import org.apache.tez.runtime.library.common.shuffle.MemoryFetchedInput;
 
 import com.google.protobuf.InvalidProtocolBufferException;
 
@@ -166,6 +170,9 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
 
   private void processDataMovementEvent(DataMovementEvent dme, DataMovementEventPayloadProto shufflePayload, BitSet emptyPartitionsBitSet) throws IOException {
     int srcIndex = dme.getSourceIndex();
+
+    String hostIdentifier = shufflePayload.getHost() + ":" + shufflePayload.getPort();
+
     if (LOG.isDebugEnabled()) {
       LOG.debug("DME srcIdx: " + srcIndex + ", targetIndex: " + dme.getTargetIndex()
           + ", attemptNum: " + dme.getVersion() + ", payload: " + ShuffleUtils
@@ -189,7 +196,47 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
     CompositeInputAttemptIdentifier srcAttemptIdentifier = constructInputAttemptIdentifier(dme.getTargetIndex(), 1, dme.getVersion(),
         shufflePayload, (useSharedInputs && srcIndex == 0));
 
-    shuffleManager.addKnownInput(shufflePayload.getHost(), shufflePayload.getPort(), srcAttemptIdentifier, srcIndex);
+    if (shufflePayload.hasData()) {
+      DataProto dataProto = shufflePayload.getData();
+
+      FetchedInput fetchedInput =
+          inputAllocator.allocate(dataProto.getRawLength(),
+              dataProto.getCompressedLength(), srcAttemptIdentifier);
+      moveDataToFetchedInput(dataProto, fetchedInput, hostIdentifier);
+      shuffleManager.addCompletedInputWithData(srcAttemptIdentifier, fetchedInput);
+
+      LOG.debug("Payload via DME : " + srcAttemptIdentifier);
+    } else {
+      shuffleManager.addKnownInput(shufflePayload.getHost(), shufflePayload.getPort(),
+              srcAttemptIdentifier, srcIndex);
+    }
+  }
+
+  private void moveDataToFetchedInput(DataProto dataProto,
+      FetchedInput fetchedInput, String hostIdentifier) throws IOException {
+    switch (fetchedInput.getType()) {
+    case DISK:
+      ShuffleUtils
+          .shuffleToDisk(((DiskFetchedInput) fetchedInput).getOutputStream(),
+              hostIdentifier, dataProto.getData().newInput(),
+              dataProto.getCompressedLength(),
+              dataProto.getUncompressedLength(), LOG,
+              fetchedInput.getInputAttemptIdentifier(), ifileReadAhead,
+              ifileReadAheadLength, true);
+      break;
+    case MEMORY:
+      ShuffleUtils
+          .shuffleToMemory(((MemoryFetchedInput) fetchedInput).getBytes(),
+              dataProto.getData().newInput(), dataProto.getRawLength(),
+              dataProto.getCompressedLength(),
+              codec, ifileReadAhead, ifileReadAheadLength, LOG,
+              fetchedInput.getInputAttemptIdentifier());
+      break;
+    case WAIT:
+    default:
+      throw new TezUncheckedException("Unexpected type: "
+          + fetchedInput.getType());
+    }
   }
 
   private void processCompositeRoutedDataMovementEvent(CompositeRoutedDataMovementEvent crdme, DataMovementEventPayloadProto shufflePayload, BitSet emptyPartitionsBitSet) throws IOException {
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index ba8592f..5e03f08 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -665,6 +665,52 @@ public class ShuffleManager implements FetcherCallback {
     }
   }
 
+  public void addCompletedInputWithData(
+      InputAttemptIdentifier srcAttemptIdentifier, FetchedInput fetchedInput)
+      throws IOException {
+    //InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
+    int inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Received Data via Event: " + srcAttemptIdentifier + " to "
+          + fetchedInput.getType());
+    }
+    // Count irrespective of whether this is a copy of an already fetched input
+    lock.lock();
+    try {
+      lastProgressTime = System.currentTimeMillis();
+    } finally {
+      lock.unlock();
+    }
+
+    boolean committed = false;
+    if (!completedInputSet.get(inputIdentifier)) {
+      synchronized (completedInputSet) {
+        if (!completedInputSet.get(inputIdentifier)) {
+          fetchedInput.commit();
+          committed = true;
+          if (!srcAttemptIdentifier.canRetrieveInputInChunks()) {
+            registerCompletedInput(fetchedInput);
+          } else {
+            registerCompletedInputForPipelinedShuffle(srcAttemptIdentifier,
+                fetchedInput);
+          }
+        }
+      }
+    }
+    if (!committed) {
+      fetchedInput.abort(); // If this fails, the fetcher may attempt another
+      // abort.
+    } else {
+      lock.lock();
+      try {
+        // Signal the wakeLoop to check for termination.
+        wakeLoop.signal();
+      } finally {
+        lock.unlock();
+      }
+    }
+  }
+
   protected synchronized  void updateEventReceivedTime() {
     long relativeTime = System.currentTimeMillis() - startTime;
     if (firstEventReceived.getValue() == 0) {
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
index 30d1adb..3467c82 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
@@ -101,6 +101,11 @@ public abstract class BaseUnorderedPartitionedKVWriter extends KeyValuesWriter {
    */
   protected final TezCounter numAdditionalSpillsCounter;
 
+  /**
+   * Represents the number of bytes that is transmitted via the event.
+   */
+  protected final TezCounter dataViaEventSize;
+
   @SuppressWarnings("unchecked")
   public BaseUnorderedPartitionedKVWriter(OutputContext outputContext, Configuration conf, int numOutputs) {
     this.outputContext = outputContext;
@@ -122,6 +127,7 @@ public abstract class BaseUnorderedPartitionedKVWriter extends KeyValuesWriter {
     additionalSpillBytesWritternCounter = outputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
     additionalSpillBytesReadCounter = outputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
     numAdditionalSpillsCounter = outputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT);
+    dataViaEventSize = outputContext.getCounters().findCounter(TaskCounter.DATA_BYTES_VIA_EVENT);
     
     // compression
     if (ConfigUtils.shouldCompressIntermediateOutput(this.conf)) {
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 29478dc..7165205 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.tez.common.CallableWithNdc;
 import org.apache.tez.common.TezCommonUtils;
@@ -70,6 +71,7 @@ import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
 import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
 import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -128,6 +130,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
   // uncompressed size for each partition
   private final long[] sizePerPartition;
   private volatile long spilledSize = 0;
+  private boolean dataViaEventsEnabled;
+  private int dataViaEventsMaxSize;
 
   static final ThreadLocal<Deflater> deflater = new ThreadLocal<Deflater>() {
 
@@ -210,6 +214,15 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     this.pipelinedShuffle = pipelinedShuffleConf && !isFinalMergeEnabled;
     this.finalEvents = Lists.newLinkedList();
 
+    this.dataViaEventsEnabled = conf.getBoolean(
+            TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED,
+            TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED_DEFAULT);
+
+    // No max cap on size (intentional)
+    this.dataViaEventsMaxSize = conf.getInt(
+            TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE,
+            TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE_DEFAULT);
+
     if (availableMemoryBytes == 0) {
       Preconditions.checkArgument(((numPartitions == 1) && !pipelinedShuffle), "availableMemory "
           + "can be set to 0 only when numPartitions=1 and " + TezRuntimeConfiguration
@@ -270,14 +283,11 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     outputLargeRecordsCounter = outputContext.getCounters().findCounter(
         TaskCounter.OUTPUT_LARGE_RECORDS);
 
-
-
     indexFileSizeEstimate = numPartitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
 
     if (numPartitions == 1 && !pipelinedShuffle) {
       //special case, where in only one partition is available.
       finalOutPath = outputFileHandler.getOutputFileForWrite();
-      finalIndexPath = outputFileHandler.getOutputIndexFileForWrite(indexFileSizeEstimate);
       skipBuffers = true;
       writer = new IFile.Writer(conf, rfs, finalOutPath, keyClass, valClass,
           codec, outputRecordsCounter, outputRecordBytesCounter);
@@ -298,7 +308,10 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
         + ", pipelinedShuffle=" + pipelinedShuffle
         + ", isFinalMergeEnabled=" + isFinalMergeEnabled
         + ", numPartitions=" + numPartitions
-        + ", reportPartitionStats=" + reportPartitionStats);
+        + ", reportPartitionStats=" + reportPartitionStats
+        + ", dataViaEventsEnabled=" + dataViaEventsEnabled
+        + ", dataViaEventsMaxSize=" + dataViaEventsMaxSize
+    );
   }
 
   private static final int ALLOC_OVERHEAD = 64;
@@ -684,6 +697,21 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     return reqBytes;
   }
 
+  private boolean canSendDataOverDME() {
+    return (writer != null) && (dataViaEventsEnabled)
+            && (writer.getCompressedLength() <= dataViaEventsMaxSize);
+  }
+
+  private byte[] readDataForDME() throws IOException {
+    // TODO: Not introducing a caching layer in IFile yet.
+    byte[] buf = null;
+    try (FSDataInputStream inStream = rfs.open(finalOutPath)) {
+        buf = new byte[(int) writer.getCompressedLength()];
+        IOUtils.readFully(inStream, buf, 0, (int) writer.getCompressedLength());
+    }
+    return buf;
+  }
+
   @Override
   public List<Event> close() throws IOException, InterruptedException {
     // In case there are buffers to be spilled, schedule spilling
@@ -721,10 +749,6 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
           writer.close();
           long rawLen = writer.getRawLength();
           long compLen = writer.getCompressedLength();
-          TezIndexRecord rec = new TezIndexRecord(0, rawLen, compLen);
-          TezSpillRecord sr = new TezSpillRecord(1);
-          sr.putIndex(rec, 0);
-          sr.writeToFile(finalIndexPath, conf);
 
           BitSet emptyPartitions = new BitSet();
           if (outputRecordsCounter.getValue() == 0) {
@@ -742,8 +766,17 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
             fileOutputBytesCounter.increment(compLen + indexFileSizeEstimate);
           }
           eventList.add(generateVMEvent());
+
+          if (!canSendDataOverDME()) {
+            TezIndexRecord rec = new TezIndexRecord(0, rawLen, compLen);
+            TezSpillRecord sr = new TezSpillRecord(1);
+            sr.putIndex(rec, 0);
+            finalIndexPath = outputFileHandler.getOutputIndexFileForWrite(indexFileSizeEstimate);
+            sr.writeToFile(finalIndexPath, conf);
+          }
           eventList.add(generateDMEvent(false, -1, false, outputContext
-              .getUniqueIdentifier(), emptyPartitions));
+                  .getUniqueIdentifier(), emptyPartitions));
+
           return eventList;
         }
 
@@ -856,6 +889,18 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       payloadBuilder.setLastEvent(isLastSpill);
     }
 
+    if (canSendDataOverDME()) {
+      ShuffleUserPayloads.DataProto.Builder dataProtoBuilder = ShuffleUserPayloads.DataProto.newBuilder();
+      dataProtoBuilder.setData(ByteString.copyFrom(readDataForDME()));
+      dataProtoBuilder.setRawLength((int) this.writer.getRawLength());
+
+      dataProtoBuilder.setCompressedLength((int) this.writer.getCompressedLength());
+      payloadBuilder.setData(dataProtoBuilder.build());
+
+      this.dataViaEventSize.increment(this.writer.getCompressedLength());
+      LOG.debug("payload packed in DME, dataSize: " + this.writer.getCompressedLength());
+    }
+
     ByteBuffer payload = payloadBuilder.build().toByteString().asReadOnlyByteBuffer();
     return CompositeDataMovementEvent.create(0, numPartitions, payload);
   }
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
index c987024..50e1e8b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
@@ -167,6 +167,8 @@ public class UnorderedKVOutput extends AbstractLogicalOutput {
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT);
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
index 94312f7..5c56083 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
@@ -140,6 +140,8 @@ public class UnorderedPartitionedKVOutput extends AbstractLogicalOutput {
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT);
diff --git a/tez-runtime-library/src/main/proto/ShufflePayloads.proto b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
index 0a4f4a6..0d13710 100644
--- a/tez-runtime-library/src/main/proto/ShufflePayloads.proto
+++ b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
@@ -26,6 +26,7 @@ message DataMovementEventPayloadProto {
   optional int32 port = 3;
   optional string path_component = 4;
   optional int32 run_duration = 5;
+  optional DataProto data = 6;
   optional bool pipelined = 7; // Related to pipelined shuffle
   optional bool last_event = 8; // Related to pipelined shuffle
   optional int32 spill_id = 9; //  Related to pipelined shuffle.
@@ -35,6 +36,7 @@ message DataProto {
   optional int32 raw_length = 1;
   optional int32 compressed_length = 2;
   optional bytes data = 3;
+  optional int32 uncompressed_length = 4;
 }
 
 message InputInformationEventPayloadProto {
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
index 83bde7b..4fa0311 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@ -35,6 +35,7 @@ import static org.mockito.Mockito.verify;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
+import java.io.InputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
@@ -264,7 +265,7 @@ public class TestUnorderedPartitionedKVWriter {
   @Test(timeout = 10000)
   public void testMultipleSpillsWithSmallBuffer() throws IOException, InterruptedException {
     // numBuffers is much higher than available threads.
-    baseTest(200, 10, null, shouldCompress, 512, 0, 9600);
+    baseTest(200, 10, null, shouldCompress, 512, 0, 9600, false);
   }
 
   @Test(timeout = 10000)
@@ -280,7 +281,9 @@ public class TestUnorderedPartitionedKVWriter {
   @Test(timeout = 10000)
   public void testNoRecords_SinglePartition() throws IOException, InterruptedException {
     // skipBuffers
-    baseTest(0, 1, null, shouldCompress, -1, 0);
+    baseTest(0, 1, null, shouldCompress, -1, 0, 2048, false);
+    // Check with data via events
+    baseTest(0, 1, null, shouldCompress, -1, 0, 2048, true);
   }
 
   @Test(timeout = 10000)
@@ -488,6 +491,7 @@ public class TestUnorderedPartitionedKVWriter {
     assertEquals(numPartitions, cdme.getCount());
     DataMovementEventPayloadProto eventProto = DataMovementEventPayloadProto.parseFrom(
         ByteString.copyFrom(cdme.getUserPayload()));
+    assertFalse(eventProto.hasData());
     BitSet emptyPartitionBits = null;
     if (partitionsWithData.cardinality() != numPartitions) {
       assertTrue(eventProto.hasEmptyPartitions());
@@ -1095,12 +1099,12 @@ public class TestUnorderedPartitionedKVWriter {
       boolean shouldCompress, int maxSingleBufferSizeBytes, int bufferMergePercent)
       throws IOException, InterruptedException {
     baseTest(numRecords, numPartitions, skippedPartitions, shouldCompress,
-        maxSingleBufferSizeBytes, bufferMergePercent, 2048);
+        maxSingleBufferSizeBytes, bufferMergePercent, 2048, false);
   }
 
   private void baseTest(int numRecords, int numPartitions, Set<Integer> skippedPartitions,
       boolean shouldCompress, int maxSingleBufferSizeBytes, int bufferMergePercent, int
-      availableMemory)
+      availableMemory, boolean dataViaEventEnabled)
           throws IOException, InterruptedException {
     PartitionerForTest partitioner = new PartitionerForTest();
     ApplicationId appId = ApplicationId.newInstance(10000000, 1);
@@ -1189,7 +1193,7 @@ public class TestUnorderedPartitionedKVWriter {
     long fileOutputBytes = fileOutputBytesCounter.getValue();
     if (numRecordsWritten > 0) {
       assertTrue(fileOutputBytes > 0);
-      if (!shouldCompress) {
+      if ((!shouldCompress) && (!dataViaEventEnabled)) {
         assertTrue(fileOutputBytes > outputRecordBytesCounter.getValue());
       }
     } else {
@@ -1262,37 +1266,53 @@ public class TestUnorderedPartitionedKVWriter {
       return;
     }
 
+    boolean isInMem= eventProto.getData().hasData();
     assertTrue(localFs.exists(outputFilePath));
-    assertTrue(localFs.exists(spillFilePath));
-    assertEquals("Incorrect output permissions", (short)0640,
-        localFs.getFileStatus(outputFilePath).getPermission().toShort());
-    assertEquals("Incorrect index permissions", (short)0640,
-        localFs.getFileStatus(spillFilePath).getPermission().toShort());
-
-    // verify no intermediate spill files have been left around
-    synchronized (kvWriter.spillInfoList) {
-      for (SpillInfo spill : kvWriter.spillInfoList) {
-        assertFalse("lingering intermediate spill file " + spill.outPath,
-            localFs.exists(spill.outPath));
+    assertEquals("Incorrect output permissions", (short) 0640,
+            localFs.getFileStatus(outputFilePath).getPermission().toShort());
+    if( !isInMem ) {
+      assertTrue(localFs.exists(spillFilePath));
+      assertEquals("Incorrect index permissions", (short) 0640,
+              localFs.getFileStatus(spillFilePath).getPermission().toShort());
+
+      // verify no intermediate spill files have been left around
+      synchronized (kvWriter.spillInfoList) {
+        for (SpillInfo spill : kvWriter.spillInfoList) {
+          assertFalse("lingering intermediate spill file " + spill.outPath,
+                  localFs.exists(spill.outPath));
+        }
       }
     }
 
     // Special case for 0 records.
-    TezSpillRecord spillRecord = new TezSpillRecord(spillFilePath, conf);
     DataInputBuffer keyBuffer = new DataInputBuffer();
     DataInputBuffer valBuffer = new DataInputBuffer();
     IntWritable keyDeser = new IntWritable();
     LongWritable valDeser = new LongWritable();
     for (int i = 0; i < numOutputs; i++) {
-      TezIndexRecord indexRecord = spillRecord.getIndex(i);
-      if (skippedPartitions != null && skippedPartitions.contains(i)) {
-        assertFalse("The Index Record for partition " + i + " should not have any data", indexRecord.hasData());
-        continue;
+      IFile.Reader reader = null;
+      InputStream inStream;
+      if (isInMem) {
+        // Read from in memory payload
+        int dataLoadSize = eventProto.getData().getData().size();
+        inStream = new ByteArrayInputStream(eventProto.getData().getData().toByteArray());
+        reader = new IFile.Reader(inStream, dataLoadSize, codec, null,
+                null, false, 0, -1);
+      } else {
+        TezSpillRecord spillRecord = new TezSpillRecord(spillFilePath, conf);
+        TezIndexRecord indexRecord = spillRecord.getIndex(i);
+        if (skippedPartitions != null && skippedPartitions.contains(i)) {
+          assertFalse("The Index Record for partition " + i + " should not have any data", indexRecord.hasData());
+          continue;
+        }
+
+        FSDataInputStream tmpStream = FileSystem.getLocal(conf).open(outputFilePath);
+        tmpStream.seek(indexRecord.getStartOffset());
+        inStream = tmpStream;
+        reader = new IFile.Reader(tmpStream, indexRecord.getPartLength(), codec, null,
+                null, false, 0, -1);
       }
-      FSDataInputStream inStream = FileSystem.getLocal(conf).open(outputFilePath);
-      inStream.seek(indexRecord.getStartOffset());
-      IFile.Reader reader = new IFile.Reader(inStream, indexRecord.getPartLength(), codec, null,
-          null, false, 0, -1);
+
       while (reader.nextRawKey(keyBuffer)) {
         reader.nextRawValue(valBuffer);
         keyDeser.readFields(keyBuffer);
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVOutputConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVOutputConfig.java
index 5e49b51..4bcff88 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVOutputConfig.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVOutputConfig.java
@@ -83,6 +83,10 @@ public class TestUnorderedPartitionedKVOutputConfig {
             .setAdditionalConfiguration("fs.shouldExist", "fs")
             .setAdditionalConfiguration("test.key.1", "key1")
             .setAdditionalConfiguration(TezRuntimeConfiguration
+                .TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED, "true")
+            .setAdditionalConfiguration(TezRuntimeConfiguration
+                .TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE, "5120")
+            .setAdditionalConfiguration(TezRuntimeConfiguration
                 .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, "true")
             .setAdditionalConfiguration(TezRuntimeConfiguration
                 .TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, "false")
@@ -121,6 +125,10 @@ public class TestUnorderedPartitionedKVOutputConfig {
         TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT));
     assertEquals(2222,
         conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES, 0));
+    assertEquals(true,
+            conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED, false));
+    assertEquals(5120,
+            conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE, 512));
     assertEquals("io", conf.get("io.shouldExist"));
     assertEquals("file", conf.get("file.shouldExist"));
     assertEquals("fs", conf.get("fs.shouldExist"));
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
index 2a94d9b..b661519 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
@@ -55,6 +55,7 @@ import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.dag.api.client.VertexStatus;
 import org.apache.tez.mapreduce.examples.CartesianProduct;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.cartesianproduct.CartesianProductVertexManager;
 import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
@@ -215,6 +216,60 @@ public class TestTezJobs {
     assertEquals(0, expectedResult.size());
   }
 
+  /**
+   * test whole {@link HashJoinExample} pipeline as following: <br>
+   * {@link JoinDataGen} -> {@link HashJoinExample} -> {@link JoinValidate}
+   * @throws Exception
+   */
+  @Test(timeout = 120000)
+  public void testHashJoinExampleWithDataViaEvent() throws Exception {
+
+    Path testDir = new Path("/tmp/testHashJoinExampleDataViaEvent");
+    Path stagingDirPath = new Path("/tmp/tez-staging-dir");
+    remoteFs.mkdirs(stagingDirPath);
+    remoteFs.mkdirs(testDir);
+
+    Path dataPath1 = new Path(testDir, "inPath1");
+    Path dataPath2 = new Path(testDir, "inPath2");
+    Path expectedOutputPath = new Path(testDir, "expectedOutputPath");
+    Path outPath = new Path(testDir, "outPath");
+
+    TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString());
+
+    //turn on the dataViaEvent
+    tezConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED, true);
+
+    TezClient tezSession = null;
+    try {
+      tezSession = TezClient.create("HashJoinExampleSession", tezConf, true);
+      tezSession.start();
+
+      JoinDataGen dataGen = new JoinDataGen();
+      String[] dataGenArgs = new String[] {
+              "-counter",
+              dataPath1.toString(), "1048576", dataPath2.toString(), "8",
+              expectedOutputPath.toString(), "2" };
+      assertEquals(0, dataGen.run(tezConf, dataGenArgs, tezSession));
+
+      HashJoinExample joinExample = new HashJoinExample();
+      String[] args = new String[] {
+              dataPath1.toString(), dataPath2.toString(), "1", outPath.toString(),
+              "doBroadcast"};
+
+      assertEquals(0, joinExample.run(tezConf, args, tezSession));
+
+      JoinValidate joinValidate = new JoinValidate();
+      String[] validateArgs = new String[] {
+              "-counter", expectedOutputPath.toString(), outPath.toString(), "3" };
+      assertEquals(0, joinValidate.run(tezConf, validateArgs, tezSession));
+    } finally {
+      if (tezSession != null) {
+        tezSession.stop();
+      }
+    }
+  }
+
   @Test(timeout = 60000)
   public void testHashJoinExampleDisableSplitGrouping() throws Exception {
     HashJoinExample hashJoinExample = new HashJoinExample();