You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by xi...@apache.org on 2023/03/30 04:17:23 UTC
[incubator-uniffle] branch master updated: [#477][part-0] feat: add ShuffleManagerServer impl (#777)
This is an automated email from the ASF dual-hosted git repository.
xianjin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new ea5a3ba4 [#477][part-0] feat: add ShuffleManagerServer impl (#777)
ea5a3ba4 is described below
commit ea5a3ba45b923b1afc8fda0fdb0546daf34274fb
Author: advancedxy <xi...@apache.org>
AuthorDate: Thu Mar 30 12:17:16 2023 +0800
[#477][part-0] feat: add ShuffleManagerServer impl (#777)
### What changes were proposed in this pull request?
1. add shuffle manager proto
2. the corresponding client and server impls
3. add the RssShuffleManagerInterface
### Why are the changes needed?
This is the first part of #477.
To support re-submit spark stage, the ShuffleManagerServer should be introduced first.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added UTs and one integration test.
---
.../manager/RssShuffleManagerInterface.java | 58 +++++++
.../shuffle/manager/ShuffleManagerGrpcService.java | 182 +++++++++++++++++++++
.../manager/ShuffleManagerServerFactory.java | 56 +++++++
.../shuffle/manager/DummyRssShuffleManager.java | 50 ++++++
.../manager/ShuffleManagerGrpcServiceTest.java | 107 ++++++++++++
.../manager/ShuffleManagerServerFactoryTest.java | 41 +++++
.../uniffle/common/metrics/EmptyGRPCMetrics.java | 24 +++
.../apache/uniffle/common/metrics/GRPCMetrics.java | 4 +
.../org/apache/uniffle/common/rpc/GrpcServer.java | 8 +-
.../apache/uniffle/common/rpc/GrpcServerTest.java | 29 +++-
integration-test/spark-common/pom.xml | 6 +
.../uniffle/test/ShuffleServerManagerTestBase.java | 73 +++++++++
.../test/SimpleShuffleServerManagerTest.java | 45 +++++
.../uniffle/client/api/ShuffleManagerClient.java | 27 +++
.../factory/ShuffleManagerClientFactory.java | 43 +++++
.../client/impl/grpc/ShuffleManagerGrpcClient.java | 73 +++++++++
.../RssReportShuffleFetchFailureRequest.java | 54 ++++++
.../RssReportShuffleFetchFailureResponse.java | 53 ++++++
.../factory/ShuffleManagerClientFactoryTest.java | 40 +++++
proto/src/main/proto/Rss.proto | 25 ++-
20 files changed, 988 insertions(+), 10 deletions(-)
diff --git a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java
new file mode 100644
index 00000000..5725a203
--- /dev/null
+++ b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.shuffle.manager;
+
+import org.apache.spark.SparkException;
+
+
+
+/**
+ * This is a proxy interface that mainly delegates the un-registration of shuffles to the MapOutputTrackerMaster on
+ * the driver. It provides a unified interface that hides implementation details for different versions of Spark.
+ */
+public interface RssShuffleManagerInterface {
+
+ /**
+ * @return the unique spark id for rss shuffle
+ */
+ String getAppId();
+
+ /**
+ * @return the maximum number of fetch failures per shuffle partition before that shuffle stage should be re-submitted
+ */
+ int getMaxFetchFailures();
+
+ /**
+ * @param shuffleId the shuffle id to query
+ * @return the num of partitions(a.k.a reduce tasks) for shuffle with shuffle id.
+ */
+ int getPartitionNum(int shuffleId);
+
+ /**
+ * @param shuffleId the shuffle id to query
+ * @return the num of map tasks for current shuffle with shuffle id.
+ */
+ int getNumMaps(int shuffleId);
+
+ /**
+ * Unregister all the map output on the driver side, so the whole stage could be re-computed.
+ * @param shuffleId the shuffle id to unregister
+ * @throws SparkException
+ */
+ void unregisterAllMapOutput(int shuffleId) throws SparkException;
+}
diff --git a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
new file mode 100644
index 00000000..7eb0e8a5
--- /dev/null
+++ b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.shuffle.manager;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
+
+import io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.util.JavaUtils;
+import org.apache.uniffle.proto.RssProtos;
+import org.apache.uniffle.proto.ShuffleManagerGrpc.ShuffleManagerImplBase;
+
+public class ShuffleManagerGrpcService extends ShuffleManagerImplBase {
+ private static final Logger LOG = LoggerFactory.getLogger(ShuffleManagerGrpcService.class);
+ private final Map<Integer, RssShuffleStatus> shuffleStatus = JavaUtils.newConcurrentMap();
+ private final RssShuffleManagerInterface shuffleManager;
+
+ public ShuffleManagerGrpcService(RssShuffleManagerInterface shuffleManager) {
+ this.shuffleManager = shuffleManager;
+ }
+
+ @Override
+ public void reportShuffleFetchFailure(RssProtos.ReportShuffleFetchFailureRequest request,
+ StreamObserver<RssProtos.ReportShuffleFetchFailureResponse> responseObserver) {
+ String appId = request.getAppId();
+ int stageAttempt = request.getStageAttemptId();
+ int partitionId = request.getPartitionId();
+ RssProtos.StatusCode code;
+ boolean reSubmitWholeStage;
+ String msg;
+ if (!appId.equals(shuffleManager.getAppId())) {
+ msg = String.format("got a wrong shuffle fetch failure report from appId: %s, expected appId: %s",
+ appId, shuffleManager.getAppId());
+ LOG.warn(msg);
+ code = RssProtos.StatusCode.INVALID_REQUEST;
+ reSubmitWholeStage = false;
+ } else {
+ RssShuffleStatus status = shuffleStatus.computeIfAbsent(request.getShuffleId(), key -> {
+ int partitionNum = shuffleManager.getPartitionNum(key);
+ return new RssShuffleStatus(partitionNum, stageAttempt);
+ }
+ );
+ int c = status.resetStageAttemptIfNecessary(stageAttempt);
+ if (c < 0) {
+ msg = String.format("got an old stage(%d vs %d) shuffle fetch failure report, which should be impossible.",
+ status.getStageAttempt(), stageAttempt);
+ LOG.warn(msg);
+ code = RssProtos.StatusCode.INVALID_REQUEST;
+ reSubmitWholeStage = false;
+ } else { // update the stage partition fetch failure count
+ code = RssProtos.StatusCode.SUCCESS;
+ status.incPartitionFetchFailure(stageAttempt, partitionId);
+ int fetchFailureNum = status.getPartitionFetchFailureNum(stageAttempt, partitionId);
+ if (fetchFailureNum >= shuffleManager.getMaxFetchFailures()) {
+ reSubmitWholeStage = true;
+ msg = String.format("report shuffle fetch failure as maximum number(%d) of shuffle fetch is occurred",
+ shuffleManager.getMaxFetchFailures());
+ } else {
+ reSubmitWholeStage = false;
+ msg = "don't report shuffle fetch failure";
+ }
+ }
+ }
+
+ RssProtos.ReportShuffleFetchFailureResponse reply = RssProtos.ReportShuffleFetchFailureResponse
+ .newBuilder()
+ .setStatus(code)
+ .setReSubmitWholeStage(reSubmitWholeStage)
+ .setMsg(msg)
+ .build();
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ }
+
+ /**
+ * Remove the no longer used shuffle id's rss shuffle status. This is called when ShuffleManager unregisters the
+ * corresponding shuffle id.
+ * @param shuffleId the shuffle id to unregister.
+ */
+ public void unregisterShuffle(int shuffleId) {
+ shuffleStatus.remove(shuffleId);
+ }
+
+ private static class RssShuffleStatus {
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
+ private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
+ private final int[] partitions;
+ private int stageAttempt;
+
+ private RssShuffleStatus(int partitionNum, int stageAttempt) {
+ this.stageAttempt = stageAttempt;
+ this.partitions = new int[partitionNum];
+ }
+
+ private <T> T withReadLock(Supplier<T> fn) {
+ readLock.lock();
+ try {
+ return fn.get();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ private <T> T withWriteLock(Supplier<T> fn) {
+ writeLock.lock();
+ try {
+ return fn.get();
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ // todo: maybe it's more performant to just use synchronized method here.
+ public int getStageAttempt() {
+ return withReadLock(() -> this.stageAttempt);
+ }
+
+ /**
+ * Check whether the input stage attempt is a new stage or not. If a new stage attempt is requested, reset
+ * partitions.
+ * @param stageAttempt the incoming stage attempt number
+ * @return 0 if stageAttempt == this.stageAttempt
+ * 1 if stageAttempt > this.stageAttempt
+ * -1 if stateAttempt < this.stageAttempt which means nothing happens
+ */
+ public int resetStageAttemptIfNecessary(int stageAttempt) {
+ return withWriteLock(() -> {
+ if (this.stageAttempt < stageAttempt) {
+ // a new stage attempt is issued. the partitions array should be clear and reset.
+ Arrays.fill(this.partitions, 0);
+ this.stageAttempt = stageAttempt;
+ return 1;
+ } else if (this.stageAttempt > stageAttempt) {
+ return -1;
+ }
+ return 0;
+ });
+ }
+
+ public void incPartitionFetchFailure(int stageAttempt, int partition) {
+ withWriteLock(() -> {
+ if (this.stageAttempt != stageAttempt) {
+ // do nothing here
+ } else {
+ this.partitions[partition] = this.partitions[partition] + 1;
+ }
+ return null;
+ });
+ }
+
+ public int getPartitionFetchFailureNum(int stageAttempt, int partition) {
+ return withReadLock(() -> {
+ if (this.stageAttempt != stageAttempt) {
+ return 0;
+ } else {
+ return this.partitions[partition];
+ }
+ });
+ }
+ }
+}
diff --git a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java
new file mode 100644
index 00000000..5c395e52
--- /dev/null
+++ b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.shuffle.manager;
+
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.metrics.GRPCMetrics;
+import org.apache.uniffle.common.rpc.GrpcServer;
+
+public class ShuffleManagerServerFactory {
+ private final RssShuffleManagerInterface shuffleManager;
+ private final RssBaseConf conf;
+
+ public ShuffleManagerServerFactory(RssShuffleManagerInterface shuffleManager, RssConf conf) {
+ this.shuffleManager = shuffleManager;
+ this.conf = new RssBaseConf();
+ this.conf.addAll(conf);
+ }
+
+ public GrpcServer getServer() {
+ String type = conf.getString(RssBaseConf.RPC_SERVER_TYPE);
+ if (ServerType.GRPC.name().equals(type)) {
+ return GrpcServer.Builder.newBuilder()
+ .conf(conf)
+ .grpcMetrics(GRPCMetrics.getEmptyGRPCMetrics())
+ .addService(new ShuffleManagerGrpcService(shuffleManager))
+ .build();
+ } else {
+ throw new UnsupportedOperationException("Unsupported server type " + type);
+ }
+ }
+
+
+ public RssBaseConf getConf() {
+ return conf;
+ }
+
+ enum ServerType {
+ GRPC
+ }
+}
diff --git a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java
new file mode 100644
index 00000000..9e06da4e
--- /dev/null
+++ b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.shuffle.manager;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+public class DummyRssShuffleManager implements RssShuffleManagerInterface {
+ public Set<Integer> unregisteredShuffleIds = new LinkedHashSet<>();
+
+ @Override
+ public String getAppId() {
+ return "testAppId";
+ }
+
+ @Override
+ public int getMaxFetchFailures() {
+ return 2;
+ }
+
+ @Override
+ public int getPartitionNum(int shuffleId) {
+ return 16;
+ }
+
+ @Override
+ public int getNumMaps(int shuffleId) {
+ return 8;
+ }
+
+ @Override
+ public void unregisterAllMapOutput(int shuffleId) {
+ unregisteredShuffleIds.add(shuffleId);
+ }
+}
diff --git a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcServiceTest.java b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcServiceTest.java
new file mode 100644
index 00000000..2160b03e
--- /dev/null
+++ b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcServiceTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.shuffle.manager;
+
+import io.grpc.stub.StreamObserver;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import org.apache.uniffle.proto.RssProtos.ReportShuffleFetchFailureRequest;
+import org.apache.uniffle.proto.RssProtos.ReportShuffleFetchFailureResponse;
+import org.apache.uniffle.proto.RssProtos.StatusCode;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class ShuffleManagerGrpcServiceTest {
+ // create mock of RssShuffleManagerInterface.
+ private static RssShuffleManagerInterface mockShuffleManager;
+ private static final String appId = "app-123";
+ private static final int maxFetchFailures = 2;
+ private static final int shuffleId = 0;
+ private static final int numMaps = 100;
+ private static final int numReduces = 10;
+
+ private static class MockedStreamObserver<T> implements StreamObserver<T> {
+ T value;
+ Throwable error;
+ boolean completed;
+
+ @Override
+ public void onNext(T value) {
+ this.value = value;
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ this.error = t;
+ }
+
+ @Override
+ public void onCompleted() {
+ this.completed = true;
+ }
+ }
+
+ @BeforeAll
+ public static void setup() {
+ mockShuffleManager = mock(RssShuffleManagerInterface.class);
+ Mockito.when(mockShuffleManager.getAppId()).thenReturn(appId);
+ Mockito.when(mockShuffleManager.getNumMaps(shuffleId)).thenReturn(numMaps);
+ Mockito.when(mockShuffleManager.getPartitionNum(shuffleId)).thenReturn(numReduces);
+ Mockito.when(mockShuffleManager.getMaxFetchFailures()).thenReturn(maxFetchFailures);
+ }
+
+ @Test
+ public void testShuffleManagerGrpcService() {
+ ShuffleManagerGrpcService service = new ShuffleManagerGrpcService(mockShuffleManager);
+ MockedStreamObserver<ReportShuffleFetchFailureResponse> appIdResponseObserver =
+ new MockedStreamObserver<>();
+ ReportShuffleFetchFailureRequest req =
+ ReportShuffleFetchFailureRequest.newBuilder()
+ .setAppId(appId)
+ .setShuffleId(shuffleId)
+ .setStageAttemptId(1)
+ .setPartitionId(1)
+ .buildPartial();
+
+ service.reportShuffleFetchFailure(req, appIdResponseObserver);
+ assertTrue(appIdResponseObserver.completed);
+ // the first call of ReportShuffleFetchFailureRequest should be successful.
+ assertEquals(StatusCode.SUCCESS, appIdResponseObserver.value.getStatus());
+ assertFalse(appIdResponseObserver.value.getReSubmitWholeStage());
+
+ // req with wrong appId should fail.
+ req = ReportShuffleFetchFailureRequest.newBuilder().mergeFrom(req)
+ .setAppId("wrong-app-id").build();
+ service.reportShuffleFetchFailure(req, appIdResponseObserver);
+ assertEquals(StatusCode.INVALID_REQUEST, appIdResponseObserver.value.getStatus());
+ // forwards the stageAttemptId to 1 to mock invalid request
+ req = ReportShuffleFetchFailureRequest.newBuilder()
+ .mergeFrom(req)
+ .setAppId(appId)
+ .setStageAttemptId(0)
+ .build();
+ service.reportShuffleFetchFailure(req, appIdResponseObserver);
+ assertEquals(StatusCode.INVALID_REQUEST, appIdResponseObserver.value.getStatus());
+ assertTrue(appIdResponseObserver.value.getMsg().contains("old stage"));
+ }
+}
diff --git a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactoryTest.java b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactoryTest.java
new file mode 100644
index 00000000..5e287356
--- /dev/null
+++ b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactoryTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.shuffle.manager;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.config.RssBaseConf;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class ShuffleManagerServerFactoryTest {
+ @Test
+ public void testShuffleManagerServerType() {
+ // add code to generate tests that check the server type
+ RssBaseConf conf = new RssBaseConf();
+ conf.set(RssBaseConf.RPC_SERVER_TYPE, "GRPC");
+ ShuffleManagerServerFactory factory = new ShuffleManagerServerFactory(null, conf);
+ // this should execute normally;
+ factory.getServer();
+
+ // other types should raise an exception
+ conf.set(RssBaseConf.RPC_SERVER_TYPE, "Netty");
+ factory = new ShuffleManagerServerFactory(null, conf);
+ assertThrows(UnsupportedOperationException.class, factory::getServer);
+ }
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/metrics/EmptyGRPCMetrics.java b/common/src/main/java/org/apache/uniffle/common/metrics/EmptyGRPCMetrics.java
new file mode 100644
index 00000000..40750ecc
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/EmptyGRPCMetrics.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.metrics;
+
+public class EmptyGRPCMetrics extends GRPCMetrics {
+ @Override
+ public void registerMetrics() {
+ }
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java b/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
index e6103d21..de3faad1 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
@@ -176,4 +176,8 @@ public abstract class GRPCMetrics {
public Map<String, Summary> getProcessTimeSummaryMap() {
return processTimeSummaryMap;
}
+
+ public static GRPCMetrics getEmptyGRPCMetrics() {
+ return new EmptyGRPCMetrics();
+ }
}
diff --git a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
index c9f2a092..59ade455 100644
--- a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
+++ b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
@@ -49,6 +49,7 @@ public class GrpcServer implements ServerInterface {
private final Server server;
private final int port;
+ private int listenPort;
private final ExecutorService pool;
protected GrpcServer(
@@ -157,10 +158,11 @@ public class GrpcServer implements ServerInterface {
public void start() throws IOException {
try {
server.start();
+ listenPort = server.getPort();
} catch (IOException e) {
ExitUtils.terminate(1, "Fail to start grpc server", e, LOG);
}
- LOG.info("Grpc server started, listening on {}.", port);
+ LOG.info("Grpc server started, configured port: {}, listening on {}.", port, listenPort);
}
public void stop() throws InterruptedException {
@@ -179,4 +181,8 @@ public class GrpcServer implements ServerInterface {
}
}
+ public int getPort() {
+ return port <= 0 ? listenPort : port;
+ }
+
}
diff --git a/common/src/test/java/org/apache/uniffle/common/rpc/GrpcServerTest.java b/common/src/test/java/org/apache/uniffle/common/rpc/GrpcServerTest.java
index 54dc5ae7..ebcb6c23 100644
--- a/common/src/test/java/org/apache/uniffle/common/rpc/GrpcServerTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/rpc/GrpcServerTest.java
@@ -26,26 +26,23 @@ import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.metrics.GRPCMetrics;
import org.apache.uniffle.common.util.ThreadUtils;
+import org.apache.uniffle.proto.ShuffleManagerGrpc;
+import static org.apache.uniffle.common.config.RssBaseConf.RPC_SERVER_PORT;
import static org.apache.uniffle.common.metrics.GRPCMetrics.GRPC_SERVER_EXECUTOR_ACTIVE_THREADS_KEY;
import static org.apache.uniffle.common.metrics.GRPCMetrics.GRPC_SERVER_EXECUTOR_BLOCKING_QUEUE_SIZE_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class GrpcServerTest {
private static final Logger LOGGER = LoggerFactory.getLogger(GrpcServerTest.class);
- static class MockedGRPCMetrics extends GRPCMetrics {
- @Override
- public void registerMetrics() {
- // ignore
- }
- }
-
@Test
public void testGrpcExecutorPool() throws Exception {
- GRPCMetrics grpcMetrics = new MockedGRPCMetrics();
+ GRPCMetrics grpcMetrics = GRPCMetrics.getEmptyGRPCMetrics();
grpcMetrics.register(new CollectorRegistry(true));
GrpcServer.GrpcThreadPoolExecutor executor = new GrpcServer.GrpcThreadPoolExecutor(
2,
@@ -87,4 +84,20 @@ public class GrpcServerTest {
executor.shutdown();
}
+
+ @Test
+ public void testRandomPort() throws Exception {
+ GRPCMetrics grpcMetrics = GRPCMetrics.getEmptyGRPCMetrics();
+ grpcMetrics.register(new CollectorRegistry(true));
+ RssBaseConf conf = new RssBaseConf();
+ conf.set(RPC_SERVER_PORT, 0);
+ GrpcServer server = GrpcServer.Builder.newBuilder()
+ .conf(conf)
+ .grpcMetrics(grpcMetrics)
+ .addService(new ShuffleManagerGrpc.ShuffleManagerImplBase(){})
+ .build();
+ server.start();
+ assertTrue(server.getPort() > 0);
+ }
+
}
diff --git a/integration-test/spark-common/pom.xml b/integration-test/spark-common/pom.xml
index 78a5e888..ee8bb0ad 100644
--- a/integration-test/spark-common/pom.xml
+++ b/integration-test/spark-common/pom.xml
@@ -77,6 +77,12 @@
<artifactId>rss-client-spark-common</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.uniffle</groupId>
+ <artifactId>rss-client-spark-common</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
<dependency>
<groupId>org.apache.uniffle</groupId>
<artifactId>rss-client-spark${client.type}</artifactId>
diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleServerManagerTestBase.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleServerManagerTestBase.java
new file mode 100644
index 00000000..8a528303
--- /dev/null
+++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleServerManagerTestBase.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+import org.apache.uniffle.client.factory.ShuffleManagerClientFactory;
+import org.apache.uniffle.client.impl.grpc.ShuffleManagerGrpcClient;
+import org.apache.uniffle.common.ClientType;
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.rpc.GrpcServer;
+import org.apache.uniffle.shuffle.manager.DummyRssShuffleManager;
+import org.apache.uniffle.shuffle.manager.RssShuffleManagerInterface;
+import org.apache.uniffle.shuffle.manager.ShuffleManagerServerFactory;
+
+import static org.apache.uniffle.common.config.RssBaseConf.RPC_SERVER_PORT;
+
+public class ShuffleServerManagerTestBase {
+ protected ShuffleManagerClientFactory factory = ShuffleManagerClientFactory.getInstance();
+ protected ShuffleManagerGrpcClient client;
+ protected static final String LOCALHOST = "localhost";
+ protected GrpcServer shuffleManagerServer;
+
+ protected RssShuffleManagerInterface getShuffleManager() {
+ return new DummyRssShuffleManager();
+ }
+
+ protected RssConf getConf() {
+ RssConf conf = new RssConf();
+ // use a random port
+ conf.set(RPC_SERVER_PORT, 0);
+ return conf;
+ }
+
+ protected GrpcServer createShuffleManagerServer() {
+ return new ShuffleManagerServerFactory(getShuffleManager(), getConf()).getServer();
+ }
+
+
+ @BeforeEach
+ public void createServerAndClient() throws Exception {
+ shuffleManagerServer = createShuffleManagerServer();
+ shuffleManagerServer.start();
+ int port = shuffleManagerServer.getPort();
+ client = factory.createShuffleManagerClient(ClientType.GRPC, LOCALHOST, port);
+ }
+
+ @AfterEach
+ public void close() throws Exception {
+ if (client != null) {
+ client.close();
+ }
+ if (shuffleManagerServer != null) {
+ shuffleManagerServer.stop();
+ }
+ }
+}
diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SimpleShuffleServerManagerTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SimpleShuffleServerManagerTest.java
new file mode 100644
index 00000000..5b052eb3
--- /dev/null
+++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SimpleShuffleServerManagerTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
+import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
+import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.shuffle.manager.DummyRssShuffleManager;
+import org.apache.uniffle.shuffle.manager.RssShuffleManagerInterface;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class SimpleShuffleServerManagerTest extends ShuffleServerManagerTestBase {
+
+ @Test
+ public void testClientAndServerConnections() {
+ RssShuffleManagerInterface dummy = new DummyRssShuffleManager();
+ RssReportShuffleFetchFailureRequest req =
+ new RssReportShuffleFetchFailureRequest(dummy.getAppId(), 0, 0, 0, null);
+ RssReportShuffleFetchFailureResponse res = client.reportShuffleFetchFailure(req);
+ assertEquals(StatusCode.SUCCESS, res.getStatusCode());
+
+ // wrong appId
+ req = new RssReportShuffleFetchFailureRequest("wrongAppId", 0, 0, 0, null);
+ res = client.reportShuffleFetchFailure(req);
+ assertEquals(StatusCode.INVALID_REQUEST, res.getStatusCode());
+ }
+}
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
new file mode 100644
index 00000000..91e53b46
--- /dev/null
+++ b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.api;
+
+import java.io.Closeable;
+
+import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
+import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
+
+public interface ShuffleManagerClient extends Closeable {
+ RssReportShuffleFetchFailureResponse reportShuffleFetchFailure(RssReportShuffleFetchFailureRequest request);
+}
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleManagerClientFactory.java b/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleManagerClientFactory.java
new file mode 100644
index 00000000..9199c43a
--- /dev/null
+++ b/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleManagerClientFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.factory;
+
+import org.apache.uniffle.client.impl.grpc.ShuffleManagerGrpcClient;
+import org.apache.uniffle.common.ClientType;
+
+public class ShuffleManagerClientFactory {
+
+ private static class LazyHolder {
+ private static final ShuffleManagerClientFactory INSTANCE = new ShuffleManagerClientFactory();
+ }
+
+ public static ShuffleManagerClientFactory getInstance() {
+ return LazyHolder.INSTANCE;
+ }
+
+ private ShuffleManagerClientFactory() {
+ }
+
+ public ShuffleManagerGrpcClient createShuffleManagerClient(ClientType clientType, String host, int port) {
+ if (ClientType.GRPC.equals(clientType)) {
+ return new ShuffleManagerGrpcClient(host, port);
+ } else {
+ throw new UnsupportedOperationException("Unsupported client type " + clientType);
+ }
+ }
+}
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
new file mode 100644
index 00000000..fe4b6235
--- /dev/null
+++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.impl.grpc;
+
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.client.api.ShuffleManagerClient;
+import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
+import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.proto.RssProtos.ReportShuffleFetchFailureRequest;
+import org.apache.uniffle.proto.RssProtos.ReportShuffleFetchFailureResponse;
+import org.apache.uniffle.proto.ShuffleManagerGrpc;
+
+public class ShuffleManagerGrpcClient extends GrpcClient implements ShuffleManagerClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ShuffleManagerGrpcClient.class);
+ private static final long RPC_TIMEOUT_DEFAULT_MS = 60000;
+ private long rpcTimeout = RPC_TIMEOUT_DEFAULT_MS;
+ private ShuffleManagerGrpc.ShuffleManagerBlockingStub blockingStub;
+
+ public ShuffleManagerGrpcClient(String host, int port) {
+ this(host, port, 3);
+ }
+
+ public ShuffleManagerGrpcClient(String host, int port, int maxRetryAttempts) {
+ this(host, port, maxRetryAttempts, true);
+ }
+
+ public ShuffleManagerGrpcClient(String host, int port, int maxRetryAttempts, boolean usePlaintext) {
+ super(host, port, maxRetryAttempts, usePlaintext);
+ blockingStub = ShuffleManagerGrpc.newBlockingStub(channel);
+ }
+
+ public ShuffleManagerGrpc.ShuffleManagerBlockingStub getBlockingStub() {
+ return blockingStub.withDeadlineAfter(rpcTimeout, TimeUnit.MILLISECONDS);
+ }
+
+ public String getDesc() {
+ return "Shuffle manager grpc client ref " + host + ":" + port;
+ }
+
+ @Override
+ public RssReportShuffleFetchFailureResponse reportShuffleFetchFailure(RssReportShuffleFetchFailureRequest request) {
+ ReportShuffleFetchFailureRequest protoRequest = request.toProto();
+ try {
+ ReportShuffleFetchFailureResponse response = getBlockingStub().reportShuffleFetchFailure(protoRequest);
+ return RssReportShuffleFetchFailureResponse.fromProto(response);
+ } catch (Exception e) {
+ String msg = "Report shuffle fetch failure to host:port[" + host + ":" + port + "] failed";
+ LOG.warn(msg, e);
+ throw new RssException(msg, e);
+ }
+ }
+}
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleFetchFailureRequest.java b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleFetchFailureRequest.java
new file mode 100644
index 00000000..37239d75
--- /dev/null
+++ b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleFetchFailureRequest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.request;
+
+import org.apache.uniffle.proto.RssProtos.ReportShuffleFetchFailureRequest;
+
+public class RssReportShuffleFetchFailureRequest {
+ private String appId;
+ private int shuffleId;
+ private int stageAttemptId;
+ private int partitionId;
+ private String exception;
+
+ public RssReportShuffleFetchFailureRequest(
+ String appId,
+ int shuffleId,
+ int stageAttemptId,
+ int partitionId,
+ String exception) {
+ this.appId = appId;
+ this.shuffleId = shuffleId;
+ this.stageAttemptId = stageAttemptId;
+ this.partitionId = partitionId;
+ this.exception = exception;
+ }
+
+ public ReportShuffleFetchFailureRequest toProto() {
+ ReportShuffleFetchFailureRequest.Builder builder = ReportShuffleFetchFailureRequest.newBuilder();
+ builder.setAppId(appId)
+ .setShuffleId(shuffleId)
+ .setStageAttemptId(stageAttemptId)
+ .setPartitionId(partitionId);
+ if (exception != null) {
+ builder.setException(exception);
+ }
+ return builder.build();
+ }
+
+}
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/response/RssReportShuffleFetchFailureResponse.java b/internal-client/src/main/java/org/apache/uniffle/client/response/RssReportShuffleFetchFailureResponse.java
new file mode 100644
index 00000000..57f799cd
--- /dev/null
+++ b/internal-client/src/main/java/org/apache/uniffle/client/response/RssReportShuffleFetchFailureResponse.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.response;
+
+import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.proto.RssProtos.ReportShuffleFetchFailureResponse;
+
+public class RssReportShuffleFetchFailureResponse extends ClientResponse {
+ private boolean reSubmitWholeStage;
+
+ public RssReportShuffleFetchFailureResponse(StatusCode code, String msg, boolean recomputeStage) {
+ super(code, msg);
+ this.reSubmitWholeStage = recomputeStage;
+ }
+
+ public boolean getReSubmitWholeStage() {
+ return this.reSubmitWholeStage;
+ }
+
+ public ReportShuffleFetchFailureResponse toProto() {
+ ReportShuffleFetchFailureResponse.Builder builder = ReportShuffleFetchFailureResponse.newBuilder();
+ return builder
+ .setStatus(getStatusCode().toProto())
+ .setMsg(getMessage())
+ .setReSubmitWholeStage(reSubmitWholeStage)
+ .build();
+ }
+
+ public static RssReportShuffleFetchFailureResponse fromProto(ReportShuffleFetchFailureResponse response) {
+ return new RssReportShuffleFetchFailureResponse(
+ // todo: [issue#780] add fromProto for StatusCode issue
+ StatusCode.valueOf(response.getStatus().name()),
+ response.getMsg(),
+ response.getReSubmitWholeStage()
+ );
+ }
+
+}
diff --git a/internal-client/src/test/java/org/apache/uniffle/client/factory/ShuffleManagerClientFactoryTest.java b/internal-client/src/test/java/org/apache/uniffle/client/factory/ShuffleManagerClientFactoryTest.java
new file mode 100644
index 00000000..17e42ba4
--- /dev/null
+++ b/internal-client/src/test/java/org/apache/uniffle/client/factory/ShuffleManagerClientFactoryTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.factory;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.client.api.ShuffleManagerClient;
+import org.apache.uniffle.common.ClientType;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class ShuffleManagerClientFactoryTest {
+
+ @Test
+ void createShuffleManagerClient() {
+ ShuffleManagerClientFactory factory = ShuffleManagerClientFactory.getInstance();
+ assertNotNull(factory);
+ // only grpc type is supported currently
+ ShuffleManagerClient c = factory.createShuffleManagerClient(ClientType.GRPC, "localhost", 1234);
+ assertNotNull(c);
+ assertThrows(UnsupportedOperationException.class,
+ () -> factory.createShuffleManagerClient(ClientType.GRPC_NETTY, "localhost", 1234));
+ }
+}
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 84736982..ac7fb936 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -488,4 +488,27 @@ message CancelDecommissionRequest {
message CancelDecommissionResponse {
StatusCode status = 1;
string retMsg = 2;
-}
\ No newline at end of file
+}
+// ShuffleManager service lives inside of compute-engine's application master, which handles rss shuffle specific logic
+// per application.
+service ShuffleManager {
+ rpc reportShuffleFetchFailure (ReportShuffleFetchFailureRequest) returns (ReportShuffleFetchFailureResponse);
+}
+
+message ReportShuffleFetchFailureRequest {
+ // appId normally should be omitted, it's used to avoid wrongly request issued from remaining executors of another
+ // app which accidentally has the same shuffle manager port with this app.
+ string appId = 1;
+ int32 shuffleId = 2;
+ int32 stageAttemptId = 3;
+ int32 partitionId = 4;
+ string exception = 5;
+ // todo: report ShuffleServerId if needed
+ // ShuffleServerId serverId = 6;
+}
+
+message ReportShuffleFetchFailureResponse {
+ StatusCode status = 1;
+ bool reSubmitWholeStage = 2;
+ string msg = 3;
+}