You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/05/19 14:56:09 UTC

[incubator-uniffle] branch branch-0.7 updated: Revert "[#886] fix(mr): MR Client may lost data or throw exception when rss.storage.type without MEMORY. (#887)"

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch branch-0.7
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/branch-0.7 by this push:
     new 90539fbc Revert "[#886] fix(mr): MR Client may lost data or throw exception when rss.storage.type without MEMORY. (#887)"
90539fbc is described below

commit 90539fbcb7b085ca4e5453223e501b2e067af72e
Author: roryqi <ro...@tencent.com>
AuthorDate: Fri May 19 22:55:29 2023 +0800

    Revert "[#886] fix(mr): MR Client may lost data or throw exception when rss.storage.type without MEMORY. (#887)"
    
    This reverts commit 4423b434bbff02c2bb1342aebdd2951dc26c9aaf.
---
 .../hadoop/mapred/SortWriteBufferManager.java      |  12 +--
 .../hadoop/mapred/SortWriteBufferManagerTest.java  | 105 +--------------------
 2 files changed, 7 insertions(+), 110 deletions(-)

diff --git a/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java b/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
index c12d81d3..cb76bef2 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
@@ -262,6 +262,12 @@ public class SortWriteBufferManager<K, V> {
       sendBuffersToServers();
     }
     long start = System.currentTimeMillis();
