You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@uniffle.apache.org by GitBox <gi...@apache.org> on 2022/09/28 07:39:29 UTC

[GitHub] [incubator-uniffle] zuston opened a new pull request, #249: Introduce data cleanup mechanism on stage level

zuston opened a new pull request, #249:
URL: https://github.com/apache/incubator-uniffle/pull/249

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://github.com/Tencent/Firestorm/blob/master/CONTRIBUTING.md
     2. Ensure you have added or run the appropriate tests for your PR
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]XXXX Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
   -->
   
   ### What changes were proposed in this pull request?
   Introduce data cleanup mechanism on stage level
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue.
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   
   ### Why are the changes needed?
   This PR is to optimize the disk capacity. For example
   1. For some spark ML jobs, it will run multiple stages and reserve large unused shuffle data in shuffle-servers. 
   
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     3. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   -->
   UTs


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#issuecomment-1272276700

   One question?
   Why do we add the concept of PurgeEvent? What's the advantage? Could we just add a shuffle level deletion method for StorageManager?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#discussion_r982489452


##########
server/src/main/java/org/apache/uniffle/server/event/AppPurgeEvent.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.event;
+
+import java.util.List;
+
+public class AppPurgeEvent extends PurgeEvent {

Review Comment:
   `PurgeEvent` is the event of removing resources for App or App's stage. It has two children class, `AppPurgeEvent` and `ShufflePurgeEvent`.
   1. `AppPurgeEvent` is the event of removing app's resources
   2. `ShufflePurgeEvent` is the event of removing app shuffle stage's resources.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#discussion_r990756575


##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -591,6 +593,45 @@ public void close() {
     dataTransferPool.shutdownNow();
   }
 
