You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/05/19 12:18:40 UTC
[incubator-uniffle] branch branch-0.7 updated: [#886] fix(mr): MR Client may lost data or throw exception when rss.storage.type without MEMORY. (#887)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch branch-0.7
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/branch-0.7 by this push:
new 4423b434 [#886] fix(mr): MR Client may lost data or throw exception when rss.storage.type without MEMORY. (#887)
4423b434 is described below
commit 4423b434bbff02c2bb1342aebdd2951dc26c9aaf
Author: zhengchenyu <zh...@163.com>
AuthorDate: Fri May 19 20:12:55 2023 +0800
[#886] fix(mr): MR Client may lost data or throw exception when rss.storage.type without MEMORY. (#887)
### What changes were proposed in this pull request?
Make sure finishShuffle after send all shuffle data.
### Why are the changes needed?
If type without MEMORY, some data will never flush.
### How was this patch tested?
I test in two mode:
* Tez local debug mode
* MR on yarn mode
Add new UT
Co-authored-by: zhengchenyu001 <zh...@ke.com>
---
.../hadoop/mapred/SortWriteBufferManager.java | 12 +--
.../hadoop/mapred/SortWriteBufferManagerTest.java | 105 ++++++++++++++++++++-
2 files changed, 110 insertions(+), 7 deletions(-)
diff --git a/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java b/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
index cb76bef2..c12d81d3 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
@@ -262,12 +262,6 @@ public class SortWriteBufferManager<K, V> {
sendBuffersToServers();
}
long start = System.currentTimeMillis();
- long commitDuration = 0;
- if (!isMemoryShuffleEnabled) {
- long s = System.currentTimeMillis();
- sendCommit();
- commitDuration = System.currentTimeMillis() - s;
- }
while (true) {
// if failed when send data to shuffle server, mark task as failed
if (failedBlockIds.size() > 0) {
@@ -293,6 +287,12 @@ public class SortWriteBufferManager<K, V> {
throw new RssException(errorMsg);
}
}
+ long commitDuration = 0;
+ if (!isMemoryShuffleEnabled) {
+ long s = System.currentTimeMillis();
+ sendCommit();
+ commitDuration = System.currentTimeMillis() - s;
+ }
start = System.currentTimeMillis();
shuffleWriteClient.reportShuffleResult(partitionToServers, appId, 0,
diff --git a/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java b/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
index 1e07335b..40173fca 100644
--- a/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
+++ b/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
@@ -17,11 +17,13 @@
package org.apache.hadoop.mapred;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -258,9 +260,91 @@ public class SortWriteBufferManagerTest {
assertTrue(manager.getWaitSendBuffers().isEmpty());
}
+ @Test
+ public void testCommitBlocksWhenMemoryShuffleDisabled() throws Exception {
+ JobConf jobConf = new JobConf(new Configuration());
+ SerializationFactory serializationFactory = new SerializationFactory(jobConf);
+ MockShuffleWriteClient client = new MockShuffleWriteClient();
+ client.setMode(3);
+ Map<Integer, List<ShuffleServerInfo>> partitionToServers = JavaUtils.newConcurrentMap();
+ Set<Long> successBlocks = Sets.newConcurrentHashSet();
+ Set<Long> failedBlocks = Sets.newConcurrentHashSet();
+ Counters.Counter mapOutputByteCounter = new Counters.Counter();
+ Counters.Counter mapOutputRecordCounter = new Counters.Counter();
+ SortWriteBufferManager<BytesWritable, BytesWritable> manager;
+ manager = new SortWriteBufferManager<BytesWritable, BytesWritable>(
+ 10240,
+ 1L,
+ 10,
+ serializationFactory.getSerializer(BytesWritable.class),
+ serializationFactory.getSerializer(BytesWritable.class),
+ WritableComparator.get(BytesWritable.class),
+ 0.9,
+ "test",
+ client,
+ 500,
+ 5 * 1000,
+ partitionToServers,
+ successBlocks,
+ failedBlocks,
+ mapOutputByteCounter,
+ mapOutputRecordCounter,
+ 1,
+ 100,
+ 1,
+ false,
+ 5,
+ 0.2f,
+ 1024000L,
+ new RssConf());
+ Random random = new Random();
+ for (int i = 0; i < 1000; i++) {
+ byte[] key = new byte[20];
+ byte[] value = new byte[1024];
+ random.nextBytes(key);
+ random.nextBytes(value);
+ int partitionId = random.nextInt(50);
+ manager.addRecord(partitionId, new BytesWritable(key), new BytesWritable(value));
+ }
+ manager.waitSendFinished();
+ assertTrue(manager.getWaitSendBuffers().isEmpty());
+ // When MEMOEY storage type is disable, all blocks should flush.
+ assertEquals(client.mockedShuffleServer.getFinishBlockSize(), client.mockedShuffleServer.getFlushBlockSize());
+ }
+
+ class MockShuffleServer {
+
+ // All methods of MockShuffle are thread safe, because send-thread may do something in concurrent way.
+ private List<ShuffleBlockInfo> cachedBlockInfos = new ArrayList<>();
+ private List<ShuffleBlockInfo> flushBlockInfos = new ArrayList<>();
+ private List<Long> finishedBlockInfos = new ArrayList<>();
+
+ public synchronized void finishShuffle() {
+ flushBlockInfos.addAll(cachedBlockInfos);
+ }
+
+ public synchronized void addCachedBlockInfos(List<ShuffleBlockInfo> shuffleBlockInfoList) {
+ cachedBlockInfos.addAll(shuffleBlockInfoList);
+ }
+
+ public synchronized void addFinishedBlockInfos(List<Long> shuffleBlockInfoList) {
+ finishedBlockInfos.addAll(shuffleBlockInfoList);
+ }
+
+ public synchronized int getFlushBlockSize() {
+ return flushBlockInfos.size();
+ }
+
+ public synchronized int getFinishBlockSize() {
+ return finishedBlockInfos.size();
+ }
+ }
+
class MockShuffleWriteClient implements ShuffleWriteClient {
int mode = 0;
+ MockShuffleServer mockedShuffleServer = new MockShuffleServer();
+ int committedMaps = 0;
public void setMode(int mode) {
this.mode = mode;
@@ -274,6 +358,15 @@ public class SortWriteBufferManagerTest {
} else if (mode == 1) {
return new SendShuffleDataResult(Sets.newHashSet(2L), Sets.newHashSet(1L));
} else {
+ if (mode == 3) {
+ try {
+ Thread.sleep(10);
+ mockedShuffleServer.addCachedBlockInfos(shuffleBlockInfoList);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RssException(e);
+ }
+ }
Set<Long> successBlockIds = Sets.newHashSet();
for (ShuffleBlockInfo blockInfo : shuffleBlockInfoList) {
successBlockIds.add(blockInfo.getBlockId());
@@ -304,6 +397,13 @@ public class SortWriteBufferManagerTest {
@Override
public boolean sendCommit(Set<ShuffleServerInfo> shuffleServerInfoSet, String appId, int shuffleId, int numMaps) {
+ if (mode == 3) {
+ committedMaps++;
+ if (committedMaps >= numMaps) {
+ mockedShuffleServer.finishShuffle();
+ }
+ return true;
+ }
return false;
}
@@ -325,7 +425,10 @@ public class SortWriteBufferManagerTest {
@Override
public void reportShuffleResult(Map<Integer, List<ShuffleServerInfo>> partitionToServers, String appId,
int shuffleId, long taskAttemptId, Map<Integer, List<Long>> partitionToBlockIds, int bitmapNum) {
-
+ if (mode == 3) {
+ mockedShuffleServer.addFinishedBlockInfos(
+ partitionToBlockIds.values().stream().flatMap(it -> it.stream()).collect(Collectors.toList()));
+ }
}
@Override