You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@uniffle.apache.org by GitBox <gi...@apache.org> on 2022/07/16 10:44:27 UTC

[GitHub] [incubator-uniffle] zuston opened a new pull request, #59: Sending commit concurrently in client side

zuston opened a new pull request, #59:
URL: https://github.com/apache/incubator-uniffle/pull/59

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://github.com/Tencent/Firestorm/blob/master/CONTRIBUTING.md
     2. Ensure you have added or run the appropriate tests for your PR
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]XXXX Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
   -->
   
   ### What changes were proposed in this pull request?
   Sending commit concurrently in client side
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue.
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   
   ### Why are the changes needed?
   I found when using the `LOCALFILE` storageType, waiting the commit will cost too much time. To speed up, it can be sent commit concurrently by using thread pool.
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   1. Introducing the conf of `rss.client.commit.sender.pool.size`, the default value is assigned shuffle server size.
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   
   ### How was this patch tested?
   No need
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #59: Sending commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#discussion_r922690489


##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -247,43 +257,58 @@ public SendShuffleDataResult sendShuffleData(String appId, List<ShuffleBlockInfo
     return new SendShuffleDataResult(successBlockIds, failedBlockIds);
   }
 
+  /**
+   * This method will wait until all shuffle data have been flushed
+   * to durable storage in assigned shuffle servers.
+   * @param shuffleServerInfoSet
+   * @param appId
+   * @param shuffleId
+   * @param numMaps
+   * @return
+   */
   @Override
   public boolean sendCommit(Set<ShuffleServerInfo> shuffleServerInfoSet, String appId, int shuffleId, int numMaps) {
+    ForkJoinPool forkJoinPool = new ForkJoinPool(
+        commitSenderPoolSize == -1 ? shuffleServerInfoSet.size() : commitSenderPoolSize
+    );
     AtomicInteger successfulCommit = new AtomicInteger(0);
-    shuffleServerInfoSet.stream().forEach(ssi -> {
-      RssSendCommitRequest request = new RssSendCommitRequest(appId, shuffleId);
-      String errorMsg = "Failed to commit shuffle data to " + ssi + " for shuffleId[" + shuffleId + "]";
-      long startTime = System.currentTimeMillis();
-      try {
-        RssSendCommitResponse response = getShuffleServerClient(ssi).sendCommit(request);
-        if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
-          int commitCount = response.getCommitCount();
-          LOG.info("Successfully sendCommit for appId[" + appId + "], shuffleId[" + shuffleId
-              + "] to ShuffleServer[" + ssi.getId() + "], cost "
-              + (System.currentTimeMillis() - startTime) + " ms, got committed maps["
-              + commitCount + "], map number of stage is " + numMaps);
-          if (commitCount >= numMaps) {
-            RssFinishShuffleResponse rfsResponse =
-                getShuffleServerClient(ssi).finishShuffle(new RssFinishShuffleRequest(appId, shuffleId));
-            if (rfsResponse.getStatusCode() != ResponseStatusCode.SUCCESS) {
-              String msg = "Failed to finish shuffle to " + ssi + " for shuffleId[" + shuffleId
-                  + "] with statusCode " + rfsResponse.getStatusCode();
-              LOG.error(msg);
-              throw new Exception(msg);
-            } else {
-              LOG.info("Successfully finish shuffle to " + ssi + " for shuffleId[" + shuffleId + "]");
+    forkJoinPool.submit(() -> {
+      shuffleServerInfoSet.parallelStream().forEach(ssi -> {
+        RssSendCommitRequest request = new RssSendCommitRequest(appId, shuffleId);
+        String errorMsg = "Failed to commit shuffle data to " + ssi + " for shuffleId[" + shuffleId + "]";
+        long startTime = System.currentTimeMillis();
+        try {
+          RssSendCommitResponse response = getShuffleServerClient(ssi).sendCommit(request);
+          if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
+            int commitCount = response.getCommitCount();
+            LOG.info("Successfully sendCommit for appId[" + appId + "], shuffleId[" + shuffleId
+                + "] to ShuffleServer[" + ssi.getId() + "], cost "
+                + (System.currentTimeMillis() - startTime) + " ms, got committed maps["
+                + commitCount + "], map number of stage is " + numMaps);
+            if (commitCount >= numMaps) {
+              RssFinishShuffleResponse rfsResponse =
+                  getShuffleServerClient(ssi).finishShuffle(new RssFinishShuffleRequest(appId, shuffleId));
+              if (rfsResponse.getStatusCode() != ResponseStatusCode.SUCCESS) {
+                String msg = "Failed to finish shuffle to " + ssi + " for shuffleId[" + shuffleId
+                    + "] with statusCode " + rfsResponse.getStatusCode();
+                LOG.error(msg);
+                throw new Exception(msg);
+              } else {
+                LOG.info("Successfully finish shuffle to " + ssi + " for shuffleId[" + shuffleId + "]");
+              }
             }
+          } else {
+            String msg = errorMsg + " with statusCode " + response.getStatusCode();
+            LOG.error(msg);
+            throw new Exception(msg);
           }
-        } else {
-          String msg = errorMsg + " with statusCode " + response.getStatusCode();
-          LOG.error(msg);
-          throw new Exception(msg);
+          successfulCommit.incrementAndGet();
+        } catch (Exception e) {
+          LOG.error(errorMsg, e);
         }
-        successfulCommit.incrementAndGet();
-      } catch (Exception e) {
-        LOG.error(errorMsg, e);
-      }
-    });
+      });
+    }).join();
+    forkJoinPool.shutdownNow();

Review Comment:
   > It looks ShutdownNow or Shutdown are all OK due to the operation of `join`
   
   It's ok for me to use method `shutdownNow()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #59: Sending commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#discussion_r922693189


##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -247,43 +257,62 @@ public SendShuffleDataResult sendShuffleData(String appId, List<ShuffleBlockInfo
     return new SendShuffleDataResult(successBlockIds, failedBlockIds);
   }
 
+  /**
+   * This method will wait until all shuffle data have been flushed
+   * to durable storage in assigned shuffle servers.
+   * @param shuffleServerInfoSet
+   * @param appId
+   * @param shuffleId
+   * @param numMaps
+   * @return
+   */
   @Override
   public boolean sendCommit(Set<ShuffleServerInfo> shuffleServerInfoSet, String appId, int shuffleId, int numMaps) {
+    ForkJoinPool forkJoinPool = new ForkJoinPool(
+        dataCommitPoolSize == -1 ? shuffleServerInfoSet.size() : dataCommitPoolSize
+    );
     AtomicInteger successfulCommit = new AtomicInteger(0);
-    shuffleServerInfoSet.stream().forEach(ssi -> {
-      RssSendCommitRequest request = new RssSendCommitRequest(appId, shuffleId);
-      String errorMsg = "Failed to commit shuffle data to " + ssi + " for shuffleId[" + shuffleId + "]";
-      long startTime = System.currentTimeMillis();
-      try {
-        RssSendCommitResponse response = getShuffleServerClient(ssi).sendCommit(request);
-        if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
-          int commitCount = response.getCommitCount();
-          LOG.info("Successfully sendCommit for appId[" + appId + "], shuffleId[" + shuffleId
-              + "] to ShuffleServer[" + ssi.getId() + "], cost "
-              + (System.currentTimeMillis() - startTime) + " ms, got committed maps["
-              + commitCount + "], map number of stage is " + numMaps);
-          if (commitCount >= numMaps) {
-            RssFinishShuffleResponse rfsResponse =
-                getShuffleServerClient(ssi).finishShuffle(new RssFinishShuffleRequest(appId, shuffleId));
-            if (rfsResponse.getStatusCode() != ResponseStatusCode.SUCCESS) {
-              String msg = "Failed to finish shuffle to " + ssi + " for shuffleId[" + shuffleId
-                  + "] with statusCode " + rfsResponse.getStatusCode();
+    try {
+      forkJoinPool.submit(() -> {
+        shuffleServerInfoSet.parallelStream().forEach(ssi -> {
+          RssSendCommitRequest request = new RssSendCommitRequest(appId, shuffleId);
+          String errorMsg = "Failed to commit shuffle data to " + ssi + " for shuffleId[" + shuffleId + "]";
+          long startTime = System.currentTimeMillis();
+          try {
+            RssSendCommitResponse response = getShuffleServerClient(ssi).sendCommit(request);
+            if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
+              int commitCount = response.getCommitCount();
+              LOG.info("Successfully sendCommit for appId[" + appId + "], shuffleId[" + shuffleId
+                  + "] to ShuffleServer[" + ssi.getId() + "], cost "
+                  + (System.currentTimeMillis() - startTime) + " ms, got committed maps["
+                  + commitCount + "], map number of stage is " + numMaps);
+              if (commitCount >= numMaps) {
+                RssFinishShuffleResponse rfsResponse =
+                    getShuffleServerClient(ssi).finishShuffle(new RssFinishShuffleRequest(appId, shuffleId));
+                if (rfsResponse.getStatusCode() != ResponseStatusCode.SUCCESS) {
+                  String msg = "Failed to finish shuffle to " + ssi + " for shuffleId[" + shuffleId
+                      + "] with statusCode " + rfsResponse.getStatusCode();
+                  LOG.error(msg);
+                  throw new Exception(msg);
+                } else {
+                  LOG.info("Successfully finish shuffle to " + ssi + " for shuffleId[" + shuffleId + "]");
+                }
+              }
+            } else {
+              String msg = errorMsg + " with statusCode " + response.getStatusCode();
               LOG.error(msg);
               throw new Exception(msg);
-            } else {
-              LOG.info("Successfully finish shuffle to " + ssi + " for shuffleId[" + shuffleId + "]");
             }
+            successfulCommit.incrementAndGet();
+          } catch (Exception e) {
+            LOG.error(errorMsg, e);
           }
-        } else {
-          String msg = errorMsg + " with statusCode " + response.getStatusCode();
-          LOG.error(msg);
-          throw new Exception(msg);
-        }
-        successfulCommit.incrementAndGet();
-      } catch (Exception e) {
-        LOG.error(errorMsg, e);
-      }
-    });
+        });
+      }).join();
+    } catch (Exception e) {

Review Comment:
   Should we use 
   ```
   finally {
      forkJoinPool.shutdownNow();
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on pull request #59: Sending commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
zuston commented on PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#issuecomment-1186189559

   > 
   
   If we close the forkjoin pool in the scope of method. I think it’s ok.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #59: Sending commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#discussion_r922687287


##########
client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java:
##########
@@ -55,6 +55,12 @@ public class RssMRConfig {
           MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_TRANSFER_POOL_SIZE;
   public static final int RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE =
           RssClientConfig.RSS_DATA_TRANFER_POOL_SIZE_DEFAULT_VALUE;
+  // Commit sender pool size

Review Comment:
   Could you remove this comment? The comment don't give me extra information to help me understand the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #59: Sending commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#discussion_r922693721


##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -247,43 +257,62 @@ public SendShuffleDataResult sendShuffleData(String appId, List<ShuffleBlockInfo
     return new SendShuffleDataResult(successBlockIds, failedBlockIds);
   }
 
+  /**
+   * This method will wait until all shuffle data have been flushed
+   * to durable storage in assigned shuffle servers.
+   * @param shuffleServerInfoSet
+   * @param appId
+   * @param shuffleId
+   * @param numMaps
+   * @return
+   */
   @Override
   public boolean sendCommit(Set<ShuffleServerInfo> shuffleServerInfoSet, String appId, int shuffleId, int numMaps) {
+    ForkJoinPool forkJoinPool = new ForkJoinPool(
+        dataCommitPoolSize == -1 ? shuffleServerInfoSet.size() : dataCommitPoolSize
+    );
     AtomicInteger successfulCommit = new AtomicInteger(0);
-    shuffleServerInfoSet.stream().forEach(ssi -> {
-      RssSendCommitRequest request = new RssSendCommitRequest(appId, shuffleId);
-      String errorMsg = "Failed to commit shuffle data to " + ssi + " for shuffleId[" + shuffleId + "]";
-      long startTime = System.currentTimeMillis();
-      try {
-        RssSendCommitResponse response = getShuffleServerClient(ssi).sendCommit(request);
-        if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
-          int commitCount = response.getCommitCount();
-          LOG.info("Successfully sendCommit for appId[" + appId + "], shuffleId[" + shuffleId
-              + "] to ShuffleServer[" + ssi.getId() + "], cost "
-              + (System.currentTimeMillis() - startTime) + " ms, got committed maps["
-              + commitCount + "], map number of stage is " + numMaps);
-          if (commitCount >= numMaps) {
-            RssFinishShuffleResponse rfsResponse =
-                getShuffleServerClient(ssi).finishShuffle(new RssFinishShuffleRequest(appId, shuffleId));
-            if (rfsResponse.getStatusCode() != ResponseStatusCode.SUCCESS) {
-              String msg = "Failed to finish shuffle to " + ssi + " for shuffleId[" + shuffleId
-                  + "] with statusCode " + rfsResponse.getStatusCode();
+    try {
+      forkJoinPool.submit(() -> {
+        shuffleServerInfoSet.parallelStream().forEach(ssi -> {
+          RssSendCommitRequest request = new RssSendCommitRequest(appId, shuffleId);
+          String errorMsg = "Failed to commit shuffle data to " + ssi + " for shuffleId[" + shuffleId + "]";
+          long startTime = System.currentTimeMillis();
+          try {
+            RssSendCommitResponse response = getShuffleServerClient(ssi).sendCommit(request);
+            if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
+              int commitCount = response.getCommitCount();
+              LOG.info("Successfully sendCommit for appId[" + appId + "], shuffleId[" + shuffleId
+                  + "] to ShuffleServer[" + ssi.getId() + "], cost "
+                  + (System.currentTimeMillis() - startTime) + " ms, got committed maps["
+                  + commitCount + "], map number of stage is " + numMaps);
+              if (commitCount >= numMaps) {
+                RssFinishShuffleResponse rfsResponse =
+                    getShuffleServerClient(ssi).finishShuffle(new RssFinishShuffleRequest(appId, shuffleId));
+                if (rfsResponse.getStatusCode() != ResponseStatusCode.SUCCESS) {
+                  String msg = "Failed to finish shuffle to " + ssi + " for shuffleId[" + shuffleId
+                      + "] with statusCode " + rfsResponse.getStatusCode();
+                  LOG.error(msg);
+                  throw new Exception(msg);
+                } else {
+                  LOG.info("Successfully finish shuffle to " + ssi + " for shuffleId[" + shuffleId + "]");
+                }
+              }
+            } else {
+              String msg = errorMsg + " with statusCode " + response.getStatusCode();
               LOG.error(msg);
               throw new Exception(msg);
-            } else {
-              LOG.info("Successfully finish shuffle to " + ssi + " for shuffleId[" + shuffleId + "]");
             }
+            successfulCommit.incrementAndGet();
+          } catch (Exception e) {
+            LOG.error(errorMsg, e);
           }
-        } else {
-          String msg = errorMsg + " with statusCode " + response.getStatusCode();
-          LOG.error(msg);
-          throw new Exception(msg);
-        }
-        successfulCommit.incrementAndGet();
-      } catch (Exception e) {
-        LOG.error(errorMsg, e);
-      }
-    });
+        });
+      }).join();
+    } catch (Exception e) {

Review Comment:
   My fault…..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #59: Sending commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#discussion_r922680037


##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -88,11 +88,12 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
   private int replicaRead;
   private boolean replicaSkipEnabled;
   private int dataTranferPoolSize;
+  private int commitSenderPoolSize = -1;
   private final ForkJoinPool dataTransferPool;
 
   public ShuffleWriteClientImpl(String clientType, int retryMax, long retryIntervalMax, int heartBeatThreadNum,
                                 int replica, int replicaWrite, int replicaRead, boolean replicaSkipEnabled,
-                                int dataTranferPoolSize) {
+                                int dataTranferPoolSize, int commitSenderPoolSize) {

Review Comment:
   We prefer the code style as below
   ```
   public ShuffleWriteClientImpl(
          String clientType, 
          int retryMax,
          long retryIntervalMax,
          int heartBeatThreadNum,
          int replica,
          int replicaWrite,
          int replicaRead,
          boolean replicaSkipEnabled,
          int dataTranferPoolSize,
          int commitSenderPoolSize) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on pull request #59: Sending commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
zuston commented on PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#issuecomment-1186193954

   > We don't recommend to use the storageType `LOCALFILE`, because it has poor performance. But the improvement is ok for me.
   
   The performance of `LOCALFILE` looks better than ess. Due to no need to wait data flushed to disk, the [MEMORY_LOCALFILE](url) will better.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi merged pull request #59: Send commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
jerqi merged PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #59: Sending commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#issuecomment-1186193816

   > > 
   > 
   > If we close the forkjoin pool in the scope of method. I think it’s ok.
   
   Ok


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #59: Sending commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#discussion_r922680296


##########
client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java:
##########
@@ -55,6 +55,12 @@ public class RssMRConfig {
           MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_TRANSFER_POOL_SIZE;
   public static final int RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE =
           RssClientConfig.RSS_DATA_TRANFER_POOL_SIZE_DEFAULT_VALUE;
+  // Commit sender pool size
+  public static final String RSS_COMMIT_SENDER_POOL_SIZE =
+      MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_COMMIT_SENDER_POOL_SIZE;
+  public static final int RSS_COMMIT_SENDER_POOL_SIZE_DEFAULT_VALUE =
+      RssClientConfig.RSS_COMMIT_SENDER_POOL_SIZE_DEFAULT_VALUE;

Review Comment:
   The name's style should be consistent with `data_transfer_pool_size`. How about `data_commit_pool_size`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #59: Sending commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#discussion_r922677763


##########
client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java:
##########
@@ -36,6 +38,8 @@ public class RssClientConfig {
   public static final boolean RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE = true;
   public static final String RSS_DATA_TRANSFER_POOL_SIZE = "rss.client.data.transfer.pool.size";
   public static final int RSS_DATA_TRANFER_POOL_SIZE_DEFAULT_VALUE = Runtime.getRuntime().availableProcessors();
+  public static final String RSS_COMMIT_SENDER_POOL_SIZE = "rss.client.commit.sender.pool.size";
+  public static final int RSS_COMMIT_SENDER_POOL_SIZE_DEFAULT_VALUE = -1;

Review Comment:
   Why is the default value `-1`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #59: Sending commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#discussion_r922689217


##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -247,43 +257,58 @@ public SendShuffleDataResult sendShuffleData(String appId, List<ShuffleBlockInfo
     return new SendShuffleDataResult(successBlockIds, failedBlockIds);
   }
 
+  /**
+   * This method will wait until all shuffle data have been flushed
+   * to durable storage in assigned shuffle servers.
+   * @param shuffleServerInfoSet
+   * @param appId
+   * @param shuffleId
+   * @param numMaps
+   * @return
+   */
   @Override
   public boolean sendCommit(Set<ShuffleServerInfo> shuffleServerInfoSet, String appId, int shuffleId, int numMaps) {
+    ForkJoinPool forkJoinPool = new ForkJoinPool(
+        commitSenderPoolSize == -1 ? shuffleServerInfoSet.size() : commitSenderPoolSize
+    );
     AtomicInteger successfulCommit = new AtomicInteger(0);
-    shuffleServerInfoSet.stream().forEach(ssi -> {
-      RssSendCommitRequest request = new RssSendCommitRequest(appId, shuffleId);
-      String errorMsg = "Failed to commit shuffle data to " + ssi + " for shuffleId[" + shuffleId + "]";
-      long startTime = System.currentTimeMillis();
-      try {
-        RssSendCommitResponse response = getShuffleServerClient(ssi).sendCommit(request);
-        if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
-          int commitCount = response.getCommitCount();
-          LOG.info("Successfully sendCommit for appId[" + appId + "], shuffleId[" + shuffleId
-              + "] to ShuffleServer[" + ssi.getId() + "], cost "
-              + (System.currentTimeMillis() - startTime) + " ms, got committed maps["
-              + commitCount + "], map number of stage is " + numMaps);
-          if (commitCount >= numMaps) {
-            RssFinishShuffleResponse rfsResponse =
-                getShuffleServerClient(ssi).finishShuffle(new RssFinishShuffleRequest(appId, shuffleId));
-            if (rfsResponse.getStatusCode() != ResponseStatusCode.SUCCESS) {
-              String msg = "Failed to finish shuffle to " + ssi + " for shuffleId[" + shuffleId
-                  + "] with statusCode " + rfsResponse.getStatusCode();
-              LOG.error(msg);
-              throw new Exception(msg);
-            } else {
-              LOG.info("Successfully finish shuffle to " + ssi + " for shuffleId[" + shuffleId + "]");
+    forkJoinPool.submit(() -> {
+      shuffleServerInfoSet.parallelStream().forEach(ssi -> {
+        RssSendCommitRequest request = new RssSendCommitRequest(appId, shuffleId);
+        String errorMsg = "Failed to commit shuffle data to " + ssi + " for shuffleId[" + shuffleId + "]";
+        long startTime = System.currentTimeMillis();
+        try {
+          RssSendCommitResponse response = getShuffleServerClient(ssi).sendCommit(request);
+          if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
+            int commitCount = response.getCommitCount();
+            LOG.info("Successfully sendCommit for appId[" + appId + "], shuffleId[" + shuffleId
+                + "] to ShuffleServer[" + ssi.getId() + "], cost "
+                + (System.currentTimeMillis() - startTime) + " ms, got committed maps["
+                + commitCount + "], map number of stage is " + numMaps);
+            if (commitCount >= numMaps) {
+              RssFinishShuffleResponse rfsResponse =
+                  getShuffleServerClient(ssi).finishShuffle(new RssFinishShuffleRequest(appId, shuffleId));
+              if (rfsResponse.getStatusCode() != ResponseStatusCode.SUCCESS) {
+                String msg = "Failed to finish shuffle to " + ssi + " for shuffleId[" + shuffleId
+                    + "] with statusCode " + rfsResponse.getStatusCode();
+                LOG.error(msg);
+                throw new Exception(msg);
+              } else {
+                LOG.info("Successfully finish shuffle to " + ssi + " for shuffleId[" + shuffleId + "]");
+              }
             }
+          } else {
+            String msg = errorMsg + " with statusCode " + response.getStatusCode();
+            LOG.error(msg);
+            throw new Exception(msg);
           }
-        } else {
-          String msg = errorMsg + " with statusCode " + response.getStatusCode();
-          LOG.error(msg);
-          throw new Exception(msg);
+          successfulCommit.incrementAndGet();
+        } catch (Exception e) {
+          LOG.error(errorMsg, e);
         }
-        successfulCommit.incrementAndGet();
-      } catch (Exception e) {
-        LOG.error(errorMsg, e);
-      }
-    });
+      });
+    }).join();
+    forkJoinPool.shutdownNow();

Review Comment:
   Should we use 
   ```
   try {
   
   } finally {
     forkJoinPool.shutdownNow();
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #59: Sending commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#discussion_r922688197


##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -247,43 +257,58 @@ public SendShuffleDataResult sendShuffleData(String appId, List<ShuffleBlockInfo
     return new SendShuffleDataResult(successBlockIds, failedBlockIds);
   }
 
+  /**
+   * This method will wait until all shuffle data have been flushed
+   * to durable storage in assigned shuffle servers.
+   * @param shuffleServerInfoSet
+   * @param appId
+   * @param shuffleId
+   * @param numMaps
+   * @return
+   */
   @Override
   public boolean sendCommit(Set<ShuffleServerInfo> shuffleServerInfoSet, String appId, int shuffleId, int numMaps) {
+    ForkJoinPool forkJoinPool = new ForkJoinPool(
+        commitSenderPoolSize == -1 ? shuffleServerInfoSet.size() : commitSenderPoolSize
+    );
     AtomicInteger successfulCommit = new AtomicInteger(0);
-    shuffleServerInfoSet.stream().forEach(ssi -> {
-      RssSendCommitRequest request = new RssSendCommitRequest(appId, shuffleId);
-      String errorMsg = "Failed to commit shuffle data to " + ssi + " for shuffleId[" + shuffleId + "]";
-      long startTime = System.currentTimeMillis();
-      try {
-        RssSendCommitResponse response = getShuffleServerClient(ssi).sendCommit(request);
-        if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
-          int commitCount = response.getCommitCount();
-          LOG.info("Successfully sendCommit for appId[" + appId + "], shuffleId[" + shuffleId
-              + "] to ShuffleServer[" + ssi.getId() + "], cost "
-              + (System.currentTimeMillis() - startTime) + " ms, got committed maps["
-              + commitCount + "], map number of stage is " + numMaps);
-          if (commitCount >= numMaps) {
-            RssFinishShuffleResponse rfsResponse =
-                getShuffleServerClient(ssi).finishShuffle(new RssFinishShuffleRequest(appId, shuffleId));
-            if (rfsResponse.getStatusCode() != ResponseStatusCode.SUCCESS) {
-              String msg = "Failed to finish shuffle to " + ssi + " for shuffleId[" + shuffleId
-                  + "] with statusCode " + rfsResponse.getStatusCode();
-              LOG.error(msg);
-              throw new Exception(msg);
-            } else {
-              LOG.info("Successfully finish shuffle to " + ssi + " for shuffleId[" + shuffleId + "]");
+    forkJoinPool.submit(() -> {
+      shuffleServerInfoSet.parallelStream().forEach(ssi -> {
+        RssSendCommitRequest request = new RssSendCommitRequest(appId, shuffleId);
+        String errorMsg = "Failed to commit shuffle data to " + ssi + " for shuffleId[" + shuffleId + "]";
+        long startTime = System.currentTimeMillis();
+        try {
+          RssSendCommitResponse response = getShuffleServerClient(ssi).sendCommit(request);
+          if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
+            int commitCount = response.getCommitCount();
+            LOG.info("Successfully sendCommit for appId[" + appId + "], shuffleId[" + shuffleId
+                + "] to ShuffleServer[" + ssi.getId() + "], cost "
+                + (System.currentTimeMillis() - startTime) + " ms, got committed maps["
+                + commitCount + "], map number of stage is " + numMaps);
+            if (commitCount >= numMaps) {
+              RssFinishShuffleResponse rfsResponse =
+                  getShuffleServerClient(ssi).finishShuffle(new RssFinishShuffleRequest(appId, shuffleId));
+              if (rfsResponse.getStatusCode() != ResponseStatusCode.SUCCESS) {
+                String msg = "Failed to finish shuffle to " + ssi + " for shuffleId[" + shuffleId
+                    + "] with statusCode " + rfsResponse.getStatusCode();
+                LOG.error(msg);
+                throw new Exception(msg);
+              } else {
+                LOG.info("Successfully finish shuffle to " + ssi + " for shuffleId[" + shuffleId + "]");
+              }
             }
+          } else {
+            String msg = errorMsg + " with statusCode " + response.getStatusCode();
+            LOG.error(msg);
+            throw new Exception(msg);
           }
-        } else {
-          String msg = errorMsg + " with statusCode " + response.getStatusCode();
-          LOG.error(msg);
-          throw new Exception(msg);
+          successfulCommit.incrementAndGet();
+        } catch (Exception e) {
+          LOG.error(errorMsg, e);
         }
-        successfulCommit.incrementAndGet();
-      } catch (Exception e) {
-        LOG.error(errorMsg, e);
-      }
-    });
+      });
+    }).join();
+    forkJoinPool.shutdownNow();

Review Comment:
   Enn. As i know, the shutDownNow actually will not effect the performance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #59: Sending commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#issuecomment-1186210476

   > Besides I think i can submit new PR to let `registerShuffleServer ` do the same optimization
   
   We'd better have performance tests. `RegisterShuffleServer` may don't cost too much time. The optimization have less effect.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #59: Sending commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#issuecomment-1186188461

   https://github.com/apache/incubator-uniffle/commit/4b5389f2b3a8b382a7409813d5c811985c9c0825
   In this pr, we use method `stream` replace method `parallelStream`. It may be bad choice. Method `registerShuffleServer` use method `stream`, too. Is it possible to improve performance to use method `parallelStream` in method `registerShuffleServer`? Will it create too many forkjoinPool?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #59: Sending commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#discussion_r922679181


##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -247,43 +249,57 @@ public SendShuffleDataResult sendShuffleData(String appId, List<ShuffleBlockInfo
     return new SendShuffleDataResult(successBlockIds, failedBlockIds);
   }
 
+  /**
+   * This method will wait until all shuffle data have been spilled

Review Comment:
   `spilled` -> `flushed`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #59: Sending commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#issuecomment-1186231072

   Could you update the document because this pr introduce the user-facing change?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #59: Sending commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#issuecomment-1186167641

   Do you have performance tests? I guess this pr can't improve the performance. Because the performance bottleneck of `commit operation` is on the shuffle server in my opinion.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #59: Sending commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#discussion_r922680037


##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -88,11 +88,12 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
   private int replicaRead;
   private boolean replicaSkipEnabled;
   private int dataTranferPoolSize;
+  private int commitSenderPoolSize = -1;
   private final ForkJoinPool dataTransferPool;
 
   public ShuffleWriteClientImpl(String clientType, int retryMax, long retryIntervalMax, int heartBeatThreadNum,
                                 int replica, int replicaWrite, int replicaRead, boolean replicaSkipEnabled,
-                                int dataTranferPoolSize) {
+                                int dataTranferPoolSize, int commitSenderPoolSize) {

Review Comment:
   We prefer the code style as below
   ```
   public ShuffleWriteClientImpl(
          String clientType, 
          int retryMax,
          long retryIntervalMax,
          int heartBeatThreadNum,
          int replica,
          int replicaWrite,
          int replicaRead,
          boolean replicaSkipEnabled,
          int dataTranferPoolSize,
          int commitSenderPoolSize
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on pull request #59: Sending commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
zuston commented on PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#issuecomment-1186174012

   Yes. I tested
   I use 1000 executors, single executor 1g/1core to run terasort 1TB.
   
   When using localfile mode, it cost 7.3 min.
   And when i apply this PR, it cost 6.1 min
   
   @jerqi 
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #59: Sending commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#discussion_r922680296


##########
client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java:
##########
@@ -55,6 +55,12 @@ public class RssMRConfig {
           MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_TRANSFER_POOL_SIZE;
   public static final int RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE =
           RssClientConfig.RSS_DATA_TRANFER_POOL_SIZE_DEFAULT_VALUE;
+  // Commit sender pool size
+  public static final String RSS_COMMIT_SENDER_POOL_SIZE =
+      MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_COMMIT_SENDER_POOL_SIZE;
+  public static final int RSS_COMMIT_SENDER_POOL_SIZE_DEFAULT_VALUE =
+      RssClientConfig.RSS_COMMIT_SENDER_POOL_SIZE_DEFAULT_VALUE;

Review Comment:
   The name should be consistent with `data_transfer_pool_size`. How about `data_commit_pool_size`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #59: Sending commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#discussion_r922679689


##########
client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java:
##########
@@ -17,6 +17,8 @@
 
 package org.apache.uniffle.client.util;
 
+import org.apache.hadoop.io.OutputBuffer;

Review Comment:
   Why do we need this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #59: Sending commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#issuecomment-1186185358

   Could you update the document about this feature?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #59: Sending commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#discussion_r922690680


##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -247,43 +257,58 @@ public SendShuffleDataResult sendShuffleData(String appId, List<ShuffleBlockInfo
     return new SendShuffleDataResult(successBlockIds, failedBlockIds);
   }
 
+  /**
+   * This method will wait until all shuffle data have been flushed
+   * to durable storage in assigned shuffle servers.
+   * @param shuffleServerInfoSet
+   * @param appId
+   * @param shuffleId
+   * @param numMaps
+   * @return
+   */
   @Override
   public boolean sendCommit(Set<ShuffleServerInfo> shuffleServerInfoSet, String appId, int shuffleId, int numMaps) {
+    ForkJoinPool forkJoinPool = new ForkJoinPool(
+        commitSenderPoolSize == -1 ? shuffleServerInfoSet.size() : commitSenderPoolSize
+    );
     AtomicInteger successfulCommit = new AtomicInteger(0);
-    shuffleServerInfoSet.stream().forEach(ssi -> {
-      RssSendCommitRequest request = new RssSendCommitRequest(appId, shuffleId);
-      String errorMsg = "Failed to commit shuffle data to " + ssi + " for shuffleId[" + shuffleId + "]";
-      long startTime = System.currentTimeMillis();
-      try {
-        RssSendCommitResponse response = getShuffleServerClient(ssi).sendCommit(request);
-        if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
-          int commitCount = response.getCommitCount();
-          LOG.info("Successfully sendCommit for appId[" + appId + "], shuffleId[" + shuffleId
-              + "] to ShuffleServer[" + ssi.getId() + "], cost "
-              + (System.currentTimeMillis() - startTime) + " ms, got committed maps["
-              + commitCount + "], map number of stage is " + numMaps);
-          if (commitCount >= numMaps) {
-            RssFinishShuffleResponse rfsResponse =
-                getShuffleServerClient(ssi).finishShuffle(new RssFinishShuffleRequest(appId, shuffleId));
-            if (rfsResponse.getStatusCode() != ResponseStatusCode.SUCCESS) {
-              String msg = "Failed to finish shuffle to " + ssi + " for shuffleId[" + shuffleId
-                  + "] with statusCode " + rfsResponse.getStatusCode();
-              LOG.error(msg);
-              throw new Exception(msg);
-            } else {
-              LOG.info("Successfully finish shuffle to " + ssi + " for shuffleId[" + shuffleId + "]");
+    forkJoinPool.submit(() -> {
+      shuffleServerInfoSet.parallelStream().forEach(ssi -> {
+        RssSendCommitRequest request = new RssSendCommitRequest(appId, shuffleId);
+        String errorMsg = "Failed to commit shuffle data to " + ssi + " for shuffleId[" + shuffleId + "]";
+        long startTime = System.currentTimeMillis();
+        try {
+          RssSendCommitResponse response = getShuffleServerClient(ssi).sendCommit(request);
+          if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
+            int commitCount = response.getCommitCount();
+            LOG.info("Successfully sendCommit for appId[" + appId + "], shuffleId[" + shuffleId
+                + "] to ShuffleServer[" + ssi.getId() + "], cost "
+                + (System.currentTimeMillis() - startTime) + " ms, got committed maps["
+                + commitCount + "], map number of stage is " + numMaps);
+            if (commitCount >= numMaps) {
+              RssFinishShuffleResponse rfsResponse =
+                  getShuffleServerClient(ssi).finishShuffle(new RssFinishShuffleRequest(appId, shuffleId));
+              if (rfsResponse.getStatusCode() != ResponseStatusCode.SUCCESS) {
+                String msg = "Failed to finish shuffle to " + ssi + " for shuffleId[" + shuffleId
+                    + "] with statusCode " + rfsResponse.getStatusCode();
+                LOG.error(msg);
+                throw new Exception(msg);
+              } else {
+                LOG.info("Successfully finish shuffle to " + ssi + " for shuffleId[" + shuffleId + "]");
+              }
             }
+          } else {
+            String msg = errorMsg + " with statusCode " + response.getStatusCode();
+            LOG.error(msg);
+            throw new Exception(msg);
           }
-        } else {
-          String msg = errorMsg + " with statusCode " + response.getStatusCode();
-          LOG.error(msg);
-          throw new Exception(msg);
+          successfulCommit.incrementAndGet();
+        } catch (Exception e) {
+          LOG.error(errorMsg, e);
         }
-        successfulCommit.incrementAndGet();
-      } catch (Exception e) {
-        LOG.error(errorMsg, e);
-      }
-    });
+      });
+    }).join();
+    forkJoinPool.shutdownNow();

Review Comment:
   I just worry that we can't shutdown forkjoinpool because of exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on pull request #59: Send commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
zuston commented on PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#issuecomment-1186377325

   Done @jerqi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] codecov-commenter commented on pull request #59: Sending commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#issuecomment-1186152260

   # [Codecov](https://codecov.io/gh/apache/incubator-uniffle/pull/59?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#59](https://codecov.io/gh/apache/incubator-uniffle/pull/59?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (a1105a3) into [master](https://codecov.io/gh/apache/incubator-uniffle/commit/e48f74ee657eca1c6dc05d558d88525c01733c3a?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (e48f74e) will **decrease** coverage by `1.03%`.
   > The diff coverage is `8.57%`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master      #59      +/-   ##
   ============================================
   - Coverage     55.21%   54.17%   -1.04%     
   + Complexity     1111     1035      -76     
   ============================================
     Files           148      139       -9     
     Lines          7953     7585     -368     
     Branches        760      730      -30     
   ============================================
   - Hits           4391     4109     -282     
   + Misses         3321     3243      -78     
   + Partials        241      233       -8     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-uniffle/pull/59?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../java/org/apache/hadoop/mapreduce/RssMRConfig.java](https://codecov.io/gh/apache/incubator-uniffle/pull/59/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50LW1yL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9oYWRvb3AvbWFwcmVkdWNlL1Jzc01SQ29uZmlnLmphdmE=) | `87.50% <ø> (ø)` | |
   | [...n/java/org/apache/hadoop/mapreduce/RssMRUtils.java](https://codecov.io/gh/apache/incubator-uniffle/pull/59/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50LW1yL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9oYWRvb3AvbWFwcmVkdWNlL1Jzc01SVXRpbHMuamF2YQ==) | `31.70% <0.00%> (-0.40%)` | :arrow_down: |
   | [...e/uniffle/client/factory/ShuffleClientFactory.java](https://codecov.io/gh/apache/incubator-uniffle/pull/59/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL2NsaWVudC9mYWN0b3J5L1NodWZmbGVDbGllbnRGYWN0b3J5LmphdmE=) | `0.00% <ø> (ø)` | |
   | [...rg/apache/uniffle/client/util/RssClientConfig.java](https://codecov.io/gh/apache/incubator-uniffle/pull/59/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL2NsaWVudC91dGlsL1Jzc0NsaWVudENvbmZpZy5qYXZh) | `0.00% <ø> (ø)` | |
   | [...he/uniffle/client/impl/ShuffleWriteClientImpl.java](https://codecov.io/gh/apache/incubator-uniffle/pull/59/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL2NsaWVudC9pbXBsL1NodWZmbGVXcml0ZUNsaWVudEltcGwuamF2YQ==) | `26.15% <8.82%> (+0.16%)` | :arrow_up: |
   | [...storage/handler/impl/DataSkippableReadHandler.java](https://codecov.io/gh/apache/incubator-uniffle/pull/59/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmFnZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvdW5pZmZsZS9zdG9yYWdlL2hhbmRsZXIvaW1wbC9EYXRhU2tpcHBhYmxlUmVhZEhhbmRsZXIuamF2YQ==) | `81.25% <0.00%> (-3.13%)` | :arrow_down: |
   | [...org/apache/uniffle/server/ShuffleFlushManager.java](https://codecov.io/gh/apache/incubator-uniffle/pull/59/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL3NlcnZlci9TaHVmZmxlRmx1c2hNYW5hZ2VyLmphdmE=) | `76.70% <0.00%> (-1.71%)` | :arrow_down: |
   | [...e/uniffle/server/storage/SingleStorageManager.java](https://codecov.io/gh/apache/incubator-uniffle/pull/59/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL3NlcnZlci9zdG9yYWdlL1NpbmdsZVN0b3JhZ2VNYW5hZ2VyLmphdmE=) | `65.57% <0.00%> (-1.64%)` | :arrow_down: |
   | [.../apache/uniffle/coordinator/ClientConfManager.java](https://codecov.io/gh/apache/incubator-uniffle/pull/59/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29vcmRpbmF0b3Ivc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3VuaWZmbGUvY29vcmRpbmF0b3IvQ2xpZW50Q29uZk1hbmFnZXIuamF2YQ==) | `91.54% <0.00%> (-1.41%)` | :arrow_down: |
   | ... and [9 more](https://codecov.io/gh/apache/incubator-uniffle/pull/59/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-uniffle/pull/59?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-uniffle/pull/59?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [e48f74e...a1105a3](https://codecov.io/gh/apache/incubator-uniffle/pull/59?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #59: Sending commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#discussion_r922689373


##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -247,43 +257,58 @@ public SendShuffleDataResult sendShuffleData(String appId, List<ShuffleBlockInfo
     return new SendShuffleDataResult(successBlockIds, failedBlockIds);
   }
 
+  /**
+   * This method will wait until all shuffle data have been flushed
+   * to durable storage in assigned shuffle servers.
+   * @param shuffleServerInfoSet
+   * @param appId
+   * @param shuffleId
+   * @param numMaps
+   * @return
+   */
   @Override
   public boolean sendCommit(Set<ShuffleServerInfo> shuffleServerInfoSet, String appId, int shuffleId, int numMaps) {
+    ForkJoinPool forkJoinPool = new ForkJoinPool(
+        commitSenderPoolSize == -1 ? shuffleServerInfoSet.size() : commitSenderPoolSize
+    );
     AtomicInteger successfulCommit = new AtomicInteger(0);
-    shuffleServerInfoSet.stream().forEach(ssi -> {
-      RssSendCommitRequest request = new RssSendCommitRequest(appId, shuffleId);
-      String errorMsg = "Failed to commit shuffle data to " + ssi + " for shuffleId[" + shuffleId + "]";
-      long startTime = System.currentTimeMillis();
-      try {
-        RssSendCommitResponse response = getShuffleServerClient(ssi).sendCommit(request);
-        if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
-          int commitCount = response.getCommitCount();
-          LOG.info("Successfully sendCommit for appId[" + appId + "], shuffleId[" + shuffleId
-              + "] to ShuffleServer[" + ssi.getId() + "], cost "
-              + (System.currentTimeMillis() - startTime) + " ms, got committed maps["
-              + commitCount + "], map number of stage is " + numMaps);
-          if (commitCount >= numMaps) {
-            RssFinishShuffleResponse rfsResponse =
-                getShuffleServerClient(ssi).finishShuffle(new RssFinishShuffleRequest(appId, shuffleId));
-            if (rfsResponse.getStatusCode() != ResponseStatusCode.SUCCESS) {
-              String msg = "Failed to finish shuffle to " + ssi + " for shuffleId[" + shuffleId
-                  + "] with statusCode " + rfsResponse.getStatusCode();
-              LOG.error(msg);
-              throw new Exception(msg);
-            } else {
-              LOG.info("Successfully finish shuffle to " + ssi + " for shuffleId[" + shuffleId + "]");
+    forkJoinPool.submit(() -> {
+      shuffleServerInfoSet.parallelStream().forEach(ssi -> {
+        RssSendCommitRequest request = new RssSendCommitRequest(appId, shuffleId);
+        String errorMsg = "Failed to commit shuffle data to " + ssi + " for shuffleId[" + shuffleId + "]";
+        long startTime = System.currentTimeMillis();
+        try {
+          RssSendCommitResponse response = getShuffleServerClient(ssi).sendCommit(request);
+          if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
+            int commitCount = response.getCommitCount();
+            LOG.info("Successfully sendCommit for appId[" + appId + "], shuffleId[" + shuffleId
+                + "] to ShuffleServer[" + ssi.getId() + "], cost "
+                + (System.currentTimeMillis() - startTime) + " ms, got committed maps["
+                + commitCount + "], map number of stage is " + numMaps);
+            if (commitCount >= numMaps) {
+              RssFinishShuffleResponse rfsResponse =
+                  getShuffleServerClient(ssi).finishShuffle(new RssFinishShuffleRequest(appId, shuffleId));
+              if (rfsResponse.getStatusCode() != ResponseStatusCode.SUCCESS) {
+                String msg = "Failed to finish shuffle to " + ssi + " for shuffleId[" + shuffleId
+                    + "] with statusCode " + rfsResponse.getStatusCode();
+                LOG.error(msg);
+                throw new Exception(msg);
+              } else {
+                LOG.info("Successfully finish shuffle to " + ssi + " for shuffleId[" + shuffleId + "]");
+              }
             }
+          } else {
+            String msg = errorMsg + " with statusCode " + response.getStatusCode();
+            LOG.error(msg);
+            throw new Exception(msg);
           }
-        } else {
-          String msg = errorMsg + " with statusCode " + response.getStatusCode();
-          LOG.error(msg);
-          throw new Exception(msg);
+          successfulCommit.incrementAndGet();
+        } catch (Exception e) {
+          LOG.error(errorMsg, e);
         }
-        successfulCommit.incrementAndGet();
-      } catch (Exception e) {
-        LOG.error(errorMsg, e);
-      }
-    });
+      });
+    }).join();
+    forkJoinPool.shutdownNow();

Review Comment:
   https://stackoverflow.com/questions/70934558/forkjoinpoolshutdown-vs-forkjoinpoolshutdownnow-after-forkjoinpooljoin



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #59: Sending commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#discussion_r922690489


##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -247,43 +257,58 @@ public SendShuffleDataResult sendShuffleData(String appId, List<ShuffleBlockInfo
     return new SendShuffleDataResult(successBlockIds, failedBlockIds);
   }
 
+  /**
+   * This method will wait until all shuffle data have been flushed
+   * to durable storage in assigned shuffle servers.
+   * @param shuffleServerInfoSet
+   * @param appId
+   * @param shuffleId
+   * @param numMaps
+   * @return
+   */
   @Override
   public boolean sendCommit(Set<ShuffleServerInfo> shuffleServerInfoSet, String appId, int shuffleId, int numMaps) {
+    ForkJoinPool forkJoinPool = new ForkJoinPool(
+        commitSenderPoolSize == -1 ? shuffleServerInfoSet.size() : commitSenderPoolSize
+    );
     AtomicInteger successfulCommit = new AtomicInteger(0);
-    shuffleServerInfoSet.stream().forEach(ssi -> {
-      RssSendCommitRequest request = new RssSendCommitRequest(appId, shuffleId);
-      String errorMsg = "Failed to commit shuffle data to " + ssi + " for shuffleId[" + shuffleId + "]";
-      long startTime = System.currentTimeMillis();
-      try {
-        RssSendCommitResponse response = getShuffleServerClient(ssi).sendCommit(request);
-        if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
-          int commitCount = response.getCommitCount();
-          LOG.info("Successfully sendCommit for appId[" + appId + "], shuffleId[" + shuffleId
-              + "] to ShuffleServer[" + ssi.getId() + "], cost "
-              + (System.currentTimeMillis() - startTime) + " ms, got committed maps["
-              + commitCount + "], map number of stage is " + numMaps);
-          if (commitCount >= numMaps) {
-            RssFinishShuffleResponse rfsResponse =
-                getShuffleServerClient(ssi).finishShuffle(new RssFinishShuffleRequest(appId, shuffleId));
-            if (rfsResponse.getStatusCode() != ResponseStatusCode.SUCCESS) {
-              String msg = "Failed to finish shuffle to " + ssi + " for shuffleId[" + shuffleId
-                  + "] with statusCode " + rfsResponse.getStatusCode();
-              LOG.error(msg);
-              throw new Exception(msg);
-            } else {
-              LOG.info("Successfully finish shuffle to " + ssi + " for shuffleId[" + shuffleId + "]");
+    forkJoinPool.submit(() -> {
+      shuffleServerInfoSet.parallelStream().forEach(ssi -> {
+        RssSendCommitRequest request = new RssSendCommitRequest(appId, shuffleId);
+        String errorMsg = "Failed to commit shuffle data to " + ssi + " for shuffleId[" + shuffleId + "]";
+        long startTime = System.currentTimeMillis();
+        try {
+          RssSendCommitResponse response = getShuffleServerClient(ssi).sendCommit(request);
+          if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
+            int commitCount = response.getCommitCount();
+            LOG.info("Successfully sendCommit for appId[" + appId + "], shuffleId[" + shuffleId
+                + "] to ShuffleServer[" + ssi.getId() + "], cost "
+                + (System.currentTimeMillis() - startTime) + " ms, got committed maps["
+                + commitCount + "], map number of stage is " + numMaps);
+            if (commitCount >= numMaps) {
+              RssFinishShuffleResponse rfsResponse =
+                  getShuffleServerClient(ssi).finishShuffle(new RssFinishShuffleRequest(appId, shuffleId));
+              if (rfsResponse.getStatusCode() != ResponseStatusCode.SUCCESS) {
+                String msg = "Failed to finish shuffle to " + ssi + " for shuffleId[" + shuffleId
+                    + "] with statusCode " + rfsResponse.getStatusCode();
+                LOG.error(msg);
+                throw new Exception(msg);
+              } else {
+                LOG.info("Successfully finish shuffle to " + ssi + " for shuffleId[" + shuffleId + "]");
+              }
             }
+          } else {
+            String msg = errorMsg + " with statusCode " + response.getStatusCode();
+            LOG.error(msg);
+            throw new Exception(msg);
           }
-        } else {
-          String msg = errorMsg + " with statusCode " + response.getStatusCode();
-          LOG.error(msg);
-          throw new Exception(msg);
+          successfulCommit.incrementAndGet();
+        } catch (Exception e) {
+          LOG.error(errorMsg, e);
         }
-        successfulCommit.incrementAndGet();
-      } catch (Exception e) {
-        LOG.error(errorMsg, e);
-      }
-    });
+      });
+    }).join();
+    forkJoinPool.shutdownNow();

Review Comment:
   > It looks ShutdownNow or Shutdown are all OK due to the operation of `join`
   It's ok for me to use method `shutdownNow()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on pull request #59: Sending commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
zuston commented on PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#issuecomment-1186175099

   > Do you have performance tests? I guess this pr can't improve the performance. Because the performance bottleneck of `commit operation` is on the shuffle server in my opinion.
   
   As I know the spilling to disk event need to be triggered by client side. So if the previous trigger is blocked, the next one will 
   not be triggered.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #59: Sending commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#discussion_r922677763


##########
client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java:
##########
@@ -36,6 +38,8 @@ public class RssClientConfig {
   public static final boolean RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE = true;
   public static final String RSS_DATA_TRANSFER_POOL_SIZE = "rss.client.data.transfer.pool.size";
   public static final int RSS_DATA_TRANFER_POOL_SIZE_DEFAULT_VALUE = Runtime.getRuntime().availableProcessors();
+  public static final String RSS_COMMIT_SENDER_POOL_SIZE = "rss.client.commit.sender.pool.size";
+  public static final int RSS_COMMIT_SENDER_POOL_SIZE_DEFAULT_VALUE = -1;

Review Comment:
   Why is the default value `-1`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #59: Sending commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#issuecomment-1186178976

   We don't recommend to use the storageType `LOCALFILE`, because it has poor performance. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #59: Sending commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#discussion_r922687386


##########
client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java:
##########
@@ -55,6 +55,12 @@ public class RssMRConfig {
           MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_TRANSFER_POOL_SIZE;
   public static final int RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE =
           RssClientConfig.RSS_DATA_TRANFER_POOL_SIZE_DEFAULT_VALUE;
+  // Commit sender pool size

Review Comment:
   Got it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #59: Sending commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#discussion_r922687334


##########
client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java:
##########
@@ -81,6 +81,7 @@ public class RssShuffleManager implements ShuffleManager {
   private final int dataReplicaRead;
   private final boolean dataReplicaSkipEnabled;
   private final int dataTransferPoolSize;
+  private final int commitSenderPoolSize;

Review Comment:
   ditto.



##########
client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java:
##########
@@ -91,11 +91,13 @@ public static ShuffleWriteClient createShuffleClient(JobConf jobConf) {
         RssMRConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE);
     int dataTransferPoolSize = jobConf.getInt(RssMRConfig.RSS_DATA_TRANSFER_POOL_SIZE,
         RssMRConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE);
+    int commitSenderPoolSize = jobConf.getInt(RssMRConfig.RSS_DATA_COMMIT_POOL_SIZE,

Review Comment:
   Could we modify this variable name?



##########
client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java:
##########
@@ -36,9 +36,10 @@ public static ShuffleClientFactory getInstance() {
 
   public ShuffleWriteClient createShuffleWriteClient(
       String clientType, int retryMax, long retryIntervalMax, int heartBeatThreadNum,
-      int replica, int replicaWrite, int replicaRead, boolean replicaSkipEnabled, int dataTransferPoolSize) {
+      int replica, int replicaWrite, int replicaRead, boolean replicaSkipEnabled, int dataTransferPoolSize,
+      int commitSenderPoolSize) {
     return new ShuffleWriteClientImpl(clientType, retryMax, retryIntervalMax, heartBeatThreadNum,
-      replica, replicaWrite, replicaRead, replicaSkipEnabled, dataTransferPoolSize);
+      replica, replicaWrite, replicaRead, replicaSkipEnabled, dataTransferPoolSize, commitSenderPoolSize);

Review Comment:
   ditto.



##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -247,43 +257,58 @@ public SendShuffleDataResult sendShuffleData(String appId, List<ShuffleBlockInfo
     return new SendShuffleDataResult(successBlockIds, failedBlockIds);
   }
 
+  /**
+   * This method will wait until all shuffle data have been flushed
+   * to durable storage in assigned shuffle servers.
+   * @param shuffleServerInfoSet
+   * @param appId
+   * @param shuffleId
+   * @param numMaps
+   * @return
+   */
   @Override
   public boolean sendCommit(Set<ShuffleServerInfo> shuffleServerInfoSet, String appId, int shuffleId, int numMaps) {
+    ForkJoinPool forkJoinPool = new ForkJoinPool(
+        commitSenderPoolSize == -1 ? shuffleServerInfoSet.size() : commitSenderPoolSize
+    );
     AtomicInteger successfulCommit = new AtomicInteger(0);
-    shuffleServerInfoSet.stream().forEach(ssi -> {
-      RssSendCommitRequest request = new RssSendCommitRequest(appId, shuffleId);
-      String errorMsg = "Failed to commit shuffle data to " + ssi + " for shuffleId[" + shuffleId + "]";
-      long startTime = System.currentTimeMillis();
-      try {
-        RssSendCommitResponse response = getShuffleServerClient(ssi).sendCommit(request);
-        if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
-          int commitCount = response.getCommitCount();
-          LOG.info("Successfully sendCommit for appId[" + appId + "], shuffleId[" + shuffleId
-              + "] to ShuffleServer[" + ssi.getId() + "], cost "
-              + (System.currentTimeMillis() - startTime) + " ms, got committed maps["
-              + commitCount + "], map number of stage is " + numMaps);
-          if (commitCount >= numMaps) {
-            RssFinishShuffleResponse rfsResponse =
-                getShuffleServerClient(ssi).finishShuffle(new RssFinishShuffleRequest(appId, shuffleId));
-            if (rfsResponse.getStatusCode() != ResponseStatusCode.SUCCESS) {
-              String msg = "Failed to finish shuffle to " + ssi + " for shuffleId[" + shuffleId
-                  + "] with statusCode " + rfsResponse.getStatusCode();
-              LOG.error(msg);
-              throw new Exception(msg);
-            } else {
-              LOG.info("Successfully finish shuffle to " + ssi + " for shuffleId[" + shuffleId + "]");
+    forkJoinPool.submit(() -> {
+      shuffleServerInfoSet.parallelStream().forEach(ssi -> {
+        RssSendCommitRequest request = new RssSendCommitRequest(appId, shuffleId);
+        String errorMsg = "Failed to commit shuffle data to " + ssi + " for shuffleId[" + shuffleId + "]";
+        long startTime = System.currentTimeMillis();
+        try {
+          RssSendCommitResponse response = getShuffleServerClient(ssi).sendCommit(request);
+          if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
+            int commitCount = response.getCommitCount();
+            LOG.info("Successfully sendCommit for appId[" + appId + "], shuffleId[" + shuffleId
+                + "] to ShuffleServer[" + ssi.getId() + "], cost "
+                + (System.currentTimeMillis() - startTime) + " ms, got committed maps["
+                + commitCount + "], map number of stage is " + numMaps);
+            if (commitCount >= numMaps) {
+              RssFinishShuffleResponse rfsResponse =
+                  getShuffleServerClient(ssi).finishShuffle(new RssFinishShuffleRequest(appId, shuffleId));
+              if (rfsResponse.getStatusCode() != ResponseStatusCode.SUCCESS) {
+                String msg = "Failed to finish shuffle to " + ssi + " for shuffleId[" + shuffleId
+                    + "] with statusCode " + rfsResponse.getStatusCode();
+                LOG.error(msg);
+                throw new Exception(msg);
+              } else {
+                LOG.info("Successfully finish shuffle to " + ssi + " for shuffleId[" + shuffleId + "]");
+              }
             }
+          } else {
+            String msg = errorMsg + " with statusCode " + response.getStatusCode();
+            LOG.error(msg);
+            throw new Exception(msg);
           }
-        } else {
-          String msg = errorMsg + " with statusCode " + response.getStatusCode();
-          LOG.error(msg);
-          throw new Exception(msg);
+          successfulCommit.incrementAndGet();
+        } catch (Exception e) {
+          LOG.error(errorMsg, e);
         }
-        successfulCommit.incrementAndGet();
-      } catch (Exception e) {
-        LOG.error(errorMsg, e);
-      }
-    });
+      });
+    }).join();
+    forkJoinPool.shutdownNow();

Review Comment:
   We add method `shutdownNow`. Maybe we'd better do performance tests again.



##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -88,11 +88,20 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
   private int replicaRead;
   private boolean replicaSkipEnabled;
   private int dataTranferPoolSize;
+  private int commitSenderPoolSize = -1;
   private final ForkJoinPool dataTransferPool;
 
-  public ShuffleWriteClientImpl(String clientType, int retryMax, long retryIntervalMax, int heartBeatThreadNum,
-                                int replica, int replicaWrite, int replicaRead, boolean replicaSkipEnabled,
-                                int dataTranferPoolSize) {
+  public ShuffleWriteClientImpl(
+      String clientType,
+      int retryMax,
+      long retryIntervalMax,
+      int heartBeatThreadNum,
+      int replica,
+      int replicaWrite,
+      int replicaRead,
+      boolean replicaSkipEnabled,
+      int dataTranferPoolSize,
+      int commitSenderPoolSize) {

Review Comment:
   ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on pull request #59: Sending commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
zuston commented on PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#issuecomment-1186209099

   Besides I think i can submit new PR to let `registerShuffleServer ` do the same optimization


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #59: Sending commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#issuecomment-1186183399

   > Yes. I tested I use 1000 executors, single executor 1g/1core to run terasort 1TB.
   > 
   > When using localfile mode, it cost 7.3 min. And when i apply this PR, it cost 6.1 min
   > 
   > @jerqi
   
   Please put performance test results into `Why are the changes need?`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #59: Sending commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#discussion_r922690226


##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -247,43 +257,58 @@ public SendShuffleDataResult sendShuffleData(String appId, List<ShuffleBlockInfo
     return new SendShuffleDataResult(successBlockIds, failedBlockIds);
   }
 
+  /**
+   * This method will wait until all shuffle data have been flushed
+   * to durable storage in assigned shuffle servers.
+   * @param shuffleServerInfoSet
+   * @param appId
+   * @param shuffleId
+   * @param numMaps
+   * @return
+   */
   @Override
   public boolean sendCommit(Set<ShuffleServerInfo> shuffleServerInfoSet, String appId, int shuffleId, int numMaps) {
+    ForkJoinPool forkJoinPool = new ForkJoinPool(
+        commitSenderPoolSize == -1 ? shuffleServerInfoSet.size() : commitSenderPoolSize
+    );
     AtomicInteger successfulCommit = new AtomicInteger(0);
-    shuffleServerInfoSet.stream().forEach(ssi -> {
-      RssSendCommitRequest request = new RssSendCommitRequest(appId, shuffleId);
-      String errorMsg = "Failed to commit shuffle data to " + ssi + " for shuffleId[" + shuffleId + "]";
-      long startTime = System.currentTimeMillis();
-      try {
-        RssSendCommitResponse response = getShuffleServerClient(ssi).sendCommit(request);
-        if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
-          int commitCount = response.getCommitCount();
-          LOG.info("Successfully sendCommit for appId[" + appId + "], shuffleId[" + shuffleId
-              + "] to ShuffleServer[" + ssi.getId() + "], cost "
-              + (System.currentTimeMillis() - startTime) + " ms, got committed maps["
-              + commitCount + "], map number of stage is " + numMaps);
-          if (commitCount >= numMaps) {
-            RssFinishShuffleResponse rfsResponse =
-                getShuffleServerClient(ssi).finishShuffle(new RssFinishShuffleRequest(appId, shuffleId));
-            if (rfsResponse.getStatusCode() != ResponseStatusCode.SUCCESS) {
-              String msg = "Failed to finish shuffle to " + ssi + " for shuffleId[" + shuffleId
-                  + "] with statusCode " + rfsResponse.getStatusCode();
-              LOG.error(msg);
-              throw new Exception(msg);
-            } else {
-              LOG.info("Successfully finish shuffle to " + ssi + " for shuffleId[" + shuffleId + "]");
+    forkJoinPool.submit(() -> {
+      shuffleServerInfoSet.parallelStream().forEach(ssi -> {
+        RssSendCommitRequest request = new RssSendCommitRequest(appId, shuffleId);
+        String errorMsg = "Failed to commit shuffle data to " + ssi + " for shuffleId[" + shuffleId + "]";
+        long startTime = System.currentTimeMillis();
+        try {
+          RssSendCommitResponse response = getShuffleServerClient(ssi).sendCommit(request);
+          if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
+            int commitCount = response.getCommitCount();
+            LOG.info("Successfully sendCommit for appId[" + appId + "], shuffleId[" + shuffleId
+                + "] to ShuffleServer[" + ssi.getId() + "], cost "
+                + (System.currentTimeMillis() - startTime) + " ms, got committed maps["
+                + commitCount + "], map number of stage is " + numMaps);
+            if (commitCount >= numMaps) {
+              RssFinishShuffleResponse rfsResponse =
+                  getShuffleServerClient(ssi).finishShuffle(new RssFinishShuffleRequest(appId, shuffleId));
+              if (rfsResponse.getStatusCode() != ResponseStatusCode.SUCCESS) {
+                String msg = "Failed to finish shuffle to " + ssi + " for shuffleId[" + shuffleId
+                    + "] with statusCode " + rfsResponse.getStatusCode();
+                LOG.error(msg);
+                throw new Exception(msg);
+              } else {
+                LOG.info("Successfully finish shuffle to " + ssi + " for shuffleId[" + shuffleId + "]");
+              }
             }
+          } else {
+            String msg = errorMsg + " with statusCode " + response.getStatusCode();
+            LOG.error(msg);
+            throw new Exception(msg);
           }
-        } else {
-          String msg = errorMsg + " with statusCode " + response.getStatusCode();
-          LOG.error(msg);
-          throw new Exception(msg);
+          successfulCommit.incrementAndGet();
+        } catch (Exception e) {
+          LOG.error(errorMsg, e);
         }
-        successfulCommit.incrementAndGet();
-      } catch (Exception e) {
-        LOG.error(errorMsg, e);
-      }
-    });
+      });
+    }).join();
+    forkJoinPool.shutdownNow();

Review Comment:
   It looks ShutdownNow or Shutdown are all OK due to the operation of `join`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #59: Sending commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#discussion_r922690680


##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -247,43 +257,58 @@ public SendShuffleDataResult sendShuffleData(String appId, List<ShuffleBlockInfo
     return new SendShuffleDataResult(successBlockIds, failedBlockIds);
   }
 
+  /**
+   * This method will wait until all shuffle data have been flushed
+   * to durable storage in assigned shuffle servers.
+   * @param shuffleServerInfoSet
+   * @param appId
+   * @param shuffleId
+   * @param numMaps
+   * @return
+   */
   @Override
   public boolean sendCommit(Set<ShuffleServerInfo> shuffleServerInfoSet, String appId, int shuffleId, int numMaps) {
+    ForkJoinPool forkJoinPool = new ForkJoinPool(
+        commitSenderPoolSize == -1 ? shuffleServerInfoSet.size() : commitSenderPoolSize
+    );
     AtomicInteger successfulCommit = new AtomicInteger(0);
-    shuffleServerInfoSet.stream().forEach(ssi -> {
-      RssSendCommitRequest request = new RssSendCommitRequest(appId, shuffleId);
-      String errorMsg = "Failed to commit shuffle data to " + ssi + " for shuffleId[" + shuffleId + "]";
-      long startTime = System.currentTimeMillis();
-      try {
-        RssSendCommitResponse response = getShuffleServerClient(ssi).sendCommit(request);
-        if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
-          int commitCount = response.getCommitCount();
-          LOG.info("Successfully sendCommit for appId[" + appId + "], shuffleId[" + shuffleId
-              + "] to ShuffleServer[" + ssi.getId() + "], cost "
-              + (System.currentTimeMillis() - startTime) + " ms, got committed maps["
-              + commitCount + "], map number of stage is " + numMaps);
-          if (commitCount >= numMaps) {
-            RssFinishShuffleResponse rfsResponse =
-                getShuffleServerClient(ssi).finishShuffle(new RssFinishShuffleRequest(appId, shuffleId));
-            if (rfsResponse.getStatusCode() != ResponseStatusCode.SUCCESS) {
-              String msg = "Failed to finish shuffle to " + ssi + " for shuffleId[" + shuffleId
-                  + "] with statusCode " + rfsResponse.getStatusCode();
-              LOG.error(msg);
-              throw new Exception(msg);
-            } else {
-              LOG.info("Successfully finish shuffle to " + ssi + " for shuffleId[" + shuffleId + "]");
+    forkJoinPool.submit(() -> {
+      shuffleServerInfoSet.parallelStream().forEach(ssi -> {
+        RssSendCommitRequest request = new RssSendCommitRequest(appId, shuffleId);
+        String errorMsg = "Failed to commit shuffle data to " + ssi + " for shuffleId[" + shuffleId + "]";
+        long startTime = System.currentTimeMillis();
+        try {
+          RssSendCommitResponse response = getShuffleServerClient(ssi).sendCommit(request);
+          if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
+            int commitCount = response.getCommitCount();
+            LOG.info("Successfully sendCommit for appId[" + appId + "], shuffleId[" + shuffleId
+                + "] to ShuffleServer[" + ssi.getId() + "], cost "
+                + (System.currentTimeMillis() - startTime) + " ms, got committed maps["
+                + commitCount + "], map number of stage is " + numMaps);
+            if (commitCount >= numMaps) {
+              RssFinishShuffleResponse rfsResponse =
+                  getShuffleServerClient(ssi).finishShuffle(new RssFinishShuffleRequest(appId, shuffleId));
+              if (rfsResponse.getStatusCode() != ResponseStatusCode.SUCCESS) {
+                String msg = "Failed to finish shuffle to " + ssi + " for shuffleId[" + shuffleId
+                    + "] with statusCode " + rfsResponse.getStatusCode();
+                LOG.error(msg);
+                throw new Exception(msg);
+              } else {
+                LOG.info("Successfully finish shuffle to " + ssi + " for shuffleId[" + shuffleId + "]");
+              }
             }
+          } else {
+            String msg = errorMsg + " with statusCode " + response.getStatusCode();
+            LOG.error(msg);
+            throw new Exception(msg);
           }
-        } else {
-          String msg = errorMsg + " with statusCode " + response.getStatusCode();
-          LOG.error(msg);
-          throw new Exception(msg);
+          successfulCommit.incrementAndGet();
+        } catch (Exception e) {
+          LOG.error(errorMsg, e);
         }
-        successfulCommit.incrementAndGet();
-      } catch (Exception e) {
-        LOG.error(errorMsg, e);
-      }
-    });
+      });
+    }).join();
+    forkJoinPool.shutdownNow();

Review Comment:
   I just think we can't shutdown forkjoinpool because of exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #59: Sending commit concurrently in client side

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#discussion_r922692419


##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -247,43 +257,58 @@ public SendShuffleDataResult sendShuffleData(String appId, List<ShuffleBlockInfo
     return new SendShuffleDataResult(successBlockIds, failedBlockIds);
   }
 
+  /**
+   * This method will wait until all shuffle data have been flushed
+   * to durable storage in assigned shuffle servers.
+   * @param shuffleServerInfoSet
+   * @param appId
+   * @param shuffleId
+   * @param numMaps
+   * @return
+   */
   @Override
   public boolean sendCommit(Set<ShuffleServerInfo> shuffleServerInfoSet, String appId, int shuffleId, int numMaps) {
+    ForkJoinPool forkJoinPool = new ForkJoinPool(
+        commitSenderPoolSize == -1 ? shuffleServerInfoSet.size() : commitSenderPoolSize
+    );
     AtomicInteger successfulCommit = new AtomicInteger(0);
-    shuffleServerInfoSet.stream().forEach(ssi -> {
-      RssSendCommitRequest request = new RssSendCommitRequest(appId, shuffleId);
-      String errorMsg = "Failed to commit shuffle data to " + ssi + " for shuffleId[" + shuffleId + "]";
-      long startTime = System.currentTimeMillis();
-      try {
-        RssSendCommitResponse response = getShuffleServerClient(ssi).sendCommit(request);
-        if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
-          int commitCount = response.getCommitCount();
-          LOG.info("Successfully sendCommit for appId[" + appId + "], shuffleId[" + shuffleId
-              + "] to ShuffleServer[" + ssi.getId() + "], cost "
-              + (System.currentTimeMillis() - startTime) + " ms, got committed maps["
-              + commitCount + "], map number of stage is " + numMaps);
-          if (commitCount >= numMaps) {
-            RssFinishShuffleResponse rfsResponse =
-                getShuffleServerClient(ssi).finishShuffle(new RssFinishShuffleRequest(appId, shuffleId));
-            if (rfsResponse.getStatusCode() != ResponseStatusCode.SUCCESS) {
-              String msg = "Failed to finish shuffle to " + ssi + " for shuffleId[" + shuffleId
-                  + "] with statusCode " + rfsResponse.getStatusCode();
-              LOG.error(msg);
-              throw new Exception(msg);
-            } else {
-              LOG.info("Successfully finish shuffle to " + ssi + " for shuffleId[" + shuffleId + "]");
+    forkJoinPool.submit(() -> {
+      shuffleServerInfoSet.parallelStream().forEach(ssi -> {
+        RssSendCommitRequest request = new RssSendCommitRequest(appId, shuffleId);
+        String errorMsg = "Failed to commit shuffle data to " + ssi + " for shuffleId[" + shuffleId + "]";
+        long startTime = System.currentTimeMillis();
+        try {
+          RssSendCommitResponse response = getShuffleServerClient(ssi).sendCommit(request);
+          if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
+            int commitCount = response.getCommitCount();
+            LOG.info("Successfully sendCommit for appId[" + appId + "], shuffleId[" + shuffleId
+                + "] to ShuffleServer[" + ssi.getId() + "], cost "
+                + (System.currentTimeMillis() - startTime) + " ms, got committed maps["
+                + commitCount + "], map number of stage is " + numMaps);
+            if (commitCount >= numMaps) {
+              RssFinishShuffleResponse rfsResponse =
+                  getShuffleServerClient(ssi).finishShuffle(new RssFinishShuffleRequest(appId, shuffleId));
+              if (rfsResponse.getStatusCode() != ResponseStatusCode.SUCCESS) {
+                String msg = "Failed to finish shuffle to " + ssi + " for shuffleId[" + shuffleId
+                    + "] with statusCode " + rfsResponse.getStatusCode();
+                LOG.error(msg);
+                throw new Exception(msg);
+              } else {
+                LOG.info("Successfully finish shuffle to " + ssi + " for shuffleId[" + shuffleId + "]");
+              }
             }
+          } else {
+            String msg = errorMsg + " with statusCode " + response.getStatusCode();
+            LOG.error(msg);
+            throw new Exception(msg);
           }
-        } else {
-          String msg = errorMsg + " with statusCode " + response.getStatusCode();
-          LOG.error(msg);
-          throw new Exception(msg);
+          successfulCommit.incrementAndGet();
+        } catch (Exception e) {
+          LOG.error(errorMsg, e);
         }
-        successfulCommit.incrementAndGet();
-      } catch (Exception e) {
-        LOG.error(errorMsg, e);
-      }
-    });
+      });
+    }).join();
+    forkJoinPool.shutdownNow();

Review Comment:
   Got it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org