You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@celeborn.apache.org by GitBox <gi...@apache.org> on 2022/11/30 10:37:45 UTC

[GitHub] [incubator-celeborn] nafiyAix opened a new pull request, #1031: [CELEBORN-86][REFATCOR] Register shuffle should have separated timeout configuration

nafiyAix opened a new pull request, #1031:
URL: https://github.com/apache/incubator-celeborn/pull/1031

   ### What changes were proposed in this pull request?
   Register shuffle should have separated timeout configuration
   
   ### Why are the changes needed?
   To ensure we can customise the register shuffle timeout and make it be greater than the reserve slot timeout.
   
   ### What are the items that need reviewer attention?
   
   
   ### Related issues.
   [CELEBORN-86](https://issues.apache.org/jira/browse/CELEBORN-86)
   
   ### Related pull requests.
   
   
   ### How was this patch tested?
   
   
   /cc @AngersZhuuuu 
   
   /assign @main-reviewer
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] AngersZhuuuu commented on a diff in pull request #1031: [CELEBORN-86][REFATCOR] Register shuffle should have separated timeout configuration

Posted by GitBox <gi...@apache.org>.
AngersZhuuuu commented on code in PR #1031:
URL: https://github.com/apache/incubator-celeborn/pull/1031#discussion_r1036929355


##########
client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java:
##########
@@ -182,13 +179,13 @@ private CelebornConf setupEnv(CompressionCodec codec) throws IOException, Interr
     shuffleClient = new ShuffleClientImpl(conf, new UserIdentifier("mock", "mock"));
 
     masterLocation.setPeer(slaveLocation);
-    when(endpointRef.askSync(
-            RegisterShuffle$.MODULE$.apply(TEST_APPLICATION_ID, TEST_SHUFFLE_ID, 1, 1),
-            ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)))
+    when(endpointRef.askSync(any(), any(), any()))
         .thenAnswer(
-            t ->
-                RegisterShuffleResponse$.MODULE$.apply(
-                    StatusCode.SUCCESS, new PartitionLocation[] {masterLocation}));
+            t -> {
+              System.out.println("Return");

Review Comment:
   Remove



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] AngersZhuuuu commented on a diff in pull request #1031: [CELEBORN-86][REFATCOR] Register shuffle should have separated timeout configuration

Posted by GitBox <gi...@apache.org>.
AngersZhuuuu commented on code in PR #1031:
URL: https://github.com/apache/incubator-celeborn/pull/1031#discussion_r1036638021


##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -376,10 +376,16 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
   def rpcConnectThreads: Int = get(RPC_CONNECT_THREADS)
   def rpcLookupTimeout: RpcTimeout =
     new RpcTimeout(get(RPC_LOOKUP_TIMEOUT).milli, RPC_LOOKUP_TIMEOUT.key)
-  def rpcAskTimeout: RpcTimeout =
-    new RpcTimeout(get(RPC_ASK_TIMEOUT).milli, RPC_ASK_TIMEOUT.key)
-  def clientRpcAskTimeout: RpcTimeout =
-    new RpcTimeout(get(CLIENT_RPC_ASK_TIMEOUT).milli, CLIENT_RPC_ASK_TIMEOUT.key)
+  def rpcAskTimeout(func: String = "default"): RpcTimeout = {
+    if (func == "client") {
+      HARpcAskTimeout
+    } else {
+      val key = RPC_ASK_TIMEOUT.key.replace("<func>", func)
+      new RpcTimeout(getTimeAsMs(key, NETWORK_TIMEOUT.defaultValueString).milli, key)
+    }
+  }
+  def HARpcAskTimeout: RpcTimeout =

Review Comment:
   haRpcTimeout



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] AngersZhuuuu commented on a diff in pull request #1031: [CELEBORN-86][REFATCOR] Register shuffle should have separated timeout configuration

Posted by GitBox <gi...@apache.org>.
AngersZhuuuu commented on code in PR #1031:
URL: https://github.com/apache/incubator-celeborn/pull/1031#discussion_r1036635382


