You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/05/19 12:13:01 UTC

[incubator-uniffle] branch master updated: [#886] fix(mr): MR Client may lost data or throw exception when rss.storage.type without MEMORY. (#887)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c62784f [#886] fix(mr): MR Client may lost data or throw exception when rss.storage.type without MEMORY. (#887)
9c62784f is described below

commit 9c62784f0115898a6f476ac58ac9a56dc8c915c3
Author: zhengchenyu <zh...@163.com>
AuthorDate: Fri May 19 20:12:55 2023 +0800

    [#886] fix(mr): MR Client may lost data or throw exception when rss.storage.type without MEMORY. (#887)
    
    ### What changes were proposed in this pull request?
    
    Make sure finishShuffle after send all shuffle data.
    
    ### Why are the changes needed?
    
    If type without MEMORY, some data will never flush.
    
    ### How was this patch tested?
    
    I test in two mode:
    * Tez local debug mode
    * MR on yarn mode
    
    Add new UT
    
    Co-authored-by: zhengchenyu001 <zh...@ke.com>
---
 .../hadoop/mapred/SortWriteBufferManager.java      |  12 +--
 .../hadoop/mapred/SortWriteBufferManagerTest.java  | 105 ++++++++++++++++++++-
 2 files changed, 110 insertions(+), 7 deletions(-)

diff --git a/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java b/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
index 6e1dc171..b9e7f676 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
@@ -260,12 +260,6 @@ public class SortWriteBufferManager<K, V> {
       sendBuffersToServers();
     }
     long start = System.currentTimeMillis();
-    long commitDuration = 0;
-    if (!isMemoryShuffleEnabled) {
-      long s = System.currentTimeMillis();
-      sendCommit();
-      commitDuration = System.currentTimeMillis() - s;
-    }
     while (true) {
       // if failed when send data to shuffle server, mark task as failed
       if (failedBlockIds.size() > 0) {
@@ -291,6 +285,12 @@ public class SortWriteBufferManager<K, V> {
         throw new RssException(errorMsg);
       }
     }
+    long commitDuration = 0;
+    if (!isMemoryShuffleEnabled) {
+      long s = System.currentTimeMillis();
+      sendCommit();
+      commitDuration = System.currentTimeMillis() - s;
+    }
 
     start = System.currentTimeMillis();
     shuffleWriteClient.reportShuffleResult(partitionToServers, appId, 0,
diff --git a/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java b/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
index f59bc925..d0b87245 100644
--- a/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
+++ b/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
@@ -17,11 +17,13 @@
 
 package org.apache.hadoop.mapred;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.Sets;
 import org.apache.hadoop.conf.Configuration;
@@ -261,9 +263,91 @@ public class SortWriteBufferManagerTest {
     assertTrue(manager.getWaitSendBuffers().isEmpty());
   }
 
+  @Test
+  public void testCommitBlocksWhenMemoryShuffleDisabled() throws Exception {
+    JobConf jobConf = new JobConf(new Configuration());
+    SerializationFactory serializationFactory = new SerializationFactory(jobConf);
+    MockShuffleWriteClient client = new MockShuffleWriteClient();
+    client.setMode(3);
+    Map<Integer, List<ShuffleServerInfo>> partitionToServers = JavaUtils.newConcurrentMap();
+    Set<Long> successBlocks = Sets.newConcurrentHashSet();
+    Set<Long> failedBlocks = Sets.newConcurrentHashSet();
+    Counters.Counter mapOutputByteCounter = new Counters.Counter();
+    Counters.Counter mapOutputRecordCounter = new Counters.Counter();
+    SortWriteBufferManager<BytesWritable, BytesWritable> manager;
+    manager = new SortWriteBufferManager<BytesWritable, BytesWritable>(
+        10240,
+        1L,
+        10,
+        serializationFactory.getSerializer(BytesWritable.class),
+        serializationFactory.getSerializer(BytesWritable.class),
+        WritableComparator.get(BytesWritable.class),
+        0.9,
+        "test",
+        client,
+        500,
+        5 * 1000,
+        partitionToServers,
+        successBlocks,
+        failedBlocks,
+        mapOutputByteCounter,
+        mapOutputRecordCounter,
+        1,
+        100,
+        1,
+        false,
+        5,
+        0.2f,
+        1024000L,
+        new RssConf());
+    Random random = new Random();
+    for (int i = 0; i < 1000; i++) {
+      byte[] key = new byte[20];
+      byte[] value = new byte[1024];
+      random.nextBytes(key);
+      random.nextBytes(value);
+      int partitionId = random.nextInt(50);
+      manager.addRecord(partitionId, new BytesWritable(key), new BytesWritable(value));
+    }
+    manager.waitSendFinished();
+    assertTrue(manager.getWaitSendBuffers().isEmpty());
+    // When MEMOEY storage type is disable, all blocks should flush.
+    assertEquals(client.mockedShuffleServer.getFinishBlockSize(), client.mockedShuffleServer.getFlushBlockSize());
+  }
+
+  class MockShuffleServer {
+
+    // All methods of MockShuffle are thread safe, because send-thread may do something in concurrent way.
+    private List<ShuffleBlockInfo> cachedBlockInfos = new ArrayList<>();
+    private List<ShuffleBlockInfo> flushBlockInfos = new ArrayList<>();
+    private List<Long> finishedBlockInfos = new ArrayList<>();
+
+    public synchronized void finishShuffle() {
+      flushBlockInfos.addAll(cachedBlockInfos);
+    }
+
+    public synchronized void addCachedBlockInfos(List<ShuffleBlockInfo> shuffleBlockInfoList) {
+      cachedBlockInfos.addAll(shuffleBlockInfoList);
+    }
+
+    public synchronized void addFinishedBlockInfos(List<Long> shuffleBlockInfoList) {
+      finishedBlockInfos.addAll(shuffleBlockInfoList);
+    }
+
+    public synchronized int getFlushBlockSize() {
+      return flushBlockInfos.size();
+    }
+
+    public synchronized int getFinishBlockSize() {
+      return finishedBlockInfos.size();
+    }
+  }
+
   class MockShuffleWriteClient implements ShuffleWriteClient {
 
     int mode = 0;
+    MockShuffleServer mockedShuffleServer = new MockShuffleServer();
+    int committedMaps = 0;
 
     public void setMode(int mode) {
       this.mode = mode;
@@ -277,6 +361,15 @@ public class SortWriteBufferManagerTest {
       } else if (mode == 1) {
         return new SendShuffleDataResult(Sets.newHashSet(2L), Sets.newHashSet(1L));
       } else {
+        if (mode == 3) {
+          try {
+            Thread.sleep(10);
+            mockedShuffleServer.addCachedBlockInfos(shuffleBlockInfoList);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RssException(e);
+          }
+        }
         Set<Long> successBlockIds = Sets.newHashSet();
         for (ShuffleBlockInfo blockInfo : shuffleBlockInfoList) {
           successBlockIds.add(blockInfo.getBlockId());
@@ -308,6 +401,13 @@ public class SortWriteBufferManagerTest {
 
     @Override
     public boolean sendCommit(Set<ShuffleServerInfo> shuffleServerInfoSet, String appId, int shuffleId, int numMaps) {
+      if (mode == 3) {
+        committedMaps++;
+        if (committedMaps >= numMaps) {
+          mockedShuffleServer.finishShuffle();
+        }
+        return true;
+      }
       return false;
     }
 
@@ -329,7 +429,10 @@ public class SortWriteBufferManagerTest {
     @Override
     public void reportShuffleResult(Map<Integer, List<ShuffleServerInfo>> partitionToServers, String appId,
         int shuffleId, long taskAttemptId, Map<Integer, List<Long>> partitionToBlockIds, int bitmapNum) {
-
+      if (mode == 3) {
+        mockedShuffleServer.addFinishedBlockInfos(
+            partitionToBlockIds.values().stream().flatMap(it -> it.stream()).collect(Collectors.toList()));
+      }
     }
 
     @Override