You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/07/18 07:49:20 UTC

[incubator-uniffle] branch master updated: [#972 ] fix(tez): Add output mapOutputByteCounter metrics (#1016)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 976ae9a2 [#972 ] fix(tez): Add output mapOutputByteCounter metrics (#1016)
976ae9a2 is described below

commit 976ae9a244c63c08ced79c331ba93186f9ef0d69
Author: bin41215 <74...@users.noreply.github.com>
AuthorDate: Tue Jul 18 15:49:13 2023 +0800

    [#972 ] fix(tez): Add output mapOutputByteCounter metrics (#1016)
    
    ### What changes were proposed in this pull request?
    
    Add output mapOutputByteCounter metrics
    
    ### Why are the changes needed?
    add this metrics to fix reducing the number of tasks
    
    Fix: #972
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    ut
---
 .../common/sort/buffer/WriteBufferManager.java     |   7 +-
 .../library/common/sort/impl/RssSorter.java        |   3 +-
 .../library/common/sort/impl/RssUnSorter.java      |   3 +-
 .../common/sort/buffer/WriteBufferManagerTest.java | 118 +++++++++++++++++----
 4 files changed, 107 insertions(+), 24 deletions(-)

diff --git a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
index d2713f03..748e01d9 100644
--- a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
+++ b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
@@ -38,6 +38,7 @@ import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.serializer.Serializer;
 import org.apache.tez.common.RssTezUtils;
+import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezVertexID;
@@ -95,6 +96,7 @@ public class WriteBufferManager<K, V> {
   private final RssConf rssConf;
   private final int shuffleId;
   private final boolean isNeedSorted;
+  private final TezCounter mapOutputByteCounter;
 
   /** WriteBufferManager */
   public WriteBufferManager(
@@ -121,7 +123,8 @@ public class WriteBufferManager<K, V> {
       long sendCheckTimeout,
       int bitmapSplitNum,
       int shuffleId,
-      boolean isNeedSorted) {
+      boolean isNeedSorted,
+      TezCounter mapOutputByteCounter) {
     this.tezTaskAttemptID = tezTaskAttemptID;
     this.maxMemSize = maxMemSize;
     this.appId = appId;
@@ -147,6 +150,7 @@ public class WriteBufferManager<K, V> {
     this.rssConf = rssConf;
     this.shuffleId = shuffleId;
     this.isNeedSorted = isNeedSorted;
+    this.mapOutputByteCounter = mapOutputByteCounter;
     this.sendExecutorService =
         Executors.newFixedThreadPool(1, ThreadUtils.getThreadFactory("send-thread"));
   }
@@ -192,6 +196,7 @@ public class WriteBufferManager<K, V> {
         && inSendListBytes.get() <= maxMemSize * sendThreshold) {
       sendBuffersToServers();
     }
+    mapOutputByteCounter.increment(length);
   }
 
   private void sendBufferToServers(WriteBuffer<K, V> buffer) {
diff --git a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java
index 8e8aed3d..1dbfae2f 100644
--- a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java
+++ b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java
@@ -167,7 +167,8 @@ public class RssSorter extends ExternalSorter {
             sendCheckTimeout,
             bitmapSplitNum,
             shuffleId,
-            true);
+            true,
+            mapOutputByteCounter);
     LOG.info("Initialized WriteBufferManager.");
   }
 
diff --git a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java
index c955bd24..13d1bf76 100644
--- a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java
+++ b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java
@@ -165,7 +165,8 @@ public class RssUnSorter extends ExternalSorter {
             sendCheckTimeout,
             bitmapSplitNum,
             shuffleId,
-            false);
+            false,
+            mapOutputByteCounter);
     LOG.info("Initialized WriteBufferManager.");
   }
 
diff --git a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
index c7fb9463..281dbb97 100644
--- a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
+++ b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.tez.runtime.library.common.sort.buffer;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -28,16 +29,29 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.io.serializer.Serializer;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.tez.common.RssTezUtils;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.output.OutputTestHelpers;
+import org.apache.tez.runtime.library.output.RssOrderedPartitionedKVOutputTest;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
 import org.apache.uniffle.client.api.ShuffleWriteClient;
@@ -57,14 +71,14 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class WriteBufferManagerTest {
   @Test
-  public void testWriteException() throws IOException, InterruptedException {
+  public void testWriteException(@TempDir File tmpDir) throws IOException, InterruptedException {
     TezTaskAttemptID tezTaskAttemptID =
         TezTaskAttemptID.fromString("attempt_1681717153064_3770270_1_00_000000_0");
-    long maxMemSize = 10240;
-    String appId = "application_1681717153064_3770270";
-    long taskAttemptId = 0;
-    Set<Long> successBlockIds = Sets.newConcurrentHashSet();
-    Set<Long> failedBlockIds = Sets.newConcurrentHashSet();
+    final long maxMemSize = 10240;
+    final String appId = "application_1681717153064_3770270";
+    final long taskAttemptId = 0;
+    final Set<Long> successBlockIds = Sets.newConcurrentHashSet();
+    final Set<Long> failedBlockIds = Sets.newConcurrentHashSet();
     MockShuffleWriteClient writeClient = new MockShuffleWriteClient();
     RawComparator comparator = WritableComparator.get(BytesWritable.class);
     long maxSegmentSize = 3 * 1024;
@@ -86,6 +100,23 @@ public class WriteBufferManagerTest {
     int bitmapSplitNum = 1;
     int shuffleId = getShuffleId(tezTaskAttemptID, 1, 2);
 
+    Configuration conf = new Configuration();
+    FileSystem localFs = FileSystem.getLocal(conf);
+    Path workingDir =
+        new Path(
+                System.getProperty(
+                    "test.build.data", System.getProperty("java.io.tmpdir", tmpDir.toString())),
+                RssOrderedPartitionedKVOutputTest.class.getName())
+            .makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
+    conf.set(
+        TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, HashPartitioner.class.getName());
+    conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, workingDir.toString());
+    OutputContext outputContext = OutputTestHelpers.createOutputContext(conf, workingDir);
+    TezCounter mapOutputByteCounter =
+        outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES);
+
     WriteBufferManager<BytesWritable, BytesWritable> bufferManager =
         new WriteBufferManager(
             tezTaskAttemptID,
@@ -111,7 +142,8 @@ public class WriteBufferManagerTest {
             sendCheckTimeout,
             bitmapSplitNum,
             shuffleId,
-            true);
+            true,
+            mapOutputByteCounter);
 
     Random random = new Random();
     for (int i = 0; i < 1000; i++) {
@@ -122,6 +154,8 @@ public class WriteBufferManagerTest {
       bufferManager.addRecord(1, new BytesWritable(key), new BytesWritable(value));
     }
 
+    assertEquals(1052000, mapOutputByteCounter.getValue());
+
     boolean isException = false;
     try {
       bufferManager.waitSendFinished();
@@ -132,14 +166,14 @@ public class WriteBufferManagerTest {
   }
 
   @Test
-  public void testWriteNormal() throws IOException, InterruptedException {
+  public void testWriteNormal(@TempDir File tmpDir) throws IOException, InterruptedException {
     TezTaskAttemptID tezTaskAttemptID =
         TezTaskAttemptID.fromString("attempt_1681717153064_3770270_1_00_000000_0");
-    long maxMemSize = 10240;
-    String appId = "appattempt_1681717153064_3770270_000001";
-    long taskAttemptId = 0;
-    Set<Long> successBlockIds = Sets.newConcurrentHashSet();
-    Set<Long> failedBlockIds = Sets.newConcurrentHashSet();
+    final long maxMemSize = 10240;
+    final String appId = "appattempt_1681717153064_3770270_000001";
+    final long taskAttemptId = 0;
+    final Set<Long> successBlockIds = Sets.newConcurrentHashSet();
+    final Set<Long> failedBlockIds = Sets.newConcurrentHashSet();
     MockShuffleWriteClient writeClient = new MockShuffleWriteClient();
     writeClient.setMode(2);
     RawComparator comparator = WritableComparator.get(BytesWritable.class);
@@ -162,6 +196,23 @@ public class WriteBufferManagerTest {
     int bitmapSplitNum = 1;
     int shuffleId = getShuffleId(tezTaskAttemptID, 1, 2);
 
+    Configuration conf = new Configuration();
+    FileSystem localFs = FileSystem.getLocal(conf);
+    Path workingDir =
+        new Path(
+                System.getProperty(
+                    "test.build.data", System.getProperty("java.io.tmpdir", tmpDir.toString())),
+                RssOrderedPartitionedKVOutputTest.class.getName())
+            .makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
+    conf.set(
+        TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, HashPartitioner.class.getName());
+    conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, workingDir.toString());
+    OutputContext outputContext = OutputTestHelpers.createOutputContext(conf, workingDir);
+    TezCounter mapOutputByteCounter =
+        outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES);
+
     WriteBufferManager<BytesWritable, BytesWritable> bufferManager =
         new WriteBufferManager(
             tezTaskAttemptID,
@@ -187,7 +238,8 @@ public class WriteBufferManagerTest {
             sendCheckTimeout,
             bitmapSplitNum,
             shuffleId,
-            true);
+            true,
+            mapOutputByteCounter);
 
     Random random = new Random();
     for (int i = 0; i < 1000; i++) {
@@ -198,6 +250,8 @@ public class WriteBufferManagerTest {
       int partitionId = random.nextInt(50);
       bufferManager.addRecord(partitionId, new BytesWritable(key), new BytesWritable(value));
     }
+
+    assertEquals(1052000, mapOutputByteCounter.getValue());
     bufferManager.waitSendFinished();
     assertTrue(bufferManager.getWaitSendBuffers().isEmpty());
 
@@ -208,6 +262,8 @@ public class WriteBufferManagerTest {
       random.nextBytes(value);
       bufferManager.addRecord(i, new BytesWritable(key), new BytesWritable(value));
     }
+
+    assertEquals(1175900, mapOutputByteCounter.getValue());
     assert (1 == bufferManager.getWaitSendBuffers().size());
     assert (4928 == bufferManager.getWaitSendBuffers().get(0).getDataLength());
 
@@ -216,14 +272,15 @@ public class WriteBufferManagerTest {
   }
 
   @Test
-  public void testCommitBlocksWhenMemoryShuffleDisabled() throws IOException, InterruptedException {
+  public void testCommitBlocksWhenMemoryShuffleDisabled(@TempDir File tmpDir)
+      throws IOException, InterruptedException {
     TezTaskAttemptID tezTaskAttemptID =
         TezTaskAttemptID.fromString("attempt_1681717153064_3770270_1_00_000000_0");
-    long maxMemSize = 10240;
-    String appId = "application_1681717153064_3770270";
-    long taskAttemptId = 0;
-    Set<Long> successBlockIds = Sets.newConcurrentHashSet();
-    Set<Long> failedBlockIds = Sets.newConcurrentHashSet();
+    final long maxMemSize = 10240;
+    final String appId = "application_1681717153064_3770270";
+    final long taskAttemptId = 0;
+    final Set<Long> successBlockIds = Sets.newConcurrentHashSet();
+    final Set<Long> failedBlockIds = Sets.newConcurrentHashSet();
     MockShuffleWriteClient writeClient = new MockShuffleWriteClient();
     writeClient.setMode(3);
     RawComparator comparator = WritableComparator.get(BytesWritable.class);
@@ -245,6 +302,23 @@ public class WriteBufferManagerTest {
     int bitmapSplitNum = 1;
     int shuffleId = getShuffleId(tezTaskAttemptID, 1, 2);
 
+    Configuration conf = new Configuration();
+    FileSystem localFs = FileSystem.getLocal(conf);
+    Path workingDir =
+        new Path(
+                System.getProperty(
+                    "test.build.data", System.getProperty("java.io.tmpdir", tmpDir.toString())),
+                RssOrderedPartitionedKVOutputTest.class.getName())
+            .makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
+    conf.set(
+        TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, HashPartitioner.class.getName());
+    conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, workingDir.toString());
+    OutputContext outputContext = OutputTestHelpers.createOutputContext(conf, workingDir);
+    TezCounter mapOutputByteCounter =
+        outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES);
+
     WriteBufferManager<BytesWritable, BytesWritable> bufferManager =
         new WriteBufferManager(
             tezTaskAttemptID,
@@ -270,7 +344,8 @@ public class WriteBufferManagerTest {
             sendCheckTimeout,
             bitmapSplitNum,
             shuffleId,
-            true);
+            true,
+            mapOutputByteCounter);
 
     Random random = new Random();
     for (int i = 0; i < 10000; i++) {
@@ -283,6 +358,7 @@ public class WriteBufferManagerTest {
     }
     bufferManager.waitSendFinished();
 
+    assertEquals(10520000, mapOutputByteCounter.getValue());
     assertTrue(bufferManager.getWaitSendBuffers().isEmpty());
     assertEquals(
         writeClient.mockedShuffleServer.getFinishBlockSize(),