+    long commitDuration = 0;
+    if (!isMemoryShuffleEnabled) {
+      long s = System.currentTimeMillis();
+      sendCommit();
+      commitDuration = System.currentTimeMillis() - s;
+    }
     while (true) {
       // if failed when send data to shuffle server, mark task as failed
       if (failedBlockIds.size() > 0) {
@@ -287,12 +293,6 @@ public class SortWriteBufferManager<K, V> {
         throw new RssException(errorMsg);
       }
     }
-    long commitDuration = 0;
-    if (!isMemoryShuffleEnabled) {
-      long s = System.currentTimeMillis();
-      sendCommit();
-      commitDuration = System.currentTimeMillis() - s;
-    }
 
     start = System.currentTimeMillis();
     shuffleWriteClient.reportShuffleResult(partitionToServers, appId, 0,
diff --git a/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java b/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
index 40173fca..1e07335b 100644
--- a/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
+++ b/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
@@ -17,13 +17,11 @@
 
 package org.apache.hadoop.mapred;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.function.Supplier;
-import java.util.stream.Collectors;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -260,91 +258,9 @@ public class SortWriteBufferManagerTest {
     assertTrue(manager.getWaitSendBuffers().isEmpty());
   }
 
-  @Test
-  public void testCommitBlocksWhenMemoryShuffleDisabled() throws Exception {
-    JobConf jobConf = new JobConf(new Configuration());
-    SerializationFactory serializationFactory = new SerializationFactory(jobConf);
-    MockShuffleWriteClient client = new MockShuffleWriteClient();
-    client.setMode(3);
-    Map<Integer, List<ShuffleServerInfo>> partitionToServers = JavaUtils.newConcurrentMap();
-    Set<Long> successBlocks = Sets.newConcurrentHashSet();
-    Set<Long> failedBlocks = Sets.newConcurrentHashSet();
-    Counters.Counter mapOutputByteCounter = new Counters.Counter();
-    Counters.Counter mapOutputRecordCounter = new Counters.Counter();
-    SortWriteBufferManager<BytesWritable, BytesWritable> manager;
-    manager = new SortWriteBufferManager<BytesWritable, BytesWritable>(
-        10240,
-        1L,
-        10,
-        serializationFactory.getSerializer(BytesWritable.class),
-        serializationFactory.getSerializer(BytesWritable.class),
-        WritableComparator.get(BytesWritable.class),
-        0.9,
-        "test",
-        client,
-        500,
-        5 * 1000,
-        partitionToServers,
-        successBlocks,
-        failedBlocks,
-        mapOutputByteCounter,
-        mapOutputRecordCounter,
-        1,
-        100,
-        1,
-        false,
-        5,
-        0.2f,
-        1024000L,
-        new RssConf());
-    Random random = new Random();
-    for (int i = 0; i < 1000; i++) {
-      byte[] key = new byte[20];
-      byte[] value = new byte[1024];
-      random.nextBytes(key);
-      random.nextBytes(value);
-      int partitionId = random.nextInt(50);
-      manager.addRecord(partitionId, new BytesWritable(key), new BytesWritable(value));
-    }
-    manager.waitSendFinished();
-    assertTrue(manager.getWaitSendBuffers().isEmpty());
-    // When MEMOEY storage type is disable, all blocks should flush.
-    assertEquals(client.mockedShuffleServer.getFinishBlockSize(), client.mockedShuffleServer.getFlushBlockSize());
-  }
-
-  class MockShuffleServer {
-
-    // All methods of MockShuffle are thread safe, because send-thread may do something in concurrent way.
-    private List<ShuffleBlockInfo> cachedBlockInfos = new ArrayList<>();
-    private List<ShuffleBlockInfo> flushBlockInfos = new ArrayList<>();
-    private List<Long> finishedBlockInfos = new ArrayList<>();
-
-    public synchronized void finishShuffle() {
-      flushBlockInfos.addAll(cachedBlockInfos);
-    }
-
-    public synchronized void addCachedBlockInfos(List<ShuffleBlockInfo> shuffleBlockInfoList) {
-      cachedBlockInfos.addAll(shuffleBlockInfoList);
-    }
-
-    public synchronized void addFinishedBlockInfos(List<Long> shuffleBlockInfoList) {
-      finishedBlockInfos.addAll(shuffleBlockInfoList);
-    }
-
-    public synchronized int getFlushBlockSize() {
-      return flushBlockInfos.size();
-    }
-
-    public synchronized int getFinishBlockSize() {
-      return finishedBlockInfos.size();
-    }
-  }
-
   class MockShuffleWriteClient implements ShuffleWriteClient {
 
     int mode = 0;
-    MockShuffleServer mockedShuffleServer = new MockShuffleServer();
-    int committedMaps = 0;
 
     public void setMode(int mode) {
       this.mode = mode;
@@ -358,15 +274,6 @@ public class SortWriteBufferManagerTest {
       } else if (mode == 1) {
         return new SendShuffleDataResult(Sets.newHashSet(2L), Sets.newHashSet(1L));
       } else {
-        if (mode == 3) {
-          try {
-            Thread.sleep(10);
-            mockedShuffleServer.addCachedBlockInfos(shuffleBlockInfoList);
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new RssException(e);
-          }
-        }
         Set<Long> successBlockIds = Sets.newHashSet();
         for (ShuffleBlockInfo blockInfo : shuffleBlockInfoList) {
           successBlockIds.add(blockInfo.getBlockId());
@@ -397,13 +304,6 @@ public class SortWriteBufferManagerTest {
 
     @Override
     public boolean sendCommit(Set<ShuffleServerInfo> shuffleServerInfoSet, String appId, int shuffleId, int numMaps) {
-      if (mode == 3) {
-        committedMaps++;
-        if (committedMaps >= numMaps) {
-          mockedShuffleServer.finishShuffle();
-        }
-        return true;
-      }
       return false;
     }
 
@@ -425,10 +325,7 @@ public class SortWriteBufferManagerTest {
     @Override
     public void reportShuffleResult(Map<Integer, List<ShuffleServerInfo>> partitionToServers, String appId,
         int shuffleId, long taskAttemptId, Map<Integer, List<Long>> partitionToBlockIds, int bitmapNum) {
-      if (mode == 3) {
-        mockedShuffleServer.addFinishedBlockInfos(
-            partitionToBlockIds.values().stream().flatMap(it -> it.stream()).collect(Collectors.toList()));
-      }
+
     }
 
     @Override