You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2020/08/13 09:01:55 UTC

[tez] branch master updated: TEZ-4207: Provide approximate number of input records to be processed in UnorderedKVInput (Rajesh Balamohan, reviewed by Ashutosh Chauhan)

This is an automated email from the ASF dual-hosted git repository.

rbalamohan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f7209f  TEZ-4207: Provide approximate number of input records to be processed in UnorderedKVInput (Rajesh Balamohan, reviewed by Ashutosh Chauhan)
8f7209f is described below

commit 8f7209fcbb9f5a74dcea9e83cb53279a58e57aff
Author: Rajesh Balamohan <rb...@apache.org>
AuthorDate: Thu Aug 13 14:30:29 2020 +0530

    TEZ-4207: Provide approximate number of input records to be processed in UnorderedKVInput (Rajesh Balamohan, reviewed by Ashutosh Chauhan)
---
 .../java/org/apache/tez/common/counters/TaskCounter.java  |  9 ++++++++-
 .../common/shuffle/impl/ShuffleInputEventHandlerImpl.java |  4 ++++
 .../library/common/shuffle/impl/ShuffleManager.java       | 15 ++++++++++++++-
 .../common/writers/UnorderedPartitionedKVWriter.java      |  3 +++
 tez-runtime-library/src/main/proto/ShufflePayloads.proto  |  1 +
 .../common/writers/TestUnorderedPartitionedKVWriter.java  | 11 +++++++++++
 6 files changed, 41 insertions(+), 2 deletions(-)

diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
index e604f37..2ee82a3 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
@@ -98,7 +98,14 @@ public enum TaskCounter {
    * Used by MROutput, OnFileSortedOutput, and OnFileUnorderedKVOutput
    */
   OUTPUT_RECORDS,
-  
+
+  /**
+   * Approximate number of input records that should be processed as the event keeps arriving from
+   * inputs.
+   * //TODO: As of now supporting broadcast data only.
+   */
+  APPROXIMATE_INPUT_RECORDS,
+
   /**
    * Represent the number of large records in the output - typically, records which are
    * spilled directly
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
index 7ad1389..e924876 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
@@ -190,7 +190,11 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
         numDmeEventsNoData.getAndIncrement();
         shuffleManager.addCompletedInputWithNoData(srcAttemptIdentifier.expand(0));
         return;
+      } else {
+        shuffleManager.updateApproximateInputRecords(shufflePayload.getNumRecord());
       }
+    } else {
+      shuffleManager.updateApproximateInputRecords(shufflePayload.getNumRecord());
     }
 
     CompositeInputAttemptIdentifier srcAttemptIdentifier = constructInputAttemptIdentifier(dme.getTargetIndex(), 1, dme.getVersion(),
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 8ae4f60..742fc18 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -189,6 +189,9 @@ public class ShuffleManager implements FetcherCallback {
 
   private final AtomicBoolean isShutdown = new AtomicBoolean(false);
 
+  private long inputRecordsFromEvents;
+  private long eventsReceived;
+  private final TezCounter approximateInputRecords;
   private final TezCounter shuffledInputsCounter;
   private final TezCounter failedShufflesCounter;
   private final TezCounter bytesShuffledCounter;
@@ -222,7 +225,8 @@ public class ShuffleManager implements FetcherCallback {
       CompressionCodec codec, FetchedInputAllocator inputAllocator) throws IOException {
     this.inputContext = inputContext;
     this.numInputs = numInputs;
-    
+
+    this.approximateInputRecords = inputContext.getCounters().findCounter(TaskCounter.APPROXIMATE_INPUT_RECORDS);
     this.shuffledInputsCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS);
     this.failedShufflesCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_FAILED_SHUFFLE_INPUTS);
     this.bytesShuffledCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES);
@@ -335,6 +339,15 @@ public class ShuffleManager implements FetcherCallback {
         + httpConnectionParams.toString() + ", maxTaskOutputAtOnce=" + maxTaskOutputAtOnce);
   }
 
+  public void updateApproximateInputRecords(int delta) {
+    if (delta <= 0) {
+      return;
+    }
+    inputRecordsFromEvents += delta;
+    eventsReceived++;
+    approximateInputRecords.setValue((inputRecordsFromEvents / eventsReceived) * numInputs);
+  }
+
   public void run() throws IOException {
     Preconditions.checkState(inputManager != null, "InputManager must be configured");
 
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index ffce5c6..d9467af 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -902,6 +902,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     outputContext.notifyProgress();
     DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
         .newBuilder();
+    if (numPartitions == 1) {
+      payloadBuilder.setNumRecord((int) outputRecordsCounter.getValue());
+    }
 
     String host = getHost();
     if (emptyPartitions.cardinality() != 0) {
diff --git a/tez-runtime-library/src/main/proto/ShufflePayloads.proto b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
index 0d13710..5cbd18a 100644
--- a/tez-runtime-library/src/main/proto/ShufflePayloads.proto
+++ b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
@@ -30,6 +30,7 @@ message DataMovementEventPayloadProto {
   optional bool pipelined = 7; // Related to pipelined shuffle
   optional bool last_event = 8; // Related to pipelined shuffle
   optional int32 spill_id = 9; //  Related to pipelined shuffle.
+  optional int32 num_record = 10;
 } 
 
 message DataProto {
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
index f4e99ec..c39bf3f 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@ -59,6 +59,7 @@ import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.library.common.Constants;
 import org.apache.tez.runtime.library.common.writers.UnorderedPartitionedKVWriter.SpillInfo;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
 import org.apache.tez.runtime.library.utils.DATA_RANGE_IN_MB;
 import org.roaringbitmap.RoaringBitmap;
@@ -1171,6 +1172,16 @@ public class TestUnorderedPartitionedKVWriter {
 
     if (numPartitions == 1) {
       assertEquals(true, kvWriter.skipBuffers);
+
+      // VM & DME events
+      assertEquals(2, events.size());
+      Event event1 = events.get(1);
+      assertTrue(event1 instanceof CompositeDataMovementEvent);
+      CompositeDataMovementEvent dme = (CompositeDataMovementEvent) event1;
+      ByteBuffer bb = dme.getUserPayload();
+      ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload =
+          ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(bb));
+      assertEquals(kvWriter.outputRecordsCounter.getValue(), shufflePayload.getNumRecord());
     }
 
     int recordsPerBuffer = sizePerBuffer / sizePerRecordWithOverhead;