You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@uniffle.apache.org by "xianjingfeng (via GitHub)" <gi...@apache.org> on 2023/02/16 03:08:23 UTC

[GitHub] [incubator-uniffle] xianjingfeng opened a new pull request, #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

xianjingfeng opened a new pull request, #606:
URL: https://github.com/apache/incubator-uniffle/pull/606

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://github.com/apache/incubator-uniffle/blob/master/CONTRIBUTING.md
     2. Ensure you have added or run the appropriate tests for your PR
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]XXXX Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue.
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   Add decommisson logic to shuffle server
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   Support shuffle server decommission. It is a part of #80 
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   No
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   -->
   UT


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] kaijchen commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "kaijchen (via GitHub)" <gi...@apache.org>.
kaijchen commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1111419980


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +274,75 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public synchronized void decommission() {
+    if (isDecommissioning()) {
+      LOG.info("Shuffle Server is decommissioning. Nothing needs to be done.");
+      return;
+    }
+    if (!ServerStatus.ACTIVE.equals(serverStatus)) {
+      throw new InvalidRequestException(
+          "Shuffle Server is processing other procedures, current status:" + serverStatus);
+    }
+    serverStatus = ServerStatus.DECOMMISSIONING;
+    LOG.info("Shuffle Server is decommissioning.");
+    if (executorService == null) {
+      executorService = Executors.newSingleThreadExecutor(
+          ThreadUtils.getThreadFactory("shuffle-server-decommission-%d"));
+    }
+    decommissionFuture = executorService.submit(this::waitDecommissionFinish);
+  }
+
+  private void waitDecommissionFinish() {
+    long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+    boolean shutdownAfterDecommission = shuffleServerConf.get(SERVER_DECOMMISSION_SHUTDOWN);
+    int remainApplicationNum;
+    while (isDecommissioning()) {
+      remainApplicationNum = shuffleTaskManager.getAppIds().size();
+      if (remainApplicationNum == 0) {
+        serverStatus = ServerStatus.DECOMMISSIONED;
+        LOG.info("All applications finished. Current status is " + serverStatus);
+        if (shutdownAfterDecommission) {
+          LOG.info("Exiting...");
+          try {
+            stopServer();
+          } catch (Exception e) {
+            ExitUtils.terminate(1, "Stop server failed!", e, LOG);
+          }
+        }
+        break;
+      }
+      LOG.info("Shuffle server is decommissioning, remaining {} applications not finished.", remainApplicationNum);
+      try {
+        Thread.sleep(checkInterval);
+      } catch (InterruptedException e) {
+        LOG.warn("Ignore the InterruptedException which should be caused by internal kill.");
+      }
+    }
+    remainApplicationNum = shuffleTaskManager.getAppIds().size();
+    if (remainApplicationNum > 0) {
+      LOG.info("Decommission exiting, remaining {} applications not finished.",
+          remainApplicationNum);
+    }
+  }
+
+  public synchronized void cancelDecommission() {
+    if (!isDecommissioning()) {
+      LOG.info("Shuffle server is not decommissioning. Nothing needs to be done.");
+      return;
+    }
+    if (decommissionFuture.cancel(true)) {
+      serverStatus = ServerStatus.ACTIVE;

Review Comment:
   What if server status is DECOMMISSIONED?
   `isDecommissioning()` will return true and the decommission logic will be executed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1109276982


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +269,69 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public void decommission() {
+    checkStatusForDecommission();

Review Comment:
   Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1108222300


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +269,69 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public void decommission() {
+    checkStatusForDecommission();
+    synchronized (statusLock) {
+      checkStatusForDecommission();
+      long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+      decommissionThread = new Thread(() -> {
+        while (isDecommissioning()) {

Review Comment:
   yeah. But you need to make sure `isDecommissioning` is properly synchronized when calling.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "zuston (via GitHub)" <gi...@apache.org>.
zuston commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1108210933


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -78,6 +82,10 @@ public class ShuffleServer {
   private AtomicBoolean isHealthy = new AtomicBoolean(true);
   private GRPCMetrics grpcMetrics;
   private MetricReporter metricReporter;
+  private Thread decommissionThread;
+  private ServerStatus serverStatus = ServerStatus.NORMAL_STATUS;

Review Comment:
   This status should be marked as `volatile,` which will be used in different threads. Right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] kaijchen commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "kaijchen (via GitHub)" <gi...@apache.org>.
kaijchen commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1109315697


##########
server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java:
##########
@@ -79,4 +75,61 @@ public void startTest() {
     }
 
   }
+
+  public void decommissionTest(boolean shutdown) throws Exception {
+    ShuffleServerConf serverConf = createShuffleServerConf();
+    serverConf.set(SERVER_DECOMMISSION_CHECK_INTERVAL, 1000L);
+    serverConf.set(SERVER_DECOMMISSION_SHUTDOWN, shutdown);
+    ShuffleServer shuffleServer = new ShuffleServer(serverConf);
+    shuffleServer.start();
+    assertEquals(ServerStatus.NORMAL_STATUS, shuffleServer.getServerStatus());
+    // Shuffle server is not decommissioning, but we can also cancel it.
+    shuffleServer.cancelDecommission();
+    ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
+    String appId = "decommissionTest_appId_" + shutdown;
+    shuffleTaskManager.registerShuffle(appId, 0, Lists.newArrayList(), new RemoteStorageInfo("/tmp"), "");
+    shuffleServer.decommission();
+    assertEquals(ServerStatus.DECOMMISSIONING, shuffleServer.getServerStatus());
+    // Shuffle server is decommissioning, but we can also decommission it again.
+    shuffleServer.decommission();
+    shuffleServer.cancelDecommission();
+    shuffleTaskManager.removeResources(appId);
+    // Wait for 2 seconds, make sure cancel command is work.
+    Thread.sleep(2000);
+    assertEquals(ServerStatus.NORMAL_STATUS, shuffleServer.getServerStatus());
+    shuffleServer.decommission();
+    if (shutdown) {
+      Awaitility.await().timeout(10, TimeUnit.SECONDS).until(
+          () -> !shuffleServer.isRunning());
+    } else {
+      Awaitility.await().timeout(10, TimeUnit.SECONDS).until(
+          () -> ServerStatus.DECOMMISSIONED.equals(shuffleServer.getServerStatus()));
+      assertEquals(true, shuffleServer.isRunning());
+      shuffleServer.stopServer();
+    }
+  }
+
+  @Test
+  public void notShutDownAfterDecommissionTest() throws Exception {
+    decommissionTest(false);
+  }
+
+  @Test
+  public void shutDownAfterDecommissionTest() throws Exception {
+    decommissionTest(true);
+  }
+

Review Comment:
   ```suggestion
   ```



##########
server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java:
##########
@@ -79,4 +75,61 @@ public void startTest() {
     }
 
   }
+

Review Comment:
   ```suggestion
   
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1111053381


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +272,72 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public synchronized void decommission() {
+    if (isDecommissioning()) {
+      LOG.info("Shuffle Server is decommissioning. Nothing need to do.");
+      return;
+    }
+    if (!ServerStatus.ACTIVE.equals(serverStatus)) {
+      throw new InvalidRequestException(
+          "Shuffle Server is processing other procedures, current status:" + serverStatus);
+    }
+    serverStatus = ServerStatus.DECOMMISSIONING;
+    LOG.info("Shuffle Server is decommissioning.");
+    executorService = Executors.newSingleThreadExecutor(
+        ThreadUtils.getThreadFactory("shuffle-server-decommission-%d"));
+    executorService.submit(this::waitDecommissionFinish);
+  }
+
+  private void waitDecommissionFinish() {
+    long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+    boolean shutdownAfterDecommission = shuffleServerConf.get(SERVER_DECOMMISSION_SHUTDOWN);
+    int remainApplicationNum;
+    while (isDecommissioning()) {
+      remainApplicationNum = shuffleTaskManager.getAppIds().size();
+      if (remainApplicationNum == 0) {
+        serverStatus = ServerStatus.DECOMMISSIONED;
+        LOG.info("All applications finished. Current status is " + serverStatus);
+        if (shutdownAfterDecommission) {
+          LOG.info("Exiting...");
+          try {
+            stopServer();
+          } catch (Exception e) {
+            ExitUtils.terminate(1, "Stop server failed!", e, LOG);
+          }
+        }
+        break;
+      }
+      LOG.info("Shuffle server is decommissioning. remain {} applications not finished.", remainApplicationNum);
+      try {
+        Thread.sleep(checkInterval);
+      } catch (InterruptedException e) {
+        LOG.warn("Ignore the InterruptedException which should be caused by internal killed.");
+      }
+    }
+    remainApplicationNum = shuffleTaskManager.getAppIds().size();
+    if (remainApplicationNum > 0) {
+      LOG.info("Decommission exiting. remain {} applications not finished.",
+          remainApplicationNum);
+    }
+  }
+
+  public synchronized void cancelDecommission() {
+    if (!isDecommissioning()) {
+      LOG.info("Shuffle server is not decommissioning. Nothing need to do.");
+      return;
+    }
+    serverStatus = ServerStatus.ACTIVE;
+    if (executorService != null) {
+      executorService.shutdownNow();
+      executorService = null;

Review Comment:
   Use `newCachedThreadPool` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] kaijchen commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "kaijchen (via GitHub)" <gi...@apache.org>.
kaijchen commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1111830630


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +274,79 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public synchronized void decommission() {
+    if (isDecommissioning()) {
+      LOG.info("Shuffle Server is decommissioning. Nothing needs to be done.");
+      return;
+    }
+    if (!ServerStatus.ACTIVE.equals(serverStatus)) {
+      throw new InvalidRequestException(
+          "Shuffle Server is processing other procedures, current status:" + serverStatus);
+    }
+    serverStatus = ServerStatus.DECOMMISSIONING;
+    LOG.info("Shuffle Server is decommissioning.");
+    if (executorService == null) {
+      executorService = Executors.newSingleThreadExecutor(
+          ThreadUtils.getThreadFactory("shuffle-server-decommission-%d"));
+    }
+    decommissionFuture = executorService.submit(this::waitDecommissionFinish);
+  }
+
+  private void waitDecommissionFinish() {
+    long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+    boolean shutdownAfterDecommission = shuffleServerConf.get(SERVER_DECOMMISSION_SHUTDOWN);
+    int remainApplicationNum;
+    while (isDecommissioning()) {
+      remainApplicationNum = shuffleTaskManager.getAppIds().size();
+      if (remainApplicationNum == 0) {
+        serverStatus = ServerStatus.DECOMMISSIONED;
+        LOG.info("All applications finished. Current status is " + serverStatus);
+        if (shutdownAfterDecommission) {
+          LOG.info("Exiting...");
+          try {
+            stopServer();
+          } catch (Exception e) {
+            ExitUtils.terminate(1, "Stop server failed!", e, LOG);
+          }
+        }
+        break;
+      }
+      LOG.info("Shuffle server is decommissioning, remaining {} applications not finished.", remainApplicationNum);
+      try {
+        Thread.sleep(checkInterval);
+      } catch (InterruptedException e) {
+        LOG.warn("Ignore the InterruptedException which should be caused by internal kill.");

Review Comment:
   Add a break here.
   
   ```suggestion
           LOG.warn("Interrupted while waiting for decommission to finish");
           break;
   ```



##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +274,79 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public synchronized void decommission() {
+    if (isDecommissioning()) {
+      LOG.info("Shuffle Server is decommissioning. Nothing needs to be done.");
+      return;
+    }
+    if (!ServerStatus.ACTIVE.equals(serverStatus)) {
+      throw new InvalidRequestException(
+          "Shuffle Server is processing other procedures, current status:" + serverStatus);
+    }
+    serverStatus = ServerStatus.DECOMMISSIONING;
+    LOG.info("Shuffle Server is decommissioning.");
+    if (executorService == null) {
+      executorService = Executors.newSingleThreadExecutor(
+          ThreadUtils.getThreadFactory("shuffle-server-decommission-%d"));
+    }
+    decommissionFuture = executorService.submit(this::waitDecommissionFinish);
+  }
+
+  private void waitDecommissionFinish() {
+    long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+    boolean shutdownAfterDecommission = shuffleServerConf.get(SERVER_DECOMMISSION_SHUTDOWN);
+    int remainApplicationNum;
+    while (isDecommissioning()) {
+      remainApplicationNum = shuffleTaskManager.getAppIds().size();
+      if (remainApplicationNum == 0) {
+        serverStatus = ServerStatus.DECOMMISSIONED;
+        LOG.info("All applications finished. Current status is " + serverStatus);
+        if (shutdownAfterDecommission) {
+          LOG.info("Exiting...");
+          try {
+            stopServer();
+          } catch (Exception e) {
+            ExitUtils.terminate(1, "Stop server failed!", e, LOG);
+          }
+        }
+        break;
+      }
+      LOG.info("Shuffle server is decommissioning, remaining {} applications not finished.", remainApplicationNum);
+      try {
+        Thread.sleep(checkInterval);
+      } catch (InterruptedException e) {
+        LOG.warn("Ignore the InterruptedException which should be caused by internal kill.");
+      }
+    }
+    remainApplicationNum = shuffleTaskManager.getAppIds().size();
+    if (remainApplicationNum > 0) {
+      LOG.info("Decommission exiting, remaining {} applications not finished.",
+          remainApplicationNum);
+    }
+  }
+
+  public synchronized void cancelDecommission() {
+    if (!isDecommissioning()) {
+      LOG.info("Shuffle server is not decommissioning. Nothing needs to be done.");
+      return;
+    }
+    if (ServerStatus.DECOMMISSIONED.equals(serverStatus)) {
+      serverStatus = ServerStatus.ACTIVE;
+      return;
+    }
+    serverStatus = ServerStatus.ACTIVE;
+    if (decommissionFuture.cancel(true)) {
+      LOG.info("Decommission canceled.");
+    } else {
+      LOG.warn("Failed to cancel decommission.");
+    }

Review Comment:
   The result of `decommissionFuture.cancel()` doesn't matter now. Since "cancel" always brings a server back to ACTIVE. Maybe we should call `executorService.shutdownNow()` here, @advancedxy WDYT?
   
   ```suggestion
       if (decommissionFuture != null) {
         decommissionFuture.cancel(true);
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1108245095


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +269,69 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public void decommission() {
+    checkStatusForDecommission();
+    synchronized (statusLock) {
+      checkStatusForDecommission();

Review Comment:
   It's OK for me to check only once.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1108431187


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +269,69 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public void decommission() {
+    checkStatusForDecommission();
+    synchronized (statusLock) {
+      checkStatusForDecommission();
+      long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+      decommissionThread = new Thread(() -> {
+        while (isDecommissioning()) {

Review Comment:
   > Whether marking "serverStatus" as volatile can solve this problem?
   possible and should be. But what about `synchronized public isDecommissioning()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] kaijchen commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "kaijchen (via GitHub)" <gi...@apache.org>.
kaijchen commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1109317196


##########
common/src/main/java/org/apache/uniffle/common/ServerStatus.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common;
+
+public enum ServerStatus {
+  UNKNOWN(-1),
+  NORMAL_STATUS(0),
+  DECOMMISSIONING(1),
+  DECOMMISSIONED(2);
+
+  private final int code;

Review Comment:
   Is code necessary?



##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +272,66 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public synchronized void decommission() {
+    if (isDecommissioning()) {
+      LOG.info("Shuffle Server is decommissioning. Nothing need to do.");
+      return;
+    }
+    if (!ServerStatus.NORMAL_STATUS.equals(serverStatus)) {
+      throw new InvalidRequestException(
+          "Shuffle Server is processing other procedures, current status:" + serverStatus);
+    }
+    serverStatus = ServerStatus.DECOMMISSIONING;
+    LOG.info("Shuffle Server is decommissioning.");
+    long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+    boolean shutdownAfterDecommission = shuffleServerConf.get(SERVER_DECOMMISSION_SHUTDOWN);
+    executorService = Executors.newSingleThreadExecutor(
+        ThreadUtils.getThreadFactory("shuffle-server-decommission-%d"));
+    executorService.submit(() -> {

Review Comment:
   This lambda looks quite long, should we create a method?



##########
server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java:
##########
@@ -79,4 +75,61 @@ public void startTest() {
     }
 
   }
+
+  public void decommissionTest(boolean shutdown) throws Exception {
+    ShuffleServerConf serverConf = createShuffleServerConf();
+    serverConf.set(SERVER_DECOMMISSION_CHECK_INTERVAL, 1000L);
+    serverConf.set(SERVER_DECOMMISSION_SHUTDOWN, shutdown);
+    ShuffleServer shuffleServer = new ShuffleServer(serverConf);
+    shuffleServer.start();
+    assertEquals(ServerStatus.NORMAL_STATUS, shuffleServer.getServerStatus());
+    // Shuffle server is not decommissioning, but we can also cancel it.
+    shuffleServer.cancelDecommission();
+    ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
+    String appId = "decommissionTest_appId_" + shutdown;
+    shuffleTaskManager.registerShuffle(appId, 0, Lists.newArrayList(), new RemoteStorageInfo("/tmp"), "");
+    shuffleServer.decommission();
+    assertEquals(ServerStatus.DECOMMISSIONING, shuffleServer.getServerStatus());
+    // Shuffle server is decommissioning, but we can also decommission it again.
+    shuffleServer.decommission();
+    shuffleServer.cancelDecommission();
+    shuffleTaskManager.removeResources(appId);
+    // Wait for 2 seconds, make sure cancel command is work.
+    Thread.sleep(2000);
+    assertEquals(ServerStatus.NORMAL_STATUS, shuffleServer.getServerStatus());
+    shuffleServer.decommission();
+    if (shutdown) {
+      Awaitility.await().timeout(10, TimeUnit.SECONDS).until(
+          () -> !shuffleServer.isRunning());
+    } else {
+      Awaitility.await().timeout(10, TimeUnit.SECONDS).until(
+          () -> ServerStatus.DECOMMISSIONED.equals(shuffleServer.getServerStatus()));
+      assertEquals(true, shuffleServer.isRunning());
+      shuffleServer.stopServer();
+    }
+  }
+
+  @Test
+  public void notShutDownAfterDecommissionTest() throws Exception {
+    decommissionTest(false);
+  }
+
+  @Test
+  public void shutDownAfterDecommissionTest() throws Exception {
+    decommissionTest(true);
+  }
+
+  private ShuffleServerConf createShuffleServerConf() throws Exception {
+    ShuffleServerConf serverConf = new ShuffleServerConf();
+    serverConf.setInteger(ShuffleServerConf.RPC_SERVER_PORT, 9527);
+    serverConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
+    serverConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
+    serverConf.setInteger(ShuffleServerConf.JETTY_HTTP_PORT, 9528);
+    serverConf.setString(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:0");
+    serverConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("/home/fengxj/tmp111"));

Review Comment:
   Use `@TempDir` to create a temp dir for it.
   
   ```suggestion
       serverConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] kaijchen commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "kaijchen (via GitHub)" <gi...@apache.org>.
kaijchen commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1111419980


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +274,75 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public synchronized void decommission() {
+    if (isDecommissioning()) {
+      LOG.info("Shuffle Server is decommissioning. Nothing needs to be done.");
+      return;
+    }
+    if (!ServerStatus.ACTIVE.equals(serverStatus)) {
+      throw new InvalidRequestException(
+          "Shuffle Server is processing other procedures, current status:" + serverStatus);
+    }
+    serverStatus = ServerStatus.DECOMMISSIONING;
+    LOG.info("Shuffle Server is decommissioning.");
+    if (executorService == null) {
+      executorService = Executors.newSingleThreadExecutor(
+          ThreadUtils.getThreadFactory("shuffle-server-decommission-%d"));
+    }
+    decommissionFuture = executorService.submit(this::waitDecommissionFinish);
+  }
+
+  private void waitDecommissionFinish() {
+    long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+    boolean shutdownAfterDecommission = shuffleServerConf.get(SERVER_DECOMMISSION_SHUTDOWN);
+    int remainApplicationNum;
+    while (isDecommissioning()) {
+      remainApplicationNum = shuffleTaskManager.getAppIds().size();
+      if (remainApplicationNum == 0) {
+        serverStatus = ServerStatus.DECOMMISSIONED;
+        LOG.info("All applications finished. Current status is " + serverStatus);
+        if (shutdownAfterDecommission) {
+          LOG.info("Exiting...");
+          try {
+            stopServer();
+          } catch (Exception e) {
+            ExitUtils.terminate(1, "Stop server failed!", e, LOG);
+          }
+        }
+        break;
+      }
+      LOG.info("Shuffle server is decommissioning, remaining {} applications not finished.", remainApplicationNum);
+      try {
+        Thread.sleep(checkInterval);
+      } catch (InterruptedException e) {
+        LOG.warn("Ignore the InterruptedException which should be caused by internal kill.");
+      }
+    }
+    remainApplicationNum = shuffleTaskManager.getAppIds().size();
+    if (remainApplicationNum > 0) {
+      LOG.info("Decommission exiting, remaining {} applications not finished.",
+          remainApplicationNum);
+    }
+  }
+
+  public synchronized void cancelDecommission() {
+    if (!isDecommissioning()) {
+      LOG.info("Shuffle server is not decommissioning. Nothing needs to be done.");
+      return;
+    }
+    if (decommissionFuture.cancel(true)) {
+      serverStatus = ServerStatus.ACTIVE;

Review Comment:
   What if server status is DECOMMISSIONED?
   `isDecommissioning()` will return true and the `decommissionFuture.cancel(true)` will be executed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng merged pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng merged PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1108249939


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +269,69 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public void decommission() {
+    checkStatusForDecommission();
+    synchronized (statusLock) {
+      checkStatusForDecommission();
+      long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+      decommissionThread = new Thread(() -> {
+        while (isDecommissioning()) {

Review Comment:
   Whether marking "serverStatus" as volatile can solve this problem?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1108192071


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +269,69 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public void decommission() {
+    checkStatusForDecommission();
+    synchronized (statusLock) {
+      checkStatusForDecommission();

Review Comment:
   Avoid unnecessary lock operations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1109277973


##########
common/src/main/java/org/apache/uniffle/common/ServerStatus.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common;
+
+public enum ServerStatus {
+  UNKNOWN(-1),
+  NORMAL_STATUS(0),

Review Comment:
   `ACTIVE` may be better? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1109275648


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +269,69 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public void decommission() {
+    checkStatusForDecommission();
+    synchronized (statusLock) {
+      checkStatusForDecommission();
+      long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+      decommissionThread = new Thread(() -> {
+        while (isDecommissioning()) {

Review Comment:
   Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] kaijchen commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "kaijchen (via GitHub)" <gi...@apache.org>.
kaijchen commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1109738738


##########
server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java:
##########
@@ -79,4 +80,53 @@ public void startTest() {
     }
 
   }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void decommissionTest(boolean shutdown) throws Exception {
+    ShuffleServerConf serverConf = createShuffleServerConf();
+    serverConf.set(SERVER_DECOMMISSION_CHECK_INTERVAL, 1000L);
+    serverConf.set(SERVER_DECOMMISSION_SHUTDOWN, shutdown);
+    ShuffleServer shuffleServer = new ShuffleServer(serverConf);
+    shuffleServer.start();
+    assertEquals(ServerStatus.ACTIVE, shuffleServer.getServerStatus());
+    // Shuffle server is not decommissioning, but we can also cancel it.
+    shuffleServer.cancelDecommission();
+    ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
+    String appId = "decommissionTest_appId_" + shutdown;
+    shuffleTaskManager.registerShuffle(appId, 0, Lists.newArrayList(), new RemoteStorageInfo("/tmp"), "");
+    shuffleServer.decommission();
+    assertEquals(ServerStatus.DECOMMISSIONING, shuffleServer.getServerStatus());
+    // Shuffle server is decommissioning, but we can also decommission it again.
+    shuffleServer.decommission();
+    shuffleServer.cancelDecommission();
+    shuffleTaskManager.removeResources(appId);
+    // Wait for 2 seconds, make sure cancel command is work.
+    Thread.sleep(2000);
+    assertEquals(ServerStatus.ACTIVE, shuffleServer.getServerStatus());
+    shuffleServer.decommission();
+    if (shutdown) {
+      Awaitility.await().timeout(10, TimeUnit.SECONDS).until(
+          () -> !shuffleServer.isRunning());
+    } else {
+      Awaitility.await().timeout(10, TimeUnit.SECONDS).until(
+          () -> ServerStatus.DECOMMISSIONED.equals(shuffleServer.getServerStatus()));
+      assertEquals(true, shuffleServer.isRunning());
+      shuffleServer.stopServer();
+    }
+  }
+
+  private ShuffleServerConf createShuffleServerConf() throws Exception {
+    ShuffleServerConf serverConf = new ShuffleServerConf();
+    serverConf.setInteger(ShuffleServerConf.RPC_SERVER_PORT, 9527);
+    serverConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
+    serverConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
+    serverConf.setInteger(ShuffleServerConf.JETTY_HTTP_PORT, 9528);
+    serverConf.setString(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:0");
+    serverConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("/tmp/null"));

Review Comment:
   Is `"/tmp/null"` OK here?
   Better use JUnit managed `@TempDir` here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] kaijchen commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "kaijchen (via GitHub)" <gi...@apache.org>.
kaijchen commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1109952608


##########
server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java:
##########
@@ -79,4 +80,53 @@ public void startTest() {
     }
 
   }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void decommissionTest(boolean shutdown) throws Exception {
+    ShuffleServerConf serverConf = createShuffleServerConf();
+    serverConf.set(SERVER_DECOMMISSION_CHECK_INTERVAL, 1000L);
+    serverConf.set(SERVER_DECOMMISSION_SHUTDOWN, shutdown);
+    ShuffleServer shuffleServer = new ShuffleServer(serverConf);
+    shuffleServer.start();
+    assertEquals(ServerStatus.ACTIVE, shuffleServer.getServerStatus());
+    // Shuffle server is not decommissioning, but we can also cancel it.
+    shuffleServer.cancelDecommission();
+    ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
+    String appId = "decommissionTest_appId_" + shutdown;
+    shuffleTaskManager.registerShuffle(appId, 0, Lists.newArrayList(), new RemoteStorageInfo("/tmp"), "");
+    shuffleServer.decommission();
+    assertEquals(ServerStatus.DECOMMISSIONING, shuffleServer.getServerStatus());
+    // Shuffle server is decommissioning, but we can also decommission it again.
+    shuffleServer.decommission();
+    shuffleServer.cancelDecommission();
+    shuffleTaskManager.removeResources(appId);
+    // Wait for 2 seconds, make sure cancel command is work.
+    Thread.sleep(2000);
+    assertEquals(ServerStatus.ACTIVE, shuffleServer.getServerStatus());
+    shuffleServer.decommission();
+    if (shutdown) {
+      Awaitility.await().timeout(10, TimeUnit.SECONDS).until(
+          () -> !shuffleServer.isRunning());
+    } else {
+      Awaitility.await().timeout(10, TimeUnit.SECONDS).until(
+          () -> ServerStatus.DECOMMISSIONED.equals(shuffleServer.getServerStatus()));
+      assertEquals(true, shuffleServer.isRunning());
+      shuffleServer.stopServer();
+    }
+  }
+
+  private ShuffleServerConf createShuffleServerConf() throws Exception {
+    ShuffleServerConf serverConf = new ShuffleServerConf();
+    serverConf.setInteger(ShuffleServerConf.RPC_SERVER_PORT, 9527);
+    serverConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
+    serverConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
+    serverConf.setInteger(ShuffleServerConf.JETTY_HTTP_PORT, 9528);
+    serverConf.setString(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:0");
+    serverConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("/tmp/null"));

Review Comment:
   ```
   --- a/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
   +++ b/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
   @@ -17,12 +17,14 @@
   
    package org.apache.uniffle.server;
   
   +import java.io.File;
    import java.util.Arrays;
    import java.util.concurrent.TimeUnit;
   
    import com.google.common.collect.Lists;
    import org.awaitility.Awaitility;
    import org.junit.jupiter.api.Test;
   +import org.junit.jupiter.api.io.TempDir;
    import org.junit.jupiter.params.ParameterizedTest;
    import org.junit.jupiter.params.provider.ValueSource;
   
   @@ -39,6 +41,9 @@ import static org.junit.jupiter.api.Assertions.fail;
   
    public class ShuffleServerTest {
   
   +  @TempDir
   +  private File tempDir;
   +
      @Test
      public void startTest() {
        try {
   @@ -123,7 +128,7 @@ public class ShuffleServerTest {
        serverConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
        serverConf.setInteger(ShuffleServerConf.JETTY_HTTP_PORT, 9528);
        serverConf.setString(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:0");
   -    serverConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("/tmp/null"));
   +    serverConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(tempDir.getAbsolutePath()));
        serverConf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 1024L);
        serverConf.setLong(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 100);
        serverConf.setLong(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY, 10);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1108237246


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -78,6 +82,10 @@ public class ShuffleServer {
   private AtomicBoolean isHealthy = new AtomicBoolean(true);
   private GRPCMetrics grpcMetrics;
   private MetricReporter metricReporter;
+  private Thread decommissionThread;
+  private ServerStatus serverStatus = ServerStatus.NORMAL_STATUS;

Review Comment:
   Right



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#issuecomment-1436210900

   > > What are the benefits?
   > 
   > Keeping a thread pool (singleThreadExecutor) for an uncommon task sounds inefficient.
   > 
   > Edit: it is not a good practice to blocking the thread using `ForkJoinPool.commonPool()`. But you may consider using `Future` to cancel the task.
   
   Done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1109730244


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +272,72 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public synchronized void decommission() {
+    if (isDecommissioning()) {
+      LOG.info("Shuffle Server is decommissioning. Nothing need to do.");
+      return;
+    }
+    if (!ServerStatus.ACTIVE.equals(serverStatus)) {
+      throw new InvalidRequestException(
+          "Shuffle Server is processing other procedures, current status:" + serverStatus);
+    }
+    serverStatus = ServerStatus.DECOMMISSIONING;
+    LOG.info("Shuffle Server is decommissioning.");
+    executorService = Executors.newSingleThreadExecutor(
+        ThreadUtils.getThreadFactory("shuffle-server-decommission-%d"));
+    executorService.submit(this::waitDecommissionFinish);
+  }
+
+  private void waitDecommissionFinish() {
+    long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+    boolean shutdownAfterDecommission = shuffleServerConf.get(SERVER_DECOMMISSION_SHUTDOWN);
+    int remainApplicationNum;
+    while (isDecommissioning()) {
+      remainApplicationNum = shuffleTaskManager.getAppIds().size();
+      if (remainApplicationNum == 0) {
+        serverStatus = ServerStatus.DECOMMISSIONED;
+        LOG.info("All applications finished. Current status is " + serverStatus);
+        if (shutdownAfterDecommission) {
+          LOG.info("Exiting...");
+          try {
+            stopServer();
+          } catch (Exception e) {
+            ExitUtils.terminate(1, "Stop server failed!", e, LOG);
+          }
+        }
+        break;
+      }
+      LOG.info("Shuffle server is decommissioning. remain {} applications not finished.", remainApplicationNum);
+      try {
+        Thread.sleep(checkInterval);
+      } catch (InterruptedException e) {
+        LOG.warn("Ignore the InterruptedException which should be caused by internal killed.");
+      }
+    }
+    remainApplicationNum = shuffleTaskManager.getAppIds().size();
+    if (remainApplicationNum > 0) {
+      LOG.info("Decommission exiting. remain {} applications not finished.",
+          remainApplicationNum);
+    }
+  }
+
+  public synchronized void cancelDecommission() {
+    if (!isDecommissioning()) {
+      LOG.info("Shuffle server is not decommissioning. Nothing need to do.");
+      return;
+    }
+    serverStatus = ServerStatus.ACTIVE;
+    if (executorService != null) {
+      executorService.shutdownNow();
+      executorService = null;

Review Comment:
   I'm not sure..
   
   Normally, we would reuse the executorService through the whole lifecycle. It's not like a thread that we should recreate. 



##########
docs/server_guide.md:
##########
@@ -91,6 +91,8 @@ This document will introduce how to deploy Uniffle shuffle servers.
 |Property Name|Default| Description                                                                                                                                                                                 |
 |---|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
 |rss.server.storageMediaProvider.from.env.key|-| Sometimes, the local storage type/media info is provided by external system. RSS would read the env key defined by this configuration and get info about the storage media of its basePaths |
+|rss.server.decommission.check.interval|60000| The interval to check if all applications have finish when server is decommissioning                                                                                                        |

Review Comment:
   nit: please add `unit is ms` in the doc and configuration builder. It's always better to clearly state that.



##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +272,72 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public synchronized void decommission() {
+    if (isDecommissioning()) {
+      LOG.info("Shuffle Server is decommissioning. Nothing need to do.");
+      return;
+    }
+    if (!ServerStatus.ACTIVE.equals(serverStatus)) {
+      throw new InvalidRequestException(
+          "Shuffle Server is processing other procedures, current status:" + serverStatus);
+    }
+    serverStatus = ServerStatus.DECOMMISSIONING;
+    LOG.info("Shuffle Server is decommissioning.");
+    executorService = Executors.newSingleThreadExecutor(
+        ThreadUtils.getThreadFactory("shuffle-server-decommission-%d"));
+    executorService.submit(this::waitDecommissionFinish);
+  }
+
+  private void waitDecommissionFinish() {
+    long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+    boolean shutdownAfterDecommission = shuffleServerConf.get(SERVER_DECOMMISSION_SHUTDOWN);
+    int remainApplicationNum;
+    while (isDecommissioning()) {
+      remainApplicationNum = shuffleTaskManager.getAppIds().size();
+      if (remainApplicationNum == 0) {
+        serverStatus = ServerStatus.DECOMMISSIONED;
+        LOG.info("All applications finished. Current status is " + serverStatus);
+        if (shutdownAfterDecommission) {
+          LOG.info("Exiting...");
+          try {
+            stopServer();
+          } catch (Exception e) {
+            ExitUtils.terminate(1, "Stop server failed!", e, LOG);
+          }
+        }
+        break;
+      }
+      LOG.info("Shuffle server is decommissioning. remain {} applications not finished.", remainApplicationNum);

Review Comment:
   Nit: -> `is decommissioning, remaining {} ....`



##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +272,72 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public synchronized void decommission() {
+    if (isDecommissioning()) {
+      LOG.info("Shuffle Server is decommissioning. Nothing need to do.");
+      return;
+    }
+    if (!ServerStatus.ACTIVE.equals(serverStatus)) {
+      throw new InvalidRequestException(
+          "Shuffle Server is processing other procedures, current status:" + serverStatus);
+    }
+    serverStatus = ServerStatus.DECOMMISSIONING;
+    LOG.info("Shuffle Server is decommissioning.");
+    executorService = Executors.newSingleThreadExecutor(
+        ThreadUtils.getThreadFactory("shuffle-server-decommission-%d"));
+    executorService.submit(this::waitDecommissionFinish);
+  }
+
+  private void waitDecommissionFinish() {
+    long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+    boolean shutdownAfterDecommission = shuffleServerConf.get(SERVER_DECOMMISSION_SHUTDOWN);
+    int remainApplicationNum;
+    while (isDecommissioning()) {
+      remainApplicationNum = shuffleTaskManager.getAppIds().size();
+      if (remainApplicationNum == 0) {
+        serverStatus = ServerStatus.DECOMMISSIONED;
+        LOG.info("All applications finished. Current status is " + serverStatus);
+        if (shutdownAfterDecommission) {
+          LOG.info("Exiting...");
+          try {
+            stopServer();
+          } catch (Exception e) {
+            ExitUtils.terminate(1, "Stop server failed!", e, LOG);
+          }
+        }
+        break;
+      }
+      LOG.info("Shuffle server is decommissioning. remain {} applications not finished.", remainApplicationNum);
+      try {
+        Thread.sleep(checkInterval);
+      } catch (InterruptedException e) {
+        LOG.warn("Ignore the InterruptedException which should be caused by internal killed.");
+      }
+    }
+    remainApplicationNum = shuffleTaskManager.getAppIds().size();
+    if (remainApplicationNum > 0) {
+      LOG.info("Decommission exiting. remain {} applications not finished.",
+          remainApplicationNum);
+    }
+  }
+
+  public synchronized void cancelDecommission() {
+    if (!isDecommissioning()) {
+      LOG.info("Shuffle server is not decommissioning. Nothing need to do.");

Review Comment:
   ditto.



##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +272,72 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public synchronized void decommission() {
+    if (isDecommissioning()) {
+      LOG.info("Shuffle Server is decommissioning. Nothing need to do.");

Review Comment:
   Nit: Nothing needs to be done.



##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +272,72 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public synchronized void decommission() {
+    if (isDecommissioning()) {
+      LOG.info("Shuffle Server is decommissioning. Nothing need to do.");
+      return;
+    }
+    if (!ServerStatus.ACTIVE.equals(serverStatus)) {
+      throw new InvalidRequestException(
+          "Shuffle Server is processing other procedures, current status:" + serverStatus);
+    }
+    serverStatus = ServerStatus.DECOMMISSIONING;
+    LOG.info("Shuffle Server is decommissioning.");
+    executorService = Executors.newSingleThreadExecutor(
+        ThreadUtils.getThreadFactory("shuffle-server-decommission-%d"));
+    executorService.submit(this::waitDecommissionFinish);
+  }
+
+  private void waitDecommissionFinish() {
+    long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+    boolean shutdownAfterDecommission = shuffleServerConf.get(SERVER_DECOMMISSION_SHUTDOWN);
+    int remainApplicationNum;
+    while (isDecommissioning()) {
+      remainApplicationNum = shuffleTaskManager.getAppIds().size();
+      if (remainApplicationNum == 0) {
+        serverStatus = ServerStatus.DECOMMISSIONED;
+        LOG.info("All applications finished. Current status is " + serverStatus);
+        if (shutdownAfterDecommission) {
+          LOG.info("Exiting...");
+          try {
+            stopServer();
+          } catch (Exception e) {
+            ExitUtils.terminate(1, "Stop server failed!", e, LOG);
+          }
+        }
+        break;
+      }
+      LOG.info("Shuffle server is decommissioning. remain {} applications not finished.", remainApplicationNum);
+      try {
+        Thread.sleep(checkInterval);
+      } catch (InterruptedException e) {
+        LOG.warn("Ignore the InterruptedException which should be caused by internal killed.");

Review Comment:
   Nit: by internal kill.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1112002249


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +274,79 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public synchronized void decommission() {
+    if (isDecommissioning()) {
+      LOG.info("Shuffle Server is decommissioning. Nothing needs to be done.");
+      return;
+    }
+    if (!ServerStatus.ACTIVE.equals(serverStatus)) {
+      throw new InvalidRequestException(
+          "Shuffle Server is processing other procedures, current status:" + serverStatus);
+    }
+    serverStatus = ServerStatus.DECOMMISSIONING;
+    LOG.info("Shuffle Server is decommissioning.");
+    if (executorService == null) {
+      executorService = Executors.newSingleThreadExecutor(
+          ThreadUtils.getThreadFactory("shuffle-server-decommission-%d"));
+    }
+    decommissionFuture = executorService.submit(this::waitDecommissionFinish);
+  }
+
+  private void waitDecommissionFinish() {
+    long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+    boolean shutdownAfterDecommission = shuffleServerConf.get(SERVER_DECOMMISSION_SHUTDOWN);
+    int remainApplicationNum;
+    while (isDecommissioning()) {
+      remainApplicationNum = shuffleTaskManager.getAppIds().size();
+      if (remainApplicationNum == 0) {
+        serverStatus = ServerStatus.DECOMMISSIONED;
+        LOG.info("All applications finished. Current status is " + serverStatus);
+        if (shutdownAfterDecommission) {
+          LOG.info("Exiting...");
+          try {
+            stopServer();
+          } catch (Exception e) {
+            ExitUtils.terminate(1, "Stop server failed!", e, LOG);
+          }
+        }
+        break;
+      }
+      LOG.info("Shuffle server is decommissioning, remaining {} applications not finished.", remainApplicationNum);
+      try {
+        Thread.sleep(checkInterval);
+      } catch (InterruptedException e) {
+        LOG.warn("Ignore the InterruptedException which should be caused by internal kill.");
+      }
+    }
+    remainApplicationNum = shuffleTaskManager.getAppIds().size();
+    if (remainApplicationNum > 0) {
+      LOG.info("Decommission exiting, remaining {} applications not finished.",
+          remainApplicationNum);
+    }
+  }
+
+  public synchronized void cancelDecommission() {
+    if (!isDecommissioning()) {
+      LOG.info("Shuffle server is not decommissioning. Nothing needs to be done.");
+      return;
+    }
+    if (ServerStatus.DECOMMISSIONED.equals(serverStatus)) {
+      serverStatus = ServerStatus.ACTIVE;
+      return;
+    }
+    serverStatus = ServerStatus.ACTIVE;
+    if (decommissionFuture.cancel(true)) {
+      LOG.info("Decommission canceled.");
+    } else {
+      LOG.warn("Failed to cancel decommission.");
+    }

Review Comment:
   No. Cancel is always need.  And we cannot call `shutdownNow` here, otherwise we cannot decommission again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] kaijchen commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "kaijchen (via GitHub)" <gi...@apache.org>.
kaijchen commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1110667775


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +272,72 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public synchronized void decommission() {
+    if (isDecommissioning()) {
+      LOG.info("Shuffle Server is decommissioning. Nothing need to do.");
+      return;
+    }
+    if (!ServerStatus.ACTIVE.equals(serverStatus)) {
+      throw new InvalidRequestException(
+          "Shuffle Server is processing other procedures, current status:" + serverStatus);
+    }
+    serverStatus = ServerStatus.DECOMMISSIONING;
+    LOG.info("Shuffle Server is decommissioning.");
+    executorService = Executors.newSingleThreadExecutor(
+        ThreadUtils.getThreadFactory("shuffle-server-decommission-%d"));
+    executorService.submit(this::waitDecommissionFinish);
+  }
+
+  private void waitDecommissionFinish() {
+    long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+    boolean shutdownAfterDecommission = shuffleServerConf.get(SERVER_DECOMMISSION_SHUTDOWN);
+    int remainApplicationNum;
+    while (isDecommissioning()) {
+      remainApplicationNum = shuffleTaskManager.getAppIds().size();
+      if (remainApplicationNum == 0) {
+        serverStatus = ServerStatus.DECOMMISSIONED;
+        LOG.info("All applications finished. Current status is " + serverStatus);
+        if (shutdownAfterDecommission) {
+          LOG.info("Exiting...");
+          try {
+            stopServer();
+          } catch (Exception e) {
+            ExitUtils.terminate(1, "Stop server failed!", e, LOG);
+          }
+        }
+        break;
+      }
+      LOG.info("Shuffle server is decommissioning. remain {} applications not finished.", remainApplicationNum);
+      try {
+        Thread.sleep(checkInterval);
+      } catch (InterruptedException e) {
+        LOG.warn("Ignore the InterruptedException which should be caused by internal killed.");
+      }
+    }
+    remainApplicationNum = shuffleTaskManager.getAppIds().size();
+    if (remainApplicationNum > 0) {
+      LOG.info("Decommission exiting. remain {} applications not finished.",
+          remainApplicationNum);
+    }
+  }
+
+  public synchronized void cancelDecommission() {
+    if (!isDecommissioning()) {
+      LOG.info("Shuffle server is not decommissioning. Nothing need to do.");
+      return;
+    }
+    serverStatus = ServerStatus.ACTIVE;
+    if (executorService != null) {
+      executorService.shutdownNow();
+      executorService = null;

Review Comment:
   > Normally, we would reuse the executorService through the whole lifecycle. It's not like a thread that we should recreate.
   
   But decommission is an uncommon task, should we keep the thread pool just for the next decommission? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] kaijchen commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "kaijchen (via GitHub)" <gi...@apache.org>.
kaijchen commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1112664247


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -149,6 +163,7 @@ public void stopServer() throws Exception {
     }
     SecurityContextFactory.get().getSecurityContext().close();
     server.stop();
+    running = false;

Review Comment:
   Shutdown the executorService here?
   ```java
   if (executorService != null) {
     executorService.shutdown()
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] kaijchen commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "kaijchen (via GitHub)" <gi...@apache.org>.
kaijchen commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1107978706


##########
server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java:
##########
@@ -79,4 +77,47 @@ public void startTest() {
     }
 
   }
+
+  @Test
+  public void decommissionTest() throws Exception {
+    ShuffleServer shuffleServer = createShuffleServer();
+    assertEquals(ServerStatus.NORMAL_STATUS, shuffleServer.getServerStatus());
+    try {
+      shuffleServer.cancelDecommission();

Review Comment:
   Use `assertThrows(InvalidRequestException.class, () -> shuffleServer.cancelDecommission());`



##########
server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java:
##########
@@ -79,4 +77,47 @@ public void startTest() {
     }
 
   }
+
+  @Test
+  public void decommissionTest() throws Exception {
+    ShuffleServer shuffleServer = createShuffleServer();
+    assertEquals(ServerStatus.NORMAL_STATUS, shuffleServer.getServerStatus());
+    try {
+      shuffleServer.cancelDecommission();
+      fail(EXPECTED_EXCEPTION_MESSAGE);
+    } catch (Exception e) {
+      assertTrue(e instanceof InvalidRequestException);
+    }
+    ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
+    String appId = "decommissionTest_appId";
+    shuffleTaskManager.registerShuffle(appId, 0, Lists.newArrayList(), new RemoteStorageInfo("/tmp"), "");
+    shuffleServer.decommission();
+    try {
+      assertEquals(ServerStatus.DECOMMISSIONING, shuffleServer.getServerStatus());

Review Comment:
   This assertion should not be inside the try-catch block?



##########
common/src/main/java/org/apache/uniffle/common/ServerStatus.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common;
+
+public enum ServerStatus {
+  NORMAL_STATUS,

Review Comment:
   ```suggestion
     NORMAL,
   ```



##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -78,6 +82,10 @@ public class ShuffleServer {
   private AtomicBoolean isHealthy = new AtomicBoolean(true);
   private GRPCMetrics grpcMetrics;
   private MetricReporter metricReporter;
+  private Thread decommissionThread;
+  private ServerStatus serverStatus = ServerStatus.NORMAL_STATUS;
+  private Object statusLock = new Object();
+  private boolean running;

Review Comment:
   Need volatile?
   
   ```suggestion
     private volatile boolean running;
   ```



##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +269,69 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public void decommission() {
+    checkStatusForDecommission();
+    synchronized (statusLock) {
+      checkStatusForDecommission();
+      long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+      decommissionThread = new Thread(() -> {
+        while (isDecommissioning()) {
+          int remainApplicationNum = shuffleTaskManager.getAppIds().size();
+          if (remainApplicationNum == 0) {
+            LOG.info("all applications finished, exit now");
+            try {
+              stopServer();
+              break;
+            } catch (Exception e) {
+              ExitUtils.terminate(1, "Stop server failed!", e, LOG);
+            }
+          }
+          LOG.info("Shuffle server is decommissioning. remain {} applications not finished.", remainApplicationNum);
+          try {
+            Thread.sleep(checkInterval);
+          } catch (InterruptedException e) {
+            LOG.warn("Ignore the InterruptedException which should be caused by internal killed");
+          }
+        }
+      });
+      decommissionThread.setName("decommission");
+      decommissionThread.start();
+      serverStatus = ServerStatus.DECOMMISSIONING;
+    }
+  }
+
+  private void checkStatusForDecommission() {
+    if (isDecommissioning()) {
+      throw new InvalidRequestException("Shuffle Server is decommissioning. Nothing need to do.");
+    }
+    if (!ServerStatus.NORMAL_STATUS.equals(serverStatus)) {
+      throw new InvalidRequestException(
+          "Shuffle Server is processing other procedures, current status:" + serverStatus);
+    }
+  }
+
+  public void cancelDecommission() {
+    checkStatusForCancelDecommission();
+    synchronized (statusLock) {
+      checkStatusForCancelDecommission();
+      if (decommissionThread != null) {
+        decommissionThread.interrupt();

Review Comment:
   Will it better to use a `ExecutorService` and use `future.cancel()` and `ExecutorService.shutdownNow()` for termination?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1109943735


##########
server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java:
##########
@@ -79,4 +80,53 @@ public void startTest() {
     }
 
   }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void decommissionTest(boolean shutdown) throws Exception {
+    ShuffleServerConf serverConf = createShuffleServerConf();
+    serverConf.set(SERVER_DECOMMISSION_CHECK_INTERVAL, 1000L);
+    serverConf.set(SERVER_DECOMMISSION_SHUTDOWN, shutdown);
+    ShuffleServer shuffleServer = new ShuffleServer(serverConf);
+    shuffleServer.start();
+    assertEquals(ServerStatus.ACTIVE, shuffleServer.getServerStatus());
+    // Shuffle server is not decommissioning, but we can also cancel it.
+    shuffleServer.cancelDecommission();
+    ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
+    String appId = "decommissionTest_appId_" + shutdown;
+    shuffleTaskManager.registerShuffle(appId, 0, Lists.newArrayList(), new RemoteStorageInfo("/tmp"), "");
+    shuffleServer.decommission();
+    assertEquals(ServerStatus.DECOMMISSIONING, shuffleServer.getServerStatus());
+    // Shuffle server is decommissioning, but we can also decommission it again.
+    shuffleServer.decommission();
+    shuffleServer.cancelDecommission();
+    shuffleTaskManager.removeResources(appId);
+    // Wait for 2 seconds, make sure cancel command is work.
+    Thread.sleep(2000);
+    assertEquals(ServerStatus.ACTIVE, shuffleServer.getServerStatus());
+    shuffleServer.decommission();
+    if (shutdown) {
+      Awaitility.await().timeout(10, TimeUnit.SECONDS).until(
+          () -> !shuffleServer.isRunning());
+    } else {
+      Awaitility.await().timeout(10, TimeUnit.SECONDS).until(
+          () -> ServerStatus.DECOMMISSIONED.equals(shuffleServer.getServerStatus()));
+      assertEquals(true, shuffleServer.isRunning());
+      shuffleServer.stopServer();
+    }
+  }
+
+  private ShuffleServerConf createShuffleServerConf() throws Exception {
+    ShuffleServerConf serverConf = new ShuffleServerConf();
+    serverConf.setInteger(ShuffleServerConf.RPC_SERVER_PORT, 9527);
+    serverConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
+    serverConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
+    serverConf.setInteger(ShuffleServerConf.JETTY_HTTP_PORT, 9528);
+    serverConf.setString(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:0");
+    serverConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("/tmp/null"));

Review Comment:
   `createShuffleServerConf` is referenced by multiple unit test cases, we need add `@TempDir` to every cases? Is there a better way?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1109952810


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +272,72 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public synchronized void decommission() {
+    if (isDecommissioning()) {
+      LOG.info("Shuffle Server is decommissioning. Nothing need to do.");
+      return;
+    }
+    if (!ServerStatus.ACTIVE.equals(serverStatus)) {
+      throw new InvalidRequestException(
+          "Shuffle Server is processing other procedures, current status:" + serverStatus);
+    }
+    serverStatus = ServerStatus.DECOMMISSIONING;
+    LOG.info("Shuffle Server is decommissioning.");
+    executorService = Executors.newSingleThreadExecutor(
+        ThreadUtils.getThreadFactory("shuffle-server-decommission-%d"));
+    executorService.submit(this::waitDecommissionFinish);
+  }
+
+  private void waitDecommissionFinish() {
+    long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+    boolean shutdownAfterDecommission = shuffleServerConf.get(SERVER_DECOMMISSION_SHUTDOWN);
+    int remainApplicationNum;
+    while (isDecommissioning()) {
+      remainApplicationNum = shuffleTaskManager.getAppIds().size();
+      if (remainApplicationNum == 0) {
+        serverStatus = ServerStatus.DECOMMISSIONED;
+        LOG.info("All applications finished. Current status is " + serverStatus);
+        if (shutdownAfterDecommission) {
+          LOG.info("Exiting...");
+          try {
+            stopServer();
+          } catch (Exception e) {
+            ExitUtils.terminate(1, "Stop server failed!", e, LOG);
+          }
+        }
+        break;
+      }
+      LOG.info("Shuffle server is decommissioning. remain {} applications not finished.", remainApplicationNum);
+      try {
+        Thread.sleep(checkInterval);
+      } catch (InterruptedException e) {
+        LOG.warn("Ignore the InterruptedException which should be caused by internal killed.");
+      }
+    }
+    remainApplicationNum = shuffleTaskManager.getAppIds().size();
+    if (remainApplicationNum > 0) {
+      LOG.info("Decommission exiting. remain {} applications not finished.",
+          remainApplicationNum);
+    }
+  }
+
+  public synchronized void cancelDecommission() {
+    if (!isDecommissioning()) {
+      LOG.info("Shuffle server is not decommissioning. Nothing need to do.");
+      return;
+    }
+    serverStatus = ServerStatus.ACTIVE;
+    if (executorService != null) {
+      executorService.shutdownNow();
+      executorService = null;

Review Comment:
   Updated



##########
docs/server_guide.md:
##########
@@ -91,6 +91,8 @@ This document will introduce how to deploy Uniffle shuffle servers.
 |Property Name|Default| Description                                                                                                                                                                                 |
 |---|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
 |rss.server.storageMediaProvider.from.env.key|-| Sometimes, the local storage type/media info is provided by external system. RSS would read the env key defined by this configuration and get info about the storage media of its basePaths |
+|rss.server.decommission.check.interval|60000| The interval to check if all applications have finish when server is decommissioning                                                                                                        |

Review Comment:
   Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1109955237


##########
server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java:
##########
@@ -79,4 +80,53 @@ public void startTest() {
     }
 
   }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void decommissionTest(boolean shutdown) throws Exception {
+    ShuffleServerConf serverConf = createShuffleServerConf();
+    serverConf.set(SERVER_DECOMMISSION_CHECK_INTERVAL, 1000L);
+    serverConf.set(SERVER_DECOMMISSION_SHUTDOWN, shutdown);
+    ShuffleServer shuffleServer = new ShuffleServer(serverConf);
+    shuffleServer.start();
+    assertEquals(ServerStatus.ACTIVE, shuffleServer.getServerStatus());
+    // Shuffle server is not decommissioning, but we can also cancel it.
+    shuffleServer.cancelDecommission();
+    ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
+    String appId = "decommissionTest_appId_" + shutdown;
+    shuffleTaskManager.registerShuffle(appId, 0, Lists.newArrayList(), new RemoteStorageInfo("/tmp"), "");
+    shuffleServer.decommission();
+    assertEquals(ServerStatus.DECOMMISSIONING, shuffleServer.getServerStatus());
+    // Shuffle server is decommissioning, but we can also decommission it again.
+    shuffleServer.decommission();
+    shuffleServer.cancelDecommission();
+    shuffleTaskManager.removeResources(appId);
+    // Wait for 2 seconds, make sure cancel command is work.
+    Thread.sleep(2000);
+    assertEquals(ServerStatus.ACTIVE, shuffleServer.getServerStatus());
+    shuffleServer.decommission();
+    if (shutdown) {
+      Awaitility.await().timeout(10, TimeUnit.SECONDS).until(
+          () -> !shuffleServer.isRunning());
+    } else {
+      Awaitility.await().timeout(10, TimeUnit.SECONDS).until(
+          () -> ServerStatus.DECOMMISSIONED.equals(shuffleServer.getServerStatus()));
+      assertEquals(true, shuffleServer.isRunning());
+      shuffleServer.stopServer();
+    }
+  }
+
+  private ShuffleServerConf createShuffleServerConf() throws Exception {
+    ShuffleServerConf serverConf = new ShuffleServerConf();
+    serverConf.setInteger(ShuffleServerConf.RPC_SERVER_PORT, 9527);
+    serverConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
+    serverConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
+    serverConf.setInteger(ShuffleServerConf.JETTY_HTTP_PORT, 9528);
+    serverConf.setString(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:0");
+    serverConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("/tmp/null"));

Review Comment:
   Get it, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1111426937


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +274,75 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public synchronized void decommission() {
+    if (isDecommissioning()) {
+      LOG.info("Shuffle Server is decommissioning. Nothing needs to be done.");
+      return;
+    }
+    if (!ServerStatus.ACTIVE.equals(serverStatus)) {
+      throw new InvalidRequestException(
+          "Shuffle Server is processing other procedures, current status:" + serverStatus);
+    }
+    serverStatus = ServerStatus.DECOMMISSIONING;
+    LOG.info("Shuffle Server is decommissioning.");
+    if (executorService == null) {
+      executorService = Executors.newSingleThreadExecutor(
+          ThreadUtils.getThreadFactory("shuffle-server-decommission-%d"));
+    }
+    decommissionFuture = executorService.submit(this::waitDecommissionFinish);
+  }
+
+  private void waitDecommissionFinish() {
+    long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+    boolean shutdownAfterDecommission = shuffleServerConf.get(SERVER_DECOMMISSION_SHUTDOWN);
+    int remainApplicationNum;
+    while (isDecommissioning()) {
+      remainApplicationNum = shuffleTaskManager.getAppIds().size();
+      if (remainApplicationNum == 0) {
+        serverStatus = ServerStatus.DECOMMISSIONED;
+        LOG.info("All applications finished. Current status is " + serverStatus);
+        if (shutdownAfterDecommission) {
+          LOG.info("Exiting...");
+          try {
+            stopServer();
+          } catch (Exception e) {
+            ExitUtils.terminate(1, "Stop server failed!", e, LOG);
+          }
+        }
+        break;
+      }
+      LOG.info("Shuffle server is decommissioning, remaining {} applications not finished.", remainApplicationNum);
+      try {
+        Thread.sleep(checkInterval);
+      } catch (InterruptedException e) {
+        LOG.warn("Ignore the InterruptedException which should be caused by internal kill.");
+      }
+    }
+    remainApplicationNum = shuffleTaskManager.getAppIds().size();
+    if (remainApplicationNum > 0) {
+      LOG.info("Decommission exiting, remaining {} applications not finished.",
+          remainApplicationNum);
+    }
+  }
+
+  public synchronized void cancelDecommission() {
+    if (!isDecommissioning()) {
+      LOG.info("Shuffle server is not decommissioning. Nothing needs to be done.");
+      return;
+    }
+    if (decommissionFuture.cancel(true)) {
+      serverStatus = ServerStatus.ACTIVE;

Review Comment:
   When the server is already `Decommissioned`, cancel decommission should bring the shuffle server to active state. Current impl is wrong then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1108212383


##########
common/src/main/java/org/apache/uniffle/common/ServerStatus.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common;
+
+public enum ServerStatus {
+  NORMAL_STATUS,

Review Comment:
   `NORMAL` has been used. Protobuf does not allow to use the same enum value in different enumerations of a package.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1108216023


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -78,6 +82,10 @@ public class ShuffleServer {
   private AtomicBoolean isHealthy = new AtomicBoolean(true);
   private GRPCMetrics grpcMetrics;
   private MetricReporter metricReporter;
+  private Thread decommissionThread;
+  private ServerStatus serverStatus = ServerStatus.NORMAL_STATUS;
+  private Object statusLock = new Object();
+  private boolean running;

Review Comment:
   Feels unnecessary, It just for UT now. But it is OK for me too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1113008892


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -149,6 +163,7 @@ public void stopServer() throws Exception {
     }
     SecurityContextFactory.get().getSecurityContext().close();
     server.stop();
+    running = false;

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1111508393


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +274,75 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public synchronized void decommission() {
+    if (isDecommissioning()) {
+      LOG.info("Shuffle Server is decommissioning. Nothing needs to be done.");
+      return;
+    }
+    if (!ServerStatus.ACTIVE.equals(serverStatus)) {
+      throw new InvalidRequestException(
+          "Shuffle Server is processing other procedures, current status:" + serverStatus);
+    }
+    serverStatus = ServerStatus.DECOMMISSIONING;
+    LOG.info("Shuffle Server is decommissioning.");
+    if (executorService == null) {
+      executorService = Executors.newSingleThreadExecutor(
+          ThreadUtils.getThreadFactory("shuffle-server-decommission-%d"));
+    }
+    decommissionFuture = executorService.submit(this::waitDecommissionFinish);
+  }
+
+  private void waitDecommissionFinish() {
+    long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+    boolean shutdownAfterDecommission = shuffleServerConf.get(SERVER_DECOMMISSION_SHUTDOWN);
+    int remainApplicationNum;
+    while (isDecommissioning()) {
+      remainApplicationNum = shuffleTaskManager.getAppIds().size();
+      if (remainApplicationNum == 0) {
+        serverStatus = ServerStatus.DECOMMISSIONED;
+        LOG.info("All applications finished. Current status is " + serverStatus);
+        if (shutdownAfterDecommission) {
+          LOG.info("Exiting...");
+          try {
+            stopServer();
+          } catch (Exception e) {
+            ExitUtils.terminate(1, "Stop server failed!", e, LOG);
+          }
+        }
+        break;
+      }
+      LOG.info("Shuffle server is decommissioning, remaining {} applications not finished.", remainApplicationNum);
+      try {
+        Thread.sleep(checkInterval);
+      } catch (InterruptedException e) {
+        LOG.warn("Ignore the InterruptedException which should be caused by internal kill.");
+      }
+    }
+    remainApplicationNum = shuffleTaskManager.getAppIds().size();
+    if (remainApplicationNum > 0) {
+      LOG.info("Decommission exiting, remaining {} applications not finished.",
+          remainApplicationNum);
+    }
+  }
+
+  public synchronized void cancelDecommission() {
+    if (!isDecommissioning()) {
+      LOG.info("Shuffle server is not decommissioning. Nothing needs to be done.");
+      return;
+    }
+    if (decommissionFuture.cancel(true)) {
+      serverStatus = ServerStatus.ACTIVE;

Review Comment:
   > > When the server is already `Decommissioned`, cancel decommission should bring the shuffle server to active state. Current impl is wrong then.
   > 
   > OK, I misread what "cancel" means. @xianjingfeng could you add "cancel decommission" part and "server state transitions" into the design doc please?
   
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1112011885


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +274,79 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public synchronized void decommission() {
+    if (isDecommissioning()) {
+      LOG.info("Shuffle Server is decommissioning. Nothing needs to be done.");
+      return;
+    }
+    if (!ServerStatus.ACTIVE.equals(serverStatus)) {
+      throw new InvalidRequestException(
+          "Shuffle Server is processing other procedures, current status:" + serverStatus);
+    }
+    serverStatus = ServerStatus.DECOMMISSIONING;
+    LOG.info("Shuffle Server is decommissioning.");
+    if (executorService == null) {
+      executorService = Executors.newSingleThreadExecutor(
+          ThreadUtils.getThreadFactory("shuffle-server-decommission-%d"));
+    }
+    decommissionFuture = executorService.submit(this::waitDecommissionFinish);
+  }
+
+  private void waitDecommissionFinish() {
+    long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+    boolean shutdownAfterDecommission = shuffleServerConf.get(SERVER_DECOMMISSION_SHUTDOWN);
+    int remainApplicationNum;
+    while (isDecommissioning()) {
+      remainApplicationNum = shuffleTaskManager.getAppIds().size();
+      if (remainApplicationNum == 0) {
+        serverStatus = ServerStatus.DECOMMISSIONED;
+        LOG.info("All applications finished. Current status is " + serverStatus);
+        if (shutdownAfterDecommission) {
+          LOG.info("Exiting...");
+          try {
+            stopServer();
+          } catch (Exception e) {
+            ExitUtils.terminate(1, "Stop server failed!", e, LOG);
+          }
+        }
+        break;
+      }
+      LOG.info("Shuffle server is decommissioning, remaining {} applications not finished.", remainApplicationNum);
+      try {
+        Thread.sleep(checkInterval);
+      } catch (InterruptedException e) {
+        LOG.warn("Ignore the InterruptedException which should be caused by internal kill.");

Review Comment:
   The logging is better. And it's better to break earlier. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "zuston (via GitHub)" <gi...@apache.org>.
zuston commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1108206353


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +269,69 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public void decommission() {

Review Comment:
   why not using `public synchronize void decommission() {}`



##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +269,69 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public void decommission() {
+    checkStatusForDecommission();
+    synchronized (statusLock) {
+      checkStatusForDecommission();
+      long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+      decommissionThread = new Thread(() -> {
+        while (isDecommissioning()) {

Review Comment:
   When server is cancelled decommission, it's better to log this.



##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -332,4 +405,13 @@ public boolean isHealthy() {
   public GRPCMetrics getGrpcMetrics() {
     return grpcMetrics;
   }
+
+  public boolean isDecommissioning() {
+    return ServerStatus.DECOMMISSIONING.equals(serverStatus);
+  }
+
+  public boolean isRunning() {

Review Comment:
   If this is only used in test cases, please indicate it with `@visibleForTest`



##########
common/src/main/java/org/apache/uniffle/common/ServerStatus.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common;
+
+public enum ServerStatus {
+  NORMAL_STATUS,
+  DECOMMISSIONING;

Review Comment:
   How about change `NORMAL_STATUS` to `RUNNING` ?



##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -78,6 +82,10 @@ public class ShuffleServer {
   private AtomicBoolean isHealthy = new AtomicBoolean(true);
   private GRPCMetrics grpcMetrics;
   private MetricReporter metricReporter;
+  private Thread decommissionThread;
+  private ServerStatus serverStatus = ServerStatus.NORMAL_STATUS;

Review Comment:
   This status should be marked as `volatile,` which will be used in different threads. Right



##########
common/src/main/java/org/apache/uniffle/common/ServerStatus.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common;
+
+public enum ServerStatus {
+  NORMAL_STATUS,
+  DECOMMISSIONING;

Review Comment:
   > is it possible to add a DECOMMISSIONED status here?
   For the state that all the app has already been finished, but the shuffle server is configured to not exit itself?
   
   +1. Because maybe in the future, we will have a dashboard to show those decommissioned server. This could indicate the former state of servers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1108213031


##########
common/src/main/java/org/apache/uniffle/common/ServerStatus.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common;
+
+public enum ServerStatus {
+  NORMAL_STATUS,
+  DECOMMISSIONING;

Review Comment:
   such as:
   1. debug purpose: let the abnormal shuffle server stops accept new shuffle assignments but it kept running so ops/rds has the time to login into the server/pod to do some debugging.
   2. manually make a node inactive if the shuffle assignment strategy cannot distributed evenly for some corner cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#issuecomment-1436750079

   @xianjingfeng Hi, could you rebase you code with the latest master? 
   The CI is broken which has been addressed in #631 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1109249825


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java:
##########
@@ -356,6 +356,12 @@ public class ShuffleServerConf extends RssBaseConf {
       .withDescription("The memory usage limit ratio for huge partition, it will only triggered when partition's "
           + "size exceeds the threshold of '" + HUGE_PARTITION_SIZE_THRESHOLD.key() + "'");
 
+  public static final ConfigOption<Long> SERVER_DECOMMISSION_CHECK_INTERVAL = ConfigOptions
+      .key("rss.server.decommission.check.interval")
+      .longType()
+      .defaultValue(60 * 1000L)

Review Comment:
   > Do you mean check if the value is valid?
   
   Yes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1109337810


##########
common/src/main/java/org/apache/uniffle/common/ServerStatus.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common;
+
+public enum ServerStatus {
+  UNKNOWN(-1),
+  NORMAL_STATUS(0),

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] kaijchen commented on pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "kaijchen (via GitHub)" <gi...@apache.org>.
kaijchen commented on PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#issuecomment-1434836378

   > Will it better to use a ExecutorService and use future.cancel() and ExecutorService.shutdownNow() for termination?
   
   Hi @xianjingfeng, sorry for not saying it clearly. Maybe `ForkJoinPool.commonPool()` is more appropriate here.
   
   ```diff
   diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
   index ee8f0775..0c5ecd46 100644
   --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
   +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
   @@ -20,8 +20,8 @@ package org.apache.uniffle.server;
    import java.util.Collections;
    import java.util.List;
    import java.util.Set;
   -import java.util.concurrent.ExecutorService;
   -import java.util.concurrent.Executors;
   +import java.util.concurrent.ForkJoinPool;
   +import java.util.concurrent.Future;
    import java.util.concurrent.atomic.AtomicBoolean;
   
    import com.google.common.annotations.VisibleForTesting;
   @@ -46,7 +46,6 @@ import org.apache.uniffle.common.security.SecurityContextFactory;
    import org.apache.uniffle.common.util.Constants;
    import org.apache.uniffle.common.util.ExitUtils;
    import org.apache.uniffle.common.util.RssUtils;
   -import org.apache.uniffle.common.util.ThreadUtils;
    import org.apache.uniffle.common.web.CommonMetricsServlet;
    import org.apache.uniffle.common.web.JettyServer;
    import org.apache.uniffle.server.buffer.ShuffleBufferManager;
   @@ -88,7 +87,7 @@ public class ShuffleServer {
      private MetricReporter metricReporter;
      private volatile ServerStatus serverStatus = ServerStatus.ACTIVE;
      private volatile boolean running;
   -  private ExecutorService executorService;
   +  Future<?> decommissionFuture;
   
      public ShuffleServer(ShuffleServerConf shuffleServerConf) throws Exception {
        this.shuffleServerConf = shuffleServerConf;
   @@ -287,11 +286,7 @@ public class ShuffleServer {
        }
        serverStatus = ServerStatus.DECOMMISSIONING;
        LOG.info("Shuffle Server is decommissioning.");
   -    if (executorService == null) {
   -      executorService = Executors.newSingleThreadExecutor(
   -          ThreadUtils.getThreadFactory("shuffle-server-decommission-%d"));
   -    }
   -    executorService.submit(this::waitDecommissionFinish);
   +    decommissionFuture = ForkJoinPool.commonPool().submit(this::waitDecommissionFinish);
      }
   
      private void waitDecommissionFinish() {
   @@ -332,8 +327,12 @@ public class ShuffleServer {
          LOG.info("Shuffle server is not decommissioning. Nothing needs to be done.");
          return;
        }
   -    serverStatus = ServerStatus.ACTIVE;
   -    LOG.info("Decommission canceled.");
   +    if (decommissionFuture.cancel(true)) {
   +      serverStatus = ServerStatus.ACTIVE;
   +      LOG.info("Decommission canceled.");
   +    } else {
   +      LOG.warn("Failed to cancel decommission.");
   +    }
      }
   
      public String getIp() {
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1108157458


##########
common/src/main/java/org/apache/uniffle/common/ServerStatus.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common;
+
+public enum ServerStatus {
+  NORMAL_STATUS,
+  DECOMMISSIONING;

Review Comment:
   is it possible to add a `DECOMMISSIONED` status here? 
   For the state that all the app has already been finished, but the shuffle server is configured to not exit itself?



##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +269,69 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public void decommission() {
+    checkStatusForDecommission();
+    synchronized (statusLock) {
+      checkStatusForDecommission();

Review Comment:
   why double check here?



##########
server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java:
##########
@@ -79,4 +77,47 @@ public void startTest() {
     }
 
   }
+
+  @Test
+  public void decommissionTest() throws Exception {
+    ShuffleServer shuffleServer = createShuffleServer();
+    assertEquals(ServerStatus.NORMAL_STATUS, shuffleServer.getServerStatus());
+    try {
+      shuffleServer.cancelDecommission();
+      fail(EXPECTED_EXCEPTION_MESSAGE);
+    } catch (Exception e) {
+      assertTrue(e instanceof InvalidRequestException);
+    }
+    ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
+    String appId = "decommissionTest_appId";
+    shuffleTaskManager.registerShuffle(appId, 0, Lists.newArrayList(), new RemoteStorageInfo("/tmp"), "");
+    shuffleServer.decommission();
+    try {
+      assertEquals(ServerStatus.DECOMMISSIONING, shuffleServer.getServerStatus());
+      shuffleServer.decommission();
+      fail(EXPECTED_EXCEPTION_MESSAGE);
+    } catch (Exception e) {
+      assertTrue(e instanceof InvalidRequestException);
+    }
+    shuffleServer.cancelDecommission();
+    shuffleTaskManager.removeShuffleDataSync(appId, 0);
+    assertEquals(ServerStatus.NORMAL_STATUS, shuffleServer.getServerStatus());

Review Comment:
   You may decrease the `rss.server.decommission.check.interval` and make sure the shuffle server is still running after `cancelDecommision` and the check interval.



##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +269,69 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public void decommission() {
+    checkStatusForDecommission();
+    synchronized (statusLock) {
+      checkStatusForDecommission();
+      long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+      decommissionThread = new Thread(() -> {
+        while (isDecommissioning()) {

Review Comment:
   I don't think this is correct. 
   
   this code is ran under a different thread, which means it's not guided by the `statusLock`. The `cancelDecommission` set the status may never be reflected in this thread.
   
   I believe using `synchronized public isDecommissioning()` is easy and clear.



##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +269,69 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public void decommission() {
+    checkStatusForDecommission();

Review Comment:
   I believe `decommission` should be idempotent? 
   
   Once the shuffle server is entering `decomissioning` state, following calls should just do nothing, rather than throw an exception? 



##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +269,69 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public void decommission() {
+    checkStatusForDecommission();
+    synchronized (statusLock) {
+      checkStatusForDecommission();
+      long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+      decommissionThread = new Thread(() -> {
+        while (isDecommissioning()) {
+          int remainApplicationNum = shuffleTaskManager.getAppIds().size();
+          if (remainApplicationNum == 0) {
+            LOG.info("all applications finished, exit now");
+            try {
+              stopServer();
+              break;
+            } catch (Exception e) {
+              ExitUtils.terminate(1, "Stop server failed!", e, LOG);
+            }
+          }
+          LOG.info("Shuffle server is decommissioning. remain {} applications not finished.", remainApplicationNum);
+          try {
+            Thread.sleep(checkInterval);
+          } catch (InterruptedException e) {
+            LOG.warn("Ignore the InterruptedException which should be caused by internal killed");
+          }
+        }
+      });
+      decommissionThread.setName("decommission");
+      decommissionThread.start();
+      serverStatus = ServerStatus.DECOMMISSIONING;
+    }
+  }
+
+  private void checkStatusForDecommission() {
+    if (isDecommissioning()) {
+      throw new InvalidRequestException("Shuffle Server is decommissioning. Nothing need to do.");
+    }
+    if (!ServerStatus.NORMAL_STATUS.equals(serverStatus)) {
+      throw new InvalidRequestException(
+          "Shuffle Server is processing other procedures, current status:" + serverStatus);
+    }
+  }
+
+  public void cancelDecommission() {
+    checkStatusForCancelDecommission();
+    synchronized (statusLock) {
+      checkStatusForCancelDecommission();
+      if (decommissionThread != null) {
+        decommissionThread.interrupt();
+        decommissionThread = null;
+      }
+      serverStatus = ServerStatus.NORMAL_STATUS;

Review Comment:
   In practice, we update the serverStatus first, then interrupt decommissionThread.



##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +269,69 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public void decommission() {
+    checkStatusForDecommission();
+    synchronized (statusLock) {
+      checkStatusForDecommission();
+      long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+      decommissionThread = new Thread(() -> {
+        while (isDecommissioning()) {
+          int remainApplicationNum = shuffleTaskManager.getAppIds().size();
+          if (remainApplicationNum == 0) {
+            LOG.info("all applications finished, exit now");
+            try {
+              stopServer();
+              break;
+            } catch (Exception e) {
+              ExitUtils.terminate(1, "Stop server failed!", e, LOG);
+            }
+          }
+          LOG.info("Shuffle server is decommissioning. remain {} applications not finished.", remainApplicationNum);
+          try {
+            Thread.sleep(checkInterval);
+          } catch (InterruptedException e) {
+            LOG.warn("Ignore the InterruptedException which should be caused by internal killed");
+          }
+        }
+      });
+      decommissionThread.setName("decommission");

Review Comment:
   Let's be a bit more specific: "shuffle-server-decommission";



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] codecov-commenter commented on pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#issuecomment-1432444738

   # [Codecov](https://codecov.io/gh/apache/incubator-uniffle/pull/606?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#606](https://codecov.io/gh/apache/incubator-uniffle/pull/606?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (f42c756) into [master](https://codecov.io/gh/apache/incubator-uniffle/commit/7a8cdb044ddf19a5f659bd3879945fb41cd9329c?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (7a8cdb0) will **increase** coverage by `2.32%`.
   > The diff coverage is `65.57%`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master     #606      +/-   ##
   ============================================
   + Coverage     60.85%   63.17%   +2.32%     
   - Complexity     1800     1813      +13     
   ============================================
     Files           214      202      -12     
     Lines         12387    10477    -1910     
     Branches       1044     1047       +3     
   ============================================
   - Hits           7538     6619     -919     
   + Misses         4444     3511     -933     
   + Partials        405      347      -58     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-uniffle/pull/606?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...n/java/org/apache/uniffle/common/ServerStatus.java](https://codecov.io/gh/apache/incubator-uniffle/pull/606?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL2NvbW1vbi9TZXJ2ZXJTdGF0dXMuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...ffle/common/exception/InvalidRequestException.java](https://codecov.io/gh/apache/incubator-uniffle/pull/606?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL2NvbW1vbi9leGNlcHRpb24vSW52YWxpZFJlcXVlc3RFeGNlcHRpb24uamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [.../java/org/apache/uniffle/server/ShuffleServer.java](https://codecov.io/gh/apache/incubator-uniffle/pull/606?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL3NlcnZlci9TaHVmZmxlU2VydmVyLmphdmE=) | `66.15% <71.42%> (+1.77%)` | :arrow_up: |
   | [...a/org/apache/uniffle/server/ShuffleServerConf.java](https://codecov.io/gh/apache/incubator-uniffle/pull/606?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL3NlcnZlci9TaHVmZmxlU2VydmVyQ29uZi5qYXZh) | `99.32% <100.00%> (+0.01%)` | :arrow_up: |
   | [deploy/kubernetes/operator/pkg/utils/certs.go](https://codecov.io/gh/apache/incubator-uniffle/pull/606?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGVwbG95L2t1YmVybmV0ZXMvb3BlcmF0b3IvcGtnL3V0aWxzL2NlcnRzLmdv) | | |
   | [...pkg/controller/sync/shuffleserver/shuffleserver.go](https://codecov.io/gh/apache/incubator-uniffle/pull/606?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGVwbG95L2t1YmVybmV0ZXMvb3BlcmF0b3IvcGtnL2NvbnRyb2xsZXIvc3luYy9zaHVmZmxlc2VydmVyL3NodWZmbGVzZXJ2ZXIuZ28=) | | |
   | [deploy/kubernetes/operator/pkg/utils/config.go](https://codecov.io/gh/apache/incubator-uniffle/pull/606?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGVwbG95L2t1YmVybmV0ZXMvb3BlcmF0b3IvcGtnL3V0aWxzL2NvbmZpZy5nbw==) | | |
   | [...tor/pkg/controller/sync/coordinator/coordinator.go](https://codecov.io/gh/apache/incubator-uniffle/pull/606?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGVwbG95L2t1YmVybmV0ZXMvb3BlcmF0b3IvcGtnL2NvbnRyb2xsZXIvc3luYy9jb29yZGluYXRvci9jb29yZGluYXRvci5nbw==) | | |
   | [deploy/kubernetes/operator/pkg/utils/rss.go](https://codecov.io/gh/apache/incubator-uniffle/pull/606?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGVwbG95L2t1YmVybmV0ZXMvb3BlcmF0b3IvcGtnL3V0aWxzL3Jzcy5nbw==) | | |
   | [...rnetes/operator/pkg/webhook/inspector/inspector.go](https://codecov.io/gh/apache/incubator-uniffle/pull/606?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGVwbG95L2t1YmVybmV0ZXMvb3BlcmF0b3IvcGtnL3dlYmhvb2svaW5zcGVjdG9yL2luc3BlY3Rvci5nbw==) | | |
   | ... and [10 more](https://codecov.io/gh/apache/incubator-uniffle/pull/606?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1108428176


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +269,69 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public void decommission() {
+    checkStatusForDecommission();

Review Comment:
   what do you in mind the `Upgrading` status mean? when the shuffle server would be upgrading?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1109227368


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +269,69 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public void decommission() {
+    checkStatusForDecommission();

Review Comment:
   I mean if the server is not decomissioning and not in normal status(such as upgrading), decommisson command should be reject. How about if server is decomissioning, we do nothing, but if server is upgrading, throw an exception?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1108191126


##########
common/src/main/java/org/apache/uniffle/common/ServerStatus.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common;
+
+public enum ServerStatus {
+  NORMAL_STATUS,
+  DECOMMISSIONING;

Review Comment:
   > For the state that all the app has already been finished, but the shuffle server is configured to not exit itself?
   
   In what case it don't need to exit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1108193380


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +269,69 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public void decommission() {
+    checkStatusForDecommission();

Review Comment:
   There will be other statuses in the future, such as `Upgrading`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1108219704


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +269,69 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public void decommission() {
+    checkStatusForDecommission();

Review Comment:
   it doesn't matter that much?
   
   Upgrading could also been decommissoned?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1108243013


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +269,69 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public void decommission() {
+    checkStatusForDecommission();

Review Comment:
   If current status is `Upgrading`, and than invoke decommisson, the status will be changed to `decommissoning`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1109330861


##########
common/src/main/java/org/apache/uniffle/common/ServerStatus.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common;
+
+public enum ServerStatus {
+  UNKNOWN(-1),
+  NORMAL_STATUS(0),
+  DECOMMISSIONING(1),
+  DECOMMISSIONED(2);
+
+  private final int code;

Review Comment:
   Yes. We need to use it to make `ServerStatus` consistent with that defined in protobuf in next pr.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] kaijchen commented on pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "kaijchen (via GitHub)" <gi...@apache.org>.
kaijchen commented on PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#issuecomment-1435491836

   > What are the benefits?
   
   Keeping a thread pool (singleThreadExecutor) for an uncommon task sounds inefficient.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] kaijchen commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "kaijchen (via GitHub)" <gi...@apache.org>.
kaijchen commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1111419980


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +274,75 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public synchronized void decommission() {
+    if (isDecommissioning()) {
+      LOG.info("Shuffle Server is decommissioning. Nothing needs to be done.");
+      return;
+    }
+    if (!ServerStatus.ACTIVE.equals(serverStatus)) {
+      throw new InvalidRequestException(
+          "Shuffle Server is processing other procedures, current status:" + serverStatus);
+    }
+    serverStatus = ServerStatus.DECOMMISSIONING;
+    LOG.info("Shuffle Server is decommissioning.");
+    if (executorService == null) {
+      executorService = Executors.newSingleThreadExecutor(
+          ThreadUtils.getThreadFactory("shuffle-server-decommission-%d"));
+    }
+    decommissionFuture = executorService.submit(this::waitDecommissionFinish);
+  }
+
+  private void waitDecommissionFinish() {
+    long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+    boolean shutdownAfterDecommission = shuffleServerConf.get(SERVER_DECOMMISSION_SHUTDOWN);
+    int remainApplicationNum;
+    while (isDecommissioning()) {
+      remainApplicationNum = shuffleTaskManager.getAppIds().size();
+      if (remainApplicationNum == 0) {
+        serverStatus = ServerStatus.DECOMMISSIONED;
+        LOG.info("All applications finished. Current status is " + serverStatus);
+        if (shutdownAfterDecommission) {
+          LOG.info("Exiting...");
+          try {
+            stopServer();
+          } catch (Exception e) {
+            ExitUtils.terminate(1, "Stop server failed!", e, LOG);
+          }
+        }
+        break;
+      }
+      LOG.info("Shuffle server is decommissioning, remaining {} applications not finished.", remainApplicationNum);
+      try {
+        Thread.sleep(checkInterval);
+      } catch (InterruptedException e) {
+        LOG.warn("Ignore the InterruptedException which should be caused by internal kill.");
+      }
+    }
+    remainApplicationNum = shuffleTaskManager.getAppIds().size();
+    if (remainApplicationNum > 0) {
+      LOG.info("Decommission exiting, remaining {} applications not finished.",
+          remainApplicationNum);
+    }
+  }
+
+  public synchronized void cancelDecommission() {
+    if (!isDecommissioning()) {
+      LOG.info("Shuffle server is not decommissioning. Nothing needs to be done.");
+      return;
+    }
+    if (decommissionFuture.cancel(true)) {
+      serverStatus = ServerStatus.ACTIVE;

Review Comment:
   What if server status is DECOMMISSIONED?
   `isDecommissioning()` will return true and the cancel decommission logic will be executed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#issuecomment-1435483433

   > Maybe `ForkJoinPool.commonPool()` is more appropriate here.
   
   What are the benefits?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1111400342


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +274,75 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public synchronized void decommission() {
+    if (isDecommissioning()) {
+      LOG.info("Shuffle Server is decommissioning. Nothing needs to be done.");
+      return;
+    }
+    if (!ServerStatus.ACTIVE.equals(serverStatus)) {
+      throw new InvalidRequestException(
+          "Shuffle Server is processing other procedures, current status:" + serverStatus);
+    }
+    serverStatus = ServerStatus.DECOMMISSIONING;
+    LOG.info("Shuffle Server is decommissioning.");
+    if (executorService == null) {
+      executorService = Executors.newSingleThreadExecutor(
+          ThreadUtils.getThreadFactory("shuffle-server-decommission-%d"));
+    }
+    decommissionFuture = executorService.submit(this::waitDecommissionFinish);
+  }
+
+  private void waitDecommissionFinish() {
+    long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+    boolean shutdownAfterDecommission = shuffleServerConf.get(SERVER_DECOMMISSION_SHUTDOWN);
+    int remainApplicationNum;
+    while (isDecommissioning()) {
+      remainApplicationNum = shuffleTaskManager.getAppIds().size();
+      if (remainApplicationNum == 0) {
+        serverStatus = ServerStatus.DECOMMISSIONED;
+        LOG.info("All applications finished. Current status is " + serverStatus);
+        if (shutdownAfterDecommission) {
+          LOG.info("Exiting...");
+          try {
+            stopServer();
+          } catch (Exception e) {
+            ExitUtils.terminate(1, "Stop server failed!", e, LOG);
+          }
+        }
+        break;
+      }
+      LOG.info("Shuffle server is decommissioning, remaining {} applications not finished.", remainApplicationNum);
+      try {
+        Thread.sleep(checkInterval);
+      } catch (InterruptedException e) {
+        LOG.warn("Ignore the InterruptedException which should be caused by internal kill.");
+      }
+    }
+    remainApplicationNum = shuffleTaskManager.getAppIds().size();
+    if (remainApplicationNum > 0) {
+      LOG.info("Decommission exiting, remaining {} applications not finished.",
+          remainApplicationNum);
+    }
+  }
+
+  public synchronized void cancelDecommission() {
+    if (!isDecommissioning()) {
+      LOG.info("Shuffle server is not decommissioning. Nothing needs to be done.");
+      return;
+    }
+    if (decommissionFuture.cancel(true)) {
+      serverStatus = ServerStatus.ACTIVE;

Review Comment:
   After a second thought, `serverStatus = ServerStatus.ACTIVE;` should before the `if` block. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] kaijchen commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "kaijchen (via GitHub)" <gi...@apache.org>.
kaijchen commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1111411169


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +274,75 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public synchronized void decommission() {
+    if (isDecommissioning()) {
+      LOG.info("Shuffle Server is decommissioning. Nothing needs to be done.");
+      return;
+    }
+    if (!ServerStatus.ACTIVE.equals(serverStatus)) {
+      throw new InvalidRequestException(
+          "Shuffle Server is processing other procedures, current status:" + serverStatus);
+    }
+    serverStatus = ServerStatus.DECOMMISSIONING;
+    LOG.info("Shuffle Server is decommissioning.");
+    if (executorService == null) {
+      executorService = Executors.newSingleThreadExecutor(
+          ThreadUtils.getThreadFactory("shuffle-server-decommission-%d"));
+    }
+    decommissionFuture = executorService.submit(this::waitDecommissionFinish);
+  }
+
+  private void waitDecommissionFinish() {
+    long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+    boolean shutdownAfterDecommission = shuffleServerConf.get(SERVER_DECOMMISSION_SHUTDOWN);
+    int remainApplicationNum;
+    while (isDecommissioning()) {
+      remainApplicationNum = shuffleTaskManager.getAppIds().size();
+      if (remainApplicationNum == 0) {
+        serverStatus = ServerStatus.DECOMMISSIONED;
+        LOG.info("All applications finished. Current status is " + serverStatus);
+        if (shutdownAfterDecommission) {
+          LOG.info("Exiting...");
+          try {
+            stopServer();
+          } catch (Exception e) {
+            ExitUtils.terminate(1, "Stop server failed!", e, LOG);
+          }
+        }
+        break;
+      }
+      LOG.info("Shuffle server is decommissioning, remaining {} applications not finished.", remainApplicationNum);
+      try {
+        Thread.sleep(checkInterval);
+      } catch (InterruptedException e) {
+        LOG.warn("Ignore the InterruptedException which should be caused by internal kill.");
+      }
+    }
+    remainApplicationNum = shuffleTaskManager.getAppIds().size();
+    if (remainApplicationNum > 0) {
+      LOG.info("Decommission exiting, remaining {} applications not finished.",
+          remainApplicationNum);
+    }
+  }
+
+  public synchronized void cancelDecommission() {
+    if (!isDecommissioning()) {
+      LOG.info("Shuffle server is not decommissioning. Nothing needs to be done.");
+      return;
+    }
+    if (decommissionFuture.cancel(true)) {
+      serverStatus = ServerStatus.ACTIVE;

Review Comment:
   I'm not sure, "failed to cancel decommission" usually means the task already completed.
   
   Also we should only set `decommissionFuture` to null when `serverStatus` is changed to ACTIVE.
   ```
       serverStatus = ServerStatus.ACTIVE;
       decommissionFuture = null;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1108226871


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +269,69 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public void decommission() {
+    checkStatusForDecommission();
+    synchronized (statusLock) {
+      checkStatusForDecommission();
+      long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+      decommissionThread = new Thread(() -> {
+        while (isDecommissioning()) {
+          int remainApplicationNum = shuffleTaskManager.getAppIds().size();
+          if (remainApplicationNum == 0) {
+            LOG.info("all applications finished, exit now");
+            try {
+              stopServer();
+              break;
+            } catch (Exception e) {
+              ExitUtils.terminate(1, "Stop server failed!", e, LOG);
+            }
+          }
+          LOG.info("Shuffle server is decommissioning. remain {} applications not finished.", remainApplicationNum);
+          try {
+            Thread.sleep(checkInterval);
+          } catch (InterruptedException e) {
+            LOG.warn("Ignore the InterruptedException which should be caused by internal killed");
+          }
+        }
+      });
+      decommissionThread.setName("decommission");
+      decommissionThread.start();
+      serverStatus = ServerStatus.DECOMMISSIONING;
+    }
+  }
+
+  private void checkStatusForDecommission() {
+    if (isDecommissioning()) {
+      throw new InvalidRequestException("Shuffle Server is decommissioning. Nothing need to do.");
+    }
+    if (!ServerStatus.NORMAL_STATUS.equals(serverStatus)) {
+      throw new InvalidRequestException(
+          "Shuffle Server is processing other procedures, current status:" + serverStatus);
+    }
+  }
+
+  public void cancelDecommission() {
+    checkStatusForCancelDecommission();
+    synchronized (statusLock) {
+      checkStatusForCancelDecommission();
+      if (decommissionThread != null) {
+        decommissionThread.interrupt();

Review Comment:
   It's OK for me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#issuecomment-1435980517

   > Edit: maybe just use a `Future` for cancel the task.
   
   +1 for this.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "zuston (via GitHub)" <gi...@apache.org>.
zuston commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1108260992


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +269,69 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public void decommission() {

Review Comment:
   But this is not in a critical path, the cost of `synchronize` is acceptable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1109230080


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +269,69 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public void decommission() {
+    checkStatusForDecommission();
+    synchronized (statusLock) {
+      checkStatusForDecommission();
+      long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+      decommissionThread = new Thread(() -> {
+        while (isDecommissioning()) {

Review Comment:
   > possible and should be. But what about `synchronized public isDecommissioning()`?
   
   It seems unnecessary if volatile is enough. But both ways are OK for me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1108340206


##########
common/src/main/java/org/apache/uniffle/common/ServerStatus.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common;
+
+public enum ServerStatus {
+  NORMAL_STATUS,

Review Comment:
   https://chys.info/blog/2017-12-05-protobuf-enum-default-value
   There are some cases, we need to notice.



##########
server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java:
##########
@@ -356,6 +356,12 @@ public class ShuffleServerConf extends RssBaseConf {
       .withDescription("The memory usage limit ratio for huge partition, it will only triggered when partition's "
           + "size exceeds the threshold of '" + HUGE_PARTITION_SIZE_THRESHOLD.key() + "'");
 
+  public static final ConfigOption<Long> SERVER_DECOMMISSION_CHECK_INTERVAL = ConfigOptions
+      .key("rss.server.decommission.check.interval")
+      .longType()
+      .defaultValue(60 * 1000L)

Review Comment:
   Do we need `checkValue`?



##########
server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java:
##########
@@ -356,6 +356,12 @@ public class ShuffleServerConf extends RssBaseConf {
       .withDescription("The memory usage limit ratio for huge partition, it will only triggered when partition's "
           + "size exceeds the threshold of '" + HUGE_PARTITION_SIZE_THRESHOLD.key() + "'");
 
+  public static final ConfigOption<Long> SERVER_DECOMMISSION_CHECK_INTERVAL = ConfigOptions
+      .key("rss.server.decommission.check.interval")
+      .longType()
+      .defaultValue(60 * 1000L)
+      .withDescription("The interval to check if all applications have finish when server decommissioning");

Review Comment:
   Do we need to add some document?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1109248628


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java:
##########
@@ -356,6 +356,12 @@ public class ShuffleServerConf extends RssBaseConf {
       .withDescription("The memory usage limit ratio for huge partition, it will only triggered when partition's "
           + "size exceeds the threshold of '" + HUGE_PARTITION_SIZE_THRESHOLD.key() + "'");
 
+  public static final ConfigOption<Long> SERVER_DECOMMISSION_CHECK_INTERVAL = ConfigOptions
+      .key("rss.server.decommission.check.interval")
+      .longType()
+      .defaultValue(60 * 1000L)

Review Comment:
   Do you mean check if the value is valid?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1108203548


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +269,69 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public void decommission() {
+    checkStatusForDecommission();
+    synchronized (statusLock) {
+      checkStatusForDecommission();
+      long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+      decommissionThread = new Thread(() -> {
+        while (isDecommissioning()) {

Review Comment:
   At present, the purpose of `statusLock` is to prevent the thread from being created repeatedly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1108220801


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +269,69 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public void decommission() {
+    checkStatusForDecommission();
+    synchronized (statusLock) {
+      checkStatusForDecommission();

Review Comment:
   I think decommission is very low frequency operation. We don't need to care lock contention that much?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1108252249


##########
common/src/main/java/org/apache/uniffle/common/ServerStatus.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common;
+
+public enum ServerStatus {
+  NORMAL_STATUS,
+  DECOMMISSIONING;

Review Comment:
   
   
   > How about change `NORMAL_STATUS` to `RUNNING` ?
   
   I have considered it before, but i think it is inappropriate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1109276611


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +269,69 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public void decommission() {
+    checkStatusForDecommission();
+    synchronized (statusLock) {
+      checkStatusForDecommission();
+      long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+      decommissionThread = new Thread(() -> {
+        while (isDecommissioning()) {
+          int remainApplicationNum = shuffleTaskManager.getAppIds().size();
+          if (remainApplicationNum == 0) {
+            LOG.info("all applications finished, exit now");
+            try {
+              stopServer();
+              break;
+            } catch (Exception e) {
+              ExitUtils.terminate(1, "Stop server failed!", e, LOG);
+            }
+          }
+          LOG.info("Shuffle server is decommissioning. remain {} applications not finished.", remainApplicationNum);
+          try {
+            Thread.sleep(checkInterval);
+          } catch (InterruptedException e) {
+            LOG.warn("Ignore the InterruptedException which should be caused by internal killed");
+          }
+        }
+      });
+      decommissionThread.setName("decommission");
+      decommissionThread.start();
+      serverStatus = ServerStatus.DECOMMISSIONING;
+    }
+  }
+
+  private void checkStatusForDecommission() {
+    if (isDecommissioning()) {
+      throw new InvalidRequestException("Shuffle Server is decommissioning. Nothing need to do.");
+    }
+    if (!ServerStatus.NORMAL_STATUS.equals(serverStatus)) {
+      throw new InvalidRequestException(
+          "Shuffle Server is processing other procedures, current status:" + serverStatus);
+    }
+  }
+
+  public void cancelDecommission() {
+    checkStatusForCancelDecommission();
+    synchronized (statusLock) {
+      checkStatusForCancelDecommission();
+      if (decommissionThread != null) {
+        decommissionThread.interrupt();

Review Comment:
   Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1109338502


##########
server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java:
##########
@@ -79,4 +75,61 @@ public void startTest() {
     }
 
   }
+
+  public void decommissionTest(boolean shutdown) throws Exception {
+    ShuffleServerConf serverConf = createShuffleServerConf();
+    serverConf.set(SERVER_DECOMMISSION_CHECK_INTERVAL, 1000L);
+    serverConf.set(SERVER_DECOMMISSION_SHUTDOWN, shutdown);
+    ShuffleServer shuffleServer = new ShuffleServer(serverConf);
+    shuffleServer.start();
+    assertEquals(ServerStatus.NORMAL_STATUS, shuffleServer.getServerStatus());
+    // Shuffle server is not decommissioning, but we can also cancel it.
+    shuffleServer.cancelDecommission();
+    ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
+    String appId = "decommissionTest_appId_" + shutdown;
+    shuffleTaskManager.registerShuffle(appId, 0, Lists.newArrayList(), new RemoteStorageInfo("/tmp"), "");
+    shuffleServer.decommission();
+    assertEquals(ServerStatus.DECOMMISSIONING, shuffleServer.getServerStatus());
+    // Shuffle server is decommissioning, but we can also decommission it again.
+    shuffleServer.decommission();
+    shuffleServer.cancelDecommission();
+    shuffleTaskManager.removeResources(appId);
+    // Wait for 2 seconds, make sure cancel command is work.
+    Thread.sleep(2000);
+    assertEquals(ServerStatus.NORMAL_STATUS, shuffleServer.getServerStatus());
+    shuffleServer.decommission();
+    if (shutdown) {
+      Awaitility.await().timeout(10, TimeUnit.SECONDS).until(
+          () -> !shuffleServer.isRunning());
+    } else {
+      Awaitility.await().timeout(10, TimeUnit.SECONDS).until(
+          () -> ServerStatus.DECOMMISSIONED.equals(shuffleServer.getServerStatus()));
+      assertEquals(true, shuffleServer.isRunning());
+      shuffleServer.stopServer();
+    }
+  }
+
+  @Test
+  public void notShutDownAfterDecommissionTest() throws Exception {
+    decommissionTest(false);
+  }
+
+  @Test
+  public void shutDownAfterDecommissionTest() throws Exception {
+    decommissionTest(true);
+  }
+
+  private ShuffleServerConf createShuffleServerConf() throws Exception {
+    ShuffleServerConf serverConf = new ShuffleServerConf();
+    serverConf.setInteger(ShuffleServerConf.RPC_SERVER_PORT, 9527);
+    serverConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
+    serverConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
+    serverConf.setInteger(ShuffleServerConf.JETTY_HTTP_PORT, 9528);
+    serverConf.setString(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:0");
+    serverConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("/home/fengxj/tmp111"));

Review Comment:
   Let's keep the original path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1108232070


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +269,69 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public void decommission() {

Review Comment:
   Not every request requires lock, such as repeated requests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1108269471


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +269,69 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public void decommission() {

Review Comment:
   It's OK for me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] kaijchen commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "kaijchen (via GitHub)" <gi...@apache.org>.
kaijchen commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1111429234


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +274,75 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public synchronized void decommission() {
+    if (isDecommissioning()) {
+      LOG.info("Shuffle Server is decommissioning. Nothing needs to be done.");
+      return;
+    }
+    if (!ServerStatus.ACTIVE.equals(serverStatus)) {
+      throw new InvalidRequestException(
+          "Shuffle Server is processing other procedures, current status:" + serverStatus);
+    }
+    serverStatus = ServerStatus.DECOMMISSIONING;
+    LOG.info("Shuffle Server is decommissioning.");
+    if (executorService == null) {
+      executorService = Executors.newSingleThreadExecutor(
+          ThreadUtils.getThreadFactory("shuffle-server-decommission-%d"));
+    }
+    decommissionFuture = executorService.submit(this::waitDecommissionFinish);
+  }
+
+  private void waitDecommissionFinish() {
+    long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+    boolean shutdownAfterDecommission = shuffleServerConf.get(SERVER_DECOMMISSION_SHUTDOWN);
+    int remainApplicationNum;
+    while (isDecommissioning()) {
+      remainApplicationNum = shuffleTaskManager.getAppIds().size();
+      if (remainApplicationNum == 0) {
+        serverStatus = ServerStatus.DECOMMISSIONED;
+        LOG.info("All applications finished. Current status is " + serverStatus);
+        if (shutdownAfterDecommission) {
+          LOG.info("Exiting...");
+          try {
+            stopServer();
+          } catch (Exception e) {
+            ExitUtils.terminate(1, "Stop server failed!", e, LOG);
+          }
+        }
+        break;
+      }
+      LOG.info("Shuffle server is decommissioning, remaining {} applications not finished.", remainApplicationNum);
+      try {
+        Thread.sleep(checkInterval);
+      } catch (InterruptedException e) {
+        LOG.warn("Ignore the InterruptedException which should be caused by internal kill.");
+      }
+    }
+    remainApplicationNum = shuffleTaskManager.getAppIds().size();
+    if (remainApplicationNum > 0) {
+      LOG.info("Decommission exiting, remaining {} applications not finished.",
+          remainApplicationNum);
+    }
+  }
+
+  public synchronized void cancelDecommission() {
+    if (!isDecommissioning()) {
+      LOG.info("Shuffle server is not decommissioning. Nothing needs to be done.");
+      return;
+    }
+    if (decommissionFuture.cancel(true)) {
+      serverStatus = ServerStatus.ACTIVE;

Review Comment:
   > When the server is already `Decommissioned`, cancel decommission should bring the shuffle server to active state. Current impl is wrong then.
   
   OK, I misread what "cancel" means. @xianjingfeng could you add "cancel decommission" part and "server state transitions" into the design doc please?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#issuecomment-1438555800

   Thanks @kaijchen @advancedxy @jerqi for the review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org