You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by xi...@apache.org on 2023/02/21 14:06:01 UTC

[incubator-uniffle] branch master updated: [#80][Part-1] feat: Add decommisson logic to shuffle server (#606)

This is an automated email from the ASF dual-hosted git repository.

xianjingfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 711f0899 [#80][Part-1] feat: Add decommisson logic to shuffle server (#606)
711f0899 is described below

commit 711f08995fd44a45775912b43616fa50d2738b2d
Author: xianjingfeng <xi...@gmail.com>
AuthorDate: Tue Feb 21 22:05:56 2023 +0800

    [#80][Part-1] feat: Add decommisson logic to shuffle server (#606)
    
    ### What changes were proposed in this pull request?
    Add decommisson logic to shuffle server
    
    ### Why are the changes needed?
    Support shuffle server decommission. It is a part of #80
    
    ### Does this PR introduce any user-facing change?
    No
    
    ### How was this patch tested?
    UT
---
 .../org/apache/uniffle/common/ServerStatus.java    |  35 +++++++
 .../common/exception/InvalidRequestException.java  |  29 ++++++
 docs/server_guide.md                               |   2 +
 .../org/apache/uniffle/server/ShuffleServer.java   | 103 +++++++++++++++++++++
 .../apache/uniffle/server/ShuffleServerConf.java   |  13 +++
 .../apache/uniffle/server/ShuffleServerTest.java   |  79 +++++++++++++---
 6 files changed, 249 insertions(+), 12 deletions(-)

diff --git a/common/src/main/java/org/apache/uniffle/common/ServerStatus.java b/common/src/main/java/org/apache/uniffle/common/ServerStatus.java
new file mode 100644
index 00000000..2cfdf6ba
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/ServerStatus.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common;
+
+public enum ServerStatus {
+  UNKNOWN(-1),
+  ACTIVE(0),
+  DECOMMISSIONING(1),
+  DECOMMISSIONED(2);
+
+  private final int code;
+
+  ServerStatus(int code) {
+    this.code = code;
+  }
+
+  public int code() {
+    return code;
+  }
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/exception/InvalidRequestException.java b/common/src/main/java/org/apache/uniffle/common/exception/InvalidRequestException.java
new file mode 100644
index 00000000..ab890c79
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/exception/InvalidRequestException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.exception;
+
+public class InvalidRequestException extends RuntimeException {
+
+  public InvalidRequestException(String message) {
+    super(message);
+  }
+
+  public InvalidRequestException(String message, Throwable e) {
+    super(message, e);
+  }
+}
diff --git a/docs/server_guide.md b/docs/server_guide.md
index e3f9a99c..0f2cc017 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -91,6 +91,8 @@ This document will introduce how to deploy Uniffle shuffle servers.
 |Property Name|Default| Description                                                                                                                                                                                 |
 |---|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
 |rss.server.storageMediaProvider.from.env.key|-| Sometimes, the local storage type/media info is provided by external system. RSS would read the env key defined by this configuration and get info about the storage media of its basePaths |
+|rss.server.decommission.check.interval|60000| The interval(ms) to check if all applications have finish when server is decommissioning                                                                                                        |
+|rss.server.decommission.shutdown|true| Whether shutdown the server after server is decommissioned                                                                                                                                  |
 
 
 ### PrometheusPushGatewayMetricReporter settings
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index 796ba305..6cabf9e9 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -20,6 +20,9 @@ package org.apache.uniffle.server;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -32,6 +35,8 @@ import org.slf4j.LoggerFactory;
 import picocli.CommandLine;
 
 import org.apache.uniffle.common.Arguments;
+import org.apache.uniffle.common.ServerStatus;
+import org.apache.uniffle.common.exception.InvalidRequestException;
 import org.apache.uniffle.common.metrics.GRPCMetrics;
 import org.apache.uniffle.common.metrics.JvmMetrics;
 import org.apache.uniffle.common.metrics.MetricReporter;
@@ -40,7 +45,9 @@ import org.apache.uniffle.common.rpc.ServerInterface;
 import org.apache.uniffle.common.security.SecurityConfig;
 import org.apache.uniffle.common.security.SecurityContextFactory;
 import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.ExitUtils;
 import org.apache.uniffle.common.util.RssUtils;
+import org.apache.uniffle.common.util.ThreadUtils;
 import org.apache.uniffle.common.web.CommonMetricsServlet;
 import org.apache.uniffle.common.web.JettyServer;
 import org.apache.uniffle.server.buffer.ShuffleBufferManager;
@@ -55,6 +62,8 @@ import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_K
 import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KRB5_CONF_FILE;
 import static org.apache.uniffle.common.config.RssBaseConf.RSS_STORAGE_TYPE;
 import static org.apache.uniffle.common.config.RssBaseConf.RSS_TEST_MODE_ENABLE;
+import static org.apache.uniffle.server.ShuffleServerConf.SERVER_DECOMMISSION_CHECK_INTERVAL;
+import static org.apache.uniffle.server.ShuffleServerConf.SERVER_DECOMMISSION_SHUTDOWN;
 
 /**
  * Server that manages startup/shutdown of a {@code Greeter} server.
@@ -78,6 +87,10 @@ public class ShuffleServer {
   private AtomicBoolean isHealthy = new AtomicBoolean(true);
   private GRPCMetrics grpcMetrics;
   private MetricReporter metricReporter;
+  private volatile ServerStatus serverStatus = ServerStatus.ACTIVE;
+  private volatile boolean running;
+  private ExecutorService executorService;
+  private Future<?> decommissionFuture;
 
   public ShuffleServer(ShuffleServerConf shuffleServerConf) throws Exception {
     this.shuffleServerConf = shuffleServerConf;
@@ -123,6 +136,7 @@ public class ShuffleServer {
         LOG.info("*** server shut down");
       }
     });
+    running = true;
     LOG.info("Shuffle server start successfully!");
   }
 
@@ -149,6 +163,10 @@ public class ShuffleServer {
     }
     SecurityContextFactory.get().getSecurityContext().close();
     server.stop();
+    if (executorService != null) {
+      executorService.shutdownNow();
+    }
+    running = false;
     LOG.info("RPC Server Stopped!");
   }
 
@@ -259,6 +277,80 @@ public class ShuffleServer {
     server.blockUntilShutdown();
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
+  public synchronized void decommission() {
+    if (isDecommissioning()) {
+      LOG.info("Shuffle Server is decommissioning. Nothing needs to be done.");
+      return;
+    }
+    if (!ServerStatus.ACTIVE.equals(serverStatus)) {
+      throw new InvalidRequestException(
+          "Shuffle Server is processing other procedures, current status:" + serverStatus);
+    }
+    serverStatus = ServerStatus.DECOMMISSIONING;
+    LOG.info("Shuffle Server is decommissioning.");
+    if (executorService == null) {
+      executorService = Executors.newSingleThreadExecutor(
+          ThreadUtils.getThreadFactory("shuffle-server-decommission-%d"));
+    }
+    decommissionFuture = executorService.submit(this::waitDecommissionFinish);
+  }
+
+  private void waitDecommissionFinish() {
+    long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+    boolean shutdownAfterDecommission = shuffleServerConf.get(SERVER_DECOMMISSION_SHUTDOWN);
+    int remainApplicationNum;
+    while (isDecommissioning()) {
+      remainApplicationNum = shuffleTaskManager.getAppIds().size();
+      if (remainApplicationNum == 0) {
+        serverStatus = ServerStatus.DECOMMISSIONED;
+        LOG.info("All applications finished. Current status is " + serverStatus);
+        if (shutdownAfterDecommission) {
+          LOG.info("Exiting...");
+          try {
+            stopServer();
+          } catch (Exception e) {
+            ExitUtils.terminate(1, "Stop server failed!", e, LOG);
+          }
+        }
+        break;
+      }
+      LOG.info("Shuffle server is decommissioning, remaining {} applications not finished.", remainApplicationNum);
+      try {
+        Thread.sleep(checkInterval);
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted while waiting for decommission to finish");
+        break;
+      }
+    }
+    remainApplicationNum = shuffleTaskManager.getAppIds().size();
+    if (remainApplicationNum > 0) {
+      LOG.info("Decommission exiting, remaining {} applications not finished.",
+          remainApplicationNum);
+    }
+  }
+
+  public synchronized void cancelDecommission() {
+    if (!isDecommissioning()) {
+      LOG.info("Shuffle server is not decommissioning. Nothing needs to be done.");
+      return;
+    }
+    if (ServerStatus.DECOMMISSIONED.equals(serverStatus)) {
+      serverStatus = ServerStatus.ACTIVE;
+      return;
+    }
+    serverStatus = ServerStatus.ACTIVE;
+    if (decommissionFuture.cancel(true)) {
+      LOG.info("Decommission canceled.");
+    } else {
+      LOG.warn("Failed to cancel decommission.");
+    }
+    decommissionFuture = null;
+  }
+
   public String getIp() {
     return this.ip;
   }
@@ -332,4 +424,15 @@ public class ShuffleServer {
   public GRPCMetrics getGrpcMetrics() {
     return grpcMetrics;
   }
+
+  public boolean isDecommissioning() {
+    return ServerStatus.DECOMMISSIONING.equals(serverStatus)
+        || ServerStatus.DECOMMISSIONED.equals(serverStatus);
+  }
+
+  @VisibleForTesting
+  public boolean isRunning() {
+    return running;
+  }
+
 }
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index 5b181075..6d501f18 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -356,6 +356,19 @@ public class ShuffleServerConf extends RssBaseConf {
       .withDescription("The memory usage limit ratio for huge partition, it will only triggered when partition's "
           + "size exceeds the threshold of '" + HUGE_PARTITION_SIZE_THRESHOLD.key() + "'");
 
+  public static final ConfigOption<Long> SERVER_DECOMMISSION_CHECK_INTERVAL = ConfigOptions
+      .key("rss.server.decommission.check.interval")
+      .longType()
+      .checkValue(ConfigUtils.POSITIVE_LONG_VALIDATOR, "check interval times must be positive")
+      .defaultValue(60 * 1000L)
+      .withDescription("The interval(ms) to check if all applications have finish when server is decommissioning");
+
+  public static final ConfigOption<Boolean> SERVER_DECOMMISSION_SHUTDOWN = ConfigOptions
+      .key("rss.server.decommission.shutdown")
+      .booleanType()
+      .defaultValue(true)
+      .withDescription("Whether shutdown the server after server is decommissioned");
+
   public ShuffleServerConf() {
   }
 
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
index 9a9e7c19..19c9513e 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
@@ -17,36 +17,40 @@
 
 package org.apache.uniffle.server;
 
+import java.io.File;
 import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.Lists;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.ServerStatus;
 import org.apache.uniffle.common.util.ExitUtils;
 import org.apache.uniffle.common.util.ExitUtils.ExitException;
 import org.apache.uniffle.storage.util.StorageType;
 
+import static org.apache.uniffle.server.ShuffleServerConf.SERVER_DECOMMISSION_CHECK_INTERVAL;
+import static org.apache.uniffle.server.ShuffleServerConf.SERVER_DECOMMISSION_SHUTDOWN;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 
 public class ShuffleServerTest {
 
+  @TempDir
+  private File tempDir;
+
   @Test
   public void startTest() {
     try {
-      ShuffleServerConf serverConf = new ShuffleServerConf();
-      serverConf.setInteger(ShuffleServerConf.RPC_SERVER_PORT, 9527);
-      serverConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
-      serverConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
-      serverConf.setInteger(ShuffleServerConf.JETTY_HTTP_PORT, 9528);
-      serverConf.setString(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:0");
-      serverConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("/tmp/null"));
-      serverConf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 1024L);
-      serverConf.setLong(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 100);
-      serverConf.setLong(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY, 10);
-
+      ShuffleServerConf serverConf = createShuffleServerConf();
       ShuffleServer ss1 = new ShuffleServer(serverConf);
       ss1.start();
-
+      ss1.stopServer();
       ExitUtils.disableSystemExit();
       ShuffleServer ss2 = new ShuffleServer(serverConf);
       String expectMessage = "Fail to start jetty http server";
@@ -57,6 +61,7 @@ public class ShuffleServerTest {
         assertEquals(expectMessage, e.getMessage());
         assertEquals(expectStatus, ((ExitException) e).getStatus());
       }
+      ss2.stopServer();
 
       serverConf.setInteger("rss.jetty.http.port", 9529);
       ss2 = new ShuffleServer(serverConf);
@@ -67,6 +72,7 @@ public class ShuffleServerTest {
         assertEquals(expectMessage, e.getMessage());
         assertEquals(expectStatus, ((ExitException) e).getStatus());
       }
+      ss2.stopServer();
 
       final Thread t = new Thread(null, () -> {
         throw new AssertionError("TestUncaughtException");
@@ -79,4 +85,53 @@ public class ShuffleServerTest {
     }
 
   }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void decommissionTest(boolean shutdown) throws Exception {
+    ShuffleServerConf serverConf = createShuffleServerConf();
+    serverConf.set(SERVER_DECOMMISSION_CHECK_INTERVAL, 1000L);
+    serverConf.set(SERVER_DECOMMISSION_SHUTDOWN, shutdown);
+    ShuffleServer shuffleServer = new ShuffleServer(serverConf);
+    shuffleServer.start();
+    assertEquals(ServerStatus.ACTIVE, shuffleServer.getServerStatus());
+    // Shuffle server is not decommissioning, but we can also cancel it.
+    shuffleServer.cancelDecommission();
+    ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
+    String appId = "decommissionTest_appId_" + shutdown;
+    shuffleTaskManager.registerShuffle(appId, 0, Lists.newArrayList(), new RemoteStorageInfo("/tmp"), "");
+    shuffleServer.decommission();
+    assertEquals(ServerStatus.DECOMMISSIONING, shuffleServer.getServerStatus());
+    // Shuffle server is decommissioning, but we can also decommission it again.
+    shuffleServer.decommission();
+    shuffleServer.cancelDecommission();
+    shuffleTaskManager.removeResources(appId);
+    // Wait for 2 seconds, make sure cancel command is work.
+    Thread.sleep(2000);
+    assertEquals(ServerStatus.ACTIVE, shuffleServer.getServerStatus());
+    shuffleServer.decommission();
+    if (shutdown) {
+      Awaitility.await().timeout(10, TimeUnit.SECONDS).until(
+          () -> !shuffleServer.isRunning());
+    } else {
+      Awaitility.await().timeout(10, TimeUnit.SECONDS).until(
+          () -> ServerStatus.DECOMMISSIONED.equals(shuffleServer.getServerStatus()));
+      assertEquals(true, shuffleServer.isRunning());
+      shuffleServer.stopServer();
+    }
+  }
+
+  private ShuffleServerConf createShuffleServerConf() throws Exception {
+    ShuffleServerConf serverConf = new ShuffleServerConf();
+    serverConf.setInteger(ShuffleServerConf.RPC_SERVER_PORT, 9527);
+    serverConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
+    serverConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
+    serverConf.setInteger(ShuffleServerConf.JETTY_HTTP_PORT, 9528);
+    serverConf.setString(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:0");
+    serverConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(tempDir.getAbsolutePath()));
+    serverConf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 1024L);
+    serverConf.setLong(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 100);
+    serverConf.setLong(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY, 10);
+    return serverConf;
+  }
 }