You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2014/09/16 03:57:32 UTC

git commit: [SPARK-2951] [PySpark] support unpickle array.array for Python 2.6

Repository: spark
Updated Branches:
  refs/heads/master fdb302f49 -> da33acb8b


[SPARK-2951] [PySpark] support unpickle array.array for Python 2.6

Pyrolite can not unpickle array.array which pickled by Python 2.6, this patch fix it by extend Pyrolite.

There is a bug in Pyrolite when unpickle array of float/double, this patch workaround it by reverse the endianness for float/double. This workaround should be removed after Pyrolite have a new release to fix this issue.

I had send an PR to Pyrolite to fix it:  https://github.com/irmen/Pyrolite/pull/11

Author: Davies Liu <da...@gmail.com>

Closes #2365 from davies/pickle and squashes the following commits:

f44f771 [Davies Liu] enable tests about array
3908f5c [Davies Liu] Merge branch 'master' into pickle
c77c87b [Davies Liu] cleanup debugging code
60e4e2f [Davies Liu] support unpickle array.array for Python 2.6


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/da33acb8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/da33acb8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/da33acb8

Branch: refs/heads/master
Commit: da33acb8b681eca5e787d546fe922af76a151398
Parents: fdb302f
Author: Davies Liu <da...@gmail.com>
Authored: Mon Sep 15 18:57:25 2014 -0700
Committer: Josh Rosen <jo...@apache.org>
Committed: Mon Sep 15 18:57:25 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/api/python/SerDeUtil.scala | 51 ++++++++++++++++++++
 python/pyspark/context.py                       |  1 +
 python/pyspark/tests.py                         |  2 -
 3 files changed, 52 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/da33acb8/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
index efc9009..6668797 100644
--- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.api.python
 
+import java.nio.ByteOrder
+
 import scala.collection.JavaConversions._
 import scala.util.Failure
 import scala.util.Try
@@ -28,6 +30,55 @@ import org.apache.spark.rdd.RDD
 
 /** Utilities for serialization / deserialization between Python and Java, using Pickle. */
 private[python] object SerDeUtil extends Logging {
+  // Unpickle array.array generated by Python 2.6
+  class ArrayConstructor extends net.razorvine.pickle.objects.ArrayConstructor {
+    //  /* Description of types */
+    //  static struct arraydescr descriptors[] = {
+    //    {'c', sizeof(char), c_getitem, c_setitem},
+    //    {'b', sizeof(char), b_getitem, b_setitem},
+    //    {'B', sizeof(char), BB_getitem, BB_setitem},
+    //    #ifdef Py_USING_UNICODE
+    //      {'u', sizeof(Py_UNICODE), u_getitem, u_setitem},
+    //    #endif
+    //    {'h', sizeof(short), h_getitem, h_setitem},
+    //    {'H', sizeof(short), HH_getitem, HH_setitem},
+    //    {'i', sizeof(int), i_getitem, i_setitem},
+    //    {'I', sizeof(int), II_getitem, II_setitem},
+    //    {'l', sizeof(long), l_getitem, l_setitem},
+    //    {'L', sizeof(long), LL_getitem, LL_setitem},
+    //    {'f', sizeof(float), f_getitem, f_setitem},
+    //    {'d', sizeof(double), d_getitem, d_setitem},
+    //    {'\0', 0, 0, 0} /* Sentinel */
+    //  };
+    // TODO: support Py_UNICODE with 2 bytes
+    // FIXME: unpickle array of float is wrong in Pyrolite, so we reverse the
+    // machine code for float/double here to workaround it.
+    // we should fix this after Pyrolite fix them
+    val machineCodes: Map[Char, Int] = if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) {
+      Map('c' -> 1, 'B' -> 0, 'b' -> 1, 'H' -> 3, 'h' -> 5, 'I' -> 7, 'i' -> 9,
+        'L' -> 11, 'l' -> 13, 'f' -> 14, 'd' -> 16, 'u' -> 21
+      )
+    } else {
+      Map('c' -> 1, 'B' -> 0, 'b' -> 1, 'H' -> 2, 'h' -> 4, 'I' -> 6, 'i' -> 8,
+        'L' -> 10, 'l' -> 12, 'f' -> 15, 'd' -> 17, 'u' -> 20
+      )
+    }
+    override def construct(args: Array[Object]): Object = {
+      if (args.length == 1) {
+        construct(args ++ Array(""))
+      } else if (args.length == 2 && args(1).isInstanceOf[String]) {
+        val typecode = args(0).asInstanceOf[String].charAt(0)
+        val data: String = args(1).asInstanceOf[String]
+        construct(typecode, machineCodes(typecode), data.getBytes("ISO-8859-1"))
+      } else {
+        super.construct(args)
+      }
+    }
+  }
+
+  def initialize() = {
+    Unpickler.registerConstructor("array", "array", new ArrayConstructor())
+  }
 
   private def checkPickle(t: (Any, Any)): (Boolean, Boolean) = {
     val pickle = new Pickler

http://git-wip-us.apache.org/repos/asf/spark/blob/da33acb8/python/pyspark/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 3ab98e2..ea28e8c 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -214,6 +214,7 @@ class SparkContext(object):
                 SparkContext._gateway = gateway or launch_gateway()
                 SparkContext._jvm = SparkContext._gateway.jvm
                 SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile
+                SparkContext._jvm.SerDeUtil.initialize()
 
             if instance:
                 if (SparkContext._active_spark_context and

http://git-wip-us.apache.org/repos/asf/spark/blob/da33acb8/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index f3309a2..f255b44 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -956,8 +956,6 @@ class TestOutputFormat(PySparkTestCase):
             conf=input_conf).collect())
         self.assertEqual(new_dataset, data)
 
-    @unittest.skipIf(sys.version_info[:2] <= (2, 6) or python_implementation() == "PyPy",
-                     "Skipped on 2.6 and PyPy until SPARK-2951 is fixed")
     def test_newhadoop_with_array(self):
         basepath = self.tempdir.name
         # use custom ArrayWritable types and converters to handle arrays


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org