You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@uniffle.apache.org by "advancedxy (via GitHub)" <gi...@apache.org> on 2023/03/29 06:40:58 UTC

[GitHub] [incubator-uniffle] advancedxy opened a new pull request, #777: [#477][part-0] feat: add ShuffleManagerServer impl

advancedxy opened a new pull request, #777:
URL: https://github.com/apache/incubator-uniffle/pull/777

   ### What changes were proposed in this pull request?
   1. add shuffle manager proto
   2. the corresponding client and server impls
   3. add the RssShuffleManagerInterface 
   
   ### Why are the changes needed?
   This is the first part of #477. 
   To support re-submit spark stage, the ShuffleManagerServer should be introduced first.
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   ### How was this patch tested?
   Added UTs and one integration test.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on pull request #777: [#477][part-0] feat: add ShuffleManagerServer impl

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on PR #777:
URL: https://github.com/apache/incubator-uniffle/pull/777#issuecomment-1489566157

   @zuston do you some any more comments?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #777: [#477][part-0] feat: add ShuffleManagerServer impl

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #777:
URL: https://github.com/apache/incubator-uniffle/pull/777#discussion_r1151597783


##########
internal-client/src/main/java/org/apache/uniffle/client/response/RssReportShuffleFetchFailureResponse.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.response;
+
+import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.proto.RssProtos.ReportShuffleFetchFailureResponse;
+
+public class RssReportShuffleFetchFailureResponse extends ClientResponse {
+  private boolean reSubmitWholeStage;
+
+  public RssReportShuffleFetchFailureResponse(StatusCode code, String msg, boolean recomputeStage) {
+    super(code, msg);
+    this.reSubmitWholeStage = recomputeStage;
+  }
+
+  public boolean getReSubmitWholeStage() {
+    return this.reSubmitWholeStage;
+  }
+
+  public ReportShuffleFetchFailureResponse toProto() {
+    ReportShuffleFetchFailureResponse.Builder builder = ReportShuffleFetchFailureResponse.newBuilder();
+    return builder
+        .setStatus(getStatusCode().toProto())
+        .setMsg(getMessage())
+        .setReSubmitWholeStage(reSubmitWholeStage)
+        .build();
+  }
+
+  public static RssReportShuffleFetchFailureResponse fromProto(ReportShuffleFetchFailureResponse response) {
+    return new RssReportShuffleFetchFailureResponse(
+        // todo: add fromProto for StatusCode

Review Comment:
   created.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] codecov-commenter commented on pull request #777: [#477][part-0] feat: add ShuffleManagerServer impl

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #777:
URL: https://github.com/apache/incubator-uniffle/pull/777#issuecomment-1488035353

   ## [Codecov](https://codecov.io/gh/apache/incubator-uniffle/pull/777?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#777](https://codecov.io/gh/apache/incubator-uniffle/pull/777?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (eb8d61f) into [master](https://codecov.io/gh/apache/incubator-uniffle/commit/1b48c12026ad7be42368fb6eee2cd9a1dff2bbb5?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (1b48c12) will **decrease** coverage by `3.74%`.
   > The diff coverage is `32.75%`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master     #777      +/-   ##
   ============================================
   - Coverage     61.16%   57.42%   -3.74%     
   + Complexity     1985     1871     -114     
   ============================================
     Files           245      270      +25     
     Lines         13422    11739    -1683     
     Branches       1125     1124       -1     
   ============================================
   - Hits           8209     6741    -1468     
   + Misses         4751     4612     -139     
   + Partials        462      386      -76     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-uniffle/pull/777?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...t/request/RssReportShuffleFetchFailureRequest.java](https://codecov.io/gh/apache/incubator-uniffle/pull/777?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW50ZXJuYWwtY2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL2NsaWVudC9yZXF1ZXN0L1Jzc1JlcG9ydFNodWZmbGVGZXRjaEZhaWx1cmVSZXF1ZXN0LmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...response/RssReportShuffleFetchFailureResponse.java](https://codecov.io/gh/apache/incubator-uniffle/pull/777?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW50ZXJuYWwtY2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL2NsaWVudC9yZXNwb25zZS9Sc3NSZXBvcnRTaHVmZmxlRmV0Y2hGYWlsdXJlUmVzcG9uc2UuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...fle/client/impl/grpc/ShuffleManagerGrpcClient.java](https://codecov.io/gh/apache/incubator-uniffle/pull/777?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW50ZXJuYWwtY2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL2NsaWVudC9pbXBsL2dycGMvU2h1ZmZsZU1hbmFnZXJHcnBjQ2xpZW50LmphdmE=) | `50.00% <50.00%> (ø)` | |
   | [...java/org/apache/uniffle/common/rpc/GrpcServer.java](https://codecov.io/gh/apache/incubator-uniffle/pull/777?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL2NvbW1vbi9ycGMvR3JwY1NlcnZlci5qYXZh) | `79.10% <66.66%> (+57.56%)` | :arrow_up: |
   | [...pache/uniffle/common/metrics/DummyGRPCMetrics.java](https://codecov.io/gh/apache/incubator-uniffle/pull/777?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL2NvbW1vbi9tZXRyaWNzL0R1bW15R1JQQ01ldHJpY3MuamF2YQ==) | `100.00% <100.00%> (ø)` | |
   | [...org/apache/uniffle/common/metrics/GRPCMetrics.java](https://codecov.io/gh/apache/incubator-uniffle/pull/777?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL2NvbW1vbi9tZXRyaWNzL0dSUENNZXRyaWNzLmphdmE=) | `48.61% <100.00%> (+0.72%)` | :arrow_up: |
   | [...le/client/factory/ShuffleManagerClientFactory.java](https://codecov.io/gh/apache/incubator-uniffle/pull/777?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW50ZXJuYWwtY2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL2NsaWVudC9mYWN0b3J5L1NodWZmbGVNYW5hZ2VyQ2xpZW50RmFjdG9yeS5qYXZh) | `100.00% <100.00%> (ø)` | |
   
   ... and [79 files with indirect coverage changes](https://codecov.io/gh/apache/incubator-uniffle/pull/777/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #777: [#477][part-0] feat: add ShuffleManagerServer impl

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #777:
URL: https://github.com/apache/incubator-uniffle/pull/777#discussion_r1152691455


##########
internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleManagerClientFactory.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.factory;
+
+import org.apache.uniffle.client.impl.grpc.ShuffleManagerGrpcClient;
+import org.apache.uniffle.common.ClientType;
+
+public class ShuffleManagerClientFactory {
+
+  private static class LazyHolder {
+    private static final ShuffleManagerClientFactory INSTANCE = new ShuffleManagerClientFactory();
+  }
+
+  public static ShuffleManagerClientFactory getInstance() {
+    return LazyHolder.INSTANCE;
+  }
+
+  private ShuffleManagerClientFactory() {
+  }
+
+  public ShuffleManagerGrpcClient createShuffleManagerClient(ClientType clientType, String host, int port) {
+    if (clientType.equals(ClientType.GRPC)) {

Review Comment:
   fixed



##########
client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.shuffle.manager;
+
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.metrics.GRPCMetrics;
+import org.apache.uniffle.common.rpc.GrpcServer;
+
+public class ShuffleManagerServerFactory {
+  private final RssShuffleManagerInterface shuffleManager;
+  private final RssBaseConf conf;
+
+  public ShuffleManagerServerFactory(RssShuffleManagerInterface shuffleManager, RssConf conf) {
+    this.shuffleManager = shuffleManager;
+    this.conf = new RssBaseConf();
+    this.conf.addAll(conf);
+  }
+
+  public GrpcServer getServer() {
+    String type = conf.getString(RssBaseConf.RPC_SERVER_TYPE);
+    if (type.equals(ServerType.GRPC.name())) {

Review Comment:
   addressed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #777: [#477][part-0] feat: add ShuffleManagerServer impl

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #777:
URL: https://github.com/apache/incubator-uniffle/pull/777#discussion_r1151549782


##########
integration-test/spark-common/src/test/java/org/apache/uniffle/test/SimpleShuffleServerManagerE2ETest.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
+import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
+import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.shuffle.manager.DummyRssShuffleManager;
+import org.apache.uniffle.shuffle.manager.RssShuffleManagerInterface;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class SimpleShuffleServerManagerE2ETest extends ShuffleServerManagerTestBase {

Review Comment:
   In a broader sense, the e2e may also refer to the actual workload in prod env.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #777: [#477][part-0] feat: add ShuffleManagerServer impl

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on code in PR #777:
URL: https://github.com/apache/incubator-uniffle/pull/777#discussion_r1151565570


##########
integration-test/spark-common/src/test/java/org/apache/uniffle/test/SimpleShuffleServerManagerE2ETest.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
+import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
+import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.shuffle.manager.DummyRssShuffleManager;
+import org.apache.uniffle.shuffle.manager.RssShuffleManagerInterface;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class SimpleShuffleServerManagerE2ETest extends ShuffleServerManagerTestBase {

Review Comment:
   If they have no difference, I prefer using `IntegrationTest` here to unify our style. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #777: [#477][part-0] feat: add ShuffleManagerServer impl

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on code in PR #777:
URL: https://github.com/apache/incubator-uniffle/pull/777#discussion_r1151535638


##########
internal-client/src/main/java/org/apache/uniffle/client/response/RssReportShuffleFetchFailureResponse.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.response;
+
+import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.proto.RssProtos.ReportShuffleFetchFailureResponse;
+
+public class RssReportShuffleFetchFailureResponse extends ClientResponse {
+  private boolean reSubmitWholeStage;
+
+  public RssReportShuffleFetchFailureResponse(StatusCode code, String msg, boolean recomputeStage) {
+    super(code, msg);
+    this.reSubmitWholeStage = recomputeStage;
+  }
+
+  public boolean getReSubmitWholeStage() {
+    return this.reSubmitWholeStage;
+  }
+
+  public ReportShuffleFetchFailureResponse toProto() {
+    ReportShuffleFetchFailureResponse.Builder builder = ReportShuffleFetchFailureResponse.newBuilder();
+    return builder
+        .setStatus(getStatusCode().toProto())
+        .setMsg(getMessage())
+        .setReSubmitWholeStage(reSubmitWholeStage)
+        .build();
+  }
+
+  public static RssReportShuffleFetchFailureResponse fromProto(ReportShuffleFetchFailureResponse response) {
+    return new RssReportShuffleFetchFailureResponse(
+        // todo: add fromProto for StatusCode

Review Comment:
   If we add `todo`, we would better add an issue to track it.



##########
common/src/main/java/org/apache/uniffle/common/metrics/DummyGRPCMetrics.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.metrics;
+
+public class DummyGRPCMetrics extends GRPCMetrics {

Review Comment:
   Em.. I  feel that this class is used for test when I see this class. What is this class used for? Could we give it a better name or add some comments? I guess that `EmptyGRPCMetrics` may be fit.



##########
integration-test/spark-common/src/test/java/org/apache/uniffle/test/SimpleShuffleServerManagerE2ETest.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
+import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
+import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.shuffle.manager.DummyRssShuffleManager;
+import org.apache.uniffle.shuffle.manager.RssShuffleManagerInterface;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class SimpleShuffleServerManagerE2ETest extends ShuffleServerManagerTestBase {

Review Comment:
   Why is test is E2E test? What's the difference from integration tests? Will we introduce the concept of `E2E` tests?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #777: [#477][part-0] feat: add ShuffleManagerServer impl

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #777:
URL: https://github.com/apache/incubator-uniffle/pull/777#discussion_r1151545276


##########
integration-test/spark-common/src/test/java/org/apache/uniffle/test/SimpleShuffleServerManagerE2ETest.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
+import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
+import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.shuffle.manager.DummyRssShuffleManager;
+import org.apache.uniffle.shuffle.manager.RssShuffleManagerInterface;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class SimpleShuffleServerManagerE2ETest extends ShuffleServerManagerTestBase {

Review Comment:
   Integration test is effectively the same as end to end test.



##########
internal-client/src/main/java/org/apache/uniffle/client/response/RssReportShuffleFetchFailureResponse.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.response;
+
+import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.proto.RssProtos.ReportShuffleFetchFailureResponse;
+
+public class RssReportShuffleFetchFailureResponse extends ClientResponse {
+  private boolean reSubmitWholeStage;
+
+  public RssReportShuffleFetchFailureResponse(StatusCode code, String msg, boolean recomputeStage) {
+    super(code, msg);
+    this.reSubmitWholeStage = recomputeStage;
+  }
+
+  public boolean getReSubmitWholeStage() {
+    return this.reSubmitWholeStage;
+  }
+
+  public ReportShuffleFetchFailureResponse toProto() {
+    ReportShuffleFetchFailureResponse.Builder builder = ReportShuffleFetchFailureResponse.newBuilder();
+    return builder
+        .setStatus(getStatusCode().toProto())
+        .setMsg(getMessage())
+        .setReSubmitWholeStage(reSubmitWholeStage)
+        .build();
+  }
+
+  public static RssReportShuffleFetchFailureResponse fromProto(ReportShuffleFetchFailureResponse response) {
+    return new RssReportShuffleFetchFailureResponse(
+        // todo: add fromProto for StatusCode

Review Comment:
   Let me create one.



##########
common/src/main/java/org/apache/uniffle/common/metrics/DummyGRPCMetrics.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.metrics;
+
+public class DummyGRPCMetrics extends GRPCMetrics {

Review Comment:
   `EmptyGRPCMetrics` sounds good to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #777: [#477][part-0] feat: add ShuffleManagerServer impl

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #777:
URL: https://github.com/apache/incubator-uniffle/pull/777#discussion_r1152677339


##########
client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.shuffle.manager;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
+
+import io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.util.JavaUtils;
+import org.apache.uniffle.proto.RssProtos;
+import org.apache.uniffle.proto.ShuffleManagerGrpc.ShuffleManagerImplBase;
+
+public class ShuffleManagerGrpcService extends ShuffleManagerImplBase {
+  private static final Logger LOG = LoggerFactory.getLogger(ShuffleManagerGrpcService.class);
+  private final Map<Integer, RssShuffleStatus> shuffleStatus = JavaUtils.newConcurrentMap();
+  private final RssShuffleManagerInterface shuffleManager;
+
+  public ShuffleManagerGrpcService(RssShuffleManagerInterface shuffleManager) {
+    this.shuffleManager = shuffleManager;
+  }
+
+  @Override
+  public void reportShuffleFetchFailure(RssProtos.ReportShuffleFetchFailureRequest request,
+      StreamObserver<RssProtos.ReportShuffleFetchFailureResponse> responseObserver) {
+    String appId = request.getAppId();
+    int stageAttempt = request.getStageAttemptId();
+    int partitionId = request.getPartitionId();
+    RssProtos.StatusCode code;
+    boolean reSubmitWholeStage;
+    String msg;
+    if (!appId.equals(shuffleManager.getAppId())) {
+      msg = String.format("got a wrong shuffle fetch failure report from appId: %s, expected appId: %s",
+          appId, shuffleManager.getAppId());
+      LOG.warn(msg);
+      code = RssProtos.StatusCode.INVALID_REQUEST;
+      reSubmitWholeStage = false;
+    } else {
+      RssShuffleStatus status = shuffleStatus.computeIfAbsent(request.getShuffleId(), key -> {
+            int partitionNum = shuffleManager.getPartitionNum(key);
+            return new RssShuffleStatus(partitionNum, stageAttempt);
+          }
+      );
+      int c = status.resetStageAttemptIfNecessary(stageAttempt);

Review Comment:
   It's just a short variable name, means compareResult. Just to keep it concise.



##########
client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.shuffle.manager;
+
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.metrics.GRPCMetrics;
+import org.apache.uniffle.common.rpc.GrpcServer;
+
+public class ShuffleManagerServerFactory {
+  private final RssShuffleManagerInterface shuffleManager;
+  private final RssBaseConf conf;
+
+  public ShuffleManagerServerFactory(RssShuffleManagerInterface shuffleManager, RssConf conf) {
+    this.shuffleManager = shuffleManager;
+    this.conf = new RssBaseConf();
+    this.conf.addAll(conf);
+  }
+
+  public GrpcServer getServer() {
+    String type = conf.getString(RssBaseConf.RPC_SERVER_TYPE);

Review Comment:
   There are multiple definitions of ServerType enums, let's refactor that in a new pr. 



##########
client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.shuffle.manager;
+
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.metrics.GRPCMetrics;
+import org.apache.uniffle.common.rpc.GrpcServer;
+
+public class ShuffleManagerServerFactory {
+  private final RssShuffleManagerInterface shuffleManager;
+  private final RssBaseConf conf;
+
+  public ShuffleManagerServerFactory(RssShuffleManagerInterface shuffleManager, RssConf conf) {
+    this.shuffleManager = shuffleManager;
+    this.conf = new RssBaseConf();
+    this.conf.addAll(conf);
+  }
+
+  public GrpcServer getServer() {
+    String type = conf.getString(RssBaseConf.RPC_SERVER_TYPE);
+    if (type.equals(ServerType.GRPC.name())) {

Review Comment:
   address it in a following commit.



##########
internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.impl.grpc;
+
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.client.api.ShuffleManagerClient;
+import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
+import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.proto.RssProtos.ReportShuffleFetchFailureRequest;
+import org.apache.uniffle.proto.RssProtos.ReportShuffleFetchFailureResponse;
+import org.apache.uniffle.proto.ShuffleManagerGrpc;
+
+public class ShuffleManagerGrpcClient extends GrpcClient implements ShuffleManagerClient {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ShuffleManagerGrpcClient.class);
+  private static final long RPC_TIMEOUT_DEFAULT_MS = 60000;

Review Comment:
   Should be. Let's do this in another pr. This code is aligned with other Client impls, such as ShuffleServerGrpcClient.
   We should refactor these together.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #777: [#477][part-0] feat: add ShuffleManagerServer impl

Posted by "zuston (via GitHub)" <gi...@apache.org>.
zuston commented on code in PR #777:
URL: https://github.com/apache/incubator-uniffle/pull/777#discussion_r1152664916


##########
client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.shuffle.manager;
+
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.metrics.GRPCMetrics;
+import org.apache.uniffle.common.rpc.GrpcServer;
+
+public class ShuffleManagerServerFactory {
+  private final RssShuffleManagerInterface shuffleManager;
+  private final RssBaseConf conf;
+
+  public ShuffleManagerServerFactory(RssShuffleManagerInterface shuffleManager, RssConf conf) {
+    this.shuffleManager = shuffleManager;
+    this.conf = new RssBaseConf();
+    this.conf.addAll(conf);
+  }
+
+  public GrpcServer getServer() {
+    String type = conf.getString(RssBaseConf.RPC_SERVER_TYPE);
+    if (type.equals(ServerType.GRPC.name())) {

Review Comment:
   I prefer `ServerType.GRPC.name().equals(type)`



##########
client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.shuffle.manager;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
+
+import io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.util.JavaUtils;
+import org.apache.uniffle.proto.RssProtos;
+import org.apache.uniffle.proto.ShuffleManagerGrpc.ShuffleManagerImplBase;
+
+public class ShuffleManagerGrpcService extends ShuffleManagerImplBase {
+  private static final Logger LOG = LoggerFactory.getLogger(ShuffleManagerGrpcService.class);
+  private final Map<Integer, RssShuffleStatus> shuffleStatus = JavaUtils.newConcurrentMap();
+  private final RssShuffleManagerInterface shuffleManager;
+
+  public ShuffleManagerGrpcService(RssShuffleManagerInterface shuffleManager) {
+    this.shuffleManager = shuffleManager;
+  }
+
+  @Override
+  public void reportShuffleFetchFailure(RssProtos.ReportShuffleFetchFailureRequest request,
+      StreamObserver<RssProtos.ReportShuffleFetchFailureResponse> responseObserver) {
+    String appId = request.getAppId();
+    int stageAttempt = request.getStageAttemptId();
+    int partitionId = request.getPartitionId();
+    RssProtos.StatusCode code;
+    boolean reSubmitWholeStage;
+    String msg;
+    if (!appId.equals(shuffleManager.getAppId())) {
+      msg = String.format("got a wrong shuffle fetch failure report from appId: %s, expected appId: %s",
+          appId, shuffleManager.getAppId());
+      LOG.warn(msg);
+      code = RssProtos.StatusCode.INVALID_REQUEST;
+      reSubmitWholeStage = false;
+    } else {
+      RssShuffleStatus status = shuffleStatus.computeIfAbsent(request.getShuffleId(), key -> {
+            int partitionNum = shuffleManager.getPartitionNum(key);
+            return new RssShuffleStatus(partitionNum, stageAttempt);
+          }
+      );
+      int c = status.resetStageAttemptIfNecessary(stageAttempt);

Review Comment:
   what's meaning of `c` ?



##########
internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.impl.grpc;
+
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.client.api.ShuffleManagerClient;
+import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
+import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.proto.RssProtos.ReportShuffleFetchFailureRequest;
+import org.apache.uniffle.proto.RssProtos.ReportShuffleFetchFailureResponse;
+import org.apache.uniffle.proto.ShuffleManagerGrpc;
+
+public class ShuffleManagerGrpcClient extends GrpcClient implements ShuffleManagerClient {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ShuffleManagerGrpcClient.class);
+  private static final long RPC_TIMEOUT_DEFAULT_MS = 60000;

Review Comment:
   Should be configurable?



##########
client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.shuffle.manager;
+
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.metrics.GRPCMetrics;
+import org.apache.uniffle.common.rpc.GrpcServer;
+
+public class ShuffleManagerServerFactory {
+  private final RssShuffleManagerInterface shuffleManager;
+  private final RssBaseConf conf;
+
+  public ShuffleManagerServerFactory(RssShuffleManagerInterface shuffleManager, RssConf conf) {
+    this.shuffleManager = shuffleManager;
+    this.conf = new RssBaseConf();
+    this.conf.addAll(conf);
+  }
+
+  public GrpcServer getServer() {
+    String type = conf.getString(RssBaseConf.RPC_SERVER_TYPE);

Review Comment:
   It looks `RPC_SERVER_TYPE` should be refactored as `ENUM` type



##########
internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleManagerClientFactory.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.factory;
+
+import org.apache.uniffle.client.impl.grpc.ShuffleManagerGrpcClient;
+import org.apache.uniffle.common.ClientType;
+
+public class ShuffleManagerClientFactory {
+
+  private static class LazyHolder {
+    private static final ShuffleManagerClientFactory INSTANCE = new ShuffleManagerClientFactory();
+  }
+
+  public static ShuffleManagerClientFactory getInstance() {
+    return LazyHolder.INSTANCE;
+  }
+
+  private ShuffleManagerClientFactory() {
+  }
+
+  public ShuffleManagerGrpcClient createShuffleManagerClient(ClientType clientType, String host, int port) {
+    if (clientType.equals(ClientType.GRPC)) {

Review Comment:
   ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy merged pull request #777: [#477][part-0] feat: add ShuffleManagerServer impl

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy merged PR #777:
URL: https://github.com/apache/incubator-uniffle/pull/777


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #777: [#477][part-0] feat: add ShuffleManagerServer impl

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on code in PR #777:
URL: https://github.com/apache/incubator-uniffle/pull/777#discussion_r1151613491


##########
internal-client/src/main/java/org/apache/uniffle/client/response/RssReportShuffleFetchFailureResponse.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.response;
+
+import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.proto.RssProtos.ReportShuffleFetchFailureResponse;
+
+public class RssReportShuffleFetchFailureResponse extends ClientResponse {
+  private boolean reSubmitWholeStage;
+
+  public RssReportShuffleFetchFailureResponse(StatusCode code, String msg, boolean recomputeStage) {
+    super(code, msg);
+    this.reSubmitWholeStage = recomputeStage;
+  }
+
+  public boolean getReSubmitWholeStage() {
+    return this.reSubmitWholeStage;
+  }
+
+  public ReportShuffleFetchFailureResponse toProto() {
+    ReportShuffleFetchFailureResponse.Builder builder = ReportShuffleFetchFailureResponse.newBuilder();
+    return builder
+        .setStatus(getStatusCode().toProto())
+        .setMsg(getMessage())
+        .setReSubmitWholeStage(reSubmitWholeStage)
+        .build();
+  }
+
+  public static RssReportShuffleFetchFailureResponse fromProto(ReportShuffleFetchFailureResponse response) {
+    return new RssReportShuffleFetchFailureResponse(
+        // todo: add fromProto for StatusCode

Review Comment:
   Could we add ISSUE ID in the comment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org