You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by xi...@apache.org on 2023/02/21 14:06:01 UTC
[incubator-uniffle] branch master updated: [#80][Part-1] feat: Add decommisson logic to shuffle server (#606)
This is an automated email from the ASF dual-hosted git repository.
xianjingfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 711f0899 [#80][Part-1] feat: Add decommisson logic to shuffle server (#606)
711f0899 is described below
commit 711f08995fd44a45775912b43616fa50d2738b2d
Author: xianjingfeng <xi...@gmail.com>
AuthorDate: Tue Feb 21 22:05:56 2023 +0800
[#80][Part-1] feat: Add decommisson logic to shuffle server (#606)
### What changes were proposed in this pull request?
Add decommisson logic to shuffle server
### Why are the changes needed?
Support shuffle server decommission. It is a part of #80
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
UT
---
.../org/apache/uniffle/common/ServerStatus.java | 35 +++++++
.../common/exception/InvalidRequestException.java | 29 ++++++
docs/server_guide.md | 2 +
.../org/apache/uniffle/server/ShuffleServer.java | 103 +++++++++++++++++++++
.../apache/uniffle/server/ShuffleServerConf.java | 13 +++
.../apache/uniffle/server/ShuffleServerTest.java | 79 +++++++++++++---
6 files changed, 249 insertions(+), 12 deletions(-)
diff --git a/common/src/main/java/org/apache/uniffle/common/ServerStatus.java b/common/src/main/java/org/apache/uniffle/common/ServerStatus.java
new file mode 100644
index 00000000..2cfdf6ba
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/ServerStatus.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common;
+
+public enum ServerStatus {
+ UNKNOWN(-1),
+ ACTIVE(0),
+ DECOMMISSIONING(1),
+ DECOMMISSIONED(2);
+
+ private final int code;
+
+ ServerStatus(int code) {
+ this.code = code;
+ }
+
+ public int code() {
+ return code;
+ }
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/exception/InvalidRequestException.java b/common/src/main/java/org/apache/uniffle/common/exception/InvalidRequestException.java
new file mode 100644
index 00000000..ab890c79
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/exception/InvalidRequestException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.exception;
+
+public class InvalidRequestException extends RuntimeException {
+
+ public InvalidRequestException(String message) {
+ super(message);
+ }
+
+ public InvalidRequestException(String message, Throwable e) {
+ super(message, e);
+ }
+}
diff --git a/docs/server_guide.md b/docs/server_guide.md
index e3f9a99c..0f2cc017 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -91,6 +91,8 @@ This document will introduce how to deploy Uniffle shuffle servers.
|Property Name|Default| Description |
|---|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|rss.server.storageMediaProvider.from.env.key|-| Sometimes, the local storage type/media info is provided by external system. RSS would read the env key defined by this configuration and get info about the storage media of its basePaths |
+|rss.server.decommission.check.interval|60000| The interval(ms) to check if all applications have finish when server is decommissioning |
+|rss.server.decommission.shutdown|true| Whether shutdown the server after server is decommissioned |
### PrometheusPushGatewayMetricReporter settings
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index 796ba305..6cabf9e9 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -20,6 +20,9 @@ package org.apache.uniffle.server;
import java.util.Collections;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.annotations.VisibleForTesting;
@@ -32,6 +35,8 @@ import org.slf4j.LoggerFactory;
import picocli.CommandLine;
import org.apache.uniffle.common.Arguments;
+import org.apache.uniffle.common.ServerStatus;
+import org.apache.uniffle.common.exception.InvalidRequestException;
import org.apache.uniffle.common.metrics.GRPCMetrics;
import org.apache.uniffle.common.metrics.JvmMetrics;
import org.apache.uniffle.common.metrics.MetricReporter;
@@ -40,7 +45,9 @@ import org.apache.uniffle.common.rpc.ServerInterface;
import org.apache.uniffle.common.security.SecurityConfig;
import org.apache.uniffle.common.security.SecurityContextFactory;
import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.ExitUtils;
import org.apache.uniffle.common.util.RssUtils;
+import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.common.web.CommonMetricsServlet;
import org.apache.uniffle.common.web.JettyServer;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;
@@ -55,6 +62,8 @@ import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_K
import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KRB5_CONF_FILE;
import static org.apache.uniffle.common.config.RssBaseConf.RSS_STORAGE_TYPE;
import static org.apache.uniffle.common.config.RssBaseConf.RSS_TEST_MODE_ENABLE;
+import static org.apache.uniffle.server.ShuffleServerConf.SERVER_DECOMMISSION_CHECK_INTERVAL;
+import static org.apache.uniffle.server.ShuffleServerConf.SERVER_DECOMMISSION_SHUTDOWN;
/**
* Server that manages startup/shutdown of a {@code Greeter} server.
@@ -78,6 +87,10 @@ public class ShuffleServer {
private AtomicBoolean isHealthy = new AtomicBoolean(true);
private GRPCMetrics grpcMetrics;
private MetricReporter metricReporter;
+ private volatile ServerStatus serverStatus = ServerStatus.ACTIVE;
+ private volatile boolean running;
+ private ExecutorService executorService;
+ private Future<?> decommissionFuture;
public ShuffleServer(ShuffleServerConf shuffleServerConf) throws Exception {
this.shuffleServerConf = shuffleServerConf;
@@ -123,6 +136,7 @@ public class ShuffleServer {
LOG.info("*** server shut down");
}
});
+ running = true;
LOG.info("Shuffle server start successfully!");
}
@@ -149,6 +163,10 @@ public class ShuffleServer {
}
SecurityContextFactory.get().getSecurityContext().close();
server.stop();
+ if (executorService != null) {
+ executorService.shutdownNow();
+ }
+ running = false;
LOG.info("RPC Server Stopped!");
}
@@ -259,6 +277,80 @@ public class ShuffleServer {
server.blockUntilShutdown();
}
+ public ServerStatus getServerStatus() {
+ return serverStatus;
+ }
+
+ public synchronized void decommission() {
+ if (isDecommissioning()) {
+ LOG.info("Shuffle Server is decommissioning. Nothing needs to be done.");
+ return;
+ }
+ if (!ServerStatus.ACTIVE.equals(serverStatus)) {
+ throw new InvalidRequestException(
+ "Shuffle Server is processing other procedures, current status:" + serverStatus);
+ }
+ serverStatus = ServerStatus.DECOMMISSIONING;
+ LOG.info("Shuffle Server is decommissioning.");
+ if (executorService == null) {
+ executorService = Executors.newSingleThreadExecutor(
+ ThreadUtils.getThreadFactory("shuffle-server-decommission-%d"));
+ }
+ decommissionFuture = executorService.submit(this::waitDecommissionFinish);
+ }
+
+ private void waitDecommissionFinish() {
+ long checkInterval = shuffleServerConf.get(SERVER_DECOMMISSION_CHECK_INTERVAL);
+ boolean shutdownAfterDecommission = shuffleServerConf.get(SERVER_DECOMMISSION_SHUTDOWN);
+ int remainApplicationNum;
+ while (isDecommissioning()) {
+ remainApplicationNum = shuffleTaskManager.getAppIds().size();
+ if (remainApplicationNum == 0) {
+ serverStatus = ServerStatus.DECOMMISSIONED;
+ LOG.info("All applications finished. Current status is " + serverStatus);
+ if (shutdownAfterDecommission) {
+ LOG.info("Exiting...");
+ try {
+ stopServer();
+ } catch (Exception e) {
+ ExitUtils.terminate(1, "Stop server failed!", e, LOG);
+ }
+ }
+ break;
+ }
+ LOG.info("Shuffle server is decommissioning, remaining {} applications not finished.", remainApplicationNum);
+ try {
+ Thread.sleep(checkInterval);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting for decommission to finish");
+ break;
+ }
+ }
+ remainApplicationNum = shuffleTaskManager.getAppIds().size();
+ if (remainApplicationNum > 0) {
+ LOG.info("Decommission exiting, remaining {} applications not finished.",
+ remainApplicationNum);
+ }
+ }
+
+ public synchronized void cancelDecommission() {
+ if (!isDecommissioning()) {
+ LOG.info("Shuffle server is not decommissioning. Nothing needs to be done.");
+ return;
+ }
+ if (ServerStatus.DECOMMISSIONED.equals(serverStatus)) {
+ serverStatus = ServerStatus.ACTIVE;
+ return;
+ }
+ serverStatus = ServerStatus.ACTIVE;
+ if (decommissionFuture.cancel(true)) {
+ LOG.info("Decommission canceled.");
+ } else {
+ LOG.warn("Failed to cancel decommission.");
+ }
+ decommissionFuture = null;
+ }
+
public String getIp() {
return this.ip;
}
@@ -332,4 +424,15 @@ public class ShuffleServer {
public GRPCMetrics getGrpcMetrics() {
return grpcMetrics;
}
+
+ public boolean isDecommissioning() {
+ return ServerStatus.DECOMMISSIONING.equals(serverStatus)
+ || ServerStatus.DECOMMISSIONED.equals(serverStatus);
+ }
+
+ @VisibleForTesting
+ public boolean isRunning() {
+ return running;
+ }
+
}
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index 5b181075..6d501f18 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -356,6 +356,19 @@ public class ShuffleServerConf extends RssBaseConf {
.withDescription("The memory usage limit ratio for huge partition, it will only triggered when partition's "
+ "size exceeds the threshold of '" + HUGE_PARTITION_SIZE_THRESHOLD.key() + "'");
+ public static final ConfigOption<Long> SERVER_DECOMMISSION_CHECK_INTERVAL = ConfigOptions
+ .key("rss.server.decommission.check.interval")
+ .longType()
+ .checkValue(ConfigUtils.POSITIVE_LONG_VALIDATOR, "check interval times must be positive")
+ .defaultValue(60 * 1000L)
+ .withDescription("The interval(ms) to check if all applications have finish when server is decommissioning");
+
+ public static final ConfigOption<Boolean> SERVER_DECOMMISSION_SHUTDOWN = ConfigOptions
+ .key("rss.server.decommission.shutdown")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Whether shutdown the server after server is decommissioned");
+
public ShuffleServerConf() {
}
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
index 9a9e7c19..19c9513e 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
@@ -17,36 +17,40 @@
package org.apache.uniffle.server;
+import java.io.File;
import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import com.google.common.collect.Lists;
+import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.common.util.ExitUtils;
import org.apache.uniffle.common.util.ExitUtils.ExitException;
import org.apache.uniffle.storage.util.StorageType;
+import static org.apache.uniffle.server.ShuffleServerConf.SERVER_DECOMMISSION_CHECK_INTERVAL;
+import static org.apache.uniffle.server.ShuffleServerConf.SERVER_DECOMMISSION_SHUTDOWN;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
public class ShuffleServerTest {
+ @TempDir
+ private File tempDir;
+
@Test
public void startTest() {
try {
- ShuffleServerConf serverConf = new ShuffleServerConf();
- serverConf.setInteger(ShuffleServerConf.RPC_SERVER_PORT, 9527);
- serverConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
- serverConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
- serverConf.setInteger(ShuffleServerConf.JETTY_HTTP_PORT, 9528);
- serverConf.setString(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:0");
- serverConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("/tmp/null"));
- serverConf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 1024L);
- serverConf.setLong(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 100);
- serverConf.setLong(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY, 10);
-
+ ShuffleServerConf serverConf = createShuffleServerConf();
ShuffleServer ss1 = new ShuffleServer(serverConf);
ss1.start();
-
+ ss1.stopServer();
ExitUtils.disableSystemExit();
ShuffleServer ss2 = new ShuffleServer(serverConf);
String expectMessage = "Fail to start jetty http server";
@@ -57,6 +61,7 @@ public class ShuffleServerTest {
assertEquals(expectMessage, e.getMessage());
assertEquals(expectStatus, ((ExitException) e).getStatus());
}
+ ss2.stopServer();
serverConf.setInteger("rss.jetty.http.port", 9529);
ss2 = new ShuffleServer(serverConf);
@@ -67,6 +72,7 @@ public class ShuffleServerTest {
assertEquals(expectMessage, e.getMessage());
assertEquals(expectStatus, ((ExitException) e).getStatus());
}
+ ss2.stopServer();
final Thread t = new Thread(null, () -> {
throw new AssertionError("TestUncaughtException");
@@ -79,4 +85,53 @@ public class ShuffleServerTest {
}
}
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void decommissionTest(boolean shutdown) throws Exception {
+ ShuffleServerConf serverConf = createShuffleServerConf();
+ serverConf.set(SERVER_DECOMMISSION_CHECK_INTERVAL, 1000L);
+ serverConf.set(SERVER_DECOMMISSION_SHUTDOWN, shutdown);
+ ShuffleServer shuffleServer = new ShuffleServer(serverConf);
+ shuffleServer.start();
+ assertEquals(ServerStatus.ACTIVE, shuffleServer.getServerStatus());
+ // Shuffle server is not decommissioning, but we can also cancel it.
+ shuffleServer.cancelDecommission();
+ ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
+ String appId = "decommissionTest_appId_" + shutdown;
+ shuffleTaskManager.registerShuffle(appId, 0, Lists.newArrayList(), new RemoteStorageInfo("/tmp"), "");
+ shuffleServer.decommission();
+ assertEquals(ServerStatus.DECOMMISSIONING, shuffleServer.getServerStatus());
+ // Shuffle server is decommissioning, but we can also decommission it again.
+ shuffleServer.decommission();
+ shuffleServer.cancelDecommission();
+ shuffleTaskManager.removeResources(appId);
+ // Wait for 2 seconds, make sure cancel command is work.
+ Thread.sleep(2000);
+ assertEquals(ServerStatus.ACTIVE, shuffleServer.getServerStatus());
+ shuffleServer.decommission();
+ if (shutdown) {
+ Awaitility.await().timeout(10, TimeUnit.SECONDS).until(
+ () -> !shuffleServer.isRunning());
+ } else {
+ Awaitility.await().timeout(10, TimeUnit.SECONDS).until(
+ () -> ServerStatus.DECOMMISSIONED.equals(shuffleServer.getServerStatus()));
+ assertEquals(true, shuffleServer.isRunning());
+ shuffleServer.stopServer();
+ }
+ }
+
+ private ShuffleServerConf createShuffleServerConf() throws Exception {
+ ShuffleServerConf serverConf = new ShuffleServerConf();
+ serverConf.setInteger(ShuffleServerConf.RPC_SERVER_PORT, 9527);
+ serverConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
+ serverConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
+ serverConf.setInteger(ShuffleServerConf.JETTY_HTTP_PORT, 9528);
+ serverConf.setString(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:0");
+ serverConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(tempDir.getAbsolutePath()));
+ serverConf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 1024L);
+ serverConf.setLong(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 100);
+ serverConf.setLong(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY, 10);
+ return serverConf;
+ }
}