You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by xi...@apache.org on 2023/03/30 04:17:23 UTC

[incubator-uniffle] branch master updated: [#477][part-0] feat: add ShuffleManagerServer impl (#777)

This is an automated email from the ASF dual-hosted git repository.

xianjin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new ea5a3ba4 [#477][part-0] feat: add ShuffleManagerServer impl (#777)
ea5a3ba4 is described below

commit ea5a3ba45b923b1afc8fda0fdb0546daf34274fb
Author: advancedxy <xi...@apache.org>
AuthorDate: Thu Mar 30 12:17:16 2023 +0800

    [#477][part-0] feat: add ShuffleManagerServer impl (#777)
    
    ### What changes were proposed in this pull request?
    1. add shuffle manager proto
    2. the corresponding client and server impls
    3. add the RssShuffleManagerInterface
    
    ### Why are the changes needed?
    This is the first part of #477.
    To support re-submit spark stage, the ShuffleManagerServer should be introduced first.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Added UTs and one integration test.
---
 .../manager/RssShuffleManagerInterface.java        |  58 +++++++
 .../shuffle/manager/ShuffleManagerGrpcService.java | 182 +++++++++++++++++++++
 .../manager/ShuffleManagerServerFactory.java       |  56 +++++++
 .../shuffle/manager/DummyRssShuffleManager.java    |  50 ++++++
 .../manager/ShuffleManagerGrpcServiceTest.java     | 107 ++++++++++++
 .../manager/ShuffleManagerServerFactoryTest.java   |  41 +++++
 .../uniffle/common/metrics/EmptyGRPCMetrics.java   |  24 +++
 .../apache/uniffle/common/metrics/GRPCMetrics.java |   4 +
 .../org/apache/uniffle/common/rpc/GrpcServer.java  |   8 +-
 .../apache/uniffle/common/rpc/GrpcServerTest.java  |  29 +++-
 integration-test/spark-common/pom.xml              |   6 +
 .../uniffle/test/ShuffleServerManagerTestBase.java |  73 +++++++++
 .../test/SimpleShuffleServerManagerTest.java       |  45 +++++
 .../uniffle/client/api/ShuffleManagerClient.java   |  27 +++
 .../factory/ShuffleManagerClientFactory.java       |  43 +++++
 .../client/impl/grpc/ShuffleManagerGrpcClient.java |  73 +++++++++
 .../RssReportShuffleFetchFailureRequest.java       |  54 ++++++
 .../RssReportShuffleFetchFailureResponse.java      |  53 ++++++
 .../factory/ShuffleManagerClientFactoryTest.java   |  40 +++++
 proto/src/main/proto/Rss.proto                     |  25 ++-
 20 files changed, 988 insertions(+), 10 deletions(-)

diff --git a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java
new file mode 100644
index 00000000..5725a203
--- /dev/null
+++ b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.shuffle.manager;
+
+import org.apache.spark.SparkException;
+
+
+
+/**
+ * This is a proxy interface that mainly delegates the un-registration of shuffles to the MapOutputTrackerMaster on
+ * the driver. It provides a unified interface that hides implementation details for different versions of Spark.
+ */
+public interface RssShuffleManagerInterface {
+
+  /**
+   * @return the unique spark id for rss shuffle
+   */
+  String getAppId();
+
+  /**
+   * @return the maximum number of fetch failures per shuffle partition before that shuffle stage should be re-submitted
+   */
+  int getMaxFetchFailures();
+
+  /**
+   * @param shuffleId the shuffle id to query
+   * @return the num of partitions(a.k.a reduce tasks) for shuffle with shuffle id.
+   */
+  int getPartitionNum(int shuffleId);
+
+  /**
+   * @param shuffleId the shuffle id to query
+   * @return the num of map tasks for current shuffle with shuffle id.
+   */
+  int getNumMaps(int shuffleId);
+
+  /**
+   * Unregister all the map output on the driver side, so the whole stage could be re-computed.
+   * @param shuffleId the shuffle id to unregister
+   * @throws SparkException
+   */
+  void unregisterAllMapOutput(int shuffleId) throws SparkException;
+}
diff --git a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
new file mode 100644
index 00000000..7eb0e8a5
--- /dev/null
+++ b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.shuffle.manager;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
+
+import io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.util.JavaUtils;
+import org.apache.uniffle.proto.RssProtos;
+import org.apache.uniffle.proto.ShuffleManagerGrpc.ShuffleManagerImplBase;
+
+public class ShuffleManagerGrpcService extends ShuffleManagerImplBase {
+  private static final Logger LOG = LoggerFactory.getLogger(ShuffleManagerGrpcService.class);
+  private final Map<Integer, RssShuffleStatus> shuffleStatus = JavaUtils.newConcurrentMap();
+  private final RssShuffleManagerInterface shuffleManager;
+
+  public ShuffleManagerGrpcService(RssShuffleManagerInterface shuffleManager) {
+    this.shuffleManager = shuffleManager;
+  }
+
+  @Override
+  public void reportShuffleFetchFailure(RssProtos.ReportShuffleFetchFailureRequest request,
+      StreamObserver<RssProtos.ReportShuffleFetchFailureResponse> responseObserver) {
+    String appId = request.getAppId();
+    int stageAttempt = request.getStageAttemptId();
+    int partitionId = request.getPartitionId();
+    RssProtos.StatusCode code;
+    boolean reSubmitWholeStage;
+    String msg;
+    if (!appId.equals(shuffleManager.getAppId())) {
+      msg = String.format("got a wrong shuffle fetch failure report from appId: %s, expected appId: %s",
+          appId, shuffleManager.getAppId());
+      LOG.warn(msg);
+      code = RssProtos.StatusCode.INVALID_REQUEST;
+      reSubmitWholeStage = false;
+    } else {
+      RssShuffleStatus status = shuffleStatus.computeIfAbsent(request.getShuffleId(), key -> {
+            int partitionNum = shuffleManager.getPartitionNum(key);
+            return new RssShuffleStatus(partitionNum, stageAttempt);
+          }
+      );
+      int c = status.resetStageAttemptIfNecessary(stageAttempt);
+      if (c < 0) {
+        msg = String.format("got an old stage(%d vs %d) shuffle fetch failure report, which should be impossible.",
+            status.getStageAttempt(), stageAttempt);
+        LOG.warn(msg);
+        code = RssProtos.StatusCode.INVALID_REQUEST;
+        reSubmitWholeStage = false;
+      } else { // update the stage partition fetch failure count
+        code = RssProtos.StatusCode.SUCCESS;
+        status.incPartitionFetchFailure(stageAttempt, partitionId);
+        int fetchFailureNum = status.getPartitionFetchFailureNum(stageAttempt, partitionId);
+        if (fetchFailureNum >= shuffleManager.getMaxFetchFailures()) {
+          reSubmitWholeStage = true;
+          msg = String.format("report shuffle fetch failure as maximum number(%d) of shuffle fetch is occurred",
+              shuffleManager.getMaxFetchFailures());
+        } else {
+          reSubmitWholeStage = false;
+          msg = "don't report shuffle fetch failure";
+        }
+      }
+    }
+
+    RssProtos.ReportShuffleFetchFailureResponse reply = RssProtos.ReportShuffleFetchFailureResponse
+        .newBuilder()
+        .setStatus(code)
+        .setReSubmitWholeStage(reSubmitWholeStage)
+        .setMsg(msg)
+        .build();
+    responseObserver.onNext(reply);
+    responseObserver.onCompleted();
+  }
+
+  /**
+   * Remove the no longer used shuffle id's rss shuffle status. This is called when ShuffleManager unregisters the
+   * corresponding shuffle id.
+   * @param shuffleId the shuffle id to unregister.
+   */
+  public void unregisterShuffle(int shuffleId) {
+    shuffleStatus.remove(shuffleId);
+  }
+
+  private static class RssShuffleStatus {
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
+    private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
+    private final int[] partitions;
+    private int stageAttempt;
+
+    private RssShuffleStatus(int partitionNum, int stageAttempt) {
+      this.stageAttempt = stageAttempt;
+      this.partitions = new int[partitionNum];
+    }
+
+    private <T> T withReadLock(Supplier<T> fn) {
+      readLock.lock();
+      try {
+        return fn.get();
+      } finally {
+        readLock.unlock();
+      }
+    }
+
+    private <T> T withWriteLock(Supplier<T> fn) {
+      writeLock.lock();
+      try {
+        return fn.get();
+      } finally {
+        writeLock.unlock();
+      }
+    }
+
+    // todo: maybe it's more performant to just use synchronized method here.
+    public int getStageAttempt() {
+      return withReadLock(() -> this.stageAttempt);
+    }
+
+    /**
+     * Check whether the input stage attempt is a new stage or not. If a new stage attempt is requested, reset
+     * partitions.
+     * @param stageAttempt the incoming stage attempt number
+     * @return 0 if stageAttempt == this.stageAttempt
+     *         1 if stageAttempt > this.stageAttempt
+     *         -1 if stateAttempt < this.stageAttempt which means nothing happens
+     */
+    public int resetStageAttemptIfNecessary(int stageAttempt) {
+      return withWriteLock(() -> {
+        if (this.stageAttempt < stageAttempt) {
+          // a new stage attempt is issued. the partitions array should be clear and reset.
+          Arrays.fill(this.partitions, 0);
+          this.stageAttempt = stageAttempt;
+          return 1;
+        } else if (this.stageAttempt > stageAttempt) {
+          return -1;
+        }
+        return 0;
+      });
+    }
+
+    public void incPartitionFetchFailure(int stageAttempt, int partition) {
+      withWriteLock(() -> {
+        if (this.stageAttempt != stageAttempt) {
+          // do nothing here
+        } else {
+          this.partitions[partition] = this.partitions[partition] + 1;
+        }
+        return null;
+      });
+    }
+
+    public int getPartitionFetchFailureNum(int stageAttempt, int partition) {
+      return withReadLock(() -> {
+        if (this.stageAttempt != stageAttempt) {
+          return 0;
+        } else {
+          return this.partitions[partition];
+        }
+      });
+    }
+  }
+}
diff --git a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java
new file mode 100644
index 00000000..5c395e52
--- /dev/null
+++ b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.shuffle.manager;
+
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.metrics.GRPCMetrics;
+import org.apache.uniffle.common.rpc.GrpcServer;
+
+public class ShuffleManagerServerFactory {
+  private final RssShuffleManagerInterface shuffleManager;
+  private final RssBaseConf conf;
+
+  public ShuffleManagerServerFactory(RssShuffleManagerInterface shuffleManager, RssConf conf) {
+    this.shuffleManager = shuffleManager;
+    this.conf = new RssBaseConf();
+    this.conf.addAll(conf);
+  }
+
+  public GrpcServer getServer() {
+    String type = conf.getString(RssBaseConf.RPC_SERVER_TYPE);
+    if (ServerType.GRPC.name().equals(type)) {
+      return GrpcServer.Builder.newBuilder()
+          .conf(conf)
+          .grpcMetrics(GRPCMetrics.getEmptyGRPCMetrics())
+          .addService(new ShuffleManagerGrpcService(shuffleManager))
+          .build();
+    } else {
+      throw new UnsupportedOperationException("Unsupported server type " + type);
+    }
+  }
+
+
+  public RssBaseConf getConf() {
+    return conf;
+  }
+
+  enum ServerType {
+    GRPC
+  }
+}
diff --git a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java
new file mode 100644
index 00000000..9e06da4e
--- /dev/null
+++ b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.shuffle.manager;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+public class DummyRssShuffleManager implements RssShuffleManagerInterface {
+  public Set<Integer> unregisteredShuffleIds = new LinkedHashSet<>();
+
+  @Override
+  public String getAppId() {
+    return "testAppId";
+  }
+
+  @Override
+  public int getMaxFetchFailures() {
+    return 2;
+  }
+
+  @Override
+  public int getPartitionNum(int shuffleId) {
+    return 16;
+  }
+
+  @Override
+  public int getNumMaps(int shuffleId) {
+    return 8;
+  }
+
+  @Override
+  public void unregisterAllMapOutput(int shuffleId) {
+    unregisteredShuffleIds.add(shuffleId);
+  }
+}
diff --git a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcServiceTest.java b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcServiceTest.java
new file mode 100644
index 00000000..2160b03e
--- /dev/null
+++ b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcServiceTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.shuffle.manager;
+
+import io.grpc.stub.StreamObserver;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import org.apache.uniffle.proto.RssProtos.ReportShuffleFetchFailureRequest;
+import org.apache.uniffle.proto.RssProtos.ReportShuffleFetchFailureResponse;
+import org.apache.uniffle.proto.RssProtos.StatusCode;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class ShuffleManagerGrpcServiceTest {
+  // create mock of RssShuffleManagerInterface.
+  private static RssShuffleManagerInterface mockShuffleManager;
+  private static final String appId = "app-123";
+  private static final int maxFetchFailures = 2;
+  private static final int shuffleId = 0;
+  private static final int numMaps = 100;
+  private static final int numReduces = 10;
+
+  private static class MockedStreamObserver<T> implements StreamObserver<T> {
+    T value;
+    Throwable error;
+    boolean completed;
+
+    @Override
+    public void onNext(T value) {
+      this.value = value;
+    }
+
+    @Override
+    public void onError(Throwable t) {
+      this.error = t;
+    }
+
+    @Override
+    public void onCompleted() {
+      this.completed = true;
+    }
+  }
+
+  @BeforeAll
+  public static void setup() {
+    mockShuffleManager = mock(RssShuffleManagerInterface.class);
+    Mockito.when(mockShuffleManager.getAppId()).thenReturn(appId);
+    Mockito.when(mockShuffleManager.getNumMaps(shuffleId)).thenReturn(numMaps);
+    Mockito.when(mockShuffleManager.getPartitionNum(shuffleId)).thenReturn(numReduces);
+    Mockito.when(mockShuffleManager.getMaxFetchFailures()).thenReturn(maxFetchFailures);
+  }
+
+  @Test
+  public void testShuffleManagerGrpcService() {
+    ShuffleManagerGrpcService service = new ShuffleManagerGrpcService(mockShuffleManager);
+    MockedStreamObserver<ReportShuffleFetchFailureResponse> appIdResponseObserver =
+        new MockedStreamObserver<>();
+    ReportShuffleFetchFailureRequest req =
+        ReportShuffleFetchFailureRequest.newBuilder()
+            .setAppId(appId)
+            .setShuffleId(shuffleId)
+            .setStageAttemptId(1)
+            .setPartitionId(1)
+            .buildPartial();
+
+    service.reportShuffleFetchFailure(req, appIdResponseObserver);
+    assertTrue(appIdResponseObserver.completed);
+    // the first call of ReportShuffleFetchFailureRequest should be successful.
+    assertEquals(StatusCode.SUCCESS, appIdResponseObserver.value.getStatus());
+    assertFalse(appIdResponseObserver.value.getReSubmitWholeStage());
+
+    // req with wrong appId should fail.
+    req = ReportShuffleFetchFailureRequest.newBuilder().mergeFrom(req)
+              .setAppId("wrong-app-id").build();
+    service.reportShuffleFetchFailure(req, appIdResponseObserver);
+    assertEquals(StatusCode.INVALID_REQUEST, appIdResponseObserver.value.getStatus());
+    // forwards the stageAttemptId to 1 to mock invalid request
+    req = ReportShuffleFetchFailureRequest.newBuilder()
+              .mergeFrom(req)
+              .setAppId(appId)
+              .setStageAttemptId(0)
+              .build();
+    service.reportShuffleFetchFailure(req, appIdResponseObserver);
+    assertEquals(StatusCode.INVALID_REQUEST, appIdResponseObserver.value.getStatus());
+    assertTrue(appIdResponseObserver.value.getMsg().contains("old stage"));
+  }
+}
diff --git a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactoryTest.java b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactoryTest.java
new file mode 100644
index 00000000..5e287356
--- /dev/null
+++ b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactoryTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.shuffle.manager;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.config.RssBaseConf;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class ShuffleManagerServerFactoryTest {
+  @Test
+  public void testShuffleManagerServerType() {
+    // add code to generate tests that check the server type
+    RssBaseConf conf = new RssBaseConf();
+    conf.set(RssBaseConf.RPC_SERVER_TYPE, "GRPC");
+    ShuffleManagerServerFactory factory = new ShuffleManagerServerFactory(null, conf);
+    // this should execute normally;
+    factory.getServer();
+
+    // other types should raise an exception
+    conf.set(RssBaseConf.RPC_SERVER_TYPE, "Netty");
+    factory = new ShuffleManagerServerFactory(null, conf);
+    assertThrows(UnsupportedOperationException.class, factory::getServer);
+  }
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/metrics/EmptyGRPCMetrics.java b/common/src/main/java/org/apache/uniffle/common/metrics/EmptyGRPCMetrics.java
new file mode 100644
index 00000000..40750ecc
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/EmptyGRPCMetrics.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.metrics;
+
+public class EmptyGRPCMetrics extends GRPCMetrics {
+  @Override
+  public void registerMetrics() {
+  }
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java b/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
index e6103d21..de3faad1 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
@@ -176,4 +176,8 @@ public abstract class GRPCMetrics {
   public Map<String, Summary> getProcessTimeSummaryMap() {
     return processTimeSummaryMap;
   }
+
+  public static GRPCMetrics getEmptyGRPCMetrics() {
+    return new EmptyGRPCMetrics();
+  }
 }
diff --git a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
index c9f2a092..59ade455 100644
--- a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
+++ b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
@@ -49,6 +49,7 @@ public class GrpcServer implements ServerInterface {
 
   private final Server server;
   private final int port;
+  private int listenPort;
   private final ExecutorService pool;
 
   protected GrpcServer(
@@ -157,10 +158,11 @@ public class GrpcServer implements ServerInterface {
   public void start() throws IOException {
     try {
       server.start();
+      listenPort = server.getPort();
     } catch (IOException e) {
       ExitUtils.terminate(1, "Fail to start grpc server", e, LOG);
     }
-    LOG.info("Grpc server started, listening on {}.", port);
+    LOG.info("Grpc server started, configured port: {}, listening on {}.", port, listenPort);
   }
 
   public void stop() throws InterruptedException {
@@ -179,4 +181,8 @@ public class GrpcServer implements ServerInterface {
     }
   }
 
+  public int getPort() {
+    return port <= 0 ? listenPort : port;
+  }
+
 }
diff --git a/common/src/test/java/org/apache/uniffle/common/rpc/GrpcServerTest.java b/common/src/test/java/org/apache/uniffle/common/rpc/GrpcServerTest.java
index 54dc5ae7..ebcb6c23 100644
--- a/common/src/test/java/org/apache/uniffle/common/rpc/GrpcServerTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/rpc/GrpcServerTest.java
@@ -26,26 +26,23 @@ import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.common.metrics.GRPCMetrics;
 import org.apache.uniffle.common.util.ThreadUtils;
+import org.apache.uniffle.proto.ShuffleManagerGrpc;
 
+import static org.apache.uniffle.common.config.RssBaseConf.RPC_SERVER_PORT;
 import static org.apache.uniffle.common.metrics.GRPCMetrics.GRPC_SERVER_EXECUTOR_ACTIVE_THREADS_KEY;
 import static org.apache.uniffle.common.metrics.GRPCMetrics.GRPC_SERVER_EXECUTOR_BLOCKING_QUEUE_SIZE_KEY;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class GrpcServerTest {
   private static final Logger LOGGER = LoggerFactory.getLogger(GrpcServerTest.class);
 
-  static class MockedGRPCMetrics extends GRPCMetrics {
-    @Override
-    public void registerMetrics() {
-      // ignore
-    }
-  }
-
   @Test
   public void testGrpcExecutorPool() throws Exception {
-    GRPCMetrics grpcMetrics = new MockedGRPCMetrics();
+    GRPCMetrics grpcMetrics = GRPCMetrics.getEmptyGRPCMetrics();
     grpcMetrics.register(new CollectorRegistry(true));
     GrpcServer.GrpcThreadPoolExecutor executor = new GrpcServer.GrpcThreadPoolExecutor(
         2,
@@ -87,4 +84,20 @@ public class GrpcServerTest {
 
     executor.shutdown();
   }
+
+  @Test
+  public void testRandomPort() throws Exception {
+    GRPCMetrics grpcMetrics = GRPCMetrics.getEmptyGRPCMetrics();
+    grpcMetrics.register(new CollectorRegistry(true));
+    RssBaseConf conf = new RssBaseConf();
+    conf.set(RPC_SERVER_PORT, 0);
+    GrpcServer server = GrpcServer.Builder.newBuilder()
+        .conf(conf)
+        .grpcMetrics(grpcMetrics)
+        .addService(new ShuffleManagerGrpc.ShuffleManagerImplBase(){})
+        .build();
+    server.start();
+    assertTrue(server.getPort() > 0);
+  }
+
 }
diff --git a/integration-test/spark-common/pom.xml b/integration-test/spark-common/pom.xml
index 78a5e888..ee8bb0ad 100644
--- a/integration-test/spark-common/pom.xml
+++ b/integration-test/spark-common/pom.xml
@@ -77,6 +77,12 @@
       <artifactId>rss-client-spark-common</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.uniffle</groupId>
+      <artifactId>rss-client-spark-common</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
     <dependency>
       <groupId>org.apache.uniffle</groupId>
       <artifactId>rss-client-spark${client.type}</artifactId>
diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleServerManagerTestBase.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleServerManagerTestBase.java
new file mode 100644
index 00000000..8a528303
--- /dev/null
+++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleServerManagerTestBase.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+import org.apache.uniffle.client.factory.ShuffleManagerClientFactory;
+import org.apache.uniffle.client.impl.grpc.ShuffleManagerGrpcClient;
+import org.apache.uniffle.common.ClientType;
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.rpc.GrpcServer;
+import org.apache.uniffle.shuffle.manager.DummyRssShuffleManager;
+import org.apache.uniffle.shuffle.manager.RssShuffleManagerInterface;
+import org.apache.uniffle.shuffle.manager.ShuffleManagerServerFactory;
+
+import static org.apache.uniffle.common.config.RssBaseConf.RPC_SERVER_PORT;
+
+public class ShuffleServerManagerTestBase {
+  protected ShuffleManagerClientFactory factory = ShuffleManagerClientFactory.getInstance();
+  protected ShuffleManagerGrpcClient client;
+  protected static final  String LOCALHOST = "localhost";
+  protected GrpcServer shuffleManagerServer;
+
+  protected RssShuffleManagerInterface getShuffleManager() {
+    return new DummyRssShuffleManager();
+  }
+
+  protected RssConf getConf() {
+    RssConf conf = new RssConf();
+    // use a random port
+    conf.set(RPC_SERVER_PORT, 0);
+    return conf;
+  }
+
+  protected GrpcServer createShuffleManagerServer() {
+    return new ShuffleManagerServerFactory(getShuffleManager(), getConf()).getServer();
+  }
+
+
+  @BeforeEach
+  public void createServerAndClient() throws Exception {
+    shuffleManagerServer = createShuffleManagerServer();
+    shuffleManagerServer.start();
+    int port = shuffleManagerServer.getPort();
+    client =  factory.createShuffleManagerClient(ClientType.GRPC, LOCALHOST, port);
+  }
+
+  @AfterEach
+  public void close() throws Exception {
+    if (client != null) {
+      client.close();
+    }
+    if (shuffleManagerServer != null) {
+      shuffleManagerServer.stop();
+    }
+  }
+}
diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SimpleShuffleServerManagerTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SimpleShuffleServerManagerTest.java
new file mode 100644
index 00000000..5b052eb3
--- /dev/null
+++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SimpleShuffleServerManagerTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
+import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
+import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.shuffle.manager.DummyRssShuffleManager;
+import org.apache.uniffle.shuffle.manager.RssShuffleManagerInterface;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class SimpleShuffleServerManagerTest extends ShuffleServerManagerTestBase {
+
+  @Test
+  public void testClientAndServerConnections() {
+    RssShuffleManagerInterface dummy = new DummyRssShuffleManager();
+    RssReportShuffleFetchFailureRequest req =
+        new RssReportShuffleFetchFailureRequest(dummy.getAppId(), 0, 0, 0, null);
+    RssReportShuffleFetchFailureResponse res =  client.reportShuffleFetchFailure(req);
+    assertEquals(StatusCode.SUCCESS, res.getStatusCode());
+
+    // wrong appId
+    req = new RssReportShuffleFetchFailureRequest("wrongAppId", 0, 0, 0, null);
+    res =  client.reportShuffleFetchFailure(req);
+    assertEquals(StatusCode.INVALID_REQUEST, res.getStatusCode());
+  }
+}
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
new file mode 100644
index 00000000..91e53b46
--- /dev/null
+++ b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.api;
+
+import java.io.Closeable;
+
+import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
+import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
+
+public interface ShuffleManagerClient extends Closeable {
+  RssReportShuffleFetchFailureResponse reportShuffleFetchFailure(RssReportShuffleFetchFailureRequest request);
+}
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleManagerClientFactory.java b/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleManagerClientFactory.java
new file mode 100644
index 00000000..9199c43a
--- /dev/null
+++ b/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleManagerClientFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.factory;
+
+import org.apache.uniffle.client.impl.grpc.ShuffleManagerGrpcClient;
+import org.apache.uniffle.common.ClientType;
+
+public class ShuffleManagerClientFactory {
+
+  private static class LazyHolder {
+    private static final ShuffleManagerClientFactory INSTANCE = new ShuffleManagerClientFactory();
+  }
+
+  public static ShuffleManagerClientFactory getInstance() {
+    return LazyHolder.INSTANCE;
+  }
+
+  private ShuffleManagerClientFactory() {
+  }
+
+  public ShuffleManagerGrpcClient createShuffleManagerClient(ClientType clientType, String host, int port) {
+    if (ClientType.GRPC.equals(clientType)) {
+      return new ShuffleManagerGrpcClient(host, port);
+    } else {
+      throw new UnsupportedOperationException("Unsupported client type " + clientType);
+    }
+  }
+}
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
new file mode 100644
index 00000000..fe4b6235
--- /dev/null
+++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.impl.grpc;
+
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.client.api.ShuffleManagerClient;
+import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
+import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.proto.RssProtos.ReportShuffleFetchFailureRequest;
+import org.apache.uniffle.proto.RssProtos.ReportShuffleFetchFailureResponse;
+import org.apache.uniffle.proto.ShuffleManagerGrpc;
+
+public class ShuffleManagerGrpcClient extends GrpcClient implements ShuffleManagerClient {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ShuffleManagerGrpcClient.class);
+  private static final long RPC_TIMEOUT_DEFAULT_MS = 60000;
+  private long rpcTimeout = RPC_TIMEOUT_DEFAULT_MS;
+  private ShuffleManagerGrpc.ShuffleManagerBlockingStub blockingStub;
+
+  public ShuffleManagerGrpcClient(String host, int port) {
+    this(host, port, 3);
+  }
+
+  public ShuffleManagerGrpcClient(String host, int port, int maxRetryAttempts) {
+    this(host, port, maxRetryAttempts, true);
+  }
+
+  public ShuffleManagerGrpcClient(String host, int port, int maxRetryAttempts, boolean usePlaintext) {
+    super(host, port, maxRetryAttempts, usePlaintext);
+    blockingStub = ShuffleManagerGrpc.newBlockingStub(channel);
+  }
+
+  public ShuffleManagerGrpc.ShuffleManagerBlockingStub getBlockingStub() {
+    return blockingStub.withDeadlineAfter(rpcTimeout, TimeUnit.MILLISECONDS);
+  }
+
+  public String getDesc() {
+    return "Shuffle manager grpc client ref " + host + ":" + port;
+  }
+
+  @Override
+  public RssReportShuffleFetchFailureResponse reportShuffleFetchFailure(RssReportShuffleFetchFailureRequest request) {
+    ReportShuffleFetchFailureRequest protoRequest = request.toProto();
+    try {
+      ReportShuffleFetchFailureResponse response = getBlockingStub().reportShuffleFetchFailure(protoRequest);
+      return RssReportShuffleFetchFailureResponse.fromProto(response);
+    } catch (Exception e) {
+      String msg = "Report shuffle fetch failure to host:port[" + host + ":" + port + "] failed";
+      LOG.warn(msg, e);
+      throw new RssException(msg, e);
+    }
+  }
+}
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleFetchFailureRequest.java b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleFetchFailureRequest.java
new file mode 100644
index 00000000..37239d75
--- /dev/null
+++ b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleFetchFailureRequest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.request;
+
+import org.apache.uniffle.proto.RssProtos.ReportShuffleFetchFailureRequest;
+
+public class RssReportShuffleFetchFailureRequest {
+  private String appId;
+  private int shuffleId;
+  private int stageAttemptId;
+  private int partitionId;
+  private String exception;
+
+  public RssReportShuffleFetchFailureRequest(
+      String appId,
+      int shuffleId,
+      int stageAttemptId,
+      int partitionId,
+      String exception) {
+    this.appId = appId;
+    this.shuffleId = shuffleId;
+    this.stageAttemptId = stageAttemptId;
+    this.partitionId = partitionId;
+    this.exception = exception;
+  }
+
+  public ReportShuffleFetchFailureRequest toProto() {
+    ReportShuffleFetchFailureRequest.Builder builder = ReportShuffleFetchFailureRequest.newBuilder();
+    builder.setAppId(appId)
+        .setShuffleId(shuffleId)
+        .setStageAttemptId(stageAttemptId)
+        .setPartitionId(partitionId);
+    if (exception != null) {
+      builder.setException(exception);
+    }
+    return builder.build();
+  }
+
+}
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/response/RssReportShuffleFetchFailureResponse.java b/internal-client/src/main/java/org/apache/uniffle/client/response/RssReportShuffleFetchFailureResponse.java
new file mode 100644
index 00000000..57f799cd
--- /dev/null
+++ b/internal-client/src/main/java/org/apache/uniffle/client/response/RssReportShuffleFetchFailureResponse.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.response;
+
+import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.proto.RssProtos.ReportShuffleFetchFailureResponse;
+
+public class RssReportShuffleFetchFailureResponse extends ClientResponse {
+  private boolean reSubmitWholeStage;
+
+  public RssReportShuffleFetchFailureResponse(StatusCode code, String msg, boolean recomputeStage) {
+    super(code, msg);
+    this.reSubmitWholeStage = recomputeStage;
+  }
+
+  public boolean getReSubmitWholeStage() {
+    return this.reSubmitWholeStage;
+  }
+
+  public ReportShuffleFetchFailureResponse toProto() {
+    ReportShuffleFetchFailureResponse.Builder builder = ReportShuffleFetchFailureResponse.newBuilder();
+    return builder
+        .setStatus(getStatusCode().toProto())
+        .setMsg(getMessage())
+        .setReSubmitWholeStage(reSubmitWholeStage)
+        .build();
+  }
+
+  public static RssReportShuffleFetchFailureResponse fromProto(ReportShuffleFetchFailureResponse response) {
+    return new RssReportShuffleFetchFailureResponse(
+        // todo: [issue#780] add fromProto for StatusCode issue
+        StatusCode.valueOf(response.getStatus().name()),
+        response.getMsg(),
+        response.getReSubmitWholeStage()
+    );
+  }
+
+}
diff --git a/internal-client/src/test/java/org/apache/uniffle/client/factory/ShuffleManagerClientFactoryTest.java b/internal-client/src/test/java/org/apache/uniffle/client/factory/ShuffleManagerClientFactoryTest.java
new file mode 100644
index 00000000..17e42ba4
--- /dev/null
+++ b/internal-client/src/test/java/org/apache/uniffle/client/factory/ShuffleManagerClientFactoryTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.factory;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.client.api.ShuffleManagerClient;
+import org.apache.uniffle.common.ClientType;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class ShuffleManagerClientFactoryTest {
+
+  @Test
+  void createShuffleManagerClient() {
+    ShuffleManagerClientFactory factory = ShuffleManagerClientFactory.getInstance();
+    assertNotNull(factory);
+    // only grpc type is supported currently
+    ShuffleManagerClient c = factory.createShuffleManagerClient(ClientType.GRPC, "localhost", 1234);
+    assertNotNull(c);
+    assertThrows(UnsupportedOperationException.class,
+        () -> factory.createShuffleManagerClient(ClientType.GRPC_NETTY, "localhost", 1234));
+  }
+}
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 84736982..ac7fb936 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -488,4 +488,27 @@ message CancelDecommissionRequest {
 message CancelDecommissionResponse {
   StatusCode status = 1;
   string retMsg = 2;
-}
\ No newline at end of file
+}
+// ShuffleManager service lives inside of compute-engine's application master, which handles rss shuffle specific logic
+// per application.
+service ShuffleManager {
+  rpc reportShuffleFetchFailure (ReportShuffleFetchFailureRequest) returns (ReportShuffleFetchFailureResponse);
+}
+
+message ReportShuffleFetchFailureRequest {
+  // appId normally should be omitted, it's used to avoid wrongly request issued from remaining executors of another
+  // app which accidentally has the same shuffle manager port with this app.
+  string appId = 1;
+  int32 shuffleId = 2;
+  int32 stageAttemptId = 3;
+  int32 partitionId = 4;
+  string exception = 5;
+  // todo: report ShuffleServerId if needed
+  // ShuffleServerId serverId = 6;
+}
+
+message ReportShuffleFetchFailureResponse {
+  StatusCode status = 1;
+  bool reSubmitWholeStage = 2;
+  string msg = 3;
+}