You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@uniffle.apache.org by "zjf2012 (via GitHub)" <gi...@apache.org> on 2023/02/21 09:31:45 UTC

[GitHub] [incubator-uniffle] zjf2012 opened a new pull request, #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' fro…

zjf2012 opened a new pull request, #637:
URL: https://github.com/apache/incubator-uniffle/pull/637

   …m RssShuffleHandle
   
   ### What changes were proposed in this pull request?
   
   move partition -> shuffle servers mapping from direct field of RssShuffleHandle to a broadcast variable to reduce task binary size.
   
   ### Why are the changes needed?
   
   to reduce task delay and task serialize/deserialize time by reduce task binary size
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   ### How was this patch tested?
   tested with 10000 partitions shuffle
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zjf2012 commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "zjf2012 (via GitHub)" <gi...@apache.org>.
zjf2012 commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1119518435


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssShuffleHandle.java:
##########
@@ -69,19 +69,56 @@ public ShuffleDependency<K, V, C> getDependency() {
     return dependency;
   }
 
-  public Map<Integer, List<ShuffleServerInfo>> getPartitionToServers() {
-    return partitionToServers;
-  }
-
   public int getShuffleId() {
     return shuffleId();
   }
 
-  public Set<ShuffleServerInfo> getShuffleServersForData() {
-    return shuffleServersForData;
+  /**
+   * Tried to get cached {@link ShuffleHandleInfo} from local thread first and then memory if not existing in local.
+   * If not cached, one of competing threads gets chance to deserialize it and caches it for other threads.
+   *
+   * @return
+   */
+  private ShuffleHandleInfo getCurrentHandleInfo() {
+    // local first
+    ShuffleHandleInfo info = _localHandleInfo.get().get();

Review Comment:
   yes, I deserialize here because I use the same way as Spark broadcasts task which get serialized first by Java serializer into bytearray then by KyroSerializer in TorrentBroadcast.  
   
   Spark already registered bytearry to KyroSerializer. 
   
   I think it's not user friend if we ask they register RssShuffleHandle class.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' fro…

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1113076403


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssShuffleHandle.java:
##########
@@ -69,19 +58,15 @@ public ShuffleDependency<K, V, C> getDependency() {
     return dependency;
   }
 
-  public Map<Integer, List<ShuffleServerInfo>> getPartitionToServers() {
-    return partitionToServers;
-  }
-
   public int getShuffleId() {
     return shuffleId();
   }
 
-  public Set<ShuffleServerInfo> getShuffleServersForData() {

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zjf2012 commented on pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' fro…

Posted by "zjf2012 (via GitHub)" <gi...@apache.org>.
zjf2012 commented on PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#issuecomment-1443198393

   > > Please see vanished task deserialization time with patch from https://docs.google.com/document/d/1TZ-3Mgwj9j7n1mMyCrS3sskFv_uOtUtXl1oF9Y_oMOw/edit?usp=sharing.
   > 
   > It seems that we don't have authority.
   
   I just changed the permission. please try again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zjf2012 commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "zjf2012 (via GitHub)" <gi...@apache.org>.
zjf2012 commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1118202874


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssShuffleHandle.java:
##########
@@ -69,19 +69,56 @@ public ShuffleDependency<K, V, C> getDependency() {
     return dependency;
   }
 
-  public Map<Integer, List<ShuffleServerInfo>> getPartitionToServers() {
-    return partitionToServers;
-  }
-
   public int getShuffleId() {
     return shuffleId();
   }
 
-  public Set<ShuffleServerInfo> getShuffleServersForData() {
-    return shuffleServersForData;
+  /**
+   * Tried to get cached {@link ShuffleHandleInfo} from local thread first and then memory if not existing in local.
+   * If not cached, one of competing threads gets chance to deserialize it and caches it for other threads.
+   *
+   * @return
+   */
+  private ShuffleHandleInfo getCurrentHandleInfo() {
+    // local first
+    ShuffleHandleInfo info = _localHandleInfo.get().get();

Review Comment:
   You are correct. Thanks for pointing it out. Let me fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1121931888


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssShuffleHandle.java:
##########
@@ -69,19 +69,56 @@ public ShuffleDependency<K, V, C> getDependency() {
     return dependency;
   }
 
-  public Map<Integer, List<ShuffleServerInfo>> getPartitionToServers() {
-    return partitionToServers;
-  }
-
   public int getShuffleId() {
     return shuffleId();
   }
 
-  public Set<ShuffleServerInfo> getShuffleServersForData() {
-    return shuffleServersForData;
+  /**
+   * Tried to get cached {@link ShuffleHandleInfo} from local thread first and then memory if not existing in local.
+   * If not cached, one of competing threads gets chance to deserialize it and caches it for other threads.
+   *
+   * @return
+   */
+  private ShuffleHandleInfo getCurrentHandleInfo() {
+    // local first
+    ShuffleHandleInfo info = _localHandleInfo.get().get();

Review Comment:
   Sorry for the late reply.
   
   In my experience, `spark.kryo.registerRequired` is disabled by default and normally is not set to true. 
   If it's explicitly set to true, I thought it won't be too much trouble ask users to register `RssShuffleHandle`.
   
   That's just my two cents. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1121241900


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java:
##########
@@ -190,4 +200,33 @@ public static int getRequiredShuffleServerNumber(SparkConf sparkConf) {
     int taskConcurrencyPerServer = sparkConf.get(RssSparkConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER);
     return (int) Math.ceil(estimateTaskConcurrency * 1.0 / taskConcurrencyPerServer);
   }
+
+  /**
+   * Get current active {@link SparkContext}. It should be called inside Driver since we don't mean to create any
+   * new {@link SparkContext} here.
+   *
+   * Note: We could use "SparkContext.getActive()" instead of "SparkContext.getOrCreate()" if the "getActive" method
+   * is not declared as package private in Scala.
+   * @return Active SparkContext created by Driver.
+   */
+  public static SparkContext getActiveSparkContext() {
+    return SparkContext.getOrCreate();
+  }
+
+  /**
+   * create broadcast variable of {@link ShuffleHandleInfo}
+   *
+   * @param sc                    expose for easy unit-test
+   * @param shuffleId
+   * @param partitionToServers
+   * @param storageInfo
+   * @return Broadcast variable registered for auto cleanup
+   */
+  public static Broadcast<byte[]> createPartShuffleServerMap(SparkContext sc, int shuffleId,
+      Map<Integer, List<ShuffleServerInfo>> partitionToServers, RemoteStorageInfo storageInfo) {
+    ShuffleHandleInfo partServerMap = new ShuffleHandleInfo(shuffleId, partitionToServers, storageInfo);

Review Comment:
   `partServerMap` should be changed  to a better name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zjf2012 commented on pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' fro…

Posted by "zjf2012 (via GitHub)" <gi...@apache.org>.
zjf2012 commented on PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#issuecomment-1439313903

   > The code lgtm overall. some minor questions:
   > 
   > > **How was this patch tested?**
   > > tested with 10000 partitions shuffle
   > 
   > Do you have some data about the serialized task before and after, is there any slowdown when using broadcast and how many shuffle servers in your environments?
   > 
   > And just to be safe, how much the size of broadcast is occupied when your 10000 partition test? Just to make sure it don't bring too much memory pressure to driver.
   
   For now, I don't have more servers. So, I only use two shuffle servers. Before my optimization, both map task and reduce task have more than 670KB binary size. After optimization, they reduce to less than 6KB. It's dramatic.
   
   Broadcast uses bittorrent-like way to distribute variable to each executor once. Executors can get some chunk of broadcast variable from other executors instead of all from driver. And task serialize/deserialize time drops a lot. So in theory, it has no way to slow down job.
   
   The size of the broadcast should be less than 670KB deduced from above statement. I'll try to capture it today.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zjf2012 commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "zjf2012 (via GitHub)" <gi...@apache.org>.
zjf2012 commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1130286792


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssShuffleHandle.java:
##########
@@ -32,29 +37,27 @@
   private String appId;
   private int numMaps;
   private ShuffleDependency<K, V, C> dependency;
-  private Map<Integer, List<ShuffleServerInfo>> partitionToServers;
-  // shuffle servers which is for store shuffle data
-  private Set<ShuffleServerInfo> shuffleServersForData;
-  // remoteStorage used for this job
-  private RemoteStorageInfo remoteStorage;
+  private Broadcast<byte[]> handlerInfoBytesBd;
+
+  // shuffle ID to ShuffleIdRef
+  // ShuffleIdRef acts as strong reference to prevent cached ShuffleHandleInfo being GCed during shuffle
+  // ShuffleIdRef will be removed when unregisterShuffle()
+  private static Map<Integer, ShuffleIdRef> _globalShuffleIdRefMap = new ConcurrentHashMap<>();

Review Comment:
   yes. it's just a convention to denote special variable. If it's violate uniffle style, I can change it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1133491980


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java:
##########
@@ -27,22 +27,31 @@
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
+import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.deploy.SparkHadoopUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.reflect.ClassTag;

Review Comment:
   Do you think it's better to make scala imports a separated group?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng merged pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng merged PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zjf2012 commented on pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "zjf2012 (via GitHub)" <gi...@apache.org>.
zjf2012 commented on PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#issuecomment-1443202413

   > > …m RssShuffleHandle
   > > ### What changes were proposed in this pull request?
   > > move partition -> shuffle servers mapping from direct field of RssShuffleHandle to a broadcast variable to reduce task binary size.
   > > ### Why are the changes needed?
   > > to reduce task delay and task serialize/deserialize time by reduce task binary size
   > > ### Does this PR introduce _any_ user-facing change?
   > > No.
   > > ### How was this patch tested?
   > > 
   > > 1. tested with 10000 partitions shuffle. Task binary size reduced from more than 670KB to less than 6KB.
   > > 2. tested with multiple shuffle stages in same job to verify ShuffleHandleInfo cache logic
   > 
   > Could you modify the title and description? It seems that the title is too long, some words of title were added to description.
   
   done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' fro…

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#issuecomment-1443192560

   > Please see vanished task deserialization time with patch from https://docs.google.com/document/d/1TZ-3Mgwj9j7n1mMyCrS3sskFv_uOtUtXl1oF9Y_oMOw/edit?usp=sharing.
   
   It seems that we don't have authority.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' fro…

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#issuecomment-1438519543

   It's OK for me for the basic logic. You can deal with code style and ci first. @zjf2012 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' fro…

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#issuecomment-1439371181

   > > Will the broadcast be cleaned when the shuffle is removed?
   > 
   > yes. it's registered for cleanup when sparkcontext creates a new broadcast variable. However, the cleaner may not clean it immediately just like below unregisterShuffle method. The cleaner gets chance to capture them (shuffle, broadcast...) only after GC happened. This behavior is good for this broadcast variable since it's small anyway. Will it cause problem in below code snippet? shuffle server may not get cleaned up quickly if there is no right GC in driver.
   > 
   > `@Override public boolean unregisterShuffle(int shuffleId) { try { if (SparkEnv.get().executorId().equals("driver")) { shuffleWriteClient.unregisterShuffle(appId, shuffleId); } } catch (Exception e) { LOG.warn("Errors on unregister to remote shuffle-servers", e); } return true; }`
   
   Ok


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zjf2012 commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "zjf2012 (via GitHub)" <gi...@apache.org>.
zjf2012 commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1122549413


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java:
##########
@@ -190,4 +200,33 @@ public static int getRequiredShuffleServerNumber(SparkConf sparkConf) {
     int taskConcurrencyPerServer = sparkConf.get(RssSparkConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER);
     return (int) Math.ceil(estimateTaskConcurrency * 1.0 / taskConcurrencyPerServer);
   }
+
+  /**
+   * Get current active {@link SparkContext}. It should be called inside Driver since we don't mean to create any
+   * new {@link SparkContext} here.
+   *
+   * Note: We could use "SparkContext.getActive()" instead of "SparkContext.getOrCreate()" if the "getActive" method
+   * is not declared as package private in Scala.
+   * @return Active SparkContext created by Driver.
+   */
+  public static SparkContext getActiveSparkContext() {
+    return SparkContext.getOrCreate();
+  }
+
+  /**
+   * create broadcast variable of {@link ShuffleHandleInfo}
+   *
+   * @param sc                    expose for easy unit-test
+   * @param shuffleId
+   * @param partitionToServers
+   * @param storageInfo
+   * @return Broadcast variable registered for auto cleanup
+   */
+  public static Broadcast<byte[]> createPartShuffleServerMap(SparkContext sc, int shuffleId,
+      Map<Integer, List<ShuffleServerInfo>> partitionToServers, RemoteStorageInfo storageInfo) {
+    ShuffleHandleInfo partServerMap = new ShuffleHandleInfo(shuffleId, partitionToServers, storageInfo);

Review Comment:
   ok, I get you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zjf2012 commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "zjf2012 (via GitHub)" <gi...@apache.org>.
zjf2012 commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1116803431


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssShuffleHandle.java:
##########
@@ -69,19 +69,56 @@ public ShuffleDependency<K, V, C> getDependency() {
     return dependency;
   }
 
-  public Map<Integer, List<ShuffleServerInfo>> getPartitionToServers() {
-    return partitionToServers;
-  }
-
   public int getShuffleId() {
     return shuffleId();
   }
 
-  public Set<ShuffleServerInfo> getShuffleServersForData() {
-    return shuffleServersForData;
+  /**
+   * Tried to get cached {@link ShuffleHandleInfo} from local thread first and then memory if not existing in local.
+   * If not cached, one of competing threads gets chance to deserialize it and caches it for other threads.
+   *
+   * @return
+   */
+  private ShuffleHandleInfo getCurrentHandleInfo() {
+    // local first
+    ShuffleHandleInfo info = _localHandleInfo.get().get();

Review Comment:
   They are for caching deserialized HandleInfo for the current active shuffle.  All tasks pertaining to a shuffle must be completed before running another shuffle in next stage. So no two shuffles are at same time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#issuecomment-1455395237

   > > > @advancedxy @xianjingfeng Do you have another suggestion?
   > > 
   > > 
   > > I'm not sure, this pr introduced some quite complex logic to broadcast shuffle handle info.
   > > If I was implementing this feature, I would just use Kryo by default, and in RSSShuffleManager indicating users to either turn off `spark.kryo.registerRequired` (which is explicitly set by user) or manually register RssShuffleHandle.
   > 
   > Just make sure we are on the same page. Only registering RssShuffleHandle to kryo serializer doesn't help resolve this issue. Each task will still have more than 670KB size in binary. for 10000 partitions job. And each task will have quite noticeable deserialization time to repeatedly deserialize partition -> shuffle server mappings as shown in https://docs.google.com/document/d/1TZ-3Mgwj9j7n1mMyCrS3sskFv_uOtUtXl1oF9Y_oMOw/edit?usp=sharing.
   > 
   > RssShuffleHandle is a field of ShuffleDependency which is serialized to task binary in below code in DAGScheduler. Without broadcast of ShuffleHandleInfo, it's hard to pull it out and avoid repeat of ShuffleHandleInfo.
   > 
   > ` RDDCheckpointData.synchronized { taskBinaryBytes = stage match { case stage: ShuffleMapStage => JavaUtils.bufferToArray( closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef)) case stage: ResultStage => JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef)) }
   > 
   > ```
   >     partitions = stage.rdd.partitions
   >   }`
   > ```
   > 
   > I think broadcast itself is quite simple and efficient. I don't see other better alternatives.
   
   Sorry I wasn't clear enough. I agree broadcast is the best way to go.  I'm just not sure that we should broadcast binary rather than just broadcast the shuffle handle info simply. The only problem that we don't broadcast shuffle handle info directly is that once `spark.kryo.registerRequired`(which is false by default) is true, the job will fail. To avoid this problem, we introduce quite some complex logic to deserialize and cache and weak ref etc. I'm wondering if that's worth it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#issuecomment-1461376706

   <img width="862" alt="image" src="https://user-images.githubusercontent.com/807537/223939198-ebe0e83c-b858-492c-b0b3-58823a581cb3.png">
   
   Hi, @zjf2012 seems that the majority of us think it's better to simply broadcast RssShuffleHandle object here, requires that `spark.kryo.registerRequired=false`. Would you mind to change it back?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zjf2012 commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "zjf2012 (via GitHub)" <gi...@apache.org>.
zjf2012 commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1116696515


##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java:
##########
@@ -318,12 +319,13 @@ public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, ShuffleDependency<
     if (dependency.partitioner().numPartitions() == 0) {
       LOG.info("RegisterShuffle with ShuffleId[" + shuffleId + "], partitionNum is 0, "
           + "return the empty RssShuffleHandle directly");
+      Broadcast<byte[]> ptsBd = RssSparkShuffleUtils.createPartShuffleServerMap(SparkContext.getOrCreate(),

Review Comment:
   yes, I tried. But we cannot call 'SparkContext.getActive()' method from Java since it's package private in scala. And "registerShuffle" is called in Driver only after SparkContext was created. So we should be safe here and always get the active SparkContext.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1116822065


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssShuffleHandle.java:
##########
@@ -69,19 +69,56 @@ public ShuffleDependency<K, V, C> getDependency() {
     return dependency;
   }
 
-  public Map<Integer, List<ShuffleServerInfo>> getPartitionToServers() {
-    return partitionToServers;
-  }
-
   public int getShuffleId() {
     return shuffleId();
   }
 
-  public Set<ShuffleServerInfo> getShuffleServersForData() {
-    return shuffleServersForData;
+  /**
+   * Tried to get cached {@link ShuffleHandleInfo} from local thread first and then memory if not existing in local.
+   * If not cached, one of competing threads gets chance to deserialize it and caches it for other threads.
+   *
+   * @return
+   */
+  private ShuffleHandleInfo getCurrentHandleInfo() {
+    // local first
+    ShuffleHandleInfo info = _localHandleInfo.get().get();

Review Comment:
   We can run two shuffles at the same time, such as join operation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1116688514


##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java:
##########
@@ -318,12 +319,13 @@ public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, ShuffleDependency<
     if (dependency.partitioner().numPartitions() == 0) {
       LOG.info("RegisterShuffle with ShuffleId[" + shuffleId + "], partitionNum is 0, "
           + "return the empty RssShuffleHandle directly");
+      Broadcast<byte[]> ptsBd = RssSparkShuffleUtils.createPartShuffleServerMap(SparkContext.getOrCreate(),

Review Comment:
   `SparkContext.getActive()`?
   If we don't have SparkContext, we should throw an exception instead of creating a SparkContext.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' fro…

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1113071138


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/PartitionShuffleServerMap.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+
+import org.apache.uniffle.common.ShuffleServerInfo;
+
+/**
+ * Class for holding partition ID -> shuffle servers mapping.
+ * It's to be broadcast to executors and referenced by shuffle tasks.
+ */
+public class PartitionShuffleServerMap {

Review Comment:
   This class may store other information, such as `RemoteStorageInfo` .  I think we should use a better name. ShuffleHandlerInfo? I hope you have a better idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zjf2012 commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "zjf2012 (via GitHub)" <gi...@apache.org>.
zjf2012 commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1122540084


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java:
##########
@@ -190,4 +200,33 @@ public static int getRequiredShuffleServerNumber(SparkConf sparkConf) {
     int taskConcurrencyPerServer = sparkConf.get(RssSparkConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER);
     return (int) Math.ceil(estimateTaskConcurrency * 1.0 / taskConcurrencyPerServer);
   }
+
+  /**
+   * Get current active {@link SparkContext}. It should be called inside Driver since we don't mean to create any
+   * new {@link SparkContext} here.
+   *
+   * Note: We could use "SparkContext.getActive()" instead of "SparkContext.getOrCreate()" if the "getActive" method
+   * is not declared as package private in Scala.
+   * @return Active SparkContext created by Driver.
+   */
+  public static SparkContext getActiveSparkContext() {
+    return SparkContext.getOrCreate();
+  }
+
+  /**
+   * create broadcast variable of {@link ShuffleHandleInfo}
+   *
+   * @param sc                    expose for easy unit-test
+   * @param shuffleId
+   * @param partitionToServers
+   * @param storageInfo
+   * @return Broadcast variable registered for auto cleanup
+   */
+  public static Broadcast<byte[]> createPartShuffleServerMap(SparkContext sc, int shuffleId,
+      Map<Integer, List<ShuffleServerInfo>> partitionToServers, RemoteStorageInfo storageInfo) {
+    ShuffleHandleInfo partServerMap = new ShuffleHandleInfo(shuffleId, partitionToServers, storageInfo);

Review Comment:
   I changed it to broadcastShuffleHdlInfo. Please double-check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zjf2012 commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "zjf2012 (via GitHub)" <gi...@apache.org>.
zjf2012 commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1122553996


##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java:
##########
@@ -318,12 +318,14 @@ public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, ShuffleDependency<
     if (dependency.partitioner().numPartitions() == 0) {
       LOG.info("RegisterShuffle with ShuffleId[" + shuffleId + "], partitionNum is 0, "
           + "return the empty RssShuffleHandle directly");
+      Broadcast<byte[]> ptsBd = RssSparkShuffleUtils.broadcastShuffleHdlInfo(

Review Comment:
   all done. thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#issuecomment-1455076963

   > @advancedxy @xianjingfeng Do you have another suggestion?
   
   I'm not sure, this pr introduced some quite complex logic to broadcast shuffle handle info. 
   
   If I was implementing this feature, I would just use Kryo by default, and in RSSShuffleManager indicating users to either 
   turn off `spark.kryo.registerRequired` (which is explicitly set by user) or manually register RssShuffleHandle.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zjf2012 commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' fro…

Posted by "zjf2012 (via GitHub)" <gi...@apache.org>.
zjf2012 commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1113730069


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java:
##########
@@ -190,4 +196,18 @@ public static int getRequiredShuffleServerNumber(SparkConf sparkConf) {
     int taskConcurrencyPerServer = sparkConf.get(RssSparkConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER);
     return (int) Math.ceil(estimateTaskConcurrency * 1.0 / taskConcurrencyPerServer);
   }
+
+  /**
+   * create broadcast variable for partition -> shuffleserver map
+   * make sure it's called in JVM with active SparkContext created
+   *
+   * @param partitionToServers
+   * @return Broadcast variable registered for auto cleanup
+   */
+  public static Broadcast<PartitionShuffleServerMap> createPartShuffleServerMap(

Review Comment:
   I agree.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zjf2012 commented on pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' fro…

Posted by "zjf2012 (via GitHub)" <gi...@apache.org>.
zjf2012 commented on PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#issuecomment-1439341813

   > Will the broadcast be cleaned when the shuffle is removed?
   
   yes. it's registered for cleanup when sparkcontext creates a new broadcast variable. However, the cleaner may not clean it immediately just like below unregisterShuffle method. The cleaner gets chance to capture them (shuffle, broadcast...) only after GC happened. This behavior is good for this broadcast variable since it's small anyway. Will it cause problem in below code snippet? shuffle server may not get cleaned up quickly if there is no right GC in driver.
   
   `@Override
     public boolean unregisterShuffle(int shuffleId) {
       try {
         if (SparkEnv.get().executorId().equals("driver")) {
           shuffleWriteClient.unregisterShuffle(appId, shuffleId);
         }
       } catch (Exception e) {
         LOG.warn("Errors on unregister to remote shuffle-servers", e);
       }
       return true;
     }`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' fro…

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#issuecomment-1438284523

   PTAL @xianjingfeng first. I will review it later tonight.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' fro…

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#issuecomment-1438703401

   The code lgtm  overall. some minor questions:
   
   > **How was this patch tested?**
   tested with 10000 partitions shuffle
   
   Do you have some data about the serialized task before and after, is there any slowdown when using broadcast and how many shuffle servers in your environments?
   
   And just to be safe,  how much the size of broadcast is occupied when your 10000 partition test? Just to make sure it don't bring too much memory pressure to driver.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zjf2012 commented on pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "zjf2012 (via GitHub)" <gi...@apache.org>.
zjf2012 commented on PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#issuecomment-1455397266

   > > > > @advancedxy @xianjingfeng Do you have another suggestion?
   > > > 
   > > > 
   > > > I'm not sure, this pr introduced some quite complex logic to broadcast shuffle handle info.
   > > > If I was implementing this feature, I would just use Kryo by default, and in RSSShuffleManager indicating users to either turn off `spark.kryo.registerRequired` (which is explicitly set by user) or manually register RssShuffleHandle.
   > > 
   > > 
   > > Just make sure we are on the same page. Only registering RssShuffleHandle to kryo serializer doesn't help resolve this issue. Each task will still have more than 670KB size in binary. for 10000 partitions job. And each task will have quite noticeable deserialization time to repeatedly deserialize partition -> shuffle server mappings as shown in https://docs.google.com/document/d/1TZ-3Mgwj9j7n1mMyCrS3sskFv_uOtUtXl1oF9Y_oMOw/edit?usp=sharing.
   > > RssShuffleHandle is a field of ShuffleDependency which is serialized to task binary in below code in DAGScheduler. Without broadcast of ShuffleHandleInfo, it's hard to pull it out and avoid repeat of ShuffleHandleInfo.
   > > ` RDDCheckpointData.synchronized { taskBinaryBytes = stage match { case stage: ShuffleMapStage => JavaUtils.bufferToArray( closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef)) case stage: ResultStage => JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef)) }
   > > ```
   > >     partitions = stage.rdd.partitions
   > >   }`
   > > ```
   > > 
   > > 
   > >     
   > >       
   > >     
   > > 
   > >       
   > >     
   > > 
   > >     
   > >   
   > > I think broadcast itself is quite simple and efficient. I don't see other better alternatives.
   > 
   > Sorry I wasn't clear enough. I agree broadcast is the best way to go. I'm just not sure that we should broadcast binary rather than just broadcast the shuffle handle info simply. The only problem that we don't broadcast shuffle handle info directly is that once `spark.kryo.registerRequired`(which is false by default) is true, the job will fail. To avoid this problem, we introduce quite some complex logic to deserialize and cache and weak ref etc. I'm wondering if that's worth it.
   
   no worries. You guys can decide which option is better. I can revert back if you like.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1118672803


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssShuffleHandle.java:
##########
@@ -69,19 +69,56 @@ public ShuffleDependency<K, V, C> getDependency() {
     return dependency;
   }
 
-  public Map<Integer, List<ShuffleServerInfo>> getPartitionToServers() {
-    return partitionToServers;
-  }
-
   public int getShuffleId() {
     return shuffleId();
   }
 
-  public Set<ShuffleServerInfo> getShuffleServersForData() {
-    return shuffleServersForData;
+  /**
+   * Tried to get cached {@link ShuffleHandleInfo} from local thread first and then memory if not existing in local.
+   * If not cached, one of competing threads gets chance to deserialize it and caches it for other threads.
+   *
+   * @return
+   */
+  private ShuffleHandleInfo getCurrentHandleInfo() {
+    // local first
+    ShuffleHandleInfo info = _localHandleInfo.get().get();

Review Comment:
   Why do we need to deserialize here?
   > The broadcasted ShuffleHandleInfo is not registered to kryoserializer. It means all job will fail if someone set "spark.kryo.registerRequired" to true.
   
   is it due to the above problem?
   
   If so, I think we could just logging here and ask users to register RssShuffleHandle class here.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' fro…

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#issuecomment-1439333682

   Will the broadcast be cleaned when the shuffle is removed?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1122544957


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java:
##########
@@ -190,4 +200,33 @@ public static int getRequiredShuffleServerNumber(SparkConf sparkConf) {
     int taskConcurrencyPerServer = sparkConf.get(RssSparkConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER);
     return (int) Math.ceil(estimateTaskConcurrency * 1.0 / taskConcurrencyPerServer);
   }
+
+  /**
+   * Get current active {@link SparkContext}. It should be called inside Driver since we don't mean to create any
+   * new {@link SparkContext} here.
+   *
+   * Note: We could use "SparkContext.getActive()" instead of "SparkContext.getOrCreate()" if the "getActive" method
+   * is not declared as package private in Scala.
+   * @return Active SparkContext created by Driver.
+   */
+  public static SparkContext getActiveSparkContext() {
+    return SparkContext.getOrCreate();
+  }
+
+  /**
+   * create broadcast variable of {@link ShuffleHandleInfo}
+   *
+   * @param sc                    expose for easy unit-test
+   * @param shuffleId
+   * @param partitionToServers
+   * @param storageInfo
+   * @return Broadcast variable registered for auto cleanup
+   */
+  public static Broadcast<byte[]> createPartShuffleServerMap(SparkContext sc, int shuffleId,
+      Map<Integer, List<ShuffleServerInfo>> partitionToServers, RemoteStorageInfo storageInfo) {
+    ShuffleHandleInfo partServerMap = new ShuffleHandleInfo(shuffleId, partitionToServers, storageInfo);

Review Comment:
   I mean this line:`ShuffleHandleInfo partServerMap = new ShuffleHandleInfo(shuffleId, partitionToServers, storageInfo);`
   Not only this method name. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1122538497


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java:
##########
@@ -190,4 +200,33 @@ public static int getRequiredShuffleServerNumber(SparkConf sparkConf) {
     int taskConcurrencyPerServer = sparkConf.get(RssSparkConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER);
     return (int) Math.ceil(estimateTaskConcurrency * 1.0 / taskConcurrencyPerServer);
   }
+
+  /**
+   * Get current active {@link SparkContext}. It should be called inside Driver since we don't mean to create any
+   * new {@link SparkContext} here.
+   *
+   * Note: We could use "SparkContext.getActive()" instead of "SparkContext.getOrCreate()" if the "getActive" method
+   * is not declared as package private in Scala.
+   * @return Active SparkContext created by Driver.
+   */
+  public static SparkContext getActiveSparkContext() {
+    return SparkContext.getOrCreate();
+  }
+
+  /**
+   * create broadcast variable of {@link ShuffleHandleInfo}
+   *
+   * @param sc                    expose for easy unit-test
+   * @param shuffleId
+   * @param partitionToServers
+   * @param storageInfo
+   * @return Broadcast variable registered for auto cleanup
+   */
+  public static Broadcast<byte[]> createPartShuffleServerMap(SparkContext sc, int shuffleId,
+      Map<Integer, List<ShuffleServerInfo>> partitionToServers, RemoteStorageInfo storageInfo) {
+    ShuffleHandleInfo partServerMap = new ShuffleHandleInfo(shuffleId, partitionToServers, storageInfo);

Review Comment:
   This line has not been changed. There are other places with the same problem, please check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1134813893


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java:
##########
@@ -27,22 +27,31 @@
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
+import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.deploy.SparkHadoopUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.reflect.ClassTag;

Review Comment:
   I see... Let's tracking it in another issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#issuecomment-1465593279

   thanks @zjf2012. I noticed there's no added UTs here, do you think it's possible to add new UTs or existing UTs are sufficient?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zjf2012 commented on pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "zjf2012 (via GitHub)" <gi...@apache.org>.
zjf2012 commented on PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#issuecomment-1461380971

   > <img alt="image" width="862" src="https://user-images.githubusercontent.com/807537/223939198-ebe0e83c-b858-492c-b0b3-58823a581cb3.png">
   > 
   > Hi, @zjf2012 seems that the majority of us think it's better to simply broadcast RssShuffleHandle object here, requires that `spark.kryo.registerRequired=false`. Would you mind to change it back?
   
   sure, I'll change it back tomorrow or next week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' fro…

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1113075977


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssShuffleHandle.java:
##########
@@ -69,19 +58,15 @@ public ShuffleDependency<K, V, C> getDependency() {
     return dependency;
   }
 
-  public Map<Integer, List<ShuffleServerInfo>> getPartitionToServers() {

Review Comment:
   Can we keep this method? In this way, our changes will be smaller.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1116756290


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssShuffleHandle.java:
##########
@@ -69,19 +69,56 @@ public ShuffleDependency<K, V, C> getDependency() {
     return dependency;
   }
 
-  public Map<Integer, List<ShuffleServerInfo>> getPartitionToServers() {
-    return partitionToServers;
-  }
-
   public int getShuffleId() {
     return shuffleId();
   }
 
-  public Set<ShuffleServerInfo> getShuffleServersForData() {
-    return shuffleServersForData;
+  /**
+   * Tried to get cached {@link ShuffleHandleInfo} from local thread first and then memory if not existing in local.
+   * If not cached, one of competing threads gets chance to deserialize it and caches it for other threads.
+   *
+   * @return
+   */
+  private ShuffleHandleInfo getCurrentHandleInfo() {
+    // local first
+    ShuffleHandleInfo info = _localHandleInfo.get().get();

Review Comment:
   Why do we need localHandleInfo  and currentHandleInfo? If we run two shuffle at the same time, it will not reduce the time of serialization.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1116702567


##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java:
##########
@@ -318,12 +319,13 @@ public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, ShuffleDependency<
     if (dependency.partitioner().numPartitions() == 0) {
       LOG.info("RegisterShuffle with ShuffleId[" + shuffleId + "], partitionNum is 0, "
           + "return the empty RssShuffleHandle directly");
+      Broadcast<byte[]> ptsBd = RssSparkShuffleUtils.createPartShuffleServerMap(SparkContext.getOrCreate(),

Review Comment:
   OK. You can extract a method to wrap `SparkContext.getOrCreate()` and add some comments to explain. Or we can use refection to call 'SparkContext.getActive()'.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zjf2012 commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "zjf2012 (via GitHub)" <gi...@apache.org>.
zjf2012 commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1122513824


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssShuffleHandle.java:
##########
@@ -69,19 +69,56 @@ public ShuffleDependency<K, V, C> getDependency() {
     return dependency;
   }
 
-  public Map<Integer, List<ShuffleServerInfo>> getPartitionToServers() {
-    return partitionToServers;
-  }
-
   public int getShuffleId() {
     return shuffleId();
   }
 
-  public Set<ShuffleServerInfo> getShuffleServersForData() {
-    return shuffleServersForData;
+  /**
+   * Tried to get cached {@link ShuffleHandleInfo} from local thread first and then memory if not existing in local.
+   * If not cached, one of competing threads gets chance to deserialize it and caches it for other threads.
+   *
+   * @return
+   */
+  private ShuffleHandleInfo getCurrentHandleInfo() {
+    // local first
+    ShuffleHandleInfo info = _localHandleInfo.get().get();

Review Comment:
   Yes, it's disabled by default. I forgot to mention another factor that the broadcast variable (ShuffleHandleInfo, not RssShuffleHandle) is about 4MB without JavaSerializer.
   
    
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1122547079


##########
client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java:
##########
@@ -277,8 +280,10 @@ public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, int numMaps, Shuff
 
     startHeartbeat();
 
+    Broadcast<byte[]> ptsBd = RssSparkShuffleUtils.createPartShuffleServerMap(

Review Comment:
   And this variable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zjf2012 commented on pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' fro…

Posted by "zjf2012 (via GitHub)" <gi...@apache.org>.
zjf2012 commented on PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#issuecomment-1441368646

   Please see vanished task deserialization time with patch from https://docs.google.com/document/d/1TZ-3Mgwj9j7n1mMyCrS3sskFv_uOtUtXl1oF9Y_oMOw/edit?usp=sharing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' fro…

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#issuecomment-1443191555

   > …m RssShuffleHandle
   > 
   > ### What changes were proposed in this pull request?
   > move partition -> shuffle servers mapping from direct field of RssShuffleHandle to a broadcast variable to reduce task binary size.
   > 
   > ### Why are the changes needed?
   > to reduce task delay and task serialize/deserialize time by reduce task binary size
   > 
   > ### Does this PR introduce _any_ user-facing change?
   > No.
   > 
   > ### How was this patch tested?
   > 1. tested with 10000 partitions shuffle. Task binary size reduced from more than 670KB to less than 6KB.
   > 2. tested with multiple shuffle stages in same job to verify ShuffleHandleInfo cache logic
   
   Could you modify the title and description? It seems that the title is too long, some words of title were added to description.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zjf2012 commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "zjf2012 (via GitHub)" <gi...@apache.org>.
zjf2012 commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1116707316


##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java:
##########
@@ -318,12 +319,13 @@ public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, ShuffleDependency<
     if (dependency.partitioner().numPartitions() == 0) {
       LOG.info("RegisterShuffle with ShuffleId[" + shuffleId + "], partitionNum is 0, "
           + "return the empty RssShuffleHandle directly");
+      Broadcast<byte[]> ptsBd = RssSparkShuffleUtils.createPartShuffleServerMap(SparkContext.getOrCreate(),

Review Comment:
   sounds good. let me add a method for comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' fro…

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1113241665


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java:
##########
@@ -190,4 +196,18 @@ public static int getRequiredShuffleServerNumber(SparkConf sparkConf) {
     int taskConcurrencyPerServer = sparkConf.get(RssSparkConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER);
     return (int) Math.ceil(estimateTaskConcurrency * 1.0 / taskConcurrencyPerServer);
   }
+
+  /**
+   * create broadcast variable for partition -> shuffleserver map
+   * make sure it's called in JVM with active SparkContext created
+   *
+   * @param partitionToServers
+   * @return Broadcast variable registered for auto cleanup
+   */
+  public static Broadcast<PartitionShuffleServerMap> createPartShuffleServerMap(

Review Comment:
   I'm prefer to pass `SparkContext` as a parameter here. it's much clear and we should pass SparkContext if we are going to add unit test for this method.



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java:
##########
@@ -27,18 +27,24 @@
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
+import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.deploy.SparkHadoopUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.client.api.CoordinatorClient;
 import org.apache.uniffle.client.factory.CoordinatorClientFactory;
 import org.apache.uniffle.client.util.ClientUtils;
 import org.apache.uniffle.common.ClientType;
 import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.Constants;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.reflect.ClassTag;

Review Comment:
   the import group order doesn't seem right.
   
   Let's try spark way:
   ```
   import java
   
   import scala
   
   import third_party
   
   import uniffle
   ```



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssShuffleHandle.java:
##########
@@ -69,19 +58,15 @@ public ShuffleDependency<K, V, C> getDependency() {
     return dependency;
   }
 
-  public Map<Integer, List<ShuffleServerInfo>> getPartitionToServers() {

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#issuecomment-1467230126

   Thanks @zjf2012 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#issuecomment-1454796348

   @advancedxy @zjf2012 Do you have another suggestion?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "smallzhongfeng (via GitHub)" <gi...@apache.org>.
smallzhongfeng commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1125418456


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssShuffleHandle.java:
##########
@@ -32,29 +37,27 @@
   private String appId;
   private int numMaps;
   private ShuffleDependency<K, V, C> dependency;
-  private Map<Integer, List<ShuffleServerInfo>> partitionToServers;
-  // shuffle servers which is for store shuffle data
-  private Set<ShuffleServerInfo> shuffleServersForData;
-  // remoteStorage used for this job
-  private RemoteStorageInfo remoteStorage;
+  private Broadcast<byte[]> handlerInfoBytesBd;
+
+  // shuffle ID to ShuffleIdRef
+  // ShuffleIdRef acts as strong reference to prevent cached ShuffleHandleInfo being GCed during shuffle
+  // ShuffleIdRef will be removed when unregisterShuffle()
+  private static Map<Integer, ShuffleIdRef> _globalShuffleIdRefMap = new ConcurrentHashMap<>();
+  // each shuffle has unique ID even for multiple concurrent running shuffles and jobs per application
+  private static ThreadLocal<HandleInfoLocalCache> _localHandleInfoCache =

Review Comment:
   ditto.



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssShuffleHandle.java:
##########
@@ -32,29 +37,27 @@
   private String appId;
   private int numMaps;
   private ShuffleDependency<K, V, C> dependency;
-  private Map<Integer, List<ShuffleServerInfo>> partitionToServers;
-  // shuffle servers which is for store shuffle data
-  private Set<ShuffleServerInfo> shuffleServersForData;
-  // remoteStorage used for this job
-  private RemoteStorageInfo remoteStorage;
+  private Broadcast<byte[]> handlerInfoBytesBd;
+
+  // shuffle ID to ShuffleIdRef
+  // ShuffleIdRef acts as strong reference to prevent cached ShuffleHandleInfo being GCed during shuffle
+  // ShuffleIdRef will be removed when unregisterShuffle()
+  private static Map<Integer, ShuffleIdRef> _globalShuffleIdRefMap = new ConcurrentHashMap<>();

Review Comment:
   It seems that the underline does not conform to the name definition specification of `java`. Right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zjf2012 commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "zjf2012 (via GitHub)" <gi...@apache.org>.
zjf2012 commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1130291443


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssShuffleHandle.java:
##########
@@ -32,29 +37,27 @@
   private String appId;
   private int numMaps;
   private ShuffleDependency<K, V, C> dependency;
-  private Map<Integer, List<ShuffleServerInfo>> partitionToServers;
-  // shuffle servers which is for store shuffle data
-  private Set<ShuffleServerInfo> shuffleServersForData;
-  // remoteStorage used for this job
-  private RemoteStorageInfo remoteStorage;
+  private Broadcast<byte[]> handlerInfoBytesBd;
+
+  // shuffle ID to ShuffleIdRef
+  // ShuffleIdRef acts as strong reference to prevent cached ShuffleHandleInfo being GCed during shuffle
+  // ShuffleIdRef will be removed when unregisterShuffle()
+  private static Map<Integer, ShuffleIdRef> _globalShuffleIdRefMap = new ConcurrentHashMap<>();
+  // each shuffle has unique ID even for multiple concurrent running shuffles and jobs per application
+  private static ThreadLocal<HandleInfoLocalCache> _localHandleInfoCache =

Review Comment:
   done



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssShuffleHandle.java:
##########
@@ -32,29 +37,27 @@
   private String appId;
   private int numMaps;
   private ShuffleDependency<K, V, C> dependency;
-  private Map<Integer, List<ShuffleServerInfo>> partitionToServers;
-  // shuffle servers which is for store shuffle data
-  private Set<ShuffleServerInfo> shuffleServersForData;
-  // remoteStorage used for this job
-  private RemoteStorageInfo remoteStorage;
+  private Broadcast<byte[]> handlerInfoBytesBd;
+
+  // shuffle ID to ShuffleIdRef
+  // ShuffleIdRef acts as strong reference to prevent cached ShuffleHandleInfo being GCed during shuffle
+  // ShuffleIdRef will be removed when unregisterShuffle()
+  private static Map<Integer, ShuffleIdRef> _globalShuffleIdRefMap = new ConcurrentHashMap<>();

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#issuecomment-1447821726

   The pr is ok for me. Let @advancedxy @xianjingfeng take an another look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1121242538


##########
client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java:
##########
@@ -277,8 +280,10 @@ public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, int numMaps, Shuff
 
     startHeartbeat();
 
+    Broadcast<byte[]> ptsBd = RssSparkShuffleUtils.createPartShuffleServerMap(

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1116828745


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssShuffleHandle.java:
##########
@@ -69,19 +69,56 @@ public ShuffleDependency<K, V, C> getDependency() {
     return dependency;
   }
 
-  public Map<Integer, List<ShuffleServerInfo>> getPartitionToServers() {
-    return partitionToServers;
-  }
-
   public int getShuffleId() {
     return shuffleId();
   }
 
-  public Set<ShuffleServerInfo> getShuffleServersForData() {
-    return shuffleServersForData;
+  /**
+   * Tried to get cached {@link ShuffleHandleInfo} from local thread first and then memory if not existing in local.
+   * If not cached, one of competing threads gets chance to deserialize it and caches it for other threads.
+   *
+   * @return
+   */
+  private ShuffleHandleInfo getCurrentHandleInfo() {
+    // local first
+    ShuffleHandleInfo info = _localHandleInfo.get().get();

Review Comment:
   Spark can also run multiple jobs at the same time. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1122548765


##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java:
##########
@@ -318,12 +318,14 @@ public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, ShuffleDependency<
     if (dependency.partitioner().numPartitions() == 0) {
       LOG.info("RegisterShuffle with ShuffleId[" + shuffleId + "], partitionNum is 0, "
           + "return the empty RssShuffleHandle directly");
+      Broadcast<byte[]> ptsBd = RssSparkShuffleUtils.broadcastShuffleHdlInfo(

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zjf2012 commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' fro…

Posted by "zjf2012 (via GitHub)" <gi...@apache.org>.
zjf2012 commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1113729493


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/PartitionShuffleServerMap.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+
+import org.apache.uniffle.common.ShuffleServerInfo;
+
+/**
+ * Class for holding partition ID -> shuffle servers mapping.
+ * It's to be broadcast to executors and referenced by shuffle tasks.
+ */
+public class PartitionShuffleServerMap {

Review Comment:
   sounds good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] codecov-commenter commented on pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' fro…

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#issuecomment-1438158864

   # [Codecov](https://codecov.io/gh/apache/incubator-uniffle/pull/637?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#637](https://codecov.io/gh/apache/incubator-uniffle/pull/637?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (f50380b) into [master](https://codecov.io/gh/apache/incubator-uniffle/commit/4f07b0c3b58db51b5e349bc9c51d39c7c84f55d0?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (4f07b0c) will **increase** coverage by `1.12%`.
   > The diff coverage is `n/a`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master     #637      +/-   ##
   ============================================
   + Coverage     60.87%   61.99%   +1.12%     
   + Complexity     1802     1704      -98     
   ============================================
     Files           214      191      -23     
     Lines         12388     9842    -2546     
     Branches       1045     1000      -45     
   ============================================
   - Hits           7541     6102    -1439     
   + Misses         4442     3410    -1032     
   + Partials        405      330      -75     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-uniffle/pull/637?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../apache/uniffle/coordinator/CoordinatorServer.java](https://codecov.io/gh/apache/incubator-uniffle/pull/637?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29vcmRpbmF0b3Ivc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3VuaWZmbGUvY29vcmRpbmF0b3IvQ29vcmRpbmF0b3JTZXJ2ZXIuamF2YQ==) | `53.71% <0.00%> (-3.31%)` | :arrow_down: |
   | [.../apache/uniffle/coordinator/ClientConfManager.java](https://codecov.io/gh/apache/incubator-uniffle/pull/637?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29vcmRpbmF0b3Ivc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3VuaWZmbGUvY29vcmRpbmF0b3IvQ2xpZW50Q29uZk1hbmFnZXIuamF2YQ==) | `91.54% <0.00%> (-1.41%)` | :arrow_down: |
   | [...ava/org/apache/spark/shuffle/RssShuffleHandle.java](https://codecov.io/gh/apache/incubator-uniffle/pull/637?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50LXNwYXJrL2NvbW1vbi9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvc3Bhcmsvc2h1ZmZsZS9Sc3NTaHVmZmxlSGFuZGxlLmphdmE=) | | |
   | [...org/apache/spark/shuffle/RssSparkShuffleUtils.java](https://codecov.io/gh/apache/incubator-uniffle/pull/637?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50LXNwYXJrL2NvbW1vbi9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvc3Bhcmsvc2h1ZmZsZS9Sc3NTcGFya1NodWZmbGVVdGlscy5qYXZh) | | |
   | [...k/shuffle/writer/WrappedByteArrayOutputStream.java](https://codecov.io/gh/apache/incubator-uniffle/pull/637?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50LXNwYXJrL2NvbW1vbi9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvc3Bhcmsvc2h1ZmZsZS93cml0ZXIvV3JhcHBlZEJ5dGVBcnJheU91dHB1dFN0cmVhbS5qYXZh) | | |
   | [...bernetes/operator/pkg/controller/controller/rss.go](https://codecov.io/gh/apache/incubator-uniffle/pull/637?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGVwbG95L2t1YmVybmV0ZXMvb3BlcmF0b3IvcGtnL2NvbnRyb2xsZXIvY29udHJvbGxlci9yc3MuZ28=) | | |
   | [...tor/pkg/controller/sync/coordinator/coordinator.go](https://codecov.io/gh/apache/incubator-uniffle/pull/637?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGVwbG95L2t1YmVybmV0ZXMvb3BlcmF0b3IvcGtnL2NvbnRyb2xsZXIvc3luYy9jb29yZGluYXRvci9jb29yZGluYXRvci5nbw==) | | |
   | [...y/kubernetes/operator/pkg/webhook/inspector/pod.go](https://codecov.io/gh/apache/incubator-uniffle/pull/637?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGVwbG95L2t1YmVybmV0ZXMvb3BlcmF0b3IvcGtnL3dlYmhvb2svaW5zcGVjdG9yL3BvZC5nbw==) | | |
   | [...pache/spark/shuffle/writer/WriteBufferManager.java](https://codecov.io/gh/apache/incubator-uniffle/pull/637?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50LXNwYXJrL2NvbW1vbi9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvc3Bhcmsvc2h1ZmZsZS93cml0ZXIvV3JpdGVCdWZmZXJNYW5hZ2VyLmphdmE=) | | |
   | [.../org/apache/spark/shuffle/writer/WriterBuffer.java](https://codecov.io/gh/apache/incubator-uniffle/pull/637?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50LXNwYXJrL2NvbW1vbi9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvc3Bhcmsvc2h1ZmZsZS93cml0ZXIvV3JpdGVyQnVmZmVyLmphdmE=) | | |
   | ... and [16 more](https://codecov.io/gh/apache/incubator-uniffle/pull/637?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zjf2012 commented on pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' fro…

Posted by "zjf2012 (via GitHub)" <gi...@apache.org>.
zjf2012 commented on PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#issuecomment-1441362544

   > > The code lgtm overall. some minor questions:
   > > > **How was this patch tested?**
   > > > tested with 10000 partitions shuffle
   > > 
   > > 
   > > Do you have some data about the serialized task before and after, is there any slowdown when using broadcast and how many shuffle servers in your environments?
   > > And just to be safe, how much the size of broadcast is occupied when your 10000 partition test? Just to make sure it don't bring too much memory pressure to driver.
   > 
   > For now, I don't have more servers. So, I only use two shuffle servers. Before my optimization, both map task and reduce task have more than 670KB binary size. After optimization, they reduce to less than 6KB. It's dramatic.
   > 
   > Broadcast uses bittorrent-like way to distribute variable to each executor once. Executors can get some chunk of broadcast variable from other executors instead of all from driver. And task serialize/deserialize time drops a lot. So in theory, it has no way to slow down job.
   > 
   > The size of the broadcast should be less than 670KB deduced from above statement. I'll try to capture it today.
   
   Before my patch, "task + partition -> shuffle servers " size in binary is about 83.6KiB. Deserialized size is about 659.8KiB.
   
   _2023-02-22 13:36:13,581 INFO memory.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size **83.6 KiB,** free 23.4 GiB)
   2023-02-22 13:36:13,582 INFO broadcast.TorrentBroadcast: Reading broadcast variable 2 took 4 ms
   2023-02-22 13:36:13,583 INFO memory.MemoryStore: Block broadcast_2 stored as values in memory (estimated size **659.8 KiB**, free 23.4 GiB)_
   
   With my patch, the broadcasted variable size in binary is about 80.6KiB < 83.6KiB. Deserialized size is about 655KiB < 659.8KiB . They are as expected.
   
   _2023-02-23 12:43:20,015 INFO memory.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size **80.6 KiB,** free 23.4 GiB)
   2023-02-23 12:43:20,015 INFO broadcast.TorrentBroadcast: Reading broadcast variable 1 took 4 ms
   2023-02-23 12:43:20,017 INFO memory.MemoryStore: Block broadcast_1 stored as values in memory (estimated size **655.0 KiB**, free 23.4 GiB)_
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zjf2012 commented on pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "zjf2012 (via GitHub)" <gi...@apache.org>.
zjf2012 commented on PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#issuecomment-1455318109

   > > @advancedxy @xianjingfeng Do you have another suggestion?
   > 
   > I'm not sure, this pr introduced some quite complex logic to broadcast shuffle handle info.
   > 
   > If I was implementing this feature, I would just use Kryo by default, and in RSSShuffleManager indicating users to either turn off `spark.kryo.registerRequired` (which is explicitly set by user) or manually register RssShuffleHandle.
   
   Just make sure we are on the same page. Only registering RssShuffleHandle to kryo serializer doesn't help resolve this issue. Each task will still have more than 670KB size in binary. for 10000 partitions job. And each task will have quite noticeable deserialization time to repeatedly deserialize partition -> shuffle server mappings as shown in https://docs.google.com/document/d/1TZ-3Mgwj9j7n1mMyCrS3sskFv_uOtUtXl1oF9Y_oMOw/edit?usp=sharing. 
   
   RssShuffleHandle is a field of ShuffleDependency which is serialized to task binary in below code in DAGScheduler. Without broadcast of ShuffleHandleInfo, it's hard to pull it out and avoid repeat of ShuffleHandleInfo.
   
   `      RDDCheckpointData.synchronized {
           taskBinaryBytes = stage match {
             case stage: ShuffleMapStage =>
               JavaUtils.bufferToArray(
                 closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
             case stage: ResultStage =>
               JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
           }
   
           partitions = stage.rdd.partitions
         }`
   
   I think broadcast itself is quite simple and efficient. I don't see other better alternatives.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zjf2012 commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "zjf2012 (via GitHub)" <gi...@apache.org>.
zjf2012 commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1133499378


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java:
##########
@@ -27,22 +27,31 @@
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
+import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.deploy.SparkHadoopUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.reflect.ClassTag;

Review Comment:
   yes, I tried. But I always get checkstyle error after I move "scala.refect.ClassTag" around, like adding extra space line before the ClassTag or moving it to last import line. I used below command line to check style locally. I think we may need to update the checkstyle for scala.
   
   mvn -B -fae checkstyle:check -Pspark2



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zjf2012 commented on pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "zjf2012 (via GitHub)" <gi...@apache.org>.
zjf2012 commented on PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#issuecomment-1447648158

   Instead of sharing same shuffleHandleInfo per JVM, I changed it to share it per thread to make code simpler. As tested, the perf remains in same level and is a little bit better.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zjf2012 commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' fro…

Posted by "zjf2012 (via GitHub)" <gi...@apache.org>.
zjf2012 commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1113729867


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java:
##########
@@ -27,18 +27,24 @@
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
+import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.deploy.SparkHadoopUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.client.api.CoordinatorClient;
 import org.apache.uniffle.client.factory.CoordinatorClientFactory;
 import org.apache.uniffle.client.util.ClientUtils;
 import org.apache.uniffle.common.ClientType;
 import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.Constants;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.reflect.ClassTag;

Review Comment:
   let adjust it accordingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zjf2012 commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' fro…

Posted by "zjf2012 (via GitHub)" <gi...@apache.org>.
zjf2012 commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1113729637


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssShuffleHandle.java:
##########
@@ -69,19 +58,15 @@ public ShuffleDependency<K, V, C> getDependency() {
     return dependency;
   }
 
-  public Map<Integer, List<ShuffleServerInfo>> getPartitionToServers() {

Review Comment:
   yes, it's better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zjf2012 commented on pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' fro…

Posted by "zjf2012 (via GitHub)" <gi...@apache.org>.
zjf2012 commented on PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#issuecomment-1439695185

   identified two problems,
   - size of the newly created broadcast variable is large than original task binary. 
   - The broadcasted ShuffleHandleInfo is not registered to kryoserializer. It means all job will fail if someone set "spark.kryo.registerRequired" to true.
   
   I need to serialize ShuffleHandleInfo to binary with spark closureSerialize before spark kryoserializing and broadcasting it. Working on it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#issuecomment-1455508093

   > > > > > @advancedxy @xianjingfeng Do you have another suggestion?
   > > > > 
   > > > > 
   > > > > I'm not sure, this pr introduced some quite complex logic to broadcast shuffle handle info.
   > > > > If I was implementing this feature, I would just use Kryo by default, and in RSSShuffleManager indicating users to either turn off `spark.kryo.registerRequired` (which is explicitly set by user) or manually register RssShuffleHandle.
   > > > 
   > > > 
   > > > Just make sure we are on the same page. Only registering RssShuffleHandle to kryo serializer doesn't help resolve this issue. Each task will still have more than 670KB size in binary. for 10000 partitions job. And each task will have quite noticeable deserialization time to repeatedly deserialize partition -> shuffle server mappings as shown in https://docs.google.com/document/d/1TZ-3Mgwj9j7n1mMyCrS3sskFv_uOtUtXl1oF9Y_oMOw/edit?usp=sharing.
   > > > RssShuffleHandle is a field of ShuffleDependency which is serialized to task binary in below code in DAGScheduler. Without broadcast of ShuffleHandleInfo, it's hard to pull it out and avoid repeat of ShuffleHandleInfo.
   > > > ` RDDCheckpointData.synchronized { taskBinaryBytes = stage match { case stage: ShuffleMapStage => JavaUtils.bufferToArray( closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef)) case stage: ResultStage => JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef)) }
   > > > ```
   > > >     partitions = stage.rdd.partitions
   > > >   }`
   > > > ```
   > > > 
   > > > 
   > > >     
   > > >       
   > > >     
   > > > 
   > > >       
   > > >     
   > > > 
   > > >     
   > > >   
   > > > I think broadcast itself is quite simple and efficient. I don't see other better alternatives.
   > > 
   > > 
   > > Sorry I wasn't clear enough. I agree broadcast is the best way to go. I'm just not sure that we should broadcast binary rather than just broadcast the shuffle handle info simply. The only problem that we don't broadcast shuffle handle info directly is that once `spark.kryo.registerRequired`(which is false by default) is true, the job will fail. To avoid this problem, we introduce quite some complex logic to deserialize and cache and weak ref etc. I'm wondering if that's worth it.
   > 
   > no worries. You guys can decide which option is better. I can revert back if you like.
   
   Thanks for your understanding. Let's take a quick vote here, cc @zuston @jerqi @xianjingfeng @kaijchen 
   1. if you are going to vote for current impl, please react with 🚀 
   2. if you are going to vote on a simpler impl, a.k.a broadcast shuffle handle info directly, please react with 🎉 .
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zjf2012 commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "zjf2012 (via GitHub)" <gi...@apache.org>.
zjf2012 commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1122514012


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java:
##########
@@ -190,4 +200,33 @@ public static int getRequiredShuffleServerNumber(SparkConf sparkConf) {
     int taskConcurrencyPerServer = sparkConf.get(RssSparkConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER);
     return (int) Math.ceil(estimateTaskConcurrency * 1.0 / taskConcurrencyPerServer);
   }
+
+  /**
+   * Get current active {@link SparkContext}. It should be called inside Driver since we don't mean to create any
+   * new {@link SparkContext} here.
+   *
+   * Note: We could use "SparkContext.getActive()" instead of "SparkContext.getOrCreate()" if the "getActive" method
+   * is not declared as package private in Scala.
+   * @return Active SparkContext created by Driver.
+   */
+  public static SparkContext getActiveSparkContext() {
+    return SparkContext.getOrCreate();
+  }
+
+  /**
+   * create broadcast variable of {@link ShuffleHandleInfo}
+   *
+   * @param sc                    expose for easy unit-test
+   * @param shuffleId
+   * @param partitionToServers
+   * @param storageInfo
+   * @return Broadcast variable registered for auto cleanup
+   */
+  public static Broadcast<byte[]> createPartShuffleServerMap(SparkContext sc, int shuffleId,
+      Map<Integer, List<ShuffleServerInfo>> partitionToServers, RemoteStorageInfo storageInfo) {
+    ShuffleHandleInfo partServerMap = new ShuffleHandleInfo(shuffleId, partitionToServers, storageInfo);

Review Comment:
   Let me change it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "xianjingfeng (via GitHub)" <gi...@apache.org>.
xianjingfeng commented on code in PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#discussion_r1122548389


##########
client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java:
##########
@@ -238,12 +239,14 @@ public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, int numMaps, Shuff
     if (dependency.partitioner().numPartitions() == 0) {
       LOG.info("RegisterShuffle with ShuffleId[" + shuffleId + "], partitionNum is 0, "
           + "return the empty RssShuffleHandle directly");
+      Broadcast<byte[]> ptsBd = RssSparkShuffleUtils.broadcastShuffleHdlInfo(

Review Comment:
   And this.



##########
client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java:
##########
@@ -277,8 +280,10 @@ public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, int numMaps, Shuff
 
     startHeartbeat();
 
+    Broadcast<byte[]> ptsBd = RssSparkShuffleUtils.broadcastShuffleHdlInfo(

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zjf2012 commented on pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "zjf2012 (via GitHub)" <gi...@apache.org>.
zjf2012 commented on PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#issuecomment-1465601765

   > thanks @zjf2012. I noticed there's no added UTs here, do you think it's possible to add new UTs or existing UTs are sufficient?
   
   You are welcome. I think current UT is good enough since my changes only wrap up some existing things. No complex logic involved. If I add some UT, it would be mockups for the broadcast, no much actual business logic.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on pull request #637: [#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle

Posted by "zuston (via GitHub)" <gi...@apache.org>.
zuston commented on PR #637:
URL: https://github.com/apache/incubator-uniffle/pull/637#issuecomment-1678583548

   > > Will the broadcast be cleaned when the shuffle is removed?
   > 
   > yes. it's registered for cleanup when sparkcontext creates a new broadcast variable. However, the cleaner may not clean it immediately just like below unregisterShuffle method. The cleaner gets chance to capture them (shuffle, broadcast...) only after GC happened. This behavior is good for this broadcast variable since it's small anyway. Will it cause problem in below code snippet? shuffle server may not get cleaned up quickly if there is no right GC in driver.
   > 
   > `@Override public boolean unregisterShuffle(int shuffleId) { try { if (SparkEnv.get().executorId().equals("driver")) { shuffleWriteClient.unregisterShuffle(appId, shuffleId); } } catch (Exception e) { LOG.warn("Errors on unregister to remote shuffle-servers", e); } return true; }`
   
   I don't see the logic of destroy the broadcast var explicitly. Do we need to destroy it in the `unregisterShuffle` method?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org