You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@uniffle.apache.org by "advancedxy (via GitHub)" <gi...@apache.org> on 2023/03/31 09:22:03 UTC

[GitHub] [incubator-uniffle] advancedxy opened a new pull request, #787: [#477][part-1]feat: support stage resubmit in spark clients

advancedxy opened a new pull request, #787:
URL: https://github.com/apache/incubator-uniffle/pull/787

   ### What changes were proposed in this pull request?
   1. utility methods to create FetchFailedException and  unregisterAllMapOutput dynamically
   2. RssShuffleManagerBase to create a shareable base for both Spark 2 and Spark3
   3. a wrapper to wrap shuffle data reader iterator that throws FetchFailedException if necessary
   
   ### Why are the changes needed?
   This is the second part of #477 
   After this commit, spark with rss could re-submit stages if there's rss fetch failure.
   
   ### Does this PR introduce _any_ user-facing change?
   Yes.
   A new configuration would be introduced, which would be covered in another PR.
   
   ### How was this patch tested?
   Added unit tests and integration tests.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #787: [#477][part-1] feat: support stage resubmit in spark clients

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on code in PR #787:
URL: https://github.com/apache/incubator-uniffle/pull/787#discussion_r1156928982


##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java:
##########
@@ -70,12 +72,18 @@
 import org.apache.uniffle.common.config.RssClientConf;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.rpc.GrpcServer;
 import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.common.util.RetryUtils;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.common.util.ThreadUtils;
+import org.apache.uniffle.shuffle.manager.RssShuffleManagerBase;
+import org.apache.uniffle.shuffle.manager.ShuffleManagerGrpcService;
+import org.apache.uniffle.shuffle.manager.ShuffleManagerServerFactory;
 