+  @Override
+  public void unregisterShuffle(String appId, int shuffleId) {
+    RssUnregisterShuffleRequest request = new RssUnregisterShuffleRequest(appId, shuffleId);
+    List<Callable<Void>> callableList = Lists.newArrayList();
+
+    shuffleServerInfoSet.stream().forEach(shuffleServerInfo -> {
+          callableList.add(() -> {
+            try {
+              ShuffleServerClient client =
+                  ShuffleServerClientFactory.getInstance().getShuffleServerClient(clientType, shuffleServerInfo);
+              RssUnregisterShuffleResponse response = client.unregisterShuffle(request);
+              if (response.getStatusCode() != ResponseStatusCode.SUCCESS) {
+                LOG.warn("Failed to unregister shuffle to " + shuffleServerInfo);
+              }
+            } catch (Exception e) {
+              LOG.warn("Error happened when unregistering to " + shuffleServerInfo, e);
+            }
+            return null;
+          });
+        }
+    );
+
+    try {
+      ExecutorService executorService =
+          Executors.newFixedThreadPool(
+              Math.min(10, shuffleServerInfoSet.size()),

Review Comment:
   `10` is the experienced value. The request is lightweight, I think 10 threads are enough. There is no need to introduce extra config to increase the understanding burden for users.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
zuston commented on PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#issuecomment-1260774904

   > > PTAL @jerqi . The unregister is now scoped with Spark, MR will be supported in the next commits.
   > 
   > Why do the mr need to clean up the shuffle of stage level? MR don't have multiple stages.
   
   Oh yes. My fault.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#discussion_r990618831


##########
storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java:
##########
@@ -33,7 +32,7 @@ public class LocalFileDeleteHandler implements ShuffleDeleteHandler {
   @Override
   public void delete(String[] storageBasePaths, String appId, String user) {
     for (String basePath : storageBasePaths) {
-      String shufflePath = ShuffleStorageUtils.getFullShuffleDataFolder(basePath, appId);
+      final String shufflePath = basePath;

Review Comment:
   It's OK. Rename to `shuffleDataStoredPath` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#issuecomment-1260764522

   > PTAL @jerqi . The unregister is now scoped with Spark, MR will be supported in the next commits.
   
   Why do the mr need to clean up the shuffle of stage level? MR don't have multiple stages.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#discussion_r990741288


##########
storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleDeleteHandler.java:
##########
@@ -39,33 +38,35 @@ public HdfsShuffleDeleteHandler(Configuration hadoopConf) {
 
   @Override
   public void delete(String[] storageBasePaths, String appId, String user) {
-    Path path = new Path(ShuffleStorageUtils.getFullShuffleDataFolder(storageBasePaths[0], appId));
-    boolean isSuccess = false;
-    int times = 0;
-    int retryMax = 5;
-    long start = System.currentTimeMillis();
-    LOG.info("Try delete shuffle data in HDFS for appId[{}] of user[{}] with {}",appId, user, path);
-    while (!isSuccess && times < retryMax) {
-      try {
-        FileSystem fileSystem = HadoopFilesystemProvider.getFilesystem(user, path, hadoopConf);
-        fileSystem.delete(path, true);
-        isSuccess = true;
-      } catch (Exception e) {
-        times++;
-        LOG.warn("Can't delete shuffle data for appId[" + appId + "] with " + times + " times", e);
+    for (String deletePath : storageBasePaths) {
+      final Path path = new Path(deletePath);
+      boolean isSuccess = false;
+      int times = 0;
+      int retryMax = 5;

Review Comment:
   Why do we use `5`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi merged pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
jerqi merged PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#discussion_r990723543


##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -591,6 +593,45 @@ public void close() {
     dataTransferPool.shutdownNow();
   }
 
+  @Override
+  public void unregisterShuffle(String appId, int shuffleId) {
+    RssUnregisterShuffleRequest request = new RssUnregisterShuffleRequest(appId, shuffleId);
+    List<Callable<Void>> callableList = Lists.newArrayList();
+
+    shuffleServerInfoSet.stream().forEach(shuffleServerInfo -> {
+          callableList.add(() -> {
+            try {
+              ShuffleServerClient client =
+                  ShuffleServerClientFactory.getInstance().getShuffleServerClient(clientType, shuffleServerInfo);
+              RssUnregisterShuffleResponse response = client.unregisterShuffle(request);
+              if (response.getStatusCode() != ResponseStatusCode.SUCCESS) {
+                LOG.warn("Failed to unregister shuffle to " + shuffleServerInfo);
+              }
+            } catch (Exception e) {
+              LOG.warn("Error happened when unregistering to " + shuffleServerInfo, e);
+            }
+            return null;
+          });
+        }
+    );
+
+    try {
+      ExecutorService executorService =
+          Executors.newFixedThreadPool(
+              Math.min(10, shuffleServerInfoSet.size()),
+              ThreadUtils.getThreadFactory("unregister-shuffle-%d")
+          );
+      List<Future<Void>> futures = executorService.invokeAll(callableList, 10, TimeUnit.SECONDS);

Review Comment:
   10 seconds? Should it be configuration option?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#discussion_r983002132


##########
server/src/main/java/org/apache/uniffle/server/event/AppPurgeEvent.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.event;
+
+import java.util.List;
+
+public class AppPurgeEvent extends PurgeEvent {

Review Comment:
   > I think we dont need to do so. These events are the encapsulation for specified consumer, and we should keep it independent.
   > 
   > By the way, speaking of the `EVENT`, I have a idea that we could create an abstract event interface for all specific events. And to avoid thread pools anywhere, we could introduce the event dispatcher. And original event consumer could be registered to dispatcher. All in one will be better and helpful to collect metrics.
   
   So we can add some `todo` comments here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
zuston commented on PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#issuecomment-1260698895

   PTAL @jerqi . The unregister is now scoped with Spark, MR will be supported in the next commits.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] codecov-commenter commented on pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#issuecomment-1260539450

   # [Codecov](https://codecov.io/gh/apache/incubator-uniffle/pull/249?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#249](https://codecov.io/gh/apache/incubator-uniffle/pull/249?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (a71add8) into [master](https://codecov.io/gh/apache/incubator-uniffle/commit/54ddca60f6469483ff87927ea496464cf9750dbb?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (54ddca6) will **decrease** coverage by `0.50%`.
   > The diff coverage is `30.28%`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master     #249      +/-   ##
   ============================================
   - Coverage     59.29%   58.78%   -0.51%     
   - Complexity     1346     1354       +8     
   ============================================
     Files           162      165       +3     
     Lines          8789     8916     +127     
     Branches        828      845      +17     
   ============================================
   + Hits           5211     5241      +30     
   - Misses         3311     3400      +89     
   - Partials        267      275       +8     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-uniffle/pull/249?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...he/uniffle/client/impl/ShuffleWriteClientImpl.java](https://codecov.io/gh/apache/incubator-uniffle/pull/249/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL2NsaWVudC9pbXBsL1NodWZmbGVXcml0ZUNsaWVudEltcGwuamF2YQ==) | `21.11% <0.00%> (-1.86%)` | :arrow_down: |
   | [...org/apache/uniffle/server/ShuffleFlushManager.java](https://codecov.io/gh/apache/incubator-uniffle/pull/249/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL3NlcnZlci9TaHVmZmxlRmx1c2hNYW5hZ2VyLmphdmE=) | `75.40% <0.00%> (-2.93%)` | :arrow_down: |
   | [...pache/uniffle/server/ShuffleServerGrpcService.java](https://codecov.io/gh/apache/incubator-uniffle/pull/249/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL3NlcnZlci9TaHVmZmxlU2VydmVyR3JwY1NlcnZpY2UuamF2YQ==) | `0.87% <0.00%> (-0.03%)` | :arrow_down: |
   | [...apache/uniffle/server/event/ShufflePurgeEvent.java](https://codecov.io/gh/apache/incubator-uniffle/pull/249/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL3NlcnZlci9ldmVudC9TaHVmZmxlUHVyZ2VFdmVudC5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...he/uniffle/server/storage/MultiStorageManager.java](https://codecov.io/gh/apache/incubator-uniffle/pull/249/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL3NlcnZlci9zdG9yYWdlL011bHRpU3RvcmFnZU1hbmFnZXIuamF2YQ==) | `37.50% <0.00%> (ø)` | |
   | [...storage/handler/impl/HdfsShuffleDeleteHandler.java](https://codecov.io/gh/apache/incubator-uniffle/pull/249/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmFnZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvdW5pZmZsZS9zdG9yYWdlL2hhbmRsZXIvaW1wbC9IZGZzU2h1ZmZsZURlbGV0ZUhhbmRsZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...e/storage/handler/impl/LocalFileDeleteHandler.java](https://codecov.io/gh/apache/incubator-uniffle/pull/249/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmFnZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvdW5pZmZsZS9zdG9yYWdlL2hhbmRsZXIvaW1wbC9Mb2NhbEZpbGVEZWxldGVIYW5kbGVyLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [.../org/apache/uniffle/server/ShuffleTaskManager.java](https://codecov.io/gh/apache/incubator-uniffle/pull/249/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL3NlcnZlci9TaHVmZmxlVGFza01hbmFnZXIuamF2YQ==) | `68.19% <16.66%> (-8.76%)` | :arrow_down: |
   | [...he/uniffle/server/storage/LocalStorageManager.java](https://codecov.io/gh/apache/incubator-uniffle/pull/249/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL3NlcnZlci9zdG9yYWdlL0xvY2FsU3RvcmFnZU1hbmFnZXIuamF2YQ==) | `59.66% <52.94%> (-1.88%)` | :arrow_down: |
   | [...che/uniffle/server/storage/HdfsStorageManager.java](https://codecov.io/gh/apache/incubator-uniffle/pull/249/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL3NlcnZlci9zdG9yYWdlL0hkZnNTdG9yYWdlTWFuYWdlci5qYXZh) | `85.00% <64.28%> (-7.00%)` | :arrow_down: |
   | ... and [5 more](https://codecov.io/gh/apache/incubator-uniffle/pull/249/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
zuston commented on PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#issuecomment-1275828375

   Thanks for your review @jerqi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
zuston commented on PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#issuecomment-1274398389

   Changelog of latest commit
   1. Extract the config entries of unregistering thread pool size and request timeout sec
   
   `spark.rss.client.unregister.request.timeout.sec` and `spark.rss.client.unregister.thread.pool.size`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#discussion_r990757285


##########
storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java:
##########
@@ -33,7 +32,7 @@ public class LocalFileDeleteHandler implements ShuffleDeleteHandler {
   @Override
   public void delete(String[] storageBasePaths, String appId, String user) {
     for (String basePath : storageBasePaths) {
-      String shufflePath = ShuffleStorageUtils.getFullShuffleDataFolder(basePath, appId);
+      final String shufflePath = basePath;

Review Comment:
   Done



##########
server/src/main/java/org/apache/uniffle/server/event/AppPurgeEvent.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.event;
+
+import java.util.List;
+
+public class AppPurgeEvent extends PurgeEvent {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#discussion_r990614675


##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -591,6 +593,45 @@ public void close() {
     dataTransferPool.shutdownNow();
   }
 
+  @Override
+  public void unregisterShuffle(String appId, int shuffleId) {
+    RssUnregisterShuffleRequest request = new RssUnregisterShuffleRequest(appId, shuffleId);
+    List<Callable<Void>> callableList = Lists.newArrayList();
+
+    shuffleServerInfoSet.stream().forEach(shuffleServerInfo -> {
+          callableList.add(() -> {
+            try {
+              ShuffleServerClient client =
+                  ShuffleServerClientFactory.getInstance().getShuffleServerClient(clientType, shuffleServerInfo);
+              RssUnregisterShuffleResponse response = client.unregisterShuffle(request);
+              if (response.getStatusCode() != ResponseStatusCode.SUCCESS) {
+                LOG.warn("Failed to unregister shuffle to " + shuffleServerInfo);
+              }
+            } catch (Exception e) {
+              LOG.warn("Error happened when unregistering to " + shuffleServerInfo, e);
+            }
+            return null;
+          });
+        }
+    );
+
+    try {
+      ExecutorService executorService =
+          Executors.newFixedThreadPool(
+              Math.min(10, shuffleServerInfoSet.size()),

Review Comment:
   Why do we use `10`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#discussion_r990725274


##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -591,6 +593,45 @@ public void close() {
     dataTransferPool.shutdownNow();
   }
 
+  @Override
+  public void unregisterShuffle(String appId, int shuffleId) {
+    RssUnregisterShuffleRequest request = new RssUnregisterShuffleRequest(appId, shuffleId);
+    List<Callable<Void>> callableList = Lists.newArrayList();
+
+    shuffleServerInfoSet.stream().forEach(shuffleServerInfo -> {
+          callableList.add(() -> {
+            try {
+              ShuffleServerClient client =
+                  ShuffleServerClientFactory.getInstance().getShuffleServerClient(clientType, shuffleServerInfo);
+              RssUnregisterShuffleResponse response = client.unregisterShuffle(request);
+              if (response.getStatusCode() != ResponseStatusCode.SUCCESS) {
+                LOG.warn("Failed to unregister shuffle to " + shuffleServerInfo);
+              }
+            } catch (Exception e) {
+              LOG.warn("Error happened when unregistering to " + shuffleServerInfo, e);
+            }
+            return null;
+          });
+        }
+    );
+
+    try {
+      ExecutorService executorService =
+          Executors.newFixedThreadPool(
+              Math.min(10, shuffleServerInfoSet.size()),
+              ThreadUtils.getThreadFactory("unregister-shuffle-%d")
+          );
+      List<Future<Void>> futures = executorService.invokeAll(callableList, 10, TimeUnit.SECONDS);

Review Comment:
   Yes.



##########
integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithLocalfileTest.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import java.io.File;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.shuffle.RssSparkConfig;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import scala.Tuple2;
+
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.storage.util.StorageType;
+
+import static org.apache.uniffle.common.config.RssBaseConf.RSS_STORAGE_BASE_PATH;

Review Comment:
   Got it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#issuecomment-1272202841

   I will take a look at this pr asap.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#discussion_r991774740


##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -591,6 +593,45 @@ public void close() {
     dataTransferPool.shutdownNow();
   }
 
+  @Override
+  public void unregisterShuffle(String appId, int shuffleId) {
+    RssUnregisterShuffleRequest request = new RssUnregisterShuffleRequest(appId, shuffleId);
+    List<Callable<Void>> callableList = Lists.newArrayList();
+
+    shuffleServerInfoSet.stream().forEach(shuffleServerInfo -> {
+          callableList.add(() -> {
+            try {
+              ShuffleServerClient client =
+                  ShuffleServerClientFactory.getInstance().getShuffleServerClient(clientType, shuffleServerInfo);
+              RssUnregisterShuffleResponse response = client.unregisterShuffle(request);
+              if (response.getStatusCode() != ResponseStatusCode.SUCCESS) {
+                LOG.warn("Failed to unregister shuffle to " + shuffleServerInfo);
+              }
+            } catch (Exception e) {
+              LOG.warn("Error happened when unregistering to " + shuffleServerInfo, e);
+            }
+            return null;
+          });
+        }
+    );
+
+    try {
+      ExecutorService executorService =
+          Executors.newFixedThreadPool(
+              Math.min(10, shuffleServerInfoSet.size()),
+              ThreadUtils.getThreadFactory("unregister-shuffle-%d")
+          );
+      List<Future<Void>> futures = executorService.invokeAll(callableList, 10, TimeUnit.SECONDS);

Review Comment:
   Sometimes the timeout could be enough, we should give a default value according to our experience. Usually we can't image all the situations that users use.



##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -591,6 +593,45 @@ public void close() {
     dataTransferPool.shutdownNow();
   }
 
+  @Override
+  public void unregisterShuffle(String appId, int shuffleId) {
+    RssUnregisterShuffleRequest request = new RssUnregisterShuffleRequest(appId, shuffleId);
+    List<Callable<Void>> callableList = Lists.newArrayList();
+
+    shuffleServerInfoSet.stream().forEach(shuffleServerInfo -> {
+          callableList.add(() -> {
+            try {
+              ShuffleServerClient client =
+                  ShuffleServerClientFactory.getInstance().getShuffleServerClient(clientType, shuffleServerInfo);
+              RssUnregisterShuffleResponse response = client.unregisterShuffle(request);
+              if (response.getStatusCode() != ResponseStatusCode.SUCCESS) {
+                LOG.warn("Failed to unregister shuffle to " + shuffleServerInfo);
+              }
+            } catch (Exception e) {
+              LOG.warn("Error happened when unregistering to " + shuffleServerInfo, e);
+            }
+            return null;
+          });
+        }
+    );
+
+    try {
+      ExecutorService executorService =
+          Executors.newFixedThreadPool(
+              Math.min(10, shuffleServerInfoSet.size()),
+              ThreadUtils.getThreadFactory("unregister-shuffle-%d")
+          );
+      List<Future<Void>> futures = executorService.invokeAll(callableList, 10, TimeUnit.SECONDS);

Review Comment:
   Sometimes the timeout may not be enough, we should give a default value according to our experience. Usually we can't image all the situations that users use.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#discussion_r982997349


##########
server/src/main/java/org/apache/uniffle/server/event/AppPurgeEvent.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.event;
+
+import java.util.List;
+
+public class AppPurgeEvent extends PurgeEvent {

Review Comment:
   Should we unify the ShuffleFlushDataEvent and PurgeEvent?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#discussion_r983004719


##########
server/src/main/java/org/apache/uniffle/server/event/AppPurgeEvent.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.event;
+
+import java.util.List;
+
+public class AppPurgeEvent extends PurgeEvent {

Review Comment:
   It's OK.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
zuston commented on PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#issuecomment-1263690925

   Gentle ping @jerqi If u have time, could u help review this? I think I have some time to quick fix at national day.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#discussion_r982489452


##########
server/src/main/java/org/apache/uniffle/server/event/AppPurgeEvent.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.event;
+
+import java.util.List;
+
+public class AppPurgeEvent extends PurgeEvent {

Review Comment:
   AppPurgeEvent is the event of removing resources for App or App's stage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#discussion_r990615381


##########
storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java:
##########
@@ -33,7 +32,7 @@ public class LocalFileDeleteHandler implements ShuffleDeleteHandler {
   @Override
   public void delete(String[] storageBasePaths, String appId, String user) {
     for (String basePath : storageBasePaths) {
-      String shufflePath = ShuffleStorageUtils.getFullShuffleDataFolder(basePath, appId);
+      final String shufflePath = basePath;

Review Comment:
   Should we modify  the name of `storageBasePaths`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#discussion_r991773965


##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -591,6 +593,45 @@ public void close() {
     dataTransferPool.shutdownNow();
   }
 
+  @Override
+  public void unregisterShuffle(String appId, int shuffleId) {
+    RssUnregisterShuffleRequest request = new RssUnregisterShuffleRequest(appId, shuffleId);
+    List<Callable<Void>> callableList = Lists.newArrayList();
+
+    shuffleServerInfoSet.stream().forEach(shuffleServerInfo -> {
+          callableList.add(() -> {
+            try {
+              ShuffleServerClient client =
+                  ShuffleServerClientFactory.getInstance().getShuffleServerClient(clientType, shuffleServerInfo);
+              RssUnregisterShuffleResponse response = client.unregisterShuffle(request);
+              if (response.getStatusCode() != ResponseStatusCode.SUCCESS) {
+                LOG.warn("Failed to unregister shuffle to " + shuffleServerInfo);
+              }
+            } catch (Exception e) {
+              LOG.warn("Error happened when unregistering to " + shuffleServerInfo, e);
+            }
+            return null;
+          });
+        }
+    );
+
+    try {
+      ExecutorService executorService =
+          Executors.newFixedThreadPool(
+              Math.min(10, shuffleServerInfoSet.size()),

Review Comment:
   Em... we would better use configuration option.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
zuston commented on PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#issuecomment-1275539665

   Gentle ping @jerqi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#discussion_r982361935


##########
server/src/main/java/org/apache/uniffle/server/event/AppPurgeEvent.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.event;
+
+import java.util.List;
+
+public class AppPurgeEvent extends PurgeEvent {

Review Comment:
   What's the relation between ShuffleFlushEvent and PurgeEvent?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
zuston commented on PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#issuecomment-1272278548

   > One question? Why do we add the concept of PurgeEvent? What's the advantage? Could we just add a shuffle level deletion method for StorageManager?
   
   Two reasons.
   1. Make clearResourceThread clean two kinds of events of stage and app level data
   2. I dont want to introduce extra delete method in StorageManager because most of deletion logic is the same


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#discussion_r990757233


##########
integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithLocalfileTest.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import java.io.File;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.shuffle.RssSparkConfig;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import scala.Tuple2;
+
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.storage.util.StorageType;
+
+import static org.apache.uniffle.common.config.RssBaseConf.RSS_STORAGE_BASE_PATH;

Review Comment:
   done
   



##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -591,6 +593,45 @@ public void close() {
     dataTransferPool.shutdownNow();
   }
 
+  @Override
+  public void unregisterShuffle(String appId, int shuffleId) {
+    RssUnregisterShuffleRequest request = new RssUnregisterShuffleRequest(appId, shuffleId);
+    List<Callable<Void>> callableList = Lists.newArrayList();
+
+    shuffleServerInfoSet.stream().forEach(shuffleServerInfo -> {
+          callableList.add(() -> {
+            try {
+              ShuffleServerClient client =
+                  ShuffleServerClientFactory.getInstance().getShuffleServerClient(clientType, shuffleServerInfo);
+              RssUnregisterShuffleResponse response = client.unregisterShuffle(request);
+              if (response.getStatusCode() != ResponseStatusCode.SUCCESS) {
+                LOG.warn("Failed to unregister shuffle to " + shuffleServerInfo);
+              }
+            } catch (Exception e) {
+              LOG.warn("Error happened when unregistering to " + shuffleServerInfo, e);
+            }
+            return null;
+          });
+        }
+    );
+
+    try {
+      ExecutorService executorService =
+          Executors.newFixedThreadPool(
+              Math.min(10, shuffleServerInfoSet.size()),
+              ThreadUtils.getThreadFactory("unregister-shuffle-%d")
+          );
+      List<Future<Void>> futures = executorService.invokeAll(callableList, 10, TimeUnit.SECONDS);
+      for (Future<Void> future : futures) {
+        if (!future.isDone()) {
+          future.cancel(true);
+        }
+      }
+    } catch (InterruptedException ie) {
+      LOG.warn("Unregister shuffle is interrupted", ie);
+    }

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#discussion_r990756333


##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -591,6 +593,45 @@ public void close() {
     dataTransferPool.shutdownNow();
   }
 
+  @Override
+  public void unregisterShuffle(String appId, int shuffleId) {
+    RssUnregisterShuffleRequest request = new RssUnregisterShuffleRequest(appId, shuffleId);
+    List<Callable<Void>> callableList = Lists.newArrayList();
+
+    shuffleServerInfoSet.stream().forEach(shuffleServerInfo -> {
+          callableList.add(() -> {
+            try {
+              ShuffleServerClient client =
+                  ShuffleServerClientFactory.getInstance().getShuffleServerClient(clientType, shuffleServerInfo);
+              RssUnregisterShuffleResponse response = client.unregisterShuffle(request);
+              if (response.getStatusCode() != ResponseStatusCode.SUCCESS) {
+                LOG.warn("Failed to unregister shuffle to " + shuffleServerInfo);
+              }
+            } catch (Exception e) {
+              LOG.warn("Error happened when unregistering to " + shuffleServerInfo, e);
+            }
+            return null;
+          });
+        }
+    );
+
+    try {
+      ExecutorService executorService =
+          Executors.newFixedThreadPool(
+              Math.min(10, shuffleServerInfoSet.size()),
+              ThreadUtils.getThreadFactory("unregister-shuffle-%d")
+          );
+      List<Future<Void>> futures = executorService.invokeAll(callableList, 10, TimeUnit.SECONDS);

Review Comment:
   This is the default timeout, considering the request is lightweight, I think 10s is enough. There is no need to introduce extra config to increase the understanding burden for users.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#discussion_r990741145


##########
storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java:
##########
@@ -33,7 +32,7 @@ public class LocalFileDeleteHandler implements ShuffleDeleteHandler {
   @Override
   public void delete(String[] storageBasePaths, String appId, String user) {
     for (String basePath : storageBasePaths) {
-      String shufflePath = ShuffleStorageUtils.getFullShuffleDataFolder(basePath, appId);
+      final String shufflePath = basePath;

Review Comment:
   Ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#discussion_r990722899


##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -591,6 +593,45 @@ public void close() {
     dataTransferPool.shutdownNow();
   }
 
+  @Override
+  public void unregisterShuffle(String appId, int shuffleId) {
+    RssUnregisterShuffleRequest request = new RssUnregisterShuffleRequest(appId, shuffleId);
+    List<Callable<Void>> callableList = Lists.newArrayList();
+
+    shuffleServerInfoSet.stream().forEach(shuffleServerInfo -> {
+          callableList.add(() -> {
+            try {
+              ShuffleServerClient client =
+                  ShuffleServerClientFactory.getInstance().getShuffleServerClient(clientType, shuffleServerInfo);
+              RssUnregisterShuffleResponse response = client.unregisterShuffle(request);
+              if (response.getStatusCode() != ResponseStatusCode.SUCCESS) {
+                LOG.warn("Failed to unregister shuffle to " + shuffleServerInfo);
+              }
+            } catch (Exception e) {
+              LOG.warn("Error happened when unregistering to " + shuffleServerInfo, e);
+            }
+            return null;
+          });
+        }
+    );
+
+    try {
+      ExecutorService executorService =
+          Executors.newFixedThreadPool(
+              Math.min(10, shuffleServerInfoSet.size()),
+              ThreadUtils.getThreadFactory("unregister-shuffle-%d")
+          );
+      List<Future<Void>> futures = executorService.invokeAll(callableList, 10, TimeUnit.SECONDS);
+      for (Future<Void> future : futures) {
+        if (!future.isDone()) {
+          future.cancel(true);
+        }
+      }
+    } catch (InterruptedException ie) {
+      LOG.warn("Unregister shuffle is interrupted", ie);
+    }

Review Comment:
   Should we shutdown the threadpool here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
zuston commented on PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#issuecomment-1272840610

   Updated. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#discussion_r983001435


##########
server/src/main/java/org/apache/uniffle/server/event/AppPurgeEvent.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.event;
+
+import java.util.List;
+
+public class AppPurgeEvent extends PurgeEvent {

Review Comment:
   I think we dont need to do so. These events are the encapsulation for specified consumer, and we should keep it independent.
   
   By the way, speaking of the `EVENT`, I have a idea that we could create an abstract event interface for all specific events. And to avoid thread pools anywhere, we could introduce the event dispatcher. And original event consumer could be registered to dispatcher. All in one will be better and helpful to collect metrics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#discussion_r991821983


##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -591,6 +593,45 @@ public void close() {
     dataTransferPool.shutdownNow();
   }
 
+  @Override
+  public void unregisterShuffle(String appId, int shuffleId) {
+    RssUnregisterShuffleRequest request = new RssUnregisterShuffleRequest(appId, shuffleId);
+    List<Callable<Void>> callableList = Lists.newArrayList();
+
+    shuffleServerInfoSet.stream().forEach(shuffleServerInfo -> {
+          callableList.add(() -> {
+            try {
+              ShuffleServerClient client =
+                  ShuffleServerClientFactory.getInstance().getShuffleServerClient(clientType, shuffleServerInfo);
+              RssUnregisterShuffleResponse response = client.unregisterShuffle(request);
+              if (response.getStatusCode() != ResponseStatusCode.SUCCESS) {
+                LOG.warn("Failed to unregister shuffle to " + shuffleServerInfo);
+              }
+            } catch (Exception e) {
+              LOG.warn("Error happened when unregistering to " + shuffleServerInfo, e);
+            }
+            return null;
+          });
+        }
+    );
+
+    try {
+      ExecutorService executorService =
+          Executors.newFixedThreadPool(
+              Math.min(10, shuffleServerInfoSet.size()),

Review Comment:
   OK. I will extract config entry.



##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -591,6 +593,45 @@ public void close() {
     dataTransferPool.shutdownNow();
   }
 
+  @Override
+  public void unregisterShuffle(String appId, int shuffleId) {
+    RssUnregisterShuffleRequest request = new RssUnregisterShuffleRequest(appId, shuffleId);
+    List<Callable<Void>> callableList = Lists.newArrayList();
+
+    shuffleServerInfoSet.stream().forEach(shuffleServerInfo -> {
+          callableList.add(() -> {
+            try {
+              ShuffleServerClient client =
+                  ShuffleServerClientFactory.getInstance().getShuffleServerClient(clientType, shuffleServerInfo);
+              RssUnregisterShuffleResponse response = client.unregisterShuffle(request);
+              if (response.getStatusCode() != ResponseStatusCode.SUCCESS) {
+                LOG.warn("Failed to unregister shuffle to " + shuffleServerInfo);
+              }
+            } catch (Exception e) {
+              LOG.warn("Error happened when unregistering to " + shuffleServerInfo, e);
+            }
+            return null;
+          });
+        }
+    );
+
+    try {
+      ExecutorService executorService =
+          Executors.newFixedThreadPool(
+              Math.min(10, shuffleServerInfoSet.size()),
+              ThreadUtils.getThreadFactory("unregister-shuffle-%d")
+          );
+      List<Future<Void>> futures = executorService.invokeAll(callableList, 10, TimeUnit.SECONDS);

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#issuecomment-1274027138

   > > One question? Why do we add the concept of PurgeEvent? What's the advantage? Could we just add a shuffle level deletion method for StorageManager?
   > 
   > Two reasons.
   > 
   > 1. Make clearResourceThread clean two kinds of events of stage and app level data
   > 2. I dont want to introduce extra delete method in StorageManager because most of deletion logic is the same
   
   Actually we still add a method `removeResourcesByShuffleIds`. Let me think twice.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#discussion_r990619070


##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -591,6 +593,45 @@ public void close() {
     dataTransferPool.shutdownNow();
   }
 
+  @Override
+  public void unregisterShuffle(String appId, int shuffleId) {
+    RssUnregisterShuffleRequest request = new RssUnregisterShuffleRequest(appId, shuffleId);
+    List<Callable<Void>> callableList = Lists.newArrayList();
+
+    shuffleServerInfoSet.stream().forEach(shuffleServerInfo -> {
+          callableList.add(() -> {
+            try {
+              ShuffleServerClient client =
+                  ShuffleServerClientFactory.getInstance().getShuffleServerClient(clientType, shuffleServerInfo);
+              RssUnregisterShuffleResponse response = client.unregisterShuffle(request);
+              if (response.getStatusCode() != ResponseStatusCode.SUCCESS) {
+                LOG.warn("Failed to unregister shuffle to " + shuffleServerInfo);
+              }
+            } catch (Exception e) {
+              LOG.warn("Error happened when unregistering to " + shuffleServerInfo, e);
+            }
+            return null;
+          });
+        }
+    );
+
+    try {
+      ExecutorService executorService =
+          Executors.newFixedThreadPool(
+              Math.min(10, shuffleServerInfoSet.size()),

Review Comment:
   Emm. Maybe we should use the `Runtime.getRuntime().availableProcessors()`. And I have no idea on this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#discussion_r990615381


##########
storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java:
##########
@@ -33,7 +32,7 @@ public class LocalFileDeleteHandler implements ShuffleDeleteHandler {
   @Override
   public void delete(String[] storageBasePaths, String appId, String user) {
     for (String basePath : storageBasePaths) {
-      String shufflePath = ShuffleStorageUtils.getFullShuffleDataFolder(basePath, appId);
+      final String shufflePath = basePath;

Review Comment:
   Why do we modify  the name of `storageBasePaths`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#discussion_r990723989


##########
integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithLocalfileTest.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import java.io.File;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.shuffle.RssSparkConfig;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import scala.Tuple2;
+
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.storage.util.StorageType;
+
+import static org.apache.uniffle.common.config.RssBaseConf.RSS_STORAGE_BASE_PATH;

Review Comment:
   We should avoid the `static import`. For other places, we don't use `static import` when use configuration option. We should be consistent with them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#discussion_r990753147


##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -591,6 +593,45 @@ public void close() {
     dataTransferPool.shutdownNow();
   }
 
+  @Override
+  public void unregisterShuffle(String appId, int shuffleId) {
+    RssUnregisterShuffleRequest request = new RssUnregisterShuffleRequest(appId, shuffleId);
+    List<Callable<Void>> callableList = Lists.newArrayList();
+
+    shuffleServerInfoSet.stream().forEach(shuffleServerInfo -> {
+          callableList.add(() -> {
+            try {
+              ShuffleServerClient client =
+                  ShuffleServerClientFactory.getInstance().getShuffleServerClient(clientType, shuffleServerInfo);
+              RssUnregisterShuffleResponse response = client.unregisterShuffle(request);
+              if (response.getStatusCode() != ResponseStatusCode.SUCCESS) {
+                LOG.warn("Failed to unregister shuffle to " + shuffleServerInfo);
+              }
+            } catch (Exception e) {
+              LOG.warn("Error happened when unregistering to " + shuffleServerInfo, e);
+            }
+            return null;
+          });
+        }
+    );
+
+    try {
+      ExecutorService executorService =
+          Executors.newFixedThreadPool(
+              Math.min(10, shuffleServerInfoSet.size()),
+              ThreadUtils.getThreadFactory("unregister-shuffle-%d")
+          );
+      List<Future<Void>> futures = executorService.invokeAll(callableList, 10, TimeUnit.SECONDS);
+      for (Future<Void> future : futures) {
+        if (!future.isDone()) {
+          future.cancel(true);
+        }
+      }
+    } catch (InterruptedException ie) {
+      LOG.warn("Unregister shuffle is interrupted", ie);
+    }

Review Comment:
   Yes. Got it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #249: Introduce data cleanup mechanism on stage level

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#discussion_r990755901


##########
storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleDeleteHandler.java:
##########
@@ -39,33 +38,35 @@ public HdfsShuffleDeleteHandler(Configuration hadoopConf) {
 
   @Override
   public void delete(String[] storageBasePaths, String appId, String user) {
-    Path path = new Path(ShuffleStorageUtils.getFullShuffleDataFolder(storageBasePaths[0], appId));
-    boolean isSuccess = false;
-    int times = 0;
-    int retryMax = 5;
-    long start = System.currentTimeMillis();
-    LOG.info("Try delete shuffle data in HDFS for appId[{}] of user[{}] with {}",appId, user, path);
-    while (!isSuccess && times < retryMax) {
-      try {
-        FileSystem fileSystem = HadoopFilesystemProvider.getFilesystem(user, path, hadoopConf);
-        fileSystem.delete(path, true);
-        isSuccess = true;
-      } catch (Exception e) {
-        times++;
-        LOG.warn("Can't delete shuffle data for appId[" + appId + "] with " + times + " times", e);
+    for (String deletePath : storageBasePaths) {
+      final Path path = new Path(deletePath);
+      boolean isSuccess = false;
+      int times = 0;
+      int retryMax = 5;

Review Comment:
   This is the original value. I dont know the reason.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org