You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/12/03 16:44:33 UTC

[spark] branch branch-3.0 updated: [SPARK-33629][PYTHON] Make spark.buffer.size configuration visible on driver side

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new c4318a1  [SPARK-33629][PYTHON] Make spark.buffer.size configuration visible on driver side
c4318a1 is described below

commit c4318a1184c2cb8e6a3010bed408be41e569a822
Author: Gabor Somogyi <ga...@gmail.com>
AuthorDate: Fri Dec 4 01:37:44 2020 +0900

    [SPARK-33629][PYTHON] Make spark.buffer.size configuration visible on driver side
    
    `spark.buffer.size` not applied in driver from pyspark. In this PR I've fixed this issue.
    
    Apply the mentioned config on driver side.
    
    No.
    
    Existing unit tests + manually.
    
    Added the following code temporarily:
    ```
    def local_connect_and_auth(port, auth_secret):
    ...
                sock.connect(sa)
                print("SPARK_BUFFER_SIZE: %d" % int(os.environ.get("SPARK_BUFFER_SIZE", 65536))) <- This is the addition
                sockfile = sock.makefile("rwb", int(os.environ.get("SPARK_BUFFER_SIZE", 65536)))
    ...
    ```
    
    Test:
    ```
    
    echo "spark.buffer.size 10000" >> conf/spark-defaults.conf
    
    $ ./bin/pyspark
    Python 3.8.5 (default, Jul 21 2020, 10:48:26)
    [Clang 11.0.3 (clang-1103.0.32.62)] on darwin
    Type "help", "copyright", "credits" or "license" for more information.
    20/12/03 13:38:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    20/12/03 13:38:14 WARN SparkEnv: I/O encryption enabled without RPC encryption: keys will be visible on the wire.
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /__ / .__/\_,_/_/ /_/\_\   version 3.1.0-SNAPSHOT
          /_/
    
    Using Python version 3.8.5 (default, Jul 21 2020 10:48:26)
    Spark context Web UI available at http://192.168.0.189:4040
    Spark context available as 'sc' (master = local[*], app id = local-1606999094506).
    SparkSession available as 'spark'.
    >>> sc.setLogLevel("TRACE")
    >>> sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()
    ...
    SPARK_BUFFER_SIZE: 10000
    ...
    [[0], [2], [3], [4], [6]]
    >>>
    ```
    
    Closes #30592 from gaborgsomogyi/SPARK-33629.
    
    Authored-by: Gabor Somogyi <ga...@gmail.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
    (cherry picked from commit bd711863fdcdde21a7d64de8a9b6b7a8bf7c19ec)
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala | 4 ++++
 python/pyspark/context.py                                         | 2 ++
 2 files changed, 6 insertions(+)

diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
index 490b487..78eb690 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
@@ -85,4 +85,8 @@ private[spark] object PythonUtils {
   def getBroadcastThreshold(sc: JavaSparkContext): Long = {
     sc.conf.get(org.apache.spark.internal.config.BROADCAST_FOR_UDF_COMPRESSION_THRESHOLD)
   }
+
+  def getSparkBufferSize(sc: JavaSparkContext): Int = {
+    sc.conf.get(org.apache.spark.internal.config.BUFFER_SIZE)
+  }
 }
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 72e8e84..25c2a5b 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -211,6 +211,8 @@ class SparkContext(object):
         # data via a socket.
         # scala's mangled names w/ $ in them require special treatment.
         self._encryption_enabled = self._jvm.PythonUtils.isEncryptionEnabled(self._jsc)
+        os.environ["SPARK_BUFFER_SIZE"] = \
+            str(self._jvm.PythonUtils.getSparkBufferSize(self._jsc))
 
         self.pythonExec = os.environ.get("PYSPARK_PYTHON", 'python')
         self.pythonVer = "%d.%d" % sys.version_info[:2]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org