You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2019/07/13 15:45:17 UTC

[spark] branch master updated: [SPARK-28355][CORE][PYTHON] Use Spark conf for threshold at which command is compressed by broadcast

This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 79e2047  [SPARK-28355][CORE][PYTHON] Use Spark conf for threshold at which command is compressed by broadcast
79e2047 is described below

commit 79e204770300dab4a669b9f8e2421ef905236e7b
Author: Jesse Cai <je...@databricks.com>
AuthorDate: Sat Jul 13 08:44:16 2019 -0700

    [SPARK-28355][CORE][PYTHON] Use Spark conf for threshold at which command is compressed by broadcast
    
    ## What changes were proposed in this pull request?
    
    The `_prepare_for_python_RDD` method currently broadcasts a pickled command if its length is greater than the hardcoded value `1 << 20` (1M). This change sets this value as a Spark conf instead.
    
    ## How was this patch tested?
    
    Unit tests, manual tests.
    
    Closes #25123 from jessecai/SPARK-28355.
    
    Authored-by: Jesse Cai <je...@databricks.com>
    Signed-off-by: gatorsmile <ga...@gmail.com>
---
 .../scala/org/apache/spark/api/python/PythonUtils.scala     |  4 ++++
 .../scala/org/apache/spark/internal/config/package.scala    |  8 ++++++++
 core/src/test/scala/org/apache/spark/SparkConfSuite.scala   | 13 +++++++++++++
 python/pyspark/rdd.py                                       |  2 +-
 4 files changed, 26 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
index eee6e4b..62d6047 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
@@ -81,4 +81,8 @@ private[spark] object PythonUtils {
   def isEncryptionEnabled(sc: JavaSparkContext): Boolean = {
     sc.conf.get(org.apache.spark.internal.config.IO_ENCRYPTION_ENABLED)
   }
+
+  def getBroadcastThreshold(sc: JavaSparkContext): Long = {
+    sc.conf.get(org.apache.spark.internal.config.BROADCAST_FOR_UDF_COMPRESSION_THRESHOLD)
+  }
 }
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 488886f..76d3d6e 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -1246,6 +1246,14 @@ package object config {
       "mechanisms to guarantee data won't be corrupted during broadcast")
     .booleanConf.createWithDefault(true)
 
+  private[spark] val BROADCAST_FOR_UDF_COMPRESSION_THRESHOLD =
+    ConfigBuilder("spark.broadcast.UDFCompressionThreshold")
+      .doc("The threshold at which user-defined functions (UDFs) and Python RDD commands " +
+        "are compressed by broadcast in bytes unless otherwise specified")
+      .bytesConf(ByteUnit.BYTE)
+      .checkValue(v => v >= 0, "The threshold should be non-negative.")
+      .createWithDefault(1L * 1024 * 1024)
+
   private[spark] val RDD_COMPRESS = ConfigBuilder("spark.rdd.compress")
     .doc("Whether to compress serialized RDD partitions " +
       "(e.g. for StorageLevel.MEMORY_ONLY_SER in Scala " +
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 6be1fed..202b85d 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -389,6 +389,19 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
       """.stripMargin.trim)
   }
 
+  test("SPARK-28355: Use Spark conf for threshold at which UDFs are compressed by broadcast") {
+    val conf = new SparkConf()
+
+    // Check the default value
+    assert(conf.get(BROADCAST_FOR_UDF_COMPRESSION_THRESHOLD) === 1L * 1024 * 1024)
+
+    // Set the conf
+    conf.set(BROADCAST_FOR_UDF_COMPRESSION_THRESHOLD, 1L * 1024)
+
+    // Verify that it has been set properly
+    assert(conf.get(BROADCAST_FOR_UDF_COMPRESSION_THRESHOLD) === 1L * 1024)
+  }
+
   val defaultIllegalValue = "SomeIllegalValue"
   val illegalValueTests : Map[String, (SparkConf, String) => Any] = Map(
     "getTimeAsSeconds" -> (_.getTimeAsSeconds(_)),
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 8bcc67a..96fdf5f 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -2490,7 +2490,7 @@ def _prepare_for_python_RDD(sc, command):
     # the serialized command will be compressed by broadcast
     ser = CloudPickleSerializer()
     pickled_command = ser.dumps(command)
-    if len(pickled_command) > (1 << 20):  # 1M
+    if len(pickled_command) > sc._jvm.PythonUtils.getBroadcastThreshold(sc._jsc):  # Default 1M
         # The broadcast will have same life cycle as created PythonRDD
         broadcast = sc.broadcast(pickled_command)
         pickled_command = ser.dumps(broadcast)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org