You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/05/19 14:56:09 UTC
[incubator-uniffle] branch branch-0.7 updated: Revert "[#886] fix(mr): MR Client may lost data or throw exception when rss.storage.type without MEMORY. (#887)"
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch branch-0.7
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/branch-0.7 by this push:
new 90539fbc Revert "[#886] fix(mr): MR Client may lost data or throw exception when rss.storage.type without MEMORY. (#887)"
90539fbc is described below
commit 90539fbcb7b085ca4e5453223e501b2e067af72e
Author: roryqi <ro...@tencent.com>
AuthorDate: Fri May 19 22:55:29 2023 +0800
Revert "[#886] fix(mr): MR Client may lost data or throw exception when rss.storage.type without MEMORY. (#887)"
This reverts commit 4423b434bbff02c2bb1342aebdd2951dc26c9aaf.
---
.../hadoop/mapred/SortWriteBufferManager.java | 12 +--
.../hadoop/mapred/SortWriteBufferManagerTest.java | 105 +--------------------
2 files changed, 7 insertions(+), 110 deletions(-)
diff --git a/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java b/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
index c12d81d3..cb76bef2 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
@@ -262,6 +262,12 @@ public class SortWriteBufferManager<K, V> {
sendBuffersToServers();
}
long start = System.currentTimeMillis();
+ long commitDuration = 0;
+ if (!isMemoryShuffleEnabled) {
+ long s = System.currentTimeMillis();
+ sendCommit();
+ commitDuration = System.currentTimeMillis() - s;
+ }
while (true) {
// if failed when send data to shuffle server, mark task as failed
if (failedBlockIds.size() > 0) {
@@ -287,12 +293,6 @@ public class SortWriteBufferManager<K, V> {
throw new RssException(errorMsg);
}
}
- long commitDuration = 0;
- if (!isMemoryShuffleEnabled) {
- long s = System.currentTimeMillis();
- sendCommit();
- commitDuration = System.currentTimeMillis() - s;
- }
start = System.currentTimeMillis();
shuffleWriteClient.reportShuffleResult(partitionToServers, appId, 0,
diff --git a/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java b/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
index 40173fca..1e07335b 100644
--- a/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
+++ b/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
@@ -17,13 +17,11 @@
package org.apache.hadoop.mapred;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.function.Supplier;
-import java.util.stream.Collectors;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -260,91 +258,9 @@ public class SortWriteBufferManagerTest {
assertTrue(manager.getWaitSendBuffers().isEmpty());
}
- @Test
- public void testCommitBlocksWhenMemoryShuffleDisabled() throws Exception {
- JobConf jobConf = new JobConf(new Configuration());
- SerializationFactory serializationFactory = new SerializationFactory(jobConf);
- MockShuffleWriteClient client = new MockShuffleWriteClient();
- client.setMode(3);
- Map<Integer, List<ShuffleServerInfo>> partitionToServers = JavaUtils.newConcurrentMap();
- Set<Long> successBlocks = Sets.newConcurrentHashSet();
- Set<Long> failedBlocks = Sets.newConcurrentHashSet();
- Counters.Counter mapOutputByteCounter = new Counters.Counter();
- Counters.Counter mapOutputRecordCounter = new Counters.Counter();
- SortWriteBufferManager<BytesWritable, BytesWritable> manager;
- manager = new SortWriteBufferManager<BytesWritable, BytesWritable>(
- 10240,
- 1L,
- 10,
- serializationFactory.getSerializer(BytesWritable.class),
- serializationFactory.getSerializer(BytesWritable.class),
- WritableComparator.get(BytesWritable.class),
- 0.9,
- "test",
- client,
- 500,
- 5 * 1000,
- partitionToServers,
- successBlocks,
- failedBlocks,
- mapOutputByteCounter,
- mapOutputRecordCounter,
- 1,
- 100,
- 1,
- false,
- 5,
- 0.2f,
- 1024000L,
- new RssConf());
- Random random = new Random();
- for (int i = 0; i < 1000; i++) {
- byte[] key = new byte[20];
- byte[] value = new byte[1024];
- random.nextBytes(key);
- random.nextBytes(value);
- int partitionId = random.nextInt(50);
- manager.addRecord(partitionId, new BytesWritable(key), new BytesWritable(value));
- }
- manager.waitSendFinished();
- assertTrue(manager.getWaitSendBuffers().isEmpty());
- // When MEMOEY storage type is disable, all blocks should flush.
- assertEquals(client.mockedShuffleServer.getFinishBlockSize(), client.mockedShuffleServer.getFlushBlockSize());
- }
-
- class MockShuffleServer {
-
- // All methods of MockShuffle are thread safe, because send-thread may do something in concurrent way.
- private List<ShuffleBlockInfo> cachedBlockInfos = new ArrayList<>();
- private List<ShuffleBlockInfo> flushBlockInfos = new ArrayList<>();
- private List<Long> finishedBlockInfos = new ArrayList<>();
-
- public synchronized void finishShuffle() {
- flushBlockInfos.addAll(cachedBlockInfos);
- }
-
- public synchronized void addCachedBlockInfos(List<ShuffleBlockInfo> shuffleBlockInfoList) {
- cachedBlockInfos.addAll(shuffleBlockInfoList);
- }
-
- public synchronized void addFinishedBlockInfos(List<Long> shuffleBlockInfoList) {
- finishedBlockInfos.addAll(shuffleBlockInfoList);
- }
-
- public synchronized int getFlushBlockSize() {
- return flushBlockInfos.size();
- }
-
- public synchronized int getFinishBlockSize() {
- return finishedBlockInfos.size();
- }
- }
-
class MockShuffleWriteClient implements ShuffleWriteClient {
int mode = 0;
- MockShuffleServer mockedShuffleServer = new MockShuffleServer();
- int committedMaps = 0;
public void setMode(int mode) {
this.mode = mode;
@@ -358,15 +274,6 @@ public class SortWriteBufferManagerTest {
} else if (mode == 1) {
return new SendShuffleDataResult(Sets.newHashSet(2L), Sets.newHashSet(1L));
} else {
- if (mode == 3) {
- try {
- Thread.sleep(10);
- mockedShuffleServer.addCachedBlockInfos(shuffleBlockInfoList);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RssException(e);
- }
- }
Set<Long> successBlockIds = Sets.newHashSet();
for (ShuffleBlockInfo blockInfo : shuffleBlockInfoList) {
successBlockIds.add(blockInfo.getBlockId());
@@ -397,13 +304,6 @@ public class SortWriteBufferManagerTest {
@Override
public boolean sendCommit(Set<ShuffleServerInfo> shuffleServerInfoSet, String appId, int shuffleId, int numMaps) {
- if (mode == 3) {
- committedMaps++;
- if (committedMaps >= numMaps) {
- mockedShuffleServer.finishShuffle();
- }
- return true;
- }
return false;
}
@@ -425,10 +325,7 @@ public class SortWriteBufferManagerTest {
@Override
public void reportShuffleResult(Map<Integer, List<ShuffleServerInfo>> partitionToServers, String appId,
int shuffleId, long taskAttemptId, Map<Integer, List<Long>> partitionToBlockIds, int bitmapNum) {
- if (mode == 3) {
- mockedShuffleServer.addFinishedBlockInfos(
- partitionToBlockIds.values().stream().flatMap(it -> it.stream()).collect(Collectors.toList()));
- }
+
}
@Override