You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2019/07/13 15:45:17 UTC
[spark] branch master updated: [SPARK-28355][CORE][PYTHON] Use
Spark conf for threshold at which command is compressed by broadcast
This is an automated email from the ASF dual-hosted git repository.
lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 79e2047 [SPARK-28355][CORE][PYTHON] Use Spark conf for threshold at which command is compressed by broadcast
79e2047 is described below
commit 79e204770300dab4a669b9f8e2421ef905236e7b
Author: Jesse Cai <je...@databricks.com>
AuthorDate: Sat Jul 13 08:44:16 2019 -0700
[SPARK-28355][CORE][PYTHON] Use Spark conf for threshold at which command is compressed by broadcast
## What changes were proposed in this pull request?
The `_prepare_for_python_RDD` method currently broadcasts a pickled command if its length is greater than the hardcoded value `1 << 20` (1M). This change sets this value as a Spark conf instead.
## How was this patch tested?
Unit tests, manual tests.
Closes #25123 from jessecai/SPARK-28355.
Authored-by: Jesse Cai <je...@databricks.com>
Signed-off-by: gatorsmile <ga...@gmail.com>
---
.../scala/org/apache/spark/api/python/PythonUtils.scala | 4 ++++
.../scala/org/apache/spark/internal/config/package.scala | 8 ++++++++
core/src/test/scala/org/apache/spark/SparkConfSuite.scala | 13 +++++++++++++
python/pyspark/rdd.py | 2 +-
4 files changed, 26 insertions(+), 1 deletion(-)
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
index eee6e4b..62d6047 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
@@ -81,4 +81,8 @@ private[spark] object PythonUtils {
def isEncryptionEnabled(sc: JavaSparkContext): Boolean = {
sc.conf.get(org.apache.spark.internal.config.IO_ENCRYPTION_ENABLED)
}
+
+ def getBroadcastThreshold(sc: JavaSparkContext): Long = {
+ sc.conf.get(org.apache.spark.internal.config.BROADCAST_FOR_UDF_COMPRESSION_THRESHOLD)
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 488886f..76d3d6e 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -1246,6 +1246,14 @@ package object config {
"mechanisms to guarantee data won't be corrupted during broadcast")
.booleanConf.createWithDefault(true)
+ private[spark] val BROADCAST_FOR_UDF_COMPRESSION_THRESHOLD =
+ ConfigBuilder("spark.broadcast.UDFCompressionThreshold")
+ .doc("The threshold at which user-defined functions (UDFs) and Python RDD commands " +
+ "are compressed by broadcast in bytes unless otherwise specified")
+ .bytesConf(ByteUnit.BYTE)
+ .checkValue(v => v >= 0, "The threshold should be non-negative.")
+ .createWithDefault(1L * 1024 * 1024)
+
private[spark] val RDD_COMPRESS = ConfigBuilder("spark.rdd.compress")
.doc("Whether to compress serialized RDD partitions " +
"(e.g. for StorageLevel.MEMORY_ONLY_SER in Scala " +
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 6be1fed..202b85d 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -389,6 +389,19 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
""".stripMargin.trim)
}
+ test("SPARK-28355: Use Spark conf for threshold at which UDFs are compressed by broadcast") {
+ val conf = new SparkConf()
+
+ // Check the default value
+ assert(conf.get(BROADCAST_FOR_UDF_COMPRESSION_THRESHOLD) === 1L * 1024 * 1024)
+
+ // Set the conf
+ conf.set(BROADCAST_FOR_UDF_COMPRESSION_THRESHOLD, 1L * 1024)
+
+ // Verify that it has been set properly
+ assert(conf.get(BROADCAST_FOR_UDF_COMPRESSION_THRESHOLD) === 1L * 1024)
+ }
+
val defaultIllegalValue = "SomeIllegalValue"
val illegalValueTests : Map[String, (SparkConf, String) => Any] = Map(
"getTimeAsSeconds" -> (_.getTimeAsSeconds(_)),
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 8bcc67a..96fdf5f 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -2490,7 +2490,7 @@ def _prepare_for_python_RDD(sc, command):
# the serialized command will be compressed by broadcast
ser = CloudPickleSerializer()
pickled_command = ser.dumps(command)
- if len(pickled_command) > (1 << 20): # 1M
+ if len(pickled_command) > sc._jvm.PythonUtils.getBroadcastThreshold(sc._jsc): # Default 1M
# The broadcast will have same life cycle as created PythonRDD
broadcast = sc.broadcast(pickled_command)
pickled_command = ser.dumps(broadcast)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org