You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@uniffle.apache.org by "advancedxy (via GitHub)" <gi...@apache.org> on 2023/02/16 08:46:08 UTC

[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #606: [#80][Part-1] feat: Add decommisson logic to shuffle server

advancedxy commented on code in PR #606:
URL: https://github.com/apache/incubator-uniffle/pull/606#discussion_r1108157458


##########
common/src/main/java/org/apache/uniffle/common/ServerStatus.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common;
+
+public enum ServerStatus {
+  NORMAL_STATUS,
+  DECOMMISSIONING;

Review Comment:
   is it possible to add a `DECOMMISSIONED` status here? 
   For the state that all the app has already been finished, but the shuffle server is configured to not exit itself?



##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +269,69 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public void decommission() {
+    checkStatusForDecommission();
+    synchronized (statusLock) {
+      checkStatusForDecommission();

Review Comment:
   why double check here?



##########
server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java:
##########
@@ -79,4 +77,47 @@ public void startTest() {
     }
 
   }
+
+  @Test
+  public void decommissionTest() throws Exception {
+    ShuffleServer shuffleServer = createShuffleServer();
+    assertEquals(ServerStatus.NORMAL_STATUS, shuffleServer.getServerStatus());
+    try {
+      shuffleServer.cancelDecommission();
+      fail(EXPECTED_EXCEPTION_MESSAGE);
+    } catch (Exception e) {
+      assertTrue(e instanceof InvalidRequestException);
+    }
+    ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
+    String appId = "decommissionTest_appId";
+    shuffleTaskManager.registerShuffle(appId, 0, Lists.newArrayList(), new RemoteStorageInfo("/tmp"), "");
+    shuffleServer.decommission();
+    try {
+      assertEquals(ServerStatus.DECOMMISSIONING, shuffleServer.getServerStatus());
+      shuffleServer.decommission();
+      fail(EXPECTED_EXCEPTION_MESSAGE);
+    } catch (Exception e) {
+      assertTrue(e instanceof InvalidRequestException);
+    }
+    shuffleServer.cancelDecommission();
+    shuffleTaskManager.removeShuffleDataSync(appId, 0);
+    assertEquals(ServerStatus.NORMAL_STATUS, shuffleServer.getServerStatus());

Review Comment:
   You may decrease the `rss.server.decommission.check.interval` and make sure the shuffle server is still running after `cancelDecommision` and the check interval.



##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +269,69 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public void decommission() {
+    checkStatusForDecommission();
+    synchronized (statusLock) {
+      checkStatusForDecommission();
+      long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+      decommissionThread = new Thread(() -> {
+        while (isDecommissioning()) {

Review Comment:
   I don't think this is correct. 
   
   this code is ran under a different thread, which means it's not guided by the `statusLock`. The `cancelDecommission` set the status may never be reflected in this thread.
   
   I believe using `synchronized public isDecommissioning()` is easy and clear.



##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +269,69 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public void decommission() {
+    checkStatusForDecommission();

Review Comment:
   I believe `decommission` should be idempotent? 
   
   Once the shuffle server is entering `decomissioning` state, following calls should just do nothing, rather than throw an exception? 



##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +269,69 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public void decommission() {
+    checkStatusForDecommission();
+    synchronized (statusLock) {
+      checkStatusForDecommission();
+      long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+      decommissionThread = new Thread(() -> {
+        while (isDecommissioning()) {
+          int remainApplicationNum = shuffleTaskManager.getAppIds().size();
+          if (remainApplicationNum == 0) {
+            LOG.info("all applications finished, exit now");
+            try {
+              stopServer();
+              break;
+            } catch (Exception e) {
+              ExitUtils.terminate(1, "Stop server failed!", e, LOG);
+            }
+          }
+          LOG.info("Shuffle server is decommissioning. remain {} applications not finished.", remainApplicationNum);
+          try {
+            Thread.sleep(checkInterval);
+          } catch (InterruptedException e) {
+            LOG.warn("Ignore the InterruptedException which should be caused by internal killed");
+          }
+        }
+      });
+      decommissionThread.setName("decommission");
+      decommissionThread.start();
+      serverStatus = ServerStatus.DECOMMISSIONING;
+    }
+  }
+
+  private void checkStatusForDecommission() {
+    if (isDecommissioning()) {
+      throw new InvalidRequestException("Shuffle Server is decommissioning. Nothing need to do.");
+    }
+    if (!ServerStatus.NORMAL_STATUS.equals(serverStatus)) {
+      throw new InvalidRequestException(
+          "Shuffle Server is processing other procedures, current status:" + serverStatus);
+    }
+  }
+
+  public void cancelDecommission() {
+    checkStatusForCancelDecommission();
+    synchronized (statusLock) {
+      checkStatusForCancelDecommission();
+      if (decommissionThread != null) {
+        decommissionThread.interrupt();
+        decommissionThread = null;
+      }
+      serverStatus = ServerStatus.NORMAL_STATUS;

Review Comment:
   In practice, we update the serverStatus first, then interrupt decommissionThread.



##########
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java:
##########
@@ -259,6 +269,69 @@ private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public void decommission() {
+    checkStatusForDecommission();
+    synchronized (statusLock) {
+      checkStatusForDecommission();
+      long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+      decommissionThread = new Thread(() -> {
+        while (isDecommissioning()) {
+          int remainApplicationNum = shuffleTaskManager.getAppIds().size();
+          if (remainApplicationNum == 0) {
+            LOG.info("all applications finished, exit now");
+            try {
+              stopServer();
+              break;
+            } catch (Exception e) {
+              ExitUtils.terminate(1, "Stop server failed!", e, LOG);
+            }
+          }
+          LOG.info("Shuffle server is decommissioning. remain {} applications not finished.", remainApplicationNum);
+          try {
+            Thread.sleep(checkInterval);
+          } catch (InterruptedException e) {
+            LOG.warn("Ignore the InterruptedException which should be caused by internal killed");
+          }
+        }
+      });
+      decommissionThread.setName("decommission");

Review Comment:
   Let's be a bit more specific: "shuffle-server-decommission";



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org