You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/10/21 11:02:22 UTC

[incubator-uniffle] branch branch-0.6 updated: [ISSUE-273][BUG] Get shuffle result failed caused by concurrent calls to registerShuffle (#274)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new 30b60751 [ISSUE-273][BUG] Get shuffle result failed caused by concurrent calls to registerShuffle (#274)
30b60751 is described below

commit 30b6075104a878071c07f73772a6b186e752c713
Author: Xianming Lei <31...@users.noreply.github.com>
AuthorDate: Fri Oct 21 16:15:00 2022 +0800

    [ISSUE-273][BUG] Get shuffle result failed caused by concurrent calls to registerShuffle (#274)
    
    What changes were proposed in this pull request?
    For issue#173, make the lifecycle of remoteStorage var scoped in the method of registerShuffle
    
    Why are the changes needed?
    This problem causes getShuffleResult to fail, which eventually causes the task to fail.
    
    Does this PR introduce any user-facing change?
    No
    
    How was this patch tested?
    Existing UT
    
    Co-authored-by: leixianming <le...@didiglobal.com>
---
 .../org/apache/spark/shuffle/RssShuffleManager.java     | 17 +++++++----------
 .../org/apache/spark/shuffle/RssShuffleManager.java     | 17 ++++++++---------
 .../java/org/apache/uniffle/test/GetReaderTest.java     |  1 -
 .../java/org/apache/uniffle/test/GetReaderTest.java     |  2 --
 4 files changed, 15 insertions(+), 22 deletions(-)

diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index cdd58a27..e33ccba4 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -86,7 +86,6 @@ public class RssShuffleManager implements ShuffleManager {
   private final int dataCommitPoolSize;
   private boolean heartbeatStarted = false;
   private boolean dynamicConfEnabled = false;
-  private RemoteStorageInfo remoteStorage;
   private ThreadPoolExecutor threadPoolExecutor;
   private EventLoop eventLoop = new EventLoop<AddBlockEvent>("ShuffleDataQueue") {
 
@@ -211,9 +210,10 @@ public class RssShuffleManager implements ShuffleManager {
     }
 
     String storageType = sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key());
-    remoteStorage = new RemoteStorageInfo(sparkConf.get(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key(), ""));
-    remoteStorage = ClientUtils.fetchRemoteStorage(
-        appId, remoteStorage, dynamicConfEnabled, storageType, shuffleWriteClient);
+    RemoteStorageInfo defaultRemoteStorage = new RemoteStorageInfo(
+        sparkConf.get(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key(), ""));
+    RemoteStorageInfo remoteStorage = ClientUtils.fetchRemoteStorage(
+        appId, defaultRemoteStorage, dynamicConfEnabled, storageType, shuffleWriteClient);
 
     int partitionNumPerRange = sparkConf.get(RssSparkConfig.RSS_PARTITION_NUM_PER_RANGE);
 
@@ -231,7 +231,7 @@ public class RssShuffleManager implements ShuffleManager {
         ShuffleAssignmentsInfo response = shuffleWriteClient.getShuffleAssignments(
                 appId, shuffleId, dependency.partitioner().numPartitions(),
                 partitionNumPerRange, assignmentTags, requiredShuffleServerNumber);
-        registerShuffleServers(appId, shuffleId, response.getServerToPartitionRanges());
+        registerShuffleServers(appId, shuffleId, response.getServerToPartitionRanges(), remoteStorage);
         return response.getPartitionToServers();
       }, retryInterval, retryTimes);
     } catch (Throwable throwable) {
@@ -266,7 +266,8 @@ public class RssShuffleManager implements ShuffleManager {
   protected void registerShuffleServers(
       String appId,
       int shuffleId,
-      Map<ShuffleServerInfo, List<PartitionRange>> serverToPartitionRanges) {
+      Map<ShuffleServerInfo, List<PartitionRange>> serverToPartitionRanges,
+      RemoteStorageInfo remoteStorage) {
     if (serverToPartitionRanges == null || serverToPartitionRanges.isEmpty()) {
       return;
     }
@@ -471,8 +472,4 @@ public class RssShuffleManager implements ShuffleManager {
     this.appId = appId;
   }
 
-  @VisibleForTesting
-  public void setRemoteStorage(RemoteStorageInfo remoteStorage) {
-    this.remoteStorage = remoteStorage;
-  }
 }
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 6f04ccff..069c4a19 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -92,7 +92,6 @@ public class RssShuffleManager implements ShuffleManager {
   private ScheduledExecutorService heartBeatScheduledExecutorService;
   private boolean heartbeatStarted = false;
   private boolean dynamicConfEnabled = false;
-  private RemoteStorageInfo remoteStorage;
   private final EventLoop eventLoop;
   private final EventLoop defaultEventLoop = new EventLoop<AddBlockEvent>("ShuffleDataQueue") {
 
@@ -253,10 +252,10 @@ public class RssShuffleManager implements ShuffleManager {
     LOG.info("Generate application id used in rss: " + id.get());
 
     String storageType = sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key());
-    remoteStorage = new RemoteStorageInfo(
+    RemoteStorageInfo defaultRemoteStorage = new RemoteStorageInfo(
         sparkConf.get(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key(), ""));
-    remoteStorage = ClientUtils.fetchRemoteStorage(
-        id.get(), remoteStorage, dynamicConfEnabled, storageType, shuffleWriteClient);
+    RemoteStorageInfo remoteStorage = ClientUtils.fetchRemoteStorage(
+        id.get(), defaultRemoteStorage, dynamicConfEnabled, storageType, shuffleWriteClient);
 
     Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf);
 
@@ -275,7 +274,7 @@ public class RssShuffleManager implements ShuffleManager {
                 1,
                 assignmentTags,
                 requiredShuffleServerNumber);
-        registerShuffleServers(id.get(), shuffleId, response.getServerToPartitionRanges());
+        registerShuffleServers(id.get(), shuffleId, response.getServerToPartitionRanges(), remoteStorage);
         return response.getPartitionToServers();
       }, retryInterval, retryTimes);
     } catch (Throwable throwable) {
@@ -586,7 +585,8 @@ public class RssShuffleManager implements ShuffleManager {
       String appId,
       int shuffleId,
       Map<ShuffleServerInfo,
-      List<PartitionRange>> serverToPartitionRanges) {
+      List<PartitionRange>> serverToPartitionRanges,
+      RemoteStorageInfo remoteStorage) {
     if (serverToPartitionRanges == null || serverToPartitionRanges.isEmpty()) {
       return;
     }
@@ -708,8 +708,7 @@ public class RssShuffleManager implements ShuffleManager {
     this.id = new AtomicReference<>(appId);
   }
 
-  @VisibleForTesting
-  public void setRemoteStorage(RemoteStorageInfo remoteStorage) {
-    this.remoteStorage = remoteStorage;
+  public String getId() {
+    return id.get();
   }
 }
diff --git a/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java b/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java
index 6acb4f95..9f83d29b 100644
--- a/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java
+++ b/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java
@@ -137,7 +137,6 @@ public class GetReaderTest extends IntegrationTestBase {
     assertNull(commonHadoopConf.get("k2"));
 
     // mock the scenario that get reader in an executor
-    rssShuffleManager.setRemoteStorage(null);
     rssShuffleReader = (RssShuffleReader) rssShuffleManager.getReader(
         rssShuffleHandle, 0, 0, mockTaskContextImpl);
     hadoopConf =  rssShuffleReader.getHadoopConf();
diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
index 827e793c..9b5ee7b2 100644
--- a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
+++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
@@ -146,8 +146,6 @@ public class GetReaderTest extends IntegrationTestBase {
     assertNull(commonHadoopConf.get("k1"));
     assertNull(commonHadoopConf.get("k2"));
 
-    // mock the scenario that get reader in an executor
-    rssShuffleManager.setRemoteStorage(null);
     rssShuffleReader = (RssShuffleReader) rssShuffleManager.getReader(
         rssShuffleHandle, 0, 0, new MockTaskContext(), new TempShuffleReadMetrics());
     hadoopConf =  rssShuffleReader.getHadoopConf();