You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2019/07/15 03:30:29 UTC
[spark] branch master updated: [SPARK-28381][PYSPARK] Upgraded
version of Pyrolite to 4.30
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 591de42 [SPARK-28381][PYSPARK] Upgraded version of Pyrolite to 4.30
591de42 is described below
commit 591de423517de379b0900fbf5751b48492d15729
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Mon Jul 15 12:29:58 2019 +0900
[SPARK-28381][PYSPARK] Upgraded version of Pyrolite to 4.30
## What changes were proposed in this pull request?
This upgraded to a newer version of Pyrolite. Most updates [1] in the newer version are for dotnot. For java, it includes a bug fix to Unpickler regarding cleaning up Unpickler memo, and support of protocol 5.
After upgrading, we can remove the fix at SPARK-27629 for the bug in Unpickler.
[1] https://github.com/irmen/Pyrolite/compare/pyrolite-4.23...master
## How was this patch tested?
Manually tested on Python 3.6 in local on existing tests.
Closes #25143 from viirya/upgrade-pyrolite.
Authored-by: Liang-Chi Hsieh <vi...@gmail.com>
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
core/pom.xml | 2 +-
core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala | 3 ---
dev/deps/spark-deps-hadoop-2.7 | 2 +-
dev/deps/spark-deps-hadoop-3.2 | 2 +-
.../main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala | 4 ----
python/pyspark/sql/tests/test_serde.py | 4 ----
.../org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala | 4 ----
7 files changed, 3 insertions(+), 18 deletions(-)
diff --git a/core/pom.xml b/core/pom.xml
index 8a872de..4446dbd 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -378,7 +378,7 @@
<dependency>
<groupId>net.razorvine</groupId>
<artifactId>pyrolite</artifactId>
- <version>4.23</version>
+ <version>4.30</version>
<exclusions>
<exclusion>
<groupId>net.razorvine</groupId>
diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
index 9462dfd..01e64b6 100644
--- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
@@ -186,9 +186,6 @@ private[spark] object SerDeUtil extends Logging {
val unpickle = new Unpickler
iter.flatMap { row =>
val obj = unpickle.loads(row)
- // `Opcodes.MEMOIZE` of Protocol 4 (Python 3.4+) will store objects in internal map
- // of `Unpickler`. This map is cleared when calling `Unpickler.close()`.
- unpickle.close()
if (batched) {
obj match {
case array: Array[Any] => array.toSeq
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index 2f660cc..79158bb 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -170,7 +170,7 @@ parquet-hadoop-bundle-1.6.0.jar
parquet-jackson-1.10.1.jar
protobuf-java-2.5.0.jar
py4j-0.10.8.1.jar
-pyrolite-4.23.jar
+pyrolite-4.30.jar
scala-compiler-2.12.8.jar
scala-library-2.12.8.jar
scala-parser-combinators_2.12-1.1.0.jar
diff --git a/dev/deps/spark-deps-hadoop-3.2 b/dev/deps/spark-deps-hadoop-3.2
index e1e114f..5e03a59 100644
--- a/dev/deps/spark-deps-hadoop-3.2
+++ b/dev/deps/spark-deps-hadoop-3.2
@@ -189,7 +189,7 @@ parquet-hadoop-1.10.1.jar
parquet-jackson-1.10.1.jar
protobuf-java-2.5.0.jar
py4j-0.10.8.1.jar
-pyrolite-4.23.jar
+pyrolite-4.30.jar
re2j-1.1.jar
scala-compiler-2.12.8.jar
scala-library-2.12.8.jar
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index 4c478a5..4617073 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -1357,10 +1357,6 @@ private[spark] abstract class SerDeBase {
val unpickle = new Unpickler
iter.flatMap { row =>
val obj = unpickle.loads(row)
- // `Opcodes.MEMOIZE` of Protocol 4 (Python 3.4+) will store objects in internal map
- // of `Unpickler`. This map is cleared when calling `Unpickler.close()`. Pyrolite
- // doesn't clear it up, so we manually clear it.
- unpickle.close()
if (batched) {
obj match {
case list: JArrayList[_] => list.asScala
diff --git a/python/pyspark/sql/tests/test_serde.py b/python/pyspark/sql/tests/test_serde.py
index f9bed76..ea2a686 100644
--- a/python/pyspark/sql/tests/test_serde.py
+++ b/python/pyspark/sql/tests/test_serde.py
@@ -128,10 +128,6 @@ class SerdeTests(ReusedSQLTestCase):
def test_int_array_serialization(self):
# Note that this test seems dependent on parallelism.
- # This issue is because internal object map in Pyrolite is not cleared after op code
- # STOP. If we use protocol 4 to pickle Python objects, op code MEMOIZE will store
- # objects in the map. We need to clear up it to make sure next unpickling works on
- # clear map.
data = self.spark.sparkContext.parallelize([[1, 2, 3, 4]] * 100, numSlices=12)
df = self.spark.createDataFrame(data, "array<integer>")
self.assertEqual(len(list(filter(lambda r: None in r.value, df.collect()))), 0)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
index 4f35278..02bfbc4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
@@ -81,10 +81,6 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute]
outputIterator.flatMap { pickedResult =>
val unpickledBatch = unpickle.loads(pickedResult)
- // `Opcodes.MEMOIZE` of Protocol 4 (Python 3.4+) will store objects in internal map
- // of `Unpickler`. This map is cleared when calling `Unpickler.close()`. Pyrolite
- // doesn't clear it up, so we manually clear it.
- unpickle.close()
unpickledBatch.asInstanceOf[java.util.ArrayList[Any]].asScala
}.map { result =>
if (udfs.length == 1) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org