-public class RssShuffleManager implements ShuffleManager {
+import static org.apache.uniffle.common.config.RssBaseConf.RPC_SERVER_PORT;
+
+public class RssShuffleManager extends RssShuffleManagerBase implements ShuffleManager {

Review Comment:
   OK.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #787: [#477][part-1] feat: support stage resubmit in spark clients

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on code in PR #787:
URL: https://github.com/apache/incubator-uniffle/pull/787#discussion_r1156646606


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/reader/ShuffleFetchFailedWrapper.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.reader;
+
+import java.io.IOException;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.spark.shuffle.FetchFailedException;
+import org.apache.spark.shuffle.RssSparkShuffleUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Product2;
+import scala.collection.AbstractIterator;
+import scala.collection.Iterator;
+
+import org.apache.uniffle.client.api.ShuffleManagerClient;
+import org.apache.uniffle.client.factory.ShuffleManagerClientFactory;
+import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
+import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
+import org.apache.uniffle.common.ClientType;
+import org.apache.uniffle.common.exception.RssFetchFailedException;
+
+public class ShuffleFetchFailedWrapper {

Review Comment:
   The wrapper is too general. I don't know why we need a wrapper and what the function of this wrapper is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #787: [#477][part-1] feat: support stage resubmit in spark clients

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on PR #787:
URL: https://github.com/apache/incubator-uniffle/pull/787#issuecomment-1501298704

   Merged to master. thanks all. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #787: [#477][part-1] feat: support stage resubmit in spark clients

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #787:
URL: https://github.com/apache/incubator-uniffle/pull/787#discussion_r1156645546


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/reader/ShuffleFetchFailedWrapper.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.reader;
+
+import java.io.IOException;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.spark.shuffle.FetchFailedException;
+import org.apache.spark.shuffle.RssSparkShuffleUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Product2;
+import scala.collection.AbstractIterator;
+import scala.collection.Iterator;
+
+import org.apache.uniffle.client.api.ShuffleManagerClient;
+import org.apache.uniffle.client.factory.ShuffleManagerClientFactory;
+import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
+import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
+import org.apache.uniffle.common.ClientType;
+import org.apache.uniffle.common.exception.RssFetchFailedException;
+
+public class ShuffleFetchFailedWrapper {

Review Comment:
   > Could we make it become an interator?
   
   Maybe. Let me reconsider this part.
   
   > Wrapper make me confusing
   
   Why this would make you confusing?



##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java:
##########
@@ -70,12 +72,18 @@
 import org.apache.uniffle.common.config.RssClientConf;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.rpc.GrpcServer;
 import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.common.util.RetryUtils;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.common.util.ThreadUtils;
+import org.apache.uniffle.shuffle.manager.RssShuffleManagerBase;
+import org.apache.uniffle.shuffle.manager.ShuffleManagerGrpcService;
+import org.apache.uniffle.shuffle.manager.ShuffleManagerServerFactory;
 
-public class RssShuffleManager implements ShuffleManager {
+import static org.apache.uniffle.common.config.RssBaseConf.RPC_SERVER_PORT;
+
+public class RssShuffleManager extends RssShuffleManagerBase implements ShuffleManager {

Review Comment:
   You cannot make an interface to extends another interface?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #787: [#477][part-1] feat: support stage resubmit in spark clients

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #787:
URL: https://github.com/apache/incubator-uniffle/pull/787#discussion_r1156645968


##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java:
##########
@@ -317,6 +350,8 @@ public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, ShuffleDependency<
     }
     startHeartbeat();
 
+    shuffleIdToPartitionNum.putIfAbsent(shuffleId, dependency.partitioner().numPartitions());
+    shuffleIdToNumMapTasks.putIfAbsent(shuffleId, dependency.rdd().partitions().length);

Review Comment:
   yes.  You could refer how spark generates its numMaps in scheduler?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on pull request #787: [#477][part-1] feat: support stage resubmit in spark clients

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on PR #787:
URL: https://github.com/apache/incubator-uniffle/pull/787#issuecomment-1498508771

   @zuston @smallzhongfeng do you have any more comments?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #787: [#477][part-1] feat: support stage resubmit in spark clients

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on code in PR #787:
URL: https://github.com/apache/incubator-uniffle/pull/787#discussion_r1156652963


##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java:
##########
@@ -317,6 +350,8 @@ public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, ShuffleDependency<
     }
     startHeartbeat();
 
+    shuffleIdToPartitionNum.putIfAbsent(shuffleId, dependency.partitioner().numPartitions());
+    shuffleIdToNumMapTasks.putIfAbsent(shuffleId, dependency.rdd().partitions().length);

Review Comment:
   > yes. You could refer how spark generates its numMaps in scheduler?
   
   If we use limit operation, the part of partitions of rdd will only be executed. Is it right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #787: [#477][part-1] feat: support stage resubmit in spark clients

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on code in PR #787:
URL: https://github.com/apache/incubator-uniffle/pull/787#discussion_r1156846963


##########
integration-test/spark2/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java:
##########
@@ -0,0 +1 @@
+../../../../../../../../spark3/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java

Review Comment:
   What's this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #787: [#477][part-1] feat: support stage resubmit in spark clients

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on code in PR #787:
URL: https://github.com/apache/incubator-uniffle/pull/787#discussion_r1157007393


##########
integration-test/spark2/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java:
##########
@@ -0,0 +1 @@
+../../../../../../../../spark3/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java

Review Comment:
   > This is a soft link to Spark3. So we could update one file instead of two files.
   > 
   > They should be both compilable for Spark2 and Spark3/
   
   It's tricky. Could we add this test case to spark-common?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #787: [#477][part-1] feat: support stage resubmit in spark clients

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #787:
URL: https://github.com/apache/incubator-uniffle/pull/787#discussion_r1156781964


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/reader/ShuffleFetchFailedWrapper.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.reader;
+
+import java.io.IOException;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.spark.shuffle.FetchFailedException;
+import org.apache.spark.shuffle.RssSparkShuffleUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Product2;
+import scala.collection.AbstractIterator;
+import scala.collection.Iterator;
+
+import org.apache.uniffle.client.api.ShuffleManagerClient;
+import org.apache.uniffle.client.factory.ShuffleManagerClientFactory;
+import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
+import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
+import org.apache.uniffle.common.ClientType;
+import org.apache.uniffle.common.exception.RssFetchFailedException;
+
+public class ShuffleFetchFailedWrapper {

Review Comment:
   There might be other cases to wrap the shuffle data reader iterator. So it's called wrapper in the first place, other cases would simply follow this pattern.
   
   Otherwise, it would be chain and chain of iterator creations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #787: [#477][part-1] feat: support stage resubmit in spark clients

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #787:
URL: https://github.com/apache/incubator-uniffle/pull/787#discussion_r1156789841


##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java:
##########
@@ -317,6 +350,8 @@ public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, ShuffleDependency<
     }
     startHeartbeat();
 
+    shuffleIdToPartitionNum.putIfAbsent(shuffleId, dependency.partitioner().numPartitions());
+    shuffleIdToNumMapTasks.putIfAbsent(shuffleId, dependency.rdd().partitions().length);

Review Comment:
   > > yes. You could refer how spark generates its numMaps in scheduler?
   > 
   > If we use limit operation, the part of partitions of rdd will only be executed. Is it right?
   
   If you are referring `CollectLimitExec`, then that part of limit wouldn't create a shuffle dependency. 
   Otherwise, limit would just create an normal shuffle dependency like other operation did.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] codecov-commenter commented on pull request #787: [#477][part-1]feat: support stage resubmit in spark clients

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #787:
URL: https://github.com/apache/incubator-uniffle/pull/787#issuecomment-1491641340

   ## [Codecov](https://codecov.io/gh/apache/incubator-uniffle/pull/787?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#787](https://codecov.io/gh/apache/incubator-uniffle/pull/787?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (af125e4) into [master](https://codecov.io/gh/apache/incubator-uniffle/commit/ea5a3ba45b923b1afc8fda0fdb0546daf34274fb?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (ea5a3ba) will **decrease** coverage by `0.09%`.
   > The diff coverage is `100.00%`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master     #787      +/-   ##
   ============================================
   - Coverage     57.86%   57.78%   -0.09%     
   + Complexity     2028     1887     -141     
   ============================================
     Files           298      270      -28     
     Lines         14520    11744    -2776     
     Branches       1185     1124      -61     
   ============================================
   - Hits           8402     6786    -1616     
   + Misses         5643     4571    -1072     
   + Partials        475      387      -88     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-uniffle/pull/787?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...rg/apache/uniffle/client/util/RssClientConfig.java](https://codecov.io/gh/apache/incubator-uniffle/pull/787?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL2NsaWVudC91dGlsL1Jzc0NsaWVudENvbmZpZy5qYXZh) | `0.00% <ø> (ø)` | |
   | [...rg/apache/uniffle/common/config/RssClientConf.java](https://codecov.io/gh/apache/incubator-uniffle/pull/787?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL2NvbW1vbi9jb25maWcvUnNzQ2xpZW50Q29uZi5qYXZh) | `95.23% <100.00%> (+1.48%)` | :arrow_up: |
   
   ... and [30 files with indirect coverage changes](https://codecov.io/gh/apache/incubator-uniffle/pull/787/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #787: [#477][part-1] feat: support stage resubmit in spark clients

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #787:
URL: https://github.com/apache/incubator-uniffle/pull/787#discussion_r1159247232


##########
integration-test/spark2/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java:
##########
@@ -0,0 +1 @@
+../../../../../../../../spark3/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #787: [#477][part-1] feat: support stage resubmit in spark clients

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #787:
URL: https://github.com/apache/incubator-uniffle/pull/787#discussion_r1156638494


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java:
##########
@@ -226,4 +230,55 @@ public static Broadcast<ShuffleHandleInfo> broadcastShuffleHdlInfo(SparkContext
     ShuffleHandleInfo handleInfo = new ShuffleHandleInfo(shuffleId, partitionToServers, storageInfo);
     return sc.broadcast(handleInfo, SHUFFLE_HANDLER_INFO_CLASS_TAG);
   }
+
+  @SuppressFBWarnings("THROWS_METHOD_THROWS_RUNTIMEEXCEPTION")
+  private static <T> T instantiateFetchFailedException(
+      BlockManagerId dummy, int shuffleId, int mapIndex, int reduceId, Throwable cause) {
+    String className = FetchFailedException.class.getName();
+    T instance;
+    Class<?> klass;
+    try {
+      klass = Class.forName(className);
+    } catch (ClassNotFoundException e) {
+      // ever happens;
+      throw new RuntimeException(e);

Review Comment:
   I think a generic `RuntimeException` is preferred here. It's indeed not tightly related to rss. 
   
   Other systems also throws `RuntimeException` if appropriate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #787: [#477][part-1] feat: support stage resubmit in spark clients

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #787:
URL: https://github.com/apache/incubator-uniffle/pull/787#discussion_r1155908966


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/reader/ShuffleFetchFailedWrapper.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.reader;
+
+import java.io.IOException;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.spark.shuffle.FetchFailedException;
+import org.apache.spark.shuffle.RssSparkShuffleUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Product2;
+import scala.collection.AbstractIterator;
+import scala.collection.Iterator;
+
+import org.apache.uniffle.client.api.ShuffleManagerClient;
+import org.apache.uniffle.client.factory.ShuffleManagerClientFactory;
+import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
+import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
+import org.apache.uniffle.common.ClientType;
+import org.apache.uniffle.common.exception.RssFetchFailedException;
+
+public class ShuffleFetchFailedWrapper {

Review Comment:
   Do you have any suggestions? 
   
   It's not an iterator. it's an iterator wrapper, which creates an iterator. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #787: [#477][part-1] feat: support stage resubmit in spark clients

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on code in PR #787:
URL: https://github.com/apache/incubator-uniffle/pull/787#discussion_r1156128763


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/reader/ShuffleFetchFailedWrapper.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.reader;
+
+import java.io.IOException;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.spark.shuffle.FetchFailedException;
+import org.apache.spark.shuffle.RssSparkShuffleUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Product2;
+import scala.collection.AbstractIterator;
+import scala.collection.Iterator;
+
+import org.apache.uniffle.client.api.ShuffleManagerClient;
+import org.apache.uniffle.client.factory.ShuffleManagerClientFactory;
+import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
+import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
+import org.apache.uniffle.common.ClientType;
+import org.apache.uniffle.common.exception.RssFetchFailedException;
+
+public class ShuffleFetchFailedWrapper {

Review Comment:
   Could we make it become an interator?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #787: [#477][part-1] feat: support stage resubmit in spark clients

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #787:
URL: https://github.com/apache/incubator-uniffle/pull/787#discussion_r1156997682


##########
integration-test/spark2/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java:
##########
@@ -0,0 +1 @@
+../../../../../../../../spark3/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java

Review Comment:
   This is a soft link to Spark3.  So we could update one file instead of two files.
   
   They should be both compilable for Spark2 and Spark3/



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #787: [#477][part-1] feat: support stage resubmit in spark clients

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #787:
URL: https://github.com/apache/incubator-uniffle/pull/787#discussion_r1156821428


##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java:
##########
@@ -70,12 +72,18 @@
 import org.apache.uniffle.common.config.RssClientConf;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.rpc.GrpcServer;
 import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.common.util.RetryUtils;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.common.util.ThreadUtils;
+import org.apache.uniffle.shuffle.manager.RssShuffleManagerBase;
+import org.apache.uniffle.shuffle.manager.ShuffleManagerGrpcService;
+import org.apache.uniffle.shuffle.manager.ShuffleManagerServerFactory;
 
-public class RssShuffleManager implements ShuffleManager {
+import static org.apache.uniffle.common.config.RssBaseConf.RPC_SERVER_PORT;
+
+public class RssShuffleManager extends RssShuffleManagerBase implements ShuffleManager {

Review Comment:
   Tried. `RssShuffleMangerInterface` cannot extend Spark's `ShuffleManager`, which has in-compatible methods between Spark2 and Spark3.
   
   RssShuffleManagerBase is abstract and might implements ShuffleManager.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #787: [#477][part-1] feat: support stage resubmit in spark clients

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on code in PR #787:
URL: https://github.com/apache/incubator-uniffle/pull/787#discussion_r1155969978


##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java:
##########
@@ -70,12 +72,18 @@
 import org.apache.uniffle.common.config.RssClientConf;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.rpc.GrpcServer;
 import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.common.util.RetryUtils;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.common.util.ThreadUtils;
+import org.apache.uniffle.shuffle.manager.RssShuffleManagerBase;
+import org.apache.uniffle.shuffle.manager.ShuffleManagerGrpcService;
+import org.apache.uniffle.shuffle.manager.ShuffleManagerServerFactory;
 
-public class RssShuffleManager implements ShuffleManager {
+import static org.apache.uniffle.common.config.RssBaseConf.RPC_SERVER_PORT;
+
+public class RssShuffleManager extends RssShuffleManagerBase implements ShuffleManager {

Review Comment:
   Is it better if `RssShuffleMangerInterface` extend ShuffleManager.



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java:
##########
@@ -226,4 +230,55 @@ public static Broadcast<ShuffleHandleInfo> broadcastShuffleHdlInfo(SparkContext
     ShuffleHandleInfo handleInfo = new ShuffleHandleInfo(shuffleId, partitionToServers, storageInfo);
     return sc.broadcast(handleInfo, SHUFFLE_HANDLER_INFO_CLASS_TAG);
   }
+
+  @SuppressFBWarnings("THROWS_METHOD_THROWS_RUNTIMEEXCEPTION")
+  private static <T> T instantiateFetchFailedException(
+      BlockManagerId dummy, int shuffleId, int mapIndex, int reduceId, Throwable cause) {
+    String className = FetchFailedException.class.getName();
+    T instance;
+    Class<?> klass;
+    try {
+      klass = Class.forName(className);
+    } catch (ClassNotFoundException e) {
+      // ever happens;
+      throw new RuntimeException(e);
+    }
+    try {
+      instance = (T) klass
+          .getConstructor(dummy.getClass(), Integer.TYPE, Long.TYPE, Integer.TYPE, Integer.TYPE, Throwable.class)
+          .newInstance(dummy, shuffleId, (long) mapIndex, mapIndex, reduceId, cause);
+    } catch (NoSuchMethodException | IllegalAccessException
+        | IllegalArgumentException | InstantiationException
+        | InvocationTargetException e) { // anything goes wrong, fallback to the another constructor.
+      try {
+        instance = (T) klass
+            .getConstructor(dummy.getClass(), Integer.TYPE, Integer.TYPE, Integer.TYPE, Throwable.class)
+            .newInstance(dummy, shuffleId, mapIndex, reduceId, cause);
+      } catch (Exception ae) {
+        LOG.error("Fail to new instance.", ae);
+        throw new RuntimeException(ae);

Review Comment:
   ditto.



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java:
##########
@@ -226,4 +230,55 @@ public static Broadcast<ShuffleHandleInfo> broadcastShuffleHdlInfo(SparkContext
     ShuffleHandleInfo handleInfo = new ShuffleHandleInfo(shuffleId, partitionToServers, storageInfo);
     return sc.broadcast(handleInfo, SHUFFLE_HANDLER_INFO_CLASS_TAG);
   }
+
+  @SuppressFBWarnings("THROWS_METHOD_THROWS_RUNTIMEEXCEPTION")
+  private static <T> T instantiateFetchFailedException(
+      BlockManagerId dummy, int shuffleId, int mapIndex, int reduceId, Throwable cause) {
+    String className = FetchFailedException.class.getName();
+    T instance;
+    Class<?> klass;
+    try {
+      klass = Class.forName(className);
+    } catch (ClassNotFoundException e) {
+      // ever happens;
+      throw new RuntimeException(e);

Review Comment:
   `RuntimeException -> RssException`?



##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java:
##########
@@ -317,6 +350,8 @@ public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, ShuffleDependency<
     }
     startHeartbeat();
 
+    shuffleIdToPartitionNum.putIfAbsent(shuffleId, dependency.partitioner().numPartitions());
+    shuffleIdToNumMapTasks.putIfAbsent(shuffleId, dependency.rdd().partitions().length);

Review Comment:
   `rdd.partitions.length = NumMapTasks`?



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/reader/ShuffleFetchFailedWrapper.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.reader;
+
+import java.io.IOException;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.spark.shuffle.FetchFailedException;
+import org.apache.spark.shuffle.RssSparkShuffleUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Product2;
+import scala.collection.AbstractIterator;
+import scala.collection.Iterator;
+
+import org.apache.uniffle.client.api.ShuffleManagerClient;
+import org.apache.uniffle.client.factory.ShuffleManagerClientFactory;
+import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
+import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
+import org.apache.uniffle.common.ClientType;
+import org.apache.uniffle.common.exception.RssFetchFailedException;
+
+public class ShuffleFetchFailedWrapper {

Review Comment:
   Wrapper make me confusing although I don't have a better suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi merged pull request #787: [#477][part-1] feat: support stage resubmit in spark clients

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi merged PR #787:
URL: https://github.com/apache/incubator-uniffle/pull/787


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #787: [#477][part-1] feat: support stage resubmit in spark clients

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on code in PR #787:
URL: https://github.com/apache/incubator-uniffle/pull/787#discussion_r1156840532


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/reader/ShuffleFetchFailedWrapper.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.reader;
+
+import java.io.IOException;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.spark.shuffle.FetchFailedException;
+import org.apache.spark.shuffle.RssSparkShuffleUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Product2;
+import scala.collection.AbstractIterator;
+import scala.collection.Iterator;
+
+import org.apache.uniffle.client.api.ShuffleManagerClient;
+import org.apache.uniffle.client.factory.ShuffleManagerClientFactory;
+import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
+import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
+import org.apache.uniffle.common.ClientType;
+import org.apache.uniffle.common.exception.RssFetchFailedException;
+
+public class ShuffleFetchFailedWrapper {

Review Comment:
   OK, I got your point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #787: [#477][part-1] feat: support stage resubmit in spark clients

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on code in PR #787:
URL: https://github.com/apache/incubator-uniffle/pull/787#discussion_r1155884955


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/reader/ShuffleFetchFailedWrapper.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.reader;
+
+import java.io.IOException;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.spark.shuffle.FetchFailedException;
+import org.apache.spark.shuffle.RssSparkShuffleUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Product2;
+import scala.collection.AbstractIterator;
+import scala.collection.Iterator;
+
+import org.apache.uniffle.client.api.ShuffleManagerClient;
+import org.apache.uniffle.client.factory.ShuffleManagerClientFactory;
+import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
+import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
+import org.apache.uniffle.common.ClientType;
+import org.apache.uniffle.common.exception.RssFetchFailedException;
+
+public class ShuffleFetchFailedWrapper {

Review Comment:
   Could we give a better name? Is it a `iterator`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #787: [#477][part-1] feat: support stage resubmit in spark clients

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #787:
URL: https://github.com/apache/incubator-uniffle/pull/787#discussion_r1156639360


##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java:
##########
@@ -70,12 +72,18 @@
 import org.apache.uniffle.common.config.RssClientConf;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.rpc.GrpcServer;
 import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.common.util.RetryUtils;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.common.util.ThreadUtils;
+import org.apache.uniffle.shuffle.manager.RssShuffleManagerBase;
+import org.apache.uniffle.shuffle.manager.ShuffleManagerGrpcService;
+import org.apache.uniffle.shuffle.manager.ShuffleManagerServerFactory;
 
-public class RssShuffleManager implements ShuffleManager {
+import static org.apache.uniffle.common.config.RssBaseConf.RPC_SERVER_PORT;
+
+public class RssShuffleManager extends RssShuffleManagerBase implements ShuffleManager {

Review Comment:
   You cannot make a n interface to extends another interface?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on pull request #787: [#477][part-1] feat: support stage resubmit in spark clients

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on PR #787:
URL: https://github.com/apache/incubator-uniffle/pull/787#issuecomment-1494204085

   @zuston @jerqi please take a look when you have time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #787: [#477][part-1] feat: support stage resubmit in spark clients

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on code in PR #787:
URL: https://github.com/apache/incubator-uniffle/pull/787#discussion_r1156639315


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java:
##########
@@ -226,4 +230,55 @@ public static Broadcast<ShuffleHandleInfo> broadcastShuffleHdlInfo(SparkContext
     ShuffleHandleInfo handleInfo = new ShuffleHandleInfo(shuffleId, partitionToServers, storageInfo);
     return sc.broadcast(handleInfo, SHUFFLE_HANDLER_INFO_CLASS_TAG);
   }
+
+  @SuppressFBWarnings("THROWS_METHOD_THROWS_RUNTIMEEXCEPTION")
+  private static <T> T instantiateFetchFailedException(
+      BlockManagerId dummy, int shuffleId, int mapIndex, int reduceId, Throwable cause) {
+    String className = FetchFailedException.class.getName();
+    T instance;
+    Class<?> klass;
+    try {
+      klass = Class.forName(className);
+    } catch (ClassNotFoundException e) {
+      // ever happens;
+      throw new RuntimeException(e);

Review Comment:
   We can collect the RssExceptions to judge the failure jobs caused by rss.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #787: [#477][part-1] feat: support stage resubmit in spark clients

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on code in PR #787:
URL: https://github.com/apache/incubator-uniffle/pull/787#discussion_r1156643677


##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java:
##########
@@ -70,12 +72,18 @@
 import org.apache.uniffle.common.config.RssClientConf;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.rpc.GrpcServer;
 import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.common.util.RetryUtils;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.common.util.ThreadUtils;
+import org.apache.uniffle.shuffle.manager.RssShuffleManagerBase;
+import org.apache.uniffle.shuffle.manager.ShuffleManagerGrpcService;
+import org.apache.uniffle.shuffle.manager.ShuffleManagerServerFactory;
 
-public class RssShuffleManager implements ShuffleManager {
+import static org.apache.uniffle.common.config.RssBaseConf.RPC_SERVER_PORT;
+
+public class RssShuffleManager extends RssShuffleManagerBase implements ShuffleManager {

Review Comment:
   > You cannot make a n interface to extends another interface?
   
   Why?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #787: [#477][part-1] feat: support stage resubmit in spark clients

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #787:
URL: https://github.com/apache/incubator-uniffle/pull/787#discussion_r1156782393


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java:
##########
@@ -226,4 +230,55 @@ public static Broadcast<ShuffleHandleInfo> broadcastShuffleHdlInfo(SparkContext
     ShuffleHandleInfo handleInfo = new ShuffleHandleInfo(shuffleId, partitionToServers, storageInfo);
     return sc.broadcast(handleInfo, SHUFFLE_HANDLER_INFO_CLASS_TAG);
   }
+
+  @SuppressFBWarnings("THROWS_METHOD_THROWS_RUNTIMEEXCEPTION")
+  private static <T> T instantiateFetchFailedException(
+      BlockManagerId dummy, int shuffleId, int mapIndex, int reduceId, Throwable cause) {
+    String className = FetchFailedException.class.getName();
+    T instance;
+    Class<?> klass;
+    try {
+      klass = Class.forName(className);
+    } catch (ClassNotFoundException e) {
+      // ever happens;
+      throw new RuntimeException(e);

Review Comment:
   Fair point. Let's reconsider this part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #787: [#477][part-1] feat: support stage resubmit in spark clients

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #787:
URL: https://github.com/apache/incubator-uniffle/pull/787#discussion_r1156782951


##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java:
##########
@@ -70,12 +72,18 @@
 import org.apache.uniffle.common.config.RssClientConf;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.rpc.GrpcServer;
 import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.common.util.RetryUtils;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.common.util.ThreadUtils;
+import org.apache.uniffle.shuffle.manager.RssShuffleManagerBase;
+import org.apache.uniffle.shuffle.manager.ShuffleManagerGrpcService;
+import org.apache.uniffle.shuffle.manager.ShuffleManagerServerFactory;
 
-public class RssShuffleManager implements ShuffleManager {
+import static org.apache.uniffle.common.config.RssBaseConf.RPC_SERVER_PORT;
+
+public class RssShuffleManager extends RssShuffleManagerBase implements ShuffleManager {

Review Comment:
   Never mind. I used to think that we cannot extends an interface with another interface.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org