##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -376,10 +376,16 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
   def rpcConnectThreads: Int = get(RPC_CONNECT_THREADS)
   def rpcLookupTimeout: RpcTimeout =
     new RpcTimeout(get(RPC_LOOKUP_TIMEOUT).milli, RPC_LOOKUP_TIMEOUT.key)
-  def rpcAskTimeout: RpcTimeout =
-    new RpcTimeout(get(RPC_ASK_TIMEOUT).milli, RPC_ASK_TIMEOUT.key)
-  def clientRpcAskTimeout: RpcTimeout =
-    new RpcTimeout(get(CLIENT_RPC_ASK_TIMEOUT).milli, CLIENT_RPC_ASK_TIMEOUT.key)
+  def rpcAskTimeout(func: String = "default"): RpcTimeout = {
+    if (func == "client") {
+      HARpcAskTimeout
+    } else {
+      val key = RPC_ASK_TIMEOUT.key.replace("<func>", func)
+      new RpcTimeout(getTimeAsMs(key, NETWORK_TIMEOUT.defaultValueString).milli, key)
+    }
+  }

Review Comment:
   We can implement
   ```
   def registerShuffleTimeout: RpcTimeOut
   def rpcAskTimeout...
   ```



##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -1002,17 +1008,17 @@ object CelebornConf extends Logging {
       .createWithDefaultString("30s")
 
   val RPC_ASK_TIMEOUT: ConfigEntry[Long] =
-    buildConf("celeborn.rpc.askTimeout")
+    buildConf("celeborn.<func>.rpc.askTimeout")
       .withAlternative("rss.rpc.askTimeout")
       .categories("network")
       .version("0.2.0")
       .doc("Timeout for RPC ask operations.")
       .fallbackConf(NETWORK_TIMEOUT)
 
-  val CLIENT_RPC_ASK_TIMEOUT: ConfigEntry[Long] =
-    buildConf("celeborn.client.rpc.askTimeout")
+  val HA_RPC_ASK_TIMEOUT: ConfigEntry[Long] =
+    buildConf("celeborn.ha.rpc.askTimeout")

Review Comment:
   Do this refactor in a separated PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] AngersZhuuuu commented on a diff in pull request #1031: [CELEBORN-86][REFATCOR] Register shuffle should have separated timeout configuration

Posted by GitBox <gi...@apache.org>.
AngersZhuuuu commented on code in PR #1031:
URL: https://github.com/apache/incubator-celeborn/pull/1031#discussion_r1036769678


##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -273,6 +273,7 @@ private ConcurrentHashMap<Integer, PartitionLocation> registerShuffle(
         () ->
             driverRssMetaService.askSync(
                 RegisterShuffle$.MODULE$.apply(appId, shuffleId, numMappers, numPartitions),
+                conf.registerShuffleRpcAskTimeout(),

Review Comment:
   Also need to change `registerMapPartitionTask`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] AngersZhuuuu commented on a diff in pull request #1031: [CELEBORN-86][REFATCOR] Register shuffle should have separated timeout configuration

Posted by GitBox <gi...@apache.org>.
AngersZhuuuu commented on code in PR #1031:
URL: https://github.com/apache/incubator-celeborn/pull/1031#discussion_r1036734907


##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -1017,6 +1019,13 @@ object CelebornConf extends Logging {
       .doc("Timeout for client RPC ask operations.")
       .fallbackConf(NETWORK_TIMEOUT)
 
+  val REGISTER_SHUFFLE_RPC_ASK_TIMEOUT: ConfigEntry[Long] =
+    buildConf("celeborn.rpc.registerShuffle.askTimeout")
+      .categories("network")
+      .version("0.2.0")
+      .doc("Timeout for ask operations during register shuffle.")
+      .fallbackConf(NETWORK_TIMEOUT)

Review Comment:
   RPC_ASK_TIMEOUT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] AngersZhuuuu merged pull request #1031: [CELEBORN-86][REFATCOR] Register shuffle should have separated timeout configuration

Posted by GitBox <gi...@apache.org>.
AngersZhuuuu merged PR #1031:
URL: https://github.com/apache/incubator-celeborn/pull/1031


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org