You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ck...@apache.org on 2023/02/03 12:14:22 UTC
[incubator-uniffle] branch master updated: [ISSUE-546] Replace ResponseStatusCode with StatusCode (#547)
This is an automated email from the ASF dual-hosted git repository.
ckj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new cdce4199 [ISSUE-546] Replace ResponseStatusCode with StatusCode (#547)
cdce4199 is described below
commit cdce4199d34c0c87406d4121f14f89e2dcdd497f
Author: xianjingfeng <58...@qq.com>
AuthorDate: Fri Feb 3 20:14:16 2023 +0800
[ISSUE-546] Replace ResponseStatusCode with StatusCode (#547)
### What changes were proposed in this pull request?
1.Replace `ResponseStatusCode` with `StatusCode`.
2.Move `StatusCode` to `rss-common`
Fix #546
### Why are the changes needed?
Code quality.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
---
.../spark/shuffle/DelegationRssShuffleManager.java | 6 +--
.../shuffle/DelegationRssShuffleManagerTest.java | 4 +-
.../spark/shuffle/DelegationRssShuffleManager.java | 6 +--
.../shuffle/DelegationRssShuffleManagerTest.java | 4 +-
.../client/impl/ShuffleWriteClientImpl.java | 34 ++++++------
.../client/impl/ShuffleWriteClientImplTest.java | 20 +++----
.../org/apache/uniffle/common/rpc}/StatusCode.java | 2 +-
.../apache/uniffle/common/rpc}/StatusCodeTest.java | 2 +-
.../org/apache/uniffle/test/AccessClusterTest.java | 16 +++---
.../apache/uniffle/test/CoordinatorGrpcTest.java | 4 +-
.../apache/uniffle/test/FetchClientConfTest.java | 8 +--
.../test/HealthCheckCoordinatorGrpcTest.java | 4 +-
.../apache/uniffle/test/ShuffleServerGrpcTest.java | 4 +-
.../client/impl/grpc/CoordinatorGrpcClient.java | 55 ++++++++++---------
.../client/impl/grpc/ShuffleServerGrpcClient.java | 61 +++++++++++-----------
.../uniffle/client/response/ClientResponse.java | 10 ++--
.../client/response/ResponseStatusCode.java | 30 -----------
.../client/response/RssAccessClusterResponse.java | 6 ++-
.../client/response/RssAppHeartBeatResponse.java | 4 +-
.../response/RssApplicationInfoResponse.java | 4 +-
.../response/RssFetchClientConfResponse.java | 6 ++-
.../response/RssFetchRemoteStorageResponse.java | 3 +-
.../client/response/RssFinishShuffleResponse.java | 4 +-
.../RssGetInMemoryShuffleDataResponse.java | 3 +-
.../response/RssGetShuffleAssignmentsResponse.java | 5 +-
.../client/response/RssGetShuffleDataResponse.java | 4 +-
.../response/RssGetShuffleIndexResponse.java | 3 +-
.../response/RssGetShuffleResultResponse.java | 3 +-
.../response/RssRegisterShuffleResponse.java | 4 +-
.../response/RssReportShuffleResultResponse.java | 4 +-
.../client/response/RssSendCommitResponse.java | 4 +-
.../client/response/RssSendHeartBeatResponse.java | 4 +-
.../response/RssSendShuffleDataResponse.java | 4 +-
.../response/RssUnregisterShuffleResponse.java | 4 +-
.../apache/uniffle/server/RegisterHeartBeat.java | 4 +-
.../uniffle/server/ShuffleServerGrpcService.java | 1 +
.../apache/uniffle/server/ShuffleTaskManager.java | 1 +
.../server/buffer/ShuffleBufferManager.java | 2 +-
.../uniffle/server/ShuffleTaskManagerTest.java | 1 +
.../server/buffer/ShuffleBufferManagerTest.java | 2 +-
.../impl/LocalFileServerReadHandlerTest.java | 8 +--
41 files changed, 180 insertions(+), 178 deletions(-)
diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
index cf9eb8af..579c5299 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
@@ -33,9 +33,9 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.CoordinatorClient;
import org.apache.uniffle.client.request.RssAccessClusterRequest;
-import org.apache.uniffle.client.response.ResponseStatusCode;
import org.apache.uniffle.client.response.RssAccessClusterResponse;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.RetryUtils;
@@ -127,11 +127,11 @@ public class DelegationRssShuffleManager implements ShuffleManager {
canAccess = RetryUtils.retry(() -> {
RssAccessClusterResponse response = coordinatorClient.accessCluster(new RssAccessClusterRequest(
accessId, assignmentTags, accessTimeoutMs, extraProperties, user));
- if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
+ if (response.getStatusCode() == StatusCode.SUCCESS) {
LOG.warn("Success to access cluster {} using {}", coordinatorClient.getDesc(), accessId);
uuid = response.getUuid();
return true;
- } else if (response.getStatusCode() == ResponseStatusCode.ACCESS_DENIED) {
+ } else if (response.getStatusCode() == StatusCode.ACCESS_DENIED) {
throw new RssException("Request to access cluster " + coordinatorClient.getDesc() + " is denied using "
+ accessId + " for " + response.getMessage());
} else {
diff --git a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
index e7270bae..e88ab915 100644
--- a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
+++ b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
@@ -33,8 +33,8 @@ import org.apache.uniffle.client.api.CoordinatorClient;
import org.apache.uniffle.client.response.RssAccessClusterResponse;
import org.apache.uniffle.storage.util.StorageType;
-import static org.apache.uniffle.client.response.ResponseStatusCode.ACCESS_DENIED;
-import static org.apache.uniffle.client.response.ResponseStatusCode.SUCCESS;
+import static org.apache.uniffle.common.rpc.StatusCode.ACCESS_DENIED;
+import static org.apache.uniffle.common.rpc.StatusCode.SUCCESS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
index 592c28a9..0056ce88 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
@@ -33,9 +33,9 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.CoordinatorClient;
import org.apache.uniffle.client.request.RssAccessClusterRequest;
-import org.apache.uniffle.client.response.ResponseStatusCode;
import org.apache.uniffle.client.response.RssAccessClusterResponse;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.RetryUtils;
@@ -126,11 +126,11 @@ public class DelegationRssShuffleManager implements ShuffleManager {
canAccess = RetryUtils.retry(() -> {
RssAccessClusterResponse response = coordinatorClient.accessCluster(new RssAccessClusterRequest(
accessId, assignmentTags, accessTimeoutMs, extraProperties, user));
- if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
+ if (response.getStatusCode() == StatusCode.SUCCESS) {
LOG.warn("Success to access cluster {} using {}", coordinatorClient.getDesc(), accessId);
uuid = response.getUuid();
return true;
- } else if (response.getStatusCode() == ResponseStatusCode.ACCESS_DENIED) {
+ } else if (response.getStatusCode() == StatusCode.ACCESS_DENIED) {
throw new RssException("Request to access cluster " + coordinatorClient.getDesc() + " is denied using "
+ accessId + " for " + response.getMessage());
} else {
diff --git a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
index d533be84..9e50db55 100644
--- a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
+++ b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
@@ -33,8 +33,8 @@ import org.apache.uniffle.client.api.CoordinatorClient;
import org.apache.uniffle.client.response.RssAccessClusterResponse;
import org.apache.uniffle.storage.util.StorageType;
-import static org.apache.uniffle.client.response.ResponseStatusCode.ACCESS_DENIED;
-import static org.apache.uniffle.client.response.ResponseStatusCode.SUCCESS;
+import static org.apache.uniffle.common.rpc.StatusCode.ACCESS_DENIED;
+import static org.apache.uniffle.common.rpc.StatusCode.SUCCESS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index 53877088..8b408ad4 100644
--- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -61,7 +61,6 @@ import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
import org.apache.uniffle.client.request.RssUnregisterShuffleRequest;
import org.apache.uniffle.client.response.ClientResponse;
-import org.apache.uniffle.client.response.ResponseStatusCode;
import org.apache.uniffle.client.response.RssAppHeartBeatResponse;
import org.apache.uniffle.client.response.RssApplicationInfoResponse;
import org.apache.uniffle.client.response.RssFetchClientConfResponse;
@@ -84,6 +83,7 @@ import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.ThreadUtils;
public class ShuffleWriteClientImpl implements ShuffleWriteClient {
@@ -172,7 +172,7 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
String logMsg = String.format("ShuffleWriteClientImpl sendShuffleData with %s blocks to %s cost: %s(ms)",
serverToBlockIds.get(ssi).size(), ssi.getId(), System.currentTimeMillis() - s);
- if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
+ if (response.getStatusCode() == StatusCode.SUCCESS) {
// mark a replica of block that has been sent
serverToBlockIds.get(ssi).forEach(block -> blockIdsTracker.get(block).incrementAndGet());
if (defectiveServers != null) {
@@ -378,7 +378,7 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
long startTime = System.currentTimeMillis();
try {
RssSendCommitResponse response = getShuffleServerClient(ssi).sendCommit(request);
- if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
+ if (response.getStatusCode() == StatusCode.SUCCESS) {
int commitCount = response.getCommitCount();
LOG.info("Successfully sendCommit for appId[" + appId + "], shuffleId[" + shuffleId
+ "] to ShuffleServer[" + ssi.getId() + "], cost "
@@ -387,7 +387,7 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
if (commitCount >= numMaps) {
RssFinishShuffleResponse rfsResponse =
getShuffleServerClient(ssi).finishShuffle(new RssFinishShuffleRequest(appId, shuffleId));
- if (rfsResponse.getStatusCode() != ResponseStatusCode.SUCCESS) {
+ if (rfsResponse.getStatusCode() != StatusCode.SUCCESS) {
String msg = "Failed to finish shuffle to " + ssi + " for shuffleId[" + shuffleId
+ "] with statusCode " + rfsResponse.getStatusCode();
LOG.error(msg);
@@ -450,10 +450,10 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
@Override
public Map<String, String> fetchClientConf(int timeoutMs) {
RssFetchClientConfResponse response =
- new RssFetchClientConfResponse(ResponseStatusCode.INTERNAL_ERROR, "Empty coordinator clients");
+ new RssFetchClientConfResponse(StatusCode.INTERNAL_ERROR, "Empty coordinator clients");
for (CoordinatorClient coordinatorClient : coordinatorClients) {
response = coordinatorClient.fetchClientConf(new RssFetchClientConfRequest(timeoutMs));
- if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
+ if (response.getStatusCode() == StatusCode.SUCCESS) {
LOG.info("Success to get conf from {}", coordinatorClient.getDesc());
break;
} else {
@@ -469,7 +469,7 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
for (CoordinatorClient coordinatorClient : coordinatorClients) {
RssFetchRemoteStorageResponse response =
coordinatorClient.fetchRemoteStorage(new RssFetchRemoteStorageRequest(appId));
- if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
+ if (response.getStatusCode() == StatusCode.SUCCESS) {
remoteStorage = response.getRemoteStorageInfo();
LOG.info("Success to get storage {} from {}", remoteStorage, coordinatorClient.getDesc());
break;
@@ -488,7 +488,7 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
appId, shuffleId, partitionNum, partitionNumPerRange, replica, requiredTags,
assignmentShuffleServerNumber, estimateTaskConcurrency);
- RssGetShuffleAssignmentsResponse response = new RssGetShuffleAssignmentsResponse(ResponseStatusCode.INTERNAL_ERROR);
+ RssGetShuffleAssignmentsResponse response = new RssGetShuffleAssignmentsResponse(StatusCode.INTERNAL_ERROR);
for (CoordinatorClient coordinatorClient : coordinatorClients) {
try {
response = coordinatorClient.getShuffleAssignments(request);
@@ -496,7 +496,7 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
LOG.error(e.getMessage());
}
- if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
+ if (response.getStatusCode() == StatusCode.SUCCESS) {
LOG.info("Success to get shuffle server assignment from {}", coordinatorClient.getDesc());
break;
}
@@ -548,7 +548,7 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
ShuffleServerInfo ssi = entry.getKey();
try {
RssReportShuffleResultResponse response = getShuffleServerClient(ssi).reportShuffleResult(request);
- if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
+ if (response.getStatusCode() == StatusCode.SUCCESS) {
LOG.info("Report shuffle result to " + ssi + " for appId[" + appId
+ "], shuffleId[" + shuffleId + "] successfully");
for (Integer partitionId : requestBlockIds.keySet()) {
@@ -583,7 +583,7 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
for (ShuffleServerInfo ssi : shuffleServerInfoSet) {
try {
RssGetShuffleResultResponse response = getShuffleServerClient(ssi).getShuffleResult(request);
- if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
+ if (response.getStatusCode() == StatusCode.SUCCESS) {
// merge into blockIds from multiple servers.
Roaring64NavigableMap blockIdBitmapOfServer = response.getBlockIdBitmap();
blockIdBitmap.or(blockIdBitmapOfServer);
@@ -624,7 +624,7 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
try {
RssGetShuffleResultResponse response =
getShuffleServerClient(shuffleServerInfo).getShuffleResultForMultiPart(request);
- if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
+ if (response.getStatusCode() == StatusCode.SUCCESS) {
// merge into blockIds from multiple servers.
Roaring64NavigableMap blockIdBitmapOfServer = response.getBlockIdBitmap();
blockIdBitmap.or(blockIdBitmapOfServer);
@@ -653,7 +653,7 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
callableList.add(() -> {
try {
RssApplicationInfoResponse response = coordinatorClient.registerApplicationInfo(request);
- if (response.getStatusCode() != ResponseStatusCode.SUCCESS) {
+ if (response.getStatusCode() != StatusCode.SUCCESS) {
LOG.error("Failed to send applicationInfo to " + coordinatorClient.getDesc());
} else {
LOG.info("Successfully send applicationInfo to " + coordinatorClient.getDesc());
@@ -687,7 +687,7 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
ShuffleServerClient client =
ShuffleServerClientFactory.getInstance().getShuffleServerClient(clientType, shuffleServerInfo);
RssAppHeartBeatResponse response = client.sendHeartBeat(request);
- if (response.getStatusCode() != ResponseStatusCode.SUCCESS) {
+ if (response.getStatusCode() != StatusCode.SUCCESS) {
LOG.warn("Failed to send heartbeat to " + shuffleServerInfo);
}
} catch (Exception e) {
@@ -702,7 +702,7 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
callableList.add(() -> {
try {
RssAppHeartBeatResponse response = coordinatorClient.sendAppHeartBeat(request);
- if (response.getStatusCode() != ResponseStatusCode.SUCCESS) {
+ if (response.getStatusCode() != StatusCode.SUCCESS) {
LOG.warn("Failed to send heartbeat to " + coordinatorClient.getDesc());
} else {
LOG.info("Successfully send heartbeat to " + coordinatorClient.getDesc());
@@ -752,7 +752,7 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
ShuffleServerClient client =
ShuffleServerClientFactory.getInstance().getShuffleServerClient(clientType, shuffleServerInfo);
RssUnregisterShuffleResponse response = client.unregisterShuffle(request);
- if (response.getStatusCode() != ResponseStatusCode.SUCCESS) {
+ if (response.getStatusCode() != StatusCode.SUCCESS) {
LOG.warn("Failed to unregister shuffle to " + shuffleServerInfo);
}
} catch (Exception e) {
@@ -787,7 +787,7 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
}
private void throwExceptionIfNecessary(ClientResponse response, String errorMsg) {
- if (response != null && response.getStatusCode() != ResponseStatusCode.SUCCESS) {
+ if (response != null && response.getStatusCode() != StatusCode.SUCCESS) {
LOG.error(errorMsg);
throw new RssException(errorMsg);
}
diff --git a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
index 3693a941..2e6fac7a 100644
--- a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
+++ b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
@@ -29,11 +29,11 @@ import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.apache.uniffle.client.api.ShuffleServerClient;
-import org.apache.uniffle.client.response.ResponseStatusCode;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
import org.apache.uniffle.client.response.SendShuffleDataResult;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.rpc.StatusCode;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -77,7 +77,7 @@ public class ShuffleWriteClientImplTest {
ShuffleWriteClientImpl spyClient = Mockito.spy(shuffleWriteClient);
doReturn(mockShuffleServerClient).when(spyClient).getShuffleServerClient(any());
when(mockShuffleServerClient.sendShuffleData(any())).thenReturn(
- new RssSendShuffleDataResponse(ResponseStatusCode.NO_BUFFER));
+ new RssSendShuffleDataResponse(StatusCode.NO_BUFFER));
List<ShuffleServerInfo> shuffleServerInfoList =
Lists.newArrayList(new ShuffleServerInfo("id", "host", 0));
@@ -115,9 +115,9 @@ public class ShuffleWriteClientImplTest {
ShuffleWriteClientImpl spyClient = Mockito.spy(shuffleWriteClient);
doReturn(mockShuffleServerClient).when(spyClient).getShuffleServerClient(any());
when(mockShuffleServerClient.sendShuffleData(any())).thenReturn(
- new RssSendShuffleDataResponse(ResponseStatusCode.NO_BUFFER),
- new RssSendShuffleDataResponse(ResponseStatusCode.SUCCESS),
- new RssSendShuffleDataResponse(ResponseStatusCode.SUCCESS));
+ new RssSendShuffleDataResponse(StatusCode.NO_BUFFER),
+ new RssSendShuffleDataResponse(StatusCode.SUCCESS),
+ new RssSendShuffleDataResponse(StatusCode.SUCCESS));
String appId = "testSendDataWithDefectiveServers_appId";
ShuffleServerInfo ssi1 = new ShuffleServerInfo("127.0.0.1", 0);
@@ -132,8 +132,8 @@ public class ShuffleWriteClientImplTest {
// Send data for the second time, the first shuffle server will be moved to the last.
when(mockShuffleServerClient.sendShuffleData(any())).thenReturn(
- new RssSendShuffleDataResponse(ResponseStatusCode.SUCCESS),
- new RssSendShuffleDataResponse(ResponseStatusCode.SUCCESS));
+ new RssSendShuffleDataResponse(StatusCode.SUCCESS),
+ new RssSendShuffleDataResponse(StatusCode.SUCCESS));
List<ShuffleServerInfo> excludeServers = new ArrayList<>();
spyClient.genServerToBlocks(shuffleBlockInfoList.get(0), shuffleServerInfoList,
2, excludeServers, Maps.newHashMap(), Maps.newHashMap(), true);
@@ -150,9 +150,9 @@ public class ShuffleWriteClientImplTest {
// Send data for the third time, the first server will be removed from the defectiveServers
// and the second server will be added to the defectiveServers.
when(mockShuffleServerClient.sendShuffleData(any())).thenReturn(
- new RssSendShuffleDataResponse(ResponseStatusCode.NO_BUFFER),
- new RssSendShuffleDataResponse(ResponseStatusCode.SUCCESS),
- new RssSendShuffleDataResponse(ResponseStatusCode.SUCCESS));
+ new RssSendShuffleDataResponse(StatusCode.NO_BUFFER),
+ new RssSendShuffleDataResponse(StatusCode.SUCCESS),
+ new RssSendShuffleDataResponse(StatusCode.SUCCESS));
List<ShuffleServerInfo> shuffleServerInfoList2 = Lists.newArrayList(ssi2, ssi1, ssi3);
List<ShuffleBlockInfo> shuffleBlockInfoList2 = Lists.newArrayList(new ShuffleBlockInfo(0, 0, 10, 10, 10,
new byte[]{1}, shuffleServerInfoList2, 10, 100, 0));
diff --git a/server/src/main/java/org/apache/uniffle/server/StatusCode.java b/common/src/main/java/org/apache/uniffle/common/rpc/StatusCode.java
similarity index 97%
rename from server/src/main/java/org/apache/uniffle/server/StatusCode.java
rename to common/src/main/java/org/apache/uniffle/common/rpc/StatusCode.java
index 98fe3973..7157c351 100644
--- a/server/src/main/java/org/apache/uniffle/server/StatusCode.java
+++ b/common/src/main/java/org/apache/uniffle/common/rpc/StatusCode.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.uniffle.server;
+package org.apache.uniffle.common.rpc;
import org.apache.uniffle.proto.RssProtos;
diff --git a/server/src/test/java/org/apache/uniffle/server/StatusCodeTest.java b/common/src/test/java/org/apache/uniffle/common/rpc/StatusCodeTest.java
similarity index 97%
rename from server/src/test/java/org/apache/uniffle/server/StatusCodeTest.java
rename to common/src/test/java/org/apache/uniffle/common/rpc/StatusCodeTest.java
index 8975393f..3c93d14b 100644
--- a/server/src/test/java/org/apache/uniffle/server/StatusCodeTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/rpc/StatusCodeTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.uniffle.server;
+package org.apache.uniffle.common.rpc;
import org.junit.jupiter.api.Test;
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
index 7bf1b71e..4e167e04 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
@@ -35,9 +35,9 @@ import org.junit.jupiter.api.io.TempDir;
import org.apache.uniffle.client.api.CoordinatorClient;
import org.apache.uniffle.client.factory.CoordinatorClientFactory;
import org.apache.uniffle.client.request.RssAccessClusterRequest;
-import org.apache.uniffle.client.response.ResponseStatusCode;
import org.apache.uniffle.client.response.RssAccessClusterResponse;
import org.apache.uniffle.common.ClientType;
+import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.coordinator.AccessManager;
import org.apache.uniffle.coordinator.CoordinatorConf;
@@ -89,7 +89,7 @@ public class AccessClusterTest extends CoordinatorTestBase {
RssAccessClusterRequest request = new RssAccessClusterRequest(accessID,
Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000, "user");
RssAccessClusterResponse response = coordinatorClient.accessCluster(request);
- assertEquals(ResponseStatusCode.ACCESS_DENIED, response.getStatusCode());
+ assertEquals(StatusCode.ACCESS_DENIED, response.getStatusCode());
// case2: illegal names
Map<String, String> extraProperties = new HashMap<>();
@@ -97,7 +97,7 @@ public class AccessClusterTest extends CoordinatorTestBase {
request = new RssAccessClusterRequest(accessID, Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION),
2000, extraProperties, "user");
response = coordinatorClient.accessCluster(request);
- assertEquals(ResponseStatusCode.ACCESS_DENIED, response.getStatusCode());
+ assertEquals(StatusCode.ACCESS_DENIED, response.getStatusCode());
// case3: legal names
extraProperties.clear();
@@ -105,7 +105,7 @@ public class AccessClusterTest extends CoordinatorTestBase {
request = new RssAccessClusterRequest(accessID, Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION),
2000, extraProperties, "user");
response = coordinatorClient.accessCluster(request);
- assertEquals(ResponseStatusCode.SUCCESS, response.getStatusCode());
+ assertEquals(StatusCode.SUCCESS, response.getStatusCode());
shutdownServers();
}
@@ -138,14 +138,14 @@ public class AccessClusterTest extends CoordinatorTestBase {
RssAccessClusterRequest request = new RssAccessClusterRequest(accessId,
Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000, "user");
RssAccessClusterResponse response = coordinatorClient.accessCluster(request);
- assertEquals(ResponseStatusCode.ACCESS_DENIED, response.getStatusCode());
+ assertEquals(StatusCode.ACCESS_DENIED, response.getStatusCode());
assertTrue(response.getMessage().startsWith("Denied by AccessCandidatesChecker"));
accessId = "135";
request = new RssAccessClusterRequest(accessId,
Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000, "user");
response = coordinatorClient.accessCluster(request);
- assertEquals(ResponseStatusCode.ACCESS_DENIED, response.getStatusCode());
+ assertEquals(StatusCode.ACCESS_DENIED, response.getStatusCode());
assertTrue(response.getMessage().startsWith("Denied by AccessClusterLoadChecker"));
shuffleServerConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT + 2);
@@ -159,13 +159,13 @@ public class AccessClusterTest extends CoordinatorTestBase {
request = new RssAccessClusterRequest(accessId,
Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000, "user");
response = client.accessCluster(request);
- assertEquals(ResponseStatusCode.INTERNAL_ERROR, response.getStatusCode());
+ assertEquals(StatusCode.INTERNAL_ERROR, response.getStatusCode());
assertTrue(response.getMessage().startsWith("UNAVAILABLE: io exception"));
request = new RssAccessClusterRequest(accessId,
Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000, "user");
response = coordinatorClient.accessCluster(request);
- assertEquals(ResponseStatusCode.SUCCESS, response.getStatusCode());
+ assertEquals(StatusCode.SUCCESS, response.getStatusCode());
assertTrue(response.getMessage().startsWith("SUCCESS"));
shuffleServer.stopServer();
shutdownServers();
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
index 3bae1739..ce170a2b 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
@@ -29,13 +29,13 @@ import org.junit.jupiter.api.Test;
import org.apache.uniffle.client.request.RssApplicationInfoRequest;
import org.apache.uniffle.client.request.RssGetShuffleAssignmentsRequest;
-import org.apache.uniffle.client.response.ResponseStatusCode;
import org.apache.uniffle.client.response.RssApplicationInfoResponse;
import org.apache.uniffle.client.response.RssGetShuffleAssignmentsResponse;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleRegisterInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.storage.StorageInfo;
import org.apache.uniffle.common.storage.StorageMedia;
import org.apache.uniffle.common.storage.StorageStatus;
@@ -207,7 +207,7 @@ public class CoordinatorGrpcTest extends CoordinatorTestBase {
RssApplicationInfoResponse response =
coordinatorClient.registerApplicationInfo(
new RssApplicationInfoRequest("application_appHeartbeatTest1", 1000, "user"));
- assertEquals(ResponseStatusCode.SUCCESS, response.getStatusCode());
+ assertEquals(StatusCode.SUCCESS, response.getStatusCode());
assertEquals(Sets.newHashSet("application_appHeartbeatTest1"),
coordinators.get(0).getApplicationManager().getAppIds());
coordinatorClient.registerApplicationInfo(
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/FetchClientConfTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/FetchClientConfTest.java
index 76949fd4..59cba9d9 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/FetchClientConfTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/FetchClientConfTest.java
@@ -32,10 +32,10 @@ import org.junit.jupiter.api.io.TempDir;
import org.apache.uniffle.client.request.RssFetchClientConfRequest;
import org.apache.uniffle.client.request.RssFetchRemoteStorageRequest;
-import org.apache.uniffle.client.response.ResponseStatusCode;
import org.apache.uniffle.client.response.RssFetchClientConfResponse;
import org.apache.uniffle.client.response.RssFetchRemoteStorageResponse;
import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.coordinator.ApplicationManager;
import org.apache.uniffle.coordinator.CoordinatorConf;
@@ -67,7 +67,7 @@ public class FetchClientConfTest extends CoordinatorTestBase {
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
RssFetchClientConfRequest request = new RssFetchClientConfRequest(2000);
RssFetchClientConfResponse response = coordinatorClient.fetchClientConf(request);
- assertEquals(ResponseStatusCode.SUCCESS, response.getStatusCode());
+ assertEquals(StatusCode.SUCCESS, response.getStatusCode());
assertEquals(2, response.getClientConf().size());
assertEquals("1234", response.getClientConf().get("spark.mock.1"));
assertEquals("true", response.getClientConf().get("spark.mock.2"));
@@ -83,7 +83,7 @@ public class FetchClientConfTest extends CoordinatorTestBase {
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
request = new RssFetchClientConfRequest(2000);
response = coordinatorClient.fetchClientConf(request);
- assertEquals(ResponseStatusCode.SUCCESS, response.getStatusCode());
+ assertEquals(StatusCode.SUCCESS, response.getStatusCode());
assertEquals(0, response.getClientConf().size());
shutdownServers();
@@ -91,7 +91,7 @@ public class FetchClientConfTest extends CoordinatorTestBase {
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
request = new RssFetchClientConfRequest(10);
response = coordinatorClient.fetchClientConf(request);
- assertEquals(ResponseStatusCode.INTERNAL_ERROR, response.getStatusCode());
+ assertEquals(StatusCode.INTERNAL_ERROR, response.getStatusCode());
assertEquals(0, response.getClientConf().size());
shutdownServers();
}
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/HealthCheckCoordinatorGrpcTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/HealthCheckCoordinatorGrpcTest.java
index a0c9f670..96832b9b 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/HealthCheckCoordinatorGrpcTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/HealthCheckCoordinatorGrpcTest.java
@@ -31,8 +31,8 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.apache.uniffle.client.request.RssGetShuffleAssignmentsRequest;
-import org.apache.uniffle.client.response.ResponseStatusCode;
import org.apache.uniffle.client.response.RssGetShuffleAssignmentsResponse;
+import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.coordinator.ServerNode;
@@ -130,7 +130,7 @@ public class HealthCheckCoordinatorGrpcTest extends CoordinatorTestBase {
nodes = coordinators.get(0).getClusterManager().getServerList(Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION));
assertEquals(0, nodes.size());
response = coordinatorClient.getShuffleAssignments(request);
- assertEquals(ResponseStatusCode.INTERNAL_ERROR, response.getStatusCode());
+ assertEquals(StatusCode.INTERNAL_ERROR, response.getStatusCode());
tempDataFile.delete();
int i = 0;
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
index 23763ad9..2c9a5df9 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
@@ -49,7 +49,6 @@ import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
-import org.apache.uniffle.client.response.ResponseStatusCode;
import org.apache.uniffle.client.response.RssGetShuffleResultResponse;
import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
import org.apache.uniffle.client.util.ClientUtils;
@@ -59,6 +58,7 @@ import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.proto.RssProtos;
@@ -205,7 +205,7 @@ public class ShuffleServerGrpcTest extends IntegrationTestBase {
request =
new RssReportShuffleResultRequest("shuffleResultTest", 0, 0L, partitionToBlockIds, 1);
RssReportShuffleResultResponse response = shuffleServerClient.reportShuffleResult(request);
- assertEquals(ResponseStatusCode.SUCCESS, response.getStatusCode());
+ assertEquals(StatusCode.SUCCESS, response.getStatusCode());
req = new RssGetShuffleResultRequest("shuffleResultTest", 0, 1);
result = shuffleServerClient.getShuffleResult(req);
blockIdBitmap = result.getBlockIdBitmap();
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
index 38f90dc1..28412912 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
@@ -41,7 +41,6 @@ import org.apache.uniffle.client.request.RssFetchClientConfRequest;
import org.apache.uniffle.client.request.RssFetchRemoteStorageRequest;
import org.apache.uniffle.client.request.RssGetShuffleAssignmentsRequest;
import org.apache.uniffle.client.request.RssSendHeartBeatRequest;
-import org.apache.uniffle.client.response.ResponseStatusCode;
import org.apache.uniffle.client.response.RssAccessClusterResponse;
import org.apache.uniffle.client.response.RssAppHeartBeatResponse;
import org.apache.uniffle.client.response.RssApplicationInfoResponse;
@@ -53,6 +52,7 @@ import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.storage.StorageInfo;
import org.apache.uniffle.common.storage.StorageInfoUtils;
import org.apache.uniffle.proto.CoordinatorServerGrpc;
@@ -73,7 +73,6 @@ import org.apache.uniffle.proto.RssProtos.RemoteStorageConfItem;
import org.apache.uniffle.proto.RssProtos.ShuffleServerHeartBeatRequest;
import org.apache.uniffle.proto.RssProtos.ShuffleServerHeartBeatResponse;
import org.apache.uniffle.proto.RssProtos.ShuffleServerId;
-import org.apache.uniffle.proto.RssProtos.StatusCode;
public class CoordinatorGrpcClient extends GrpcClient implements CoordinatorClient {
@@ -133,7 +132,7 @@ public class CoordinatorGrpcClient extends GrpcClient implements CoordinatorClie
.putAllStorageInfo(StorageInfoUtils.toProto(storageInfo))
.build();
- StatusCode status;
+ RssProtos.StatusCode status;
ShuffleServerHeartBeatResponse response = null;
try {
@@ -141,17 +140,17 @@ public class CoordinatorGrpcClient extends GrpcClient implements CoordinatorClie
status = response.getStatus();
} catch (StatusRuntimeException e) {
LOG.error(e.getMessage());
- status = StatusCode.TIMEOUT;
+ status = RssProtos.StatusCode.TIMEOUT;
} catch (Exception e) {
LOG.error(e.getMessage());
- status = StatusCode.INTERNAL_ERROR;
+ status = RssProtos.StatusCode.INTERNAL_ERROR;
}
if (response == null) {
response = ShuffleServerHeartBeatResponse.newBuilder().setStatus(status).build();
}
- if (status != StatusCode.SUCCESS) {
+ if (status != RssProtos.StatusCode.SUCCESS) {
LOG.error("Fail to send heartbeat to {}:{} {}", this.host, this.port, status);
}
@@ -198,16 +197,16 @@ public class CoordinatorGrpcClient extends GrpcClient implements CoordinatorClie
request.getStorageInfo());
RssSendHeartBeatResponse response;
- StatusCode statusCode = rpcResponse.getStatus();
+ RssProtos.StatusCode statusCode = rpcResponse.getStatus();
switch (statusCode) {
case SUCCESS:
- response = new RssSendHeartBeatResponse(ResponseStatusCode.SUCCESS);
+ response = new RssSendHeartBeatResponse(StatusCode.SUCCESS);
break;
case TIMEOUT:
- response = new RssSendHeartBeatResponse(ResponseStatusCode.TIMEOUT);
+ response = new RssSendHeartBeatResponse(StatusCode.TIMEOUT);
break;
default:
- response = new RssSendHeartBeatResponse(ResponseStatusCode.INTERNAL_ERROR);
+ response = new RssSendHeartBeatResponse(StatusCode.INTERNAL_ERROR);
}
return response;
}
@@ -219,13 +218,13 @@ public class CoordinatorGrpcClient extends GrpcClient implements CoordinatorClie
RssProtos.AppHeartBeatResponse rpcResponse = blockingStub
.withDeadlineAfter(request.getTimeoutMs(), TimeUnit.MILLISECONDS).appHeartbeat(rpcRequest);
RssAppHeartBeatResponse response;
- StatusCode statusCode = rpcResponse.getStatus();
+ RssProtos.StatusCode statusCode = rpcResponse.getStatus();
switch (statusCode) {
case SUCCESS:
- response = new RssAppHeartBeatResponse(ResponseStatusCode.SUCCESS);
+ response = new RssAppHeartBeatResponse(StatusCode.SUCCESS);
break;
default:
- response = new RssAppHeartBeatResponse(ResponseStatusCode.INTERNAL_ERROR);
+ response = new RssAppHeartBeatResponse(StatusCode.INTERNAL_ERROR);
}
return response;
}
@@ -237,13 +236,13 @@ public class CoordinatorGrpcClient extends GrpcClient implements CoordinatorClie
ApplicationInfoResponse rpcResponse = blockingStub
.withDeadlineAfter(request.getTimeoutMs(), TimeUnit.MILLISECONDS).registerApplicationInfo(rpcRequest);
RssApplicationInfoResponse response;
- StatusCode statusCode = rpcResponse.getStatus();
+ RssProtos.StatusCode statusCode = rpcResponse.getStatus();
switch (statusCode) {
case SUCCESS:
- response = new RssApplicationInfoResponse(ResponseStatusCode.SUCCESS);
+ response = new RssApplicationInfoResponse(StatusCode.SUCCESS);
break;
default:
- response = new RssApplicationInfoResponse(ResponseStatusCode.INTERNAL_ERROR);
+ response = new RssApplicationInfoResponse(StatusCode.INTERNAL_ERROR);
}
return response;
}
@@ -261,10 +260,10 @@ public class CoordinatorGrpcClient extends GrpcClient implements CoordinatorClie
request.getEstimateTaskConcurrency());
RssGetShuffleAssignmentsResponse response;
- StatusCode statusCode = rpcResponse.getStatus();
+ RssProtos.StatusCode statusCode = rpcResponse.getStatus();
switch (statusCode) {
case SUCCESS:
- response = new RssGetShuffleAssignmentsResponse(ResponseStatusCode.SUCCESS);
+ response = new RssGetShuffleAssignmentsResponse(StatusCode.SUCCESS);
// get all register info according to coordinator's response
Map<ShuffleServerInfo, List<PartitionRange>> serverToPartitionRanges = getServerToPartitionRanges(rpcResponse);
Map<Integer, List<ShuffleServerInfo>> partitionToServers = getPartitionToServers(rpcResponse);
@@ -272,10 +271,10 @@ public class CoordinatorGrpcClient extends GrpcClient implements CoordinatorClie
response.setPartitionToServers(partitionToServers);
break;
case TIMEOUT:
- response = new RssGetShuffleAssignmentsResponse(ResponseStatusCode.TIMEOUT);
+ response = new RssGetShuffleAssignmentsResponse(StatusCode.TIMEOUT);
break;
default:
- response = new RssGetShuffleAssignmentsResponse(ResponseStatusCode.INTERNAL_ERROR, rpcResponse.getRetMsg());
+ response = new RssGetShuffleAssignmentsResponse(StatusCode.INTERNAL_ERROR, rpcResponse.getRetMsg());
}
return response;
@@ -295,21 +294,21 @@ public class CoordinatorGrpcClient extends GrpcClient implements CoordinatorClie
rpcResponse = blockingStub
.withDeadlineAfter(request.getTimeoutMs(), TimeUnit.MILLISECONDS).accessCluster(rpcRequest);
} catch (Exception e) {
- return new RssAccessClusterResponse(ResponseStatusCode.INTERNAL_ERROR, e.getMessage());
+ return new RssAccessClusterResponse(StatusCode.INTERNAL_ERROR, e.getMessage());
}
RssAccessClusterResponse response;
- StatusCode statusCode = rpcResponse.getStatus();
+ RssProtos.StatusCode statusCode = rpcResponse.getStatus();
switch (statusCode) {
case SUCCESS:
response = new RssAccessClusterResponse(
- ResponseStatusCode.SUCCESS,
+ StatusCode.SUCCESS,
rpcResponse.getRetMsg(),
rpcResponse.getUuid()
);
break;
default:
- response = new RssAccessClusterResponse(ResponseStatusCode.ACCESS_DENIED, rpcResponse.getRetMsg());
+ response = new RssAccessClusterResponse(StatusCode.ACCESS_DENIED, rpcResponse.getRetMsg());
}
return response;
@@ -325,12 +324,12 @@ public class CoordinatorGrpcClient extends GrpcClient implements CoordinatorClie
Map<String, String> clientConf = rpcResponse
.getClientConfList().stream().collect(Collectors.toMap(ClientConfItem::getKey, ClientConfItem::getValue));
return new RssFetchClientConfResponse(
- ResponseStatusCode.SUCCESS,
+ StatusCode.SUCCESS,
rpcResponse.getRetMsg(),
clientConf);
} catch (Exception e) {
LOG.info(e.getMessage(), e);
- return new RssFetchClientConfResponse(ResponseStatusCode.INTERNAL_ERROR, e.getMessage());
+ return new RssFetchClientConfResponse(StatusCode.INTERNAL_ERROR, e.getMessage());
}
}
@@ -347,12 +346,12 @@ public class CoordinatorGrpcClient extends GrpcClient implements CoordinatorClie
.stream()
.collect(Collectors.toMap(RemoteStorageConfItem::getKey, RemoteStorageConfItem::getValue));
RssFetchRemoteStorageResponse tt = new RssFetchRemoteStorageResponse(
- ResponseStatusCode.SUCCESS,
+ StatusCode.SUCCESS,
new RemoteStorageInfo(rpcResponse.getRemoteStorage().getPath(), remoteStorageConf));
return tt;
} catch (Exception e) {
LOG.info("Failed to fetch remote storage from coordinator, " + e.getMessage(), e);
- return new RssFetchRemoteStorageResponse(ResponseStatusCode.INTERNAL_ERROR, null);
+ return new RssFetchRemoteStorageResponse(StatusCode.INTERNAL_ERROR, null);
}
}
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index 57dc8ed5..2149cb6c 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -44,7 +44,6 @@ import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
import org.apache.uniffle.client.request.RssUnregisterShuffleRequest;
-import org.apache.uniffle.client.response.ResponseStatusCode;
import org.apache.uniffle.client.response.RssAppHeartBeatResponse;
import org.apache.uniffle.client.response.RssFinishShuffleResponse;
import org.apache.uniffle.client.response.RssGetInMemoryShuffleDataResponse;
@@ -63,6 +62,7 @@ import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.exception.NotRetryException;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.RetryUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.proto.RssProtos;
@@ -97,7 +97,6 @@ import org.apache.uniffle.proto.RssProtos.ShuffleDataBlockSegment;
import org.apache.uniffle.proto.RssProtos.ShufflePartitionRange;
import org.apache.uniffle.proto.RssProtos.ShuffleRegisterRequest;
import org.apache.uniffle.proto.RssProtos.ShuffleRegisterResponse;
-import org.apache.uniffle.proto.RssProtos.StatusCode;
import org.apache.uniffle.proto.ShuffleServerGrpc;
import org.apache.uniffle.proto.ShuffleServerGrpc.ShuffleServerBlockingStub;
@@ -214,7 +213,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
long result = FAILED_REQUIRE_ID;
Random random = new Random();
final int backOffBase = 2000;
- while (rpcResponse.getStatus() == StatusCode.NO_BUFFER) {
+ while (rpcResponse.getStatus() == RssProtos.StatusCode.NO_BUFFER) {
LOG.info("Can't require " + requireSize + " bytes from " + host + ":" + port + ", sleep and try["
+ retry + "] again");
if (retry >= retryMax) {
@@ -233,7 +232,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
rpcResponse = getBlockingStub().requireBuffer(rpcRequest);
retry++;
}
- if (rpcResponse.getStatus() == StatusCode.SUCCESS) {
+ if (rpcResponse.getStatus() == RssProtos.StatusCode.SUCCESS) {
LOG.info("Require preAllocated size of {} from {}:{}, cost: {}(ms)",
requireSize, host, port, System.currentTimeMillis() - start);
result = rpcResponse.getRequireBufferId();
@@ -254,11 +253,11 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
RssProtos.ShuffleUnregisterResponse rpcResponse = doUnregisterShuffle(request.getAppId(), request.getShuffleId());
RssUnregisterShuffleResponse response;
- StatusCode statusCode = rpcResponse.getStatus();
+ RssProtos.StatusCode statusCode = rpcResponse.getStatus();
switch (statusCode) {
case SUCCESS:
- response = new RssUnregisterShuffleResponse(ResponseStatusCode.SUCCESS);
+ response = new RssUnregisterShuffleResponse(StatusCode.SUCCESS);
break;
default:
String msg = String.format("Errors on unregister shuffle to %s:%s for appId[%s].shuffleId[%], error: %s",
@@ -282,10 +281,10 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
);
RssRegisterShuffleResponse response;
- StatusCode statusCode = rpcResponse.getStatus();
+ RssProtos.StatusCode statusCode = rpcResponse.getStatus();
switch (statusCode) {
case SUCCESS:
- response = new RssRegisterShuffleResponse(ResponseStatusCode.SUCCESS);
+ response = new RssRegisterShuffleResponse(StatusCode.SUCCESS);
break;
default:
String msg = "Can't register shuffle to " + host + ":" + port
@@ -358,12 +357,12 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
SendShuffleDataResponse response = getBlockingStub().sendShuffleData(rpcRequest);
LOG.info("Do sendShuffleData to {}:{} rpc cost:" + (System.currentTimeMillis() - start)
+ " ms for " + allocateSize + " bytes with " + finalBlockNum + " blocks", host, port);
- if (response.getStatus() != StatusCode.SUCCESS) {
+ if (response.getStatus() != RssProtos.StatusCode.SUCCESS) {
String msg = "Can't send shuffle data with " + finalBlockNum
+ " blocks to " + host + ":" + port
+ ", statusCode=" + response.getStatus()
+ ", errorMsg:" + response.getRetMsg();
- if (response.getStatus() == StatusCode.NO_REGISTER) {
+ if (response.getStatus() == RssProtos.StatusCode.NO_REGISTER) {
throw new NotRetryException(msg);
} else {
throw new RssException(msg);
@@ -380,9 +379,9 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
RssSendShuffleDataResponse response;
if (isSuccessful) {
- response = new RssSendShuffleDataResponse(ResponseStatusCode.SUCCESS);
+ response = new RssSendShuffleDataResponse(StatusCode.SUCCESS);
} else {
- response = new RssSendShuffleDataResponse(ResponseStatusCode.INTERNAL_ERROR);
+ response = new RssSendShuffleDataResponse(StatusCode.INTERNAL_ERROR);
}
return response;
}
@@ -392,14 +391,14 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
ShuffleCommitResponse rpcResponse = doSendCommit(request.getAppId(), request.getShuffleId());
RssSendCommitResponse response;
- if (rpcResponse.getStatus() != StatusCode.SUCCESS) {
+ if (rpcResponse.getStatus() != RssProtos.StatusCode.SUCCESS) {
String msg = "Can't commit shuffle data to " + host + ":" + port
+ " for [appId=" + request.getAppId() + ", shuffleId=" + request.getShuffleId() + "], "
+ "errorMsg:" + rpcResponse.getRetMsg();
LOG.error(msg);
throw new RssException(msg);
} else {
- response = new RssSendCommitResponse(ResponseStatusCode.SUCCESS);
+ response = new RssSendCommitResponse(StatusCode.SUCCESS);
response.setCommitCount(rpcResponse.getCommitCount());
}
return response;
@@ -408,14 +407,14 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
@Override
public RssAppHeartBeatResponse sendHeartBeat(RssAppHeartBeatRequest request) {
AppHeartBeatResponse appHeartBeatResponse = doSendHeartBeat(request.getAppId(), request.getTimeoutMs());
- if (appHeartBeatResponse.getStatus() != StatusCode.SUCCESS) {
+ if (appHeartBeatResponse.getStatus() != RssProtos.StatusCode.SUCCESS) {
String msg = "Can't send heartbeat to " + host + ":" + port
+ " for [appId=" + request.getAppId() + ", timeout=" + request.getTimeoutMs() + "ms], "
+ "errorMsg:" + appHeartBeatResponse.getRetMsg();
LOG.error(msg);
- return new RssAppHeartBeatResponse(ResponseStatusCode.INTERNAL_ERROR);
+ return new RssAppHeartBeatResponse(StatusCode.INTERNAL_ERROR);
} else {
- return new RssAppHeartBeatResponse(ResponseStatusCode.SUCCESS);
+ return new RssAppHeartBeatResponse(StatusCode.SUCCESS);
}
}
@@ -427,7 +426,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
FinishShuffleResponse rpcResponse = getBlockingStub().finishShuffle(rpcRequest);
RssFinishShuffleResponse response;
- if (rpcResponse.getStatus() != StatusCode.SUCCESS) {
+ if (rpcResponse.getStatus() != RssProtos.StatusCode.SUCCESS) {
String msg = "Can't finish shuffle process to " + host + ":" + port
+ " for [appId=" + request.getAppId() + ", shuffleId=" + request.getShuffleId() + "], "
+ "errorMsg:" + rpcResponse.getRetMsg();
@@ -438,7 +437,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
+ request.getShuffleId() + "]";
LOG.info("FinishShuffle to {}:{} for {} cost {} ms", host, port, requestInfo,
System.currentTimeMillis() - start);
- response = new RssFinishShuffleResponse(ResponseStatusCode.SUCCESS);
+ response = new RssFinishShuffleResponse(StatusCode.SUCCESS);
}
return response;
}
@@ -465,11 +464,11 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
.build();
ReportShuffleResultResponse rpcResponse = doReportShuffleResult(recRequest);
- StatusCode statusCode = rpcResponse.getStatus();
+ RssProtos.StatusCode statusCode = rpcResponse.getStatus();
RssReportShuffleResultResponse response;
switch (statusCode) {
case SUCCESS:
- response = new RssReportShuffleResultResponse(ResponseStatusCode.SUCCESS);
+ response = new RssReportShuffleResultResponse(StatusCode.SUCCESS);
break;
default:
String msg = "Can't report shuffle result to " + host + ":" + port
@@ -506,13 +505,13 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
.setPartitionId(request.getPartitionId())
.build();
GetShuffleResultResponse rpcResponse = getBlockingStub().getShuffleResult(rpcRequest);
- StatusCode statusCode = rpcResponse.getStatus();
+ RssProtos.StatusCode statusCode = rpcResponse.getStatus();
RssGetShuffleResultResponse response;
switch (statusCode) {
case SUCCESS:
try {
- response = new RssGetShuffleResultResponse(ResponseStatusCode.SUCCESS,
+ response = new RssGetShuffleResultResponse(StatusCode.SUCCESS,
rpcResponse.getSerializedBitmap().toByteArray());
} catch (Exception e) {
throw new RuntimeException(e);
@@ -538,13 +537,13 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
.addAllPartitions(request.getPartitions())
.build();
GetShuffleResultForMultiPartResponse rpcResponse = getBlockingStub().getShuffleResultForMultiPart(rpcRequest);
- StatusCode statusCode = rpcResponse.getStatus();
+ RssProtos.StatusCode statusCode = rpcResponse.getStatus();
RssGetShuffleResultResponse response;
switch (statusCode) {
case SUCCESS:
try {
- response = new RssGetShuffleResultResponse(ResponseStatusCode.SUCCESS,
+ response = new RssGetShuffleResultResponse(StatusCode.SUCCESS,
rpcResponse.getSerializedBitmap().toByteArray());
} catch (Exception e) {
throw new RuntimeException(e);
@@ -581,13 +580,13 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
LOG.info("GetShuffleData from {}:{} for {} cost {} ms", host, port, requestInfo,
System.currentTimeMillis() - start);
- StatusCode statusCode = rpcResponse.getStatus();
+ RssProtos.StatusCode statusCode = rpcResponse.getStatus();
RssGetShuffleDataResponse response;
switch (statusCode) {
case SUCCESS:
response = new RssGetShuffleDataResponse(
- ResponseStatusCode.SUCCESS, rpcResponse.getData().toByteArray());
+ StatusCode.SUCCESS, rpcResponse.getData().toByteArray());
break;
default:
@@ -616,13 +615,13 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
LOG.info("GetShuffleIndex from {}:{} for {} cost {} ms", host, port,
requestInfo, System.currentTimeMillis() - start);
- StatusCode statusCode = rpcResponse.getStatus();
+ RssProtos.StatusCode statusCode = rpcResponse.getStatus();
RssGetShuffleIndexResponse response;
switch (statusCode) {
case SUCCESS:
response = new RssGetShuffleIndexResponse(
- ResponseStatusCode.SUCCESS, rpcResponse.getIndexData().toByteArray(), rpcResponse.getDataFileLen());
+ StatusCode.SUCCESS, rpcResponse.getIndexData().toByteArray(), rpcResponse.getDataFileLen());
break;
default:
@@ -665,13 +664,13 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
LOG.info("GetInMemoryShuffleData from {}:{} for " + requestInfo + " cost "
+ (System.currentTimeMillis() - start) + " ms", host, port);
- StatusCode statusCode = rpcResponse.getStatus();
+ RssProtos.StatusCode statusCode = rpcResponse.getStatus();
RssGetInMemoryShuffleDataResponse response;
switch (statusCode) {
case SUCCESS:
response = new RssGetInMemoryShuffleDataResponse(
- ResponseStatusCode.SUCCESS, rpcResponse.getData().toByteArray(),
+ StatusCode.SUCCESS, rpcResponse.getData().toByteArray(),
toBufferSegments(rpcResponse.getShuffleDataBlockSegmentsList()));
break;
default:
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/response/ClientResponse.java b/internal-client/src/main/java/org/apache/uniffle/client/response/ClientResponse.java
index 91d47360..8382dfa9 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/response/ClientResponse.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/response/ClientResponse.java
@@ -17,22 +17,24 @@
package org.apache.uniffle.client.response;
+import org.apache.uniffle.common.rpc.StatusCode;
+
public class ClientResponse {
- private final ResponseStatusCode statusCode;
+ private final StatusCode statusCode;
private final String message;
- public ClientResponse(ResponseStatusCode statusCode) {
+ public ClientResponse(StatusCode statusCode) {
this.statusCode = statusCode;
this.message = "";
}
- public ClientResponse(ResponseStatusCode statusCode, String message) {
+ public ClientResponse(StatusCode statusCode, String message) {
this.statusCode = statusCode;
this.message = message;
}
- public ResponseStatusCode getStatusCode() {
+ public StatusCode getStatusCode() {
return statusCode;
}
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/response/ResponseStatusCode.java b/internal-client/src/main/java/org/apache/uniffle/client/response/ResponseStatusCode.java
deleted file mode 100644
index 0e857e7e..00000000
--- a/internal-client/src/main/java/org/apache/uniffle/client/response/ResponseStatusCode.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.uniffle.client.response;
-
-public enum ResponseStatusCode {
- SUCCESS,
- DOUBLE_REGISTER,
- NO_BUFFER,
- INVALID_STORAGE,
- NO_REGISTER,
- NO_PARTITION,
- INTERNAL_ERROR,
- TIMEOUT,
- ACCESS_DENIED
-}
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/response/RssAccessClusterResponse.java b/internal-client/src/main/java/org/apache/uniffle/client/response/RssAccessClusterResponse.java
index 583e7daf..6ea8790b 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/response/RssAccessClusterResponse.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/response/RssAccessClusterResponse.java
@@ -17,15 +17,17 @@
package org.apache.uniffle.client.response;
+import org.apache.uniffle.common.rpc.StatusCode;
+
public class RssAccessClusterResponse extends ClientResponse {
private String uuid;
- public RssAccessClusterResponse(ResponseStatusCode statusCode, String messge) {
+ public RssAccessClusterResponse(StatusCode statusCode, String messge) {
super(statusCode, messge);
}
- public RssAccessClusterResponse(ResponseStatusCode statusCode, String messge, String uuid) {
+ public RssAccessClusterResponse(StatusCode statusCode, String messge, String uuid) {
super(statusCode, messge);
this.uuid = uuid;
}
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/response/RssAppHeartBeatResponse.java b/internal-client/src/main/java/org/apache/uniffle/client/response/RssAppHeartBeatResponse.java
index 6148a3e9..681da68c 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/response/RssAppHeartBeatResponse.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/response/RssAppHeartBeatResponse.java
@@ -17,9 +17,11 @@
package org.apache.uniffle.client.response;
+import org.apache.uniffle.common.rpc.StatusCode;
+
public class RssAppHeartBeatResponse extends ClientResponse {
- public RssAppHeartBeatResponse(ResponseStatusCode statusCode) {
+ public RssAppHeartBeatResponse(StatusCode statusCode) {
super(statusCode);
}
}
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/response/RssApplicationInfoResponse.java b/internal-client/src/main/java/org/apache/uniffle/client/response/RssApplicationInfoResponse.java
index 210041d7..de8690ce 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/response/RssApplicationInfoResponse.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/response/RssApplicationInfoResponse.java
@@ -17,9 +17,11 @@
package org.apache.uniffle.client.response;
+import org.apache.uniffle.common.rpc.StatusCode;
+
public class RssApplicationInfoResponse extends ClientResponse {
- public RssApplicationInfoResponse(ResponseStatusCode statusCode) {
+ public RssApplicationInfoResponse(StatusCode statusCode) {
super(statusCode);
}
}
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/response/RssFetchClientConfResponse.java b/internal-client/src/main/java/org/apache/uniffle/client/response/RssFetchClientConfResponse.java
index 8fb15b28..64835819 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/response/RssFetchClientConfResponse.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/response/RssFetchClientConfResponse.java
@@ -21,15 +21,17 @@ import java.util.Map;
import com.google.common.collect.Maps;
+import org.apache.uniffle.common.rpc.StatusCode;
+
public class RssFetchClientConfResponse extends ClientResponse {
private final Map<String, String> clientConf;
- public RssFetchClientConfResponse(ResponseStatusCode statusCode, String message) {
+ public RssFetchClientConfResponse(StatusCode statusCode, String message) {
this(statusCode, message, Maps.newHashMap());
}
public RssFetchClientConfResponse(
- ResponseStatusCode statusCode,
+ StatusCode statusCode,
String message,
Map<String, String> clientConf) {
super(statusCode, message);
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/response/RssFetchRemoteStorageResponse.java b/internal-client/src/main/java/org/apache/uniffle/client/response/RssFetchRemoteStorageResponse.java
index 53bc6941..b3948cff 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/response/RssFetchRemoteStorageResponse.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/response/RssFetchRemoteStorageResponse.java
@@ -18,12 +18,13 @@
package org.apache.uniffle.client.response;
import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.rpc.StatusCode;
public class RssFetchRemoteStorageResponse extends ClientResponse {
private RemoteStorageInfo remoteStorageInfo;
public RssFetchRemoteStorageResponse(
- ResponseStatusCode statusCode,
+ StatusCode statusCode,
RemoteStorageInfo remoteStorageInfo) {
super(statusCode);
this.remoteStorageInfo = remoteStorageInfo;
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/response/RssFinishShuffleResponse.java b/internal-client/src/main/java/org/apache/uniffle/client/response/RssFinishShuffleResponse.java
index dfb0d7c2..d5ea46b9 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/response/RssFinishShuffleResponse.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/response/RssFinishShuffleResponse.java
@@ -17,9 +17,11 @@
package org.apache.uniffle.client.response;
+import org.apache.uniffle.common.rpc.StatusCode;
+
public class RssFinishShuffleResponse extends ClientResponse {
- public RssFinishShuffleResponse(ResponseStatusCode statusCode) {
+ public RssFinishShuffleResponse(StatusCode statusCode) {
super(statusCode);
}
}
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetInMemoryShuffleDataResponse.java b/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetInMemoryShuffleDataResponse.java
index 546a41d1..70f218d5 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetInMemoryShuffleDataResponse.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetInMemoryShuffleDataResponse.java
@@ -20,6 +20,7 @@ package org.apache.uniffle.client.response;
import java.util.List;
import org.apache.uniffle.common.BufferSegment;
+import org.apache.uniffle.common.rpc.StatusCode;
public class RssGetInMemoryShuffleDataResponse extends ClientResponse {
@@ -27,7 +28,7 @@ public class RssGetInMemoryShuffleDataResponse extends ClientResponse {
private final List<BufferSegment> bufferSegments;
public RssGetInMemoryShuffleDataResponse(
- ResponseStatusCode statusCode, byte[] data, List<BufferSegment> bufferSegments) {
+ StatusCode statusCode, byte[] data, List<BufferSegment> bufferSegments) {
super(statusCode);
this.bufferSegments = bufferSegments;
this.data = data;
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleAssignmentsResponse.java b/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleAssignmentsResponse.java
index f916a85b..fa0e5e34 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleAssignmentsResponse.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleAssignmentsResponse.java
@@ -22,17 +22,18 @@ import java.util.Map;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.rpc.StatusCode;
public class RssGetShuffleAssignmentsResponse extends ClientResponse {
private Map<Integer, List<ShuffleServerInfo>> partitionToServers;
private Map<ShuffleServerInfo, List<PartitionRange>> serverToPartitionRanges;
- public RssGetShuffleAssignmentsResponse(ResponseStatusCode statusCode) {
+ public RssGetShuffleAssignmentsResponse(StatusCode statusCode) {
super(statusCode);
}
- public RssGetShuffleAssignmentsResponse(ResponseStatusCode statusCode, String message) {
+ public RssGetShuffleAssignmentsResponse(StatusCode statusCode, String message) {
super(statusCode, message);
}
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleDataResponse.java b/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleDataResponse.java
index 5e73aa52..86c54f45 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleDataResponse.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleDataResponse.java
@@ -17,11 +17,13 @@
package org.apache.uniffle.client.response;
+import org.apache.uniffle.common.rpc.StatusCode;
+
public class RssGetShuffleDataResponse extends ClientResponse {
private final byte[] shuffleData;
- public RssGetShuffleDataResponse(ResponseStatusCode statusCode, byte[] data) {
+ public RssGetShuffleDataResponse(StatusCode statusCode, byte[] data) {
super(statusCode);
this.shuffleData = data;
}
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleIndexResponse.java b/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleIndexResponse.java
index 8884271c..f1c3171f 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleIndexResponse.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleIndexResponse.java
@@ -18,11 +18,12 @@
package org.apache.uniffle.client.response;
import org.apache.uniffle.common.ShuffleIndexResult;
+import org.apache.uniffle.common.rpc.StatusCode;
public class RssGetShuffleIndexResponse extends ClientResponse {
private final ShuffleIndexResult shuffleIndexResult;
- public RssGetShuffleIndexResponse(ResponseStatusCode statusCode, byte[] data, long dataFileLen) {
+ public RssGetShuffleIndexResponse(StatusCode statusCode, byte[] data, long dataFileLen) {
super(statusCode);
this.shuffleIndexResult = new ShuffleIndexResult(data, dataFileLen);
}
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleResultResponse.java b/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleResultResponse.java
index a9bffc44..c8e429e7 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleResultResponse.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleResultResponse.java
@@ -21,13 +21,14 @@ import java.io.IOException;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
+import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.RssUtils;
public class RssGetShuffleResultResponse extends ClientResponse {
private Roaring64NavigableMap blockIdBitmap;
- public RssGetShuffleResultResponse(ResponseStatusCode statusCode, byte[] serializedBitmap) throws IOException {
+ public RssGetShuffleResultResponse(StatusCode statusCode, byte[] serializedBitmap) throws IOException {
super(statusCode);
blockIdBitmap = RssUtils.deserializeBitMap(serializedBitmap);
}
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/response/RssRegisterShuffleResponse.java b/internal-client/src/main/java/org/apache/uniffle/client/response/RssRegisterShuffleResponse.java
index 59078bd7..2d610558 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/response/RssRegisterShuffleResponse.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/response/RssRegisterShuffleResponse.java
@@ -17,9 +17,11 @@
package org.apache.uniffle.client.response;
+import org.apache.uniffle.common.rpc.StatusCode;
+
public class RssRegisterShuffleResponse extends ClientResponse {
- public RssRegisterShuffleResponse(ResponseStatusCode statusCode) {
+ public RssRegisterShuffleResponse(StatusCode statusCode) {
super(statusCode);
}
}
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/response/RssReportShuffleResultResponse.java b/internal-client/src/main/java/org/apache/uniffle/client/response/RssReportShuffleResultResponse.java
index f39a1e42..ab87ee01 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/response/RssReportShuffleResultResponse.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/response/RssReportShuffleResultResponse.java
@@ -17,9 +17,11 @@
package org.apache.uniffle.client.response;
+import org.apache.uniffle.common.rpc.StatusCode;
+
public class RssReportShuffleResultResponse extends ClientResponse {
- public RssReportShuffleResultResponse(ResponseStatusCode statusCode) {
+ public RssReportShuffleResultResponse(StatusCode statusCode) {
super(statusCode);
}
}
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/response/RssSendCommitResponse.java b/internal-client/src/main/java/org/apache/uniffle/client/response/RssSendCommitResponse.java
index 8d945eb3..bb686e2a 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/response/RssSendCommitResponse.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/response/RssSendCommitResponse.java
@@ -17,11 +17,13 @@
package org.apache.uniffle.client.response;
+import org.apache.uniffle.common.rpc.StatusCode;
+
public class RssSendCommitResponse extends ClientResponse {
private int commitCount;
- public RssSendCommitResponse(ResponseStatusCode statusCode) {
+ public RssSendCommitResponse(StatusCode statusCode) {
super(statusCode);
}
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/response/RssSendHeartBeatResponse.java b/internal-client/src/main/java/org/apache/uniffle/client/response/RssSendHeartBeatResponse.java
index 6bb1aaf8..cc47e436 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/response/RssSendHeartBeatResponse.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/response/RssSendHeartBeatResponse.java
@@ -19,11 +19,13 @@ package org.apache.uniffle.client.response;
import java.util.Set;
+import org.apache.uniffle.common.rpc.StatusCode;
+
public class RssSendHeartBeatResponse extends ClientResponse {
private Set<String> appIds;
- public RssSendHeartBeatResponse(ResponseStatusCode statusCode) {
+ public RssSendHeartBeatResponse(StatusCode statusCode) {
super(statusCode);
}
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/response/RssSendShuffleDataResponse.java b/internal-client/src/main/java/org/apache/uniffle/client/response/RssSendShuffleDataResponse.java
index 51cdbe07..eb20dd8a 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/response/RssSendShuffleDataResponse.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/response/RssSendShuffleDataResponse.java
@@ -19,12 +19,14 @@ package org.apache.uniffle.client.response;
import java.util.List;
+import org.apache.uniffle.common.rpc.StatusCode;
+
public class RssSendShuffleDataResponse extends ClientResponse {
private List<Long> successBlockIds;
private List<Long> failedBlockIds;
- public RssSendShuffleDataResponse(ResponseStatusCode statusCode) {
+ public RssSendShuffleDataResponse(StatusCode statusCode) {
super(statusCode);
}
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/response/RssUnregisterShuffleResponse.java b/internal-client/src/main/java/org/apache/uniffle/client/response/RssUnregisterShuffleResponse.java
index b872522e..c6bc0527 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/response/RssUnregisterShuffleResponse.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/response/RssUnregisterShuffleResponse.java
@@ -17,9 +17,11 @@
package org.apache.uniffle.client.response;
+import org.apache.uniffle.common.rpc.StatusCode;
+
public class RssUnregisterShuffleResponse extends ClientResponse {
- public RssUnregisterShuffleResponse(ResponseStatusCode statusCode) {
+ public RssUnregisterShuffleResponse(StatusCode statusCode) {
super(statusCode);
}
diff --git a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
index 42352560..4e8b93ec 100644
--- a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
+++ b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
@@ -34,8 +34,8 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.CoordinatorClient;
import org.apache.uniffle.client.factory.CoordinatorClientFactory;
import org.apache.uniffle.client.request.RssSendHeartBeatRequest;
-import org.apache.uniffle.client.response.ResponseStatusCode;
import org.apache.uniffle.client.response.RssSendHeartBeatResponse;
+import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.storage.StorageInfo;
import org.apache.uniffle.common.util.ThreadUtils;
@@ -125,7 +125,7 @@ public class RegisterHeartBeat {
for (Future<RssSendHeartBeatResponse> rf : respFutures) {
try {
if (rf.get(request.getTimeout() * 2, TimeUnit.MILLISECONDS).getStatusCode()
- == ResponseStatusCode.SUCCESS) {
+ == StatusCode.SUCCESS) {
sendSuccessfully = true;
}
} catch (Exception e) {
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 57e3d12e..6abaf33b 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -45,6 +45,7 @@ import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.ShufflePartitionedData;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.exception.FileNotFoundException;
+import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.proto.RssProtos;
import org.apache.uniffle.proto.RssProtos.AppHeartBeatRequest;
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 213c66dc..3654b05f 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -53,6 +53,7 @@ import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.ShufflePartitionedData;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.exception.FileNotFoundException;
+import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.common.util.ThreadUtils;
diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 259e07b2..8d966d3e 100644
--- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShufflePartitionedData;
+import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.common.util.TripleFunction;
@@ -46,7 +47,6 @@ import org.apache.uniffle.server.ShuffleDataFlushEvent;
import org.apache.uniffle.server.ShuffleFlushManager;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.ShuffleServerMetrics;
-import org.apache.uniffle.server.StatusCode;
public class ShuffleBufferManager {
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 2762ee99..ea36c21b 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -48,6 +48,7 @@ import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.ShufflePartitionedData;
+import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.RssUtils;
diff --git a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
index e6952e7d..00d8467e 100644
--- a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
@@ -30,13 +30,13 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShufflePartitionedData;
+import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.server.ShuffleFlushManager;
import org.apache.uniffle.server.ShuffleServer;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.ShuffleServerMetrics;
import org.apache.uniffle.server.ShuffleTaskManager;
-import org.apache.uniffle.server.StatusCode;
import org.apache.uniffle.server.TestShuffleFlushManager;
import org.apache.uniffle.server.storage.StorageManager;
import org.apache.uniffle.server.storage.StorageManagerFactory;
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java
index a8d4ec25..5f588154 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java
@@ -31,12 +31,12 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.client.api.ShuffleServerClient;
import org.apache.uniffle.client.request.RssGetShuffleDataRequest;
-import org.apache.uniffle.client.response.ResponseStatusCode;
import org.apache.uniffle.client.response.RssGetShuffleDataResponse;
import org.apache.uniffle.client.response.RssGetShuffleIndexResponse;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -73,7 +73,7 @@ public class LocalFileServerReadHandlerTest {
int actualWriteDataBlock = expectTotalBlockNum - 1;
int actualFileLen = blockSize * actualWriteDataBlock;
- RssGetShuffleIndexResponse response = new RssGetShuffleIndexResponse(ResponseStatusCode.SUCCESS,
+ RssGetShuffleIndexResponse response = new RssGetShuffleIndexResponse(StatusCode.SUCCESS,
byteBuffer.array(), actualFileLen);
Mockito.doReturn(response).when(mockShuffleServerClient).getShuffleIndex(Mockito.any());
@@ -91,9 +91,9 @@ public class LocalFileServerReadHandlerTest {
ArgumentMatcher<RssGetShuffleDataRequest> segment2Match =
(request) -> request.getOffset() == bytesPerSegment && request.getLength() == blockSize;
RssGetShuffleDataResponse segment1Response =
- new RssGetShuffleDataResponse(ResponseStatusCode.SUCCESS, segments.get(0));
+ new RssGetShuffleDataResponse(StatusCode.SUCCESS, segments.get(0));
RssGetShuffleDataResponse segment2Response =
- new RssGetShuffleDataResponse(ResponseStatusCode.SUCCESS, segments.get(1));
+ new RssGetShuffleDataResponse(StatusCode.SUCCESS, segments.get(1));
Mockito.doReturn(segment1Response).when(mockShuffleServerClient).getShuffleData(Mockito.argThat(segment1Match));
Mockito.doReturn(segment2Response).when(mockShuffleServerClient).getShuffleData(Mockito.argThat(segment2Match));