You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/07/18 07:49:20 UTC
[incubator-uniffle] branch master updated: [#972 ] fix(tez): Add output mapOutputByteCounter metrics (#1016)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 976ae9a2 [#972 ] fix(tez): Add output mapOutputByteCounter metrics (#1016)
976ae9a2 is described below
commit 976ae9a244c63c08ced79c331ba93186f9ef0d69
Author: bin41215 <74...@users.noreply.github.com>
AuthorDate: Tue Jul 18 15:49:13 2023 +0800
[#972 ] fix(tez): Add output mapOutputByteCounter metrics (#1016)
### What changes were proposed in this pull request?
Add output mapOutputByteCounter metrics
### Why are the changes needed?
add this metrics to fix reducing the number of tasks
Fix: #972
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
ut
---
.../common/sort/buffer/WriteBufferManager.java | 7 +-
.../library/common/sort/impl/RssSorter.java | 3 +-
.../library/common/sort/impl/RssUnSorter.java | 3 +-
.../common/sort/buffer/WriteBufferManagerTest.java | 118 +++++++++++++++++----
4 files changed, 107 insertions(+), 24 deletions(-)
diff --git a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
index d2713f03..748e01d9 100644
--- a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
+++ b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
@@ -38,6 +38,7 @@ import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.tez.common.RssTezUtils;
+import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezVertexID;
@@ -95,6 +96,7 @@ public class WriteBufferManager<K, V> {
private final RssConf rssConf;
private final int shuffleId;
private final boolean isNeedSorted;
+ private final TezCounter mapOutputByteCounter;
/** WriteBufferManager */
public WriteBufferManager(
@@ -121,7 +123,8 @@ public class WriteBufferManager<K, V> {
long sendCheckTimeout,
int bitmapSplitNum,
int shuffleId,
- boolean isNeedSorted) {
+ boolean isNeedSorted,
+ TezCounter mapOutputByteCounter) {
this.tezTaskAttemptID = tezTaskAttemptID;
this.maxMemSize = maxMemSize;
this.appId = appId;
@@ -147,6 +150,7 @@ public class WriteBufferManager<K, V> {
this.rssConf = rssConf;
this.shuffleId = shuffleId;
this.isNeedSorted = isNeedSorted;
+ this.mapOutputByteCounter = mapOutputByteCounter;
this.sendExecutorService =
Executors.newFixedThreadPool(1, ThreadUtils.getThreadFactory("send-thread"));
}
@@ -192,6 +196,7 @@ public class WriteBufferManager<K, V> {
&& inSendListBytes.get() <= maxMemSize * sendThreshold) {
sendBuffersToServers();
}
+ mapOutputByteCounter.increment(length);
}
private void sendBufferToServers(WriteBuffer<K, V> buffer) {
diff --git a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java
index 8e8aed3d..1dbfae2f 100644
--- a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java
+++ b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java
@@ -167,7 +167,8 @@ public class RssSorter extends ExternalSorter {
sendCheckTimeout,
bitmapSplitNum,
shuffleId,
- true);
+ true,
+ mapOutputByteCounter);
LOG.info("Initialized WriteBufferManager.");
}
diff --git a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java
index c955bd24..13d1bf76 100644
--- a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java
+++ b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java
@@ -165,7 +165,8 @@ public class RssUnSorter extends ExternalSorter {
sendCheckTimeout,
bitmapSplitNum,
shuffleId,
- false);
+ false,
+ mapOutputByteCounter);
LOG.info("Initialized WriteBufferManager.");
}
diff --git a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
index c7fb9463..281dbb97 100644
--- a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
+++ b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
@@ -17,6 +17,7 @@
package org.apache.tez.runtime.library.common.sort.buffer;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -28,16 +29,29 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.mapred.JobConf;
import org.apache.tez.common.RssTezUtils;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.output.OutputTestHelpers;
+import org.apache.tez.runtime.library.output.RssOrderedPartitionedKVOutputTest;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.client.api.ShuffleWriteClient;
@@ -57,14 +71,14 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class WriteBufferManagerTest {
@Test
- public void testWriteException() throws IOException, InterruptedException {
+ public void testWriteException(@TempDir File tmpDir) throws IOException, InterruptedException {
TezTaskAttemptID tezTaskAttemptID =
TezTaskAttemptID.fromString("attempt_1681717153064_3770270_1_00_000000_0");
- long maxMemSize = 10240;
- String appId = "application_1681717153064_3770270";
- long taskAttemptId = 0;
- Set<Long> successBlockIds = Sets.newConcurrentHashSet();
- Set<Long> failedBlockIds = Sets.newConcurrentHashSet();
+ final long maxMemSize = 10240;
+ final String appId = "application_1681717153064_3770270";
+ final long taskAttemptId = 0;
+ final Set<Long> successBlockIds = Sets.newConcurrentHashSet();
+ final Set<Long> failedBlockIds = Sets.newConcurrentHashSet();
MockShuffleWriteClient writeClient = new MockShuffleWriteClient();
RawComparator comparator = WritableComparator.get(BytesWritable.class);
long maxSegmentSize = 3 * 1024;
@@ -86,6 +100,23 @@ public class WriteBufferManagerTest {
int bitmapSplitNum = 1;
int shuffleId = getShuffleId(tezTaskAttemptID, 1, 2);
+ Configuration conf = new Configuration();
+ FileSystem localFs = FileSystem.getLocal(conf);
+ Path workingDir =
+ new Path(
+ System.getProperty(
+ "test.build.data", System.getProperty("java.io.tmpdir", tmpDir.toString())),
+ RssOrderedPartitionedKVOutputTest.class.getName())
+ .makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
+ conf.set(
+ TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, HashPartitioner.class.getName());
+ conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, workingDir.toString());
+ OutputContext outputContext = OutputTestHelpers.createOutputContext(conf, workingDir);
+ TezCounter mapOutputByteCounter =
+ outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES);
+
WriteBufferManager<BytesWritable, BytesWritable> bufferManager =
new WriteBufferManager(
tezTaskAttemptID,
@@ -111,7 +142,8 @@ public class WriteBufferManagerTest {
sendCheckTimeout,
bitmapSplitNum,
shuffleId,
- true);
+ true,
+ mapOutputByteCounter);
Random random = new Random();
for (int i = 0; i < 1000; i++) {
@@ -122,6 +154,8 @@ public class WriteBufferManagerTest {
bufferManager.addRecord(1, new BytesWritable(key), new BytesWritable(value));
}
+ assertEquals(1052000, mapOutputByteCounter.getValue());
+
boolean isException = false;
try {
bufferManager.waitSendFinished();
@@ -132,14 +166,14 @@ public class WriteBufferManagerTest {
}
@Test
- public void testWriteNormal() throws IOException, InterruptedException {
+ public void testWriteNormal(@TempDir File tmpDir) throws IOException, InterruptedException {
TezTaskAttemptID tezTaskAttemptID =
TezTaskAttemptID.fromString("attempt_1681717153064_3770270_1_00_000000_0");
- long maxMemSize = 10240;
- String appId = "appattempt_1681717153064_3770270_000001";
- long taskAttemptId = 0;
- Set<Long> successBlockIds = Sets.newConcurrentHashSet();
- Set<Long> failedBlockIds = Sets.newConcurrentHashSet();
+ final long maxMemSize = 10240;
+ final String appId = "appattempt_1681717153064_3770270_000001";
+ final long taskAttemptId = 0;
+ final Set<Long> successBlockIds = Sets.newConcurrentHashSet();
+ final Set<Long> failedBlockIds = Sets.newConcurrentHashSet();
MockShuffleWriteClient writeClient = new MockShuffleWriteClient();
writeClient.setMode(2);
RawComparator comparator = WritableComparator.get(BytesWritable.class);
@@ -162,6 +196,23 @@ public class WriteBufferManagerTest {
int bitmapSplitNum = 1;
int shuffleId = getShuffleId(tezTaskAttemptID, 1, 2);
+ Configuration conf = new Configuration();
+ FileSystem localFs = FileSystem.getLocal(conf);
+ Path workingDir =
+ new Path(
+ System.getProperty(
+ "test.build.data", System.getProperty("java.io.tmpdir", tmpDir.toString())),
+ RssOrderedPartitionedKVOutputTest.class.getName())
+ .makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
+ conf.set(
+ TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, HashPartitioner.class.getName());
+ conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, workingDir.toString());
+ OutputContext outputContext = OutputTestHelpers.createOutputContext(conf, workingDir);
+ TezCounter mapOutputByteCounter =
+ outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES);
+
WriteBufferManager<BytesWritable, BytesWritable> bufferManager =
new WriteBufferManager(
tezTaskAttemptID,
@@ -187,7 +238,8 @@ public class WriteBufferManagerTest {
sendCheckTimeout,
bitmapSplitNum,
shuffleId,
- true);
+ true,
+ mapOutputByteCounter);
Random random = new Random();
for (int i = 0; i < 1000; i++) {
@@ -198,6 +250,8 @@ public class WriteBufferManagerTest {
int partitionId = random.nextInt(50);
bufferManager.addRecord(partitionId, new BytesWritable(key), new BytesWritable(value));
}
+
+ assertEquals(1052000, mapOutputByteCounter.getValue());
bufferManager.waitSendFinished();
assertTrue(bufferManager.getWaitSendBuffers().isEmpty());
@@ -208,6 +262,8 @@ public class WriteBufferManagerTest {
random.nextBytes(value);
bufferManager.addRecord(i, new BytesWritable(key), new BytesWritable(value));
}
+
+ assertEquals(1175900, mapOutputByteCounter.getValue());
assert (1 == bufferManager.getWaitSendBuffers().size());
assert (4928 == bufferManager.getWaitSendBuffers().get(0).getDataLength());
@@ -216,14 +272,15 @@ public class WriteBufferManagerTest {
}
@Test
- public void testCommitBlocksWhenMemoryShuffleDisabled() throws IOException, InterruptedException {
+ public void testCommitBlocksWhenMemoryShuffleDisabled(@TempDir File tmpDir)
+ throws IOException, InterruptedException {
TezTaskAttemptID tezTaskAttemptID =
TezTaskAttemptID.fromString("attempt_1681717153064_3770270_1_00_000000_0");
- long maxMemSize = 10240;
- String appId = "application_1681717153064_3770270";
- long taskAttemptId = 0;
- Set<Long> successBlockIds = Sets.newConcurrentHashSet();
- Set<Long> failedBlockIds = Sets.newConcurrentHashSet();
+ final long maxMemSize = 10240;
+ final String appId = "application_1681717153064_3770270";
+ final long taskAttemptId = 0;
+ final Set<Long> successBlockIds = Sets.newConcurrentHashSet();
+ final Set<Long> failedBlockIds = Sets.newConcurrentHashSet();
MockShuffleWriteClient writeClient = new MockShuffleWriteClient();
writeClient.setMode(3);
RawComparator comparator = WritableComparator.get(BytesWritable.class);
@@ -245,6 +302,23 @@ public class WriteBufferManagerTest {
int bitmapSplitNum = 1;
int shuffleId = getShuffleId(tezTaskAttemptID, 1, 2);
+ Configuration conf = new Configuration();
+ FileSystem localFs = FileSystem.getLocal(conf);
+ Path workingDir =
+ new Path(
+ System.getProperty(
+ "test.build.data", System.getProperty("java.io.tmpdir", tmpDir.toString())),
+ RssOrderedPartitionedKVOutputTest.class.getName())
+ .makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
+ conf.set(
+ TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, HashPartitioner.class.getName());
+ conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, workingDir.toString());
+ OutputContext outputContext = OutputTestHelpers.createOutputContext(conf, workingDir);
+ TezCounter mapOutputByteCounter =
+ outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES);
+
WriteBufferManager<BytesWritable, BytesWritable> bufferManager =
new WriteBufferManager(
tezTaskAttemptID,
@@ -270,7 +344,8 @@ public class WriteBufferManagerTest {
sendCheckTimeout,
bitmapSplitNum,
shuffleId,
- true);
+ true,
+ mapOutputByteCounter);
Random random = new Random();
for (int i = 0; i < 10000; i++) {
@@ -283,6 +358,7 @@ public class WriteBufferManagerTest {
}
bufferManager.waitSendFinished();
+ assertEquals(10520000, mapOutputByteCounter.getValue());
assertTrue(bufferManager.getWaitSendBuffers().isEmpty());
assertEquals(
writeClient.mockedShuffleServer.getFinishBlockSize(),