You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2020/08/13 09:01:55 UTC
[tez] branch master updated: TEZ-4207: Provide approximate number
of input records to be processed in UnorderedKVInput (Rajesh Balamohan,
reviewed by Ashutosh Chauhan)
This is an automated email from the ASF dual-hosted git repository.
rbalamohan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new 8f7209f TEZ-4207: Provide approximate number of input records to be processed in UnorderedKVInput (Rajesh Balamohan, reviewed by Ashutosh Chauhan)
8f7209f is described below
commit 8f7209fcbb9f5a74dcea9e83cb53279a58e57aff
Author: Rajesh Balamohan <rb...@apache.org>
AuthorDate: Thu Aug 13 14:30:29 2020 +0530
TEZ-4207: Provide approximate number of input records to be processed in UnorderedKVInput (Rajesh Balamohan, reviewed by Ashutosh Chauhan)
---
.../java/org/apache/tez/common/counters/TaskCounter.java | 9 ++++++++-
.../common/shuffle/impl/ShuffleInputEventHandlerImpl.java | 4 ++++
.../library/common/shuffle/impl/ShuffleManager.java | 15 ++++++++++++++-
.../common/writers/UnorderedPartitionedKVWriter.java | 3 +++
tez-runtime-library/src/main/proto/ShufflePayloads.proto | 1 +
.../common/writers/TestUnorderedPartitionedKVWriter.java | 11 +++++++++++
6 files changed, 41 insertions(+), 2 deletions(-)
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
index e604f37..2ee82a3 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
@@ -98,7 +98,14 @@ public enum TaskCounter {
* Used by MROutput, OnFileSortedOutput, and OnFileUnorderedKVOutput
*/
OUTPUT_RECORDS,
-
+
+ /**
+ * Approximate number of input records that should be processed as the event keeps arriving from
+ * inputs.
+ * //TODO: As of now supporting broadcast data only.
+ */
+ APPROXIMATE_INPUT_RECORDS,
+
/**
* Represent the number of large records in the output - typically, records which are
* spilled directly
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
index 7ad1389..e924876 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
@@ -190,7 +190,11 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
numDmeEventsNoData.getAndIncrement();
shuffleManager.addCompletedInputWithNoData(srcAttemptIdentifier.expand(0));
return;
+ } else {
+ shuffleManager.updateApproximateInputRecords(shufflePayload.getNumRecord());
}
+ } else {
+ shuffleManager.updateApproximateInputRecords(shufflePayload.getNumRecord());
}
CompositeInputAttemptIdentifier srcAttemptIdentifier = constructInputAttemptIdentifier(dme.getTargetIndex(), 1, dme.getVersion(),
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 8ae4f60..742fc18 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -189,6 +189,9 @@ public class ShuffleManager implements FetcherCallback {
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+ private long inputRecordsFromEvents;
+ private long eventsReceived;
+ private final TezCounter approximateInputRecords;
private final TezCounter shuffledInputsCounter;
private final TezCounter failedShufflesCounter;
private final TezCounter bytesShuffledCounter;
@@ -222,7 +225,8 @@ public class ShuffleManager implements FetcherCallback {
CompressionCodec codec, FetchedInputAllocator inputAllocator) throws IOException {
this.inputContext = inputContext;
this.numInputs = numInputs;
-
+
+ this.approximateInputRecords = inputContext.getCounters().findCounter(TaskCounter.APPROXIMATE_INPUT_RECORDS);
this.shuffledInputsCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS);
this.failedShufflesCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_FAILED_SHUFFLE_INPUTS);
this.bytesShuffledCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES);
@@ -335,6 +339,15 @@ public class ShuffleManager implements FetcherCallback {
+ httpConnectionParams.toString() + ", maxTaskOutputAtOnce=" + maxTaskOutputAtOnce);
}
+ public void updateApproximateInputRecords(int delta) {
+ if (delta <= 0) {
+ return;
+ }
+ inputRecordsFromEvents += delta;
+ eventsReceived++;
+ approximateInputRecords.setValue((inputRecordsFromEvents / eventsReceived) * numInputs);
+ }
+
public void run() throws IOException {
Preconditions.checkState(inputManager != null, "InputManager must be configured");
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index ffce5c6..d9467af 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -902,6 +902,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
outputContext.notifyProgress();
DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
.newBuilder();
+ if (numPartitions == 1) {
+ payloadBuilder.setNumRecord((int) outputRecordsCounter.getValue());
+ }
String host = getHost();
if (emptyPartitions.cardinality() != 0) {
diff --git a/tez-runtime-library/src/main/proto/ShufflePayloads.proto b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
index 0d13710..5cbd18a 100644
--- a/tez-runtime-library/src/main/proto/ShufflePayloads.proto
+++ b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
@@ -30,6 +30,7 @@ message DataMovementEventPayloadProto {
optional bool pipelined = 7; // Related to pipelined shuffle
optional bool last_event = 8; // Related to pipelined shuffle
optional int32 spill_id = 9; // Related to pipelined shuffle.
+ optional int32 num_record = 10;
}
message DataProto {
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
index f4e99ec..c39bf3f 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@ -59,6 +59,7 @@ import org.apache.tez.runtime.api.TaskFailureType;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.common.Constants;
import org.apache.tez.runtime.library.common.writers.UnorderedPartitionedKVWriter.SpillInfo;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
import org.apache.tez.runtime.library.utils.DATA_RANGE_IN_MB;
import org.roaringbitmap.RoaringBitmap;
@@ -1171,6 +1172,16 @@ public class TestUnorderedPartitionedKVWriter {
if (numPartitions == 1) {
assertEquals(true, kvWriter.skipBuffers);
+
+ // VM & DME events
+ assertEquals(2, events.size());
+ Event event1 = events.get(1);
+ assertTrue(event1 instanceof CompositeDataMovementEvent);
+ CompositeDataMovementEvent dme = (CompositeDataMovementEvent) event1;
+ ByteBuffer bb = dme.getUserPayload();
+ ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload =
+ ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(bb));
+ assertEquals(kvWriter.outputRecordsCounter.getValue(), shufflePayload.getNumRecord());
}
int recordsPerBuffer = sizePerBuffer / sizePerRecordWithOverhead;