You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/09/05 10:34:32 UTC

[incubator-celeborn] branch branch-0.3 updated: [CELEBORN-946][GLUTEN] Record read metric should be compatible with Gluten shuffle dependency

This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new b9bf1a70e [CELEBORN-946][GLUTEN] Record read metric should be compatible with Gluten shuffle dependency
b9bf1a70e is described below

commit b9bf1a70e01c16d4ff040d1ae03437d46b59d644
Author: xiyu.zk <xi...@alibaba-inc.com>
AuthorDate: Tue Sep 5 18:34:12 2023 +0800

    [CELEBORN-946][GLUTEN] Record read metric should be compatible with Gluten shuffle dependency
    
    ### What changes were proposed in this pull request?
    Currently judging whether it is a Gluten shuffle through serde is only applicable to Velox Backend. In order to adapt to ClickHouse Backend at the same time, it is more generic to use ColumnarShuffleDependency as the judgment basis.
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #1878 from kerwin-zk/gluten.
    
    Authored-by: xiyu.zk <xi...@alibaba-inc.com>
    Signed-off-by: zky.zhoukeyong <zk...@alibaba-inc.com>
    (cherry picked from commit d53b6e53c719d588da7f68185a2c5b8c453daa48)
    Signed-off-by: zky.zhoukeyong <zk...@alibaba-inc.com>
---
 .../org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala | 4 ++--
 ...BatchSerdeHelper.scala => GlutenShuffleDependencyHelper.scala} | 8 ++++----
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index 7a35a1d61..c220a2f04 100644
--- a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++ b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -146,8 +146,8 @@ class CelebornShuffleReader[K, C](
       serializerInstance.deserializeStream(_).asKeyValueIterator)
 
     val iterWithUpdatedRecordsRead =
-      if (GlutenColumnarBatchSerdeHelper.isGlutenSerde(serializerInstance.getClass.getName)) {
-        GlutenColumnarBatchSerdeHelper.withUpdatedRecordsRead(recordIter, metrics)
+      if (GlutenShuffleDependencyHelper.isGlutenDep(dep.getClass.getName)) {
+        GlutenShuffleDependencyHelper.withUpdatedRecordsRead(recordIter, metrics)
       } else {
         recordIter.map { record =>
           metrics.incRecordsRead(1)
diff --git a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenColumnarBatchSerdeHelper.scala b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenShuffleDependencyHelper.scala
similarity index 80%
rename from client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenColumnarBatchSerdeHelper.scala
rename to client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenShuffleDependencyHelper.scala
index 259bb954d..4d743cda8 100644
--- a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenColumnarBatchSerdeHelper.scala
+++ b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenShuffleDependencyHelper.scala
@@ -23,14 +23,14 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
 /**
  * A helper class to be compatible with Gluten Celeborn.
  */
-object GlutenColumnarBatchSerdeHelper {
+object GlutenShuffleDependencyHelper {
 
-  def isGlutenSerde(serdeName: String): Boolean = {
+  def isGlutenDep(depName: String): Boolean = {
     // scalastyle:off
     // see Gluten
-    // https://github.com/oap-project/gluten/blob/main/gluten-celeborn/src/main/scala/org/apache/spark/shuffle/CelebornColumnarBatchSerializer.scala
+    // https://github.com/oap-project/gluten/blob/main/gluten-core/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleDependency.scala
     // scalastyle:on
-    "org.apache.spark.shuffle.CelebornColumnarBatchSerializer".equals(serdeName)
+    "org.apache.spark.shuffle.ColumnarShuffleDependency".equals(depName)
   }
 
   def withUpdatedRecordsRead(