You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/09/28 06:48:11 UTC

git commit: [SPARK-3389] Add Converter for ease of Parquet reading in PySpark

Repository: spark
Updated Branches:
  refs/heads/master 5b922bb45 -> 248232936


[SPARK-3389] Add Converter for ease of Parquet reading in PySpark

https://issues.apache.org/jira/browse/SPARK-3389

Author: Uri Laserson <la...@cloudera.com>

Closes #2256 from laserson/SPARK-3389 and squashes the following commits:

0ed363e [Uri Laserson] PEP8'd the python file
0b4b380 [Uri Laserson] Moved converter to examples and added python example
eecf4dc [Uri Laserson] [SPARK-3389] Add Converter for ease of Parquet reading in PySpark


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/24823293
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/24823293
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/24823293

Branch: refs/heads/master
Commit: 248232936e1bead7f102e59eb8faf3126c582d9d
Parents: 5b922bb
Author: Uri Laserson <la...@cloudera.com>
Authored: Sat Sep 27 21:48:05 2014 -0700
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Sat Sep 27 21:48:05 2014 -0700

----------------------------------------------------------------------
 examples/src/main/python/parquet_inputformat.py |  59 ++++++++++++++
 examples/src/main/resources/full_user.avsc      |   1 +
 examples/src/main/resources/users.parquet       | Bin 0 -> 615 bytes
 .../pythonconverters/AvroConverters.scala       |  76 +++++++++++--------
 4 files changed, 106 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/24823293/examples/src/main/python/parquet_inputformat.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/parquet_inputformat.py b/examples/src/main/python/parquet_inputformat.py
new file mode 100644
index 0000000..c9b08f8
--- /dev/null
+++ b/examples/src/main/python/parquet_inputformat.py
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+
+from pyspark import SparkContext
+
+"""
+Read data file users.parquet in local Spark distro:
+
+$ cd $SPARK_HOME
+$ export AVRO_PARQUET_JARS=/path/to/parquet-avro-1.5.0.jar
+$ ./bin/spark-submit --driver-class-path /path/to/example/jar \\
+        --jars $AVRO_PARQUET_JARS \\
+        ./examples/src/main/python/parquet_inputformat.py \\
+        examples/src/main/resources/users.parquet
+<...lots of log output...>
+{u'favorite_color': None, u'name': u'Alyssa', u'favorite_numbers': [3, 9, 15, 20]}
+{u'favorite_color': u'red', u'name': u'Ben', u'favorite_numbers': []}
+<...more log output...>
+"""
+if __name__ == "__main__":
+    if len(sys.argv) != 2:
+        print >> sys.stderr, """
+        Usage: parquet_inputformat.py <data_file>
+
+        Run with example jar:
+        ./bin/spark-submit --driver-class-path /path/to/example/jar \\
+                /path/to/examples/parquet_inputformat.py <data_file>
+        Assumes you have Parquet data stored in <data_file>.
+        """
+        exit(-1)
+
+    path = sys.argv[1]
+    sc = SparkContext(appName="ParquetInputFormat")
+
+    parquet_rdd = sc.newAPIHadoopFile(
+        path,
+        'parquet.avro.AvroParquetInputFormat',
+        'java.lang.Void',
+        'org.apache.avro.generic.IndexedRecord',
+        valueConverter='org.apache.spark.examples.pythonconverters.IndexedRecordToJavaConverter')
+    output = parquet_rdd.map(lambda x: x[1]).collect()
+    for k in output:
+        print k

http://git-wip-us.apache.org/repos/asf/spark/blob/24823293/examples/src/main/resources/full_user.avsc
----------------------------------------------------------------------
diff --git a/examples/src/main/resources/full_user.avsc b/examples/src/main/resources/full_user.avsc
new file mode 100644
index 0000000..04e7ba2
--- /dev/null
+++ b/examples/src/main/resources/full_user.avsc
@@ -0,0 +1 @@
+{"type": "record", "namespace": "example.avro", "name": "User", "fields": [{"type": "string", "name": "name"}, {"type": ["string", "null"], "name": "favorite_color"}, {"type": {"items": "int", "type": "array"}, "name": "favorite_numbers"}]}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/24823293/examples/src/main/resources/users.parquet
----------------------------------------------------------------------
diff --git a/examples/src/main/resources/users.parquet b/examples/src/main/resources/users.parquet
new file mode 100644
index 0000000..aa52733
Binary files /dev/null and b/examples/src/main/resources/users.parquet differ

http://git-wip-us.apache.org/repos/asf/spark/blob/24823293/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala
index 1b25983..a11890d 100644
--- a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala
@@ -30,21 +30,28 @@ import org.apache.spark.api.python.Converter
 import org.apache.spark.SparkException
 
 
-/**
- * Implementation of [[org.apache.spark.api.python.Converter]] that converts
- * an Avro Record wrapped in an AvroKey (or AvroValue) to a Java Map. It tries
- * to work with all 3 Avro data mappings (Generic, Specific and Reflect).
- */
-class AvroWrapperToJavaConverter extends Converter[Any, Any] {
-  override def convert(obj: Any): Any = {
+object AvroConversionUtil extends Serializable {
+  def fromAvro(obj: Any, schema: Schema): Any = {
     if (obj == null) {
       return null
     }
-    obj.asInstanceOf[AvroWrapper[_]].datum() match {
-      case null => null
-      case record: IndexedRecord => unpackRecord(record)
-      case other => throw new SparkException(
-        s"Unsupported top-level Avro data type ${other.getClass.getName}")
+    schema.getType match {
+      case UNION   => unpackUnion(obj, schema)
+      case ARRAY   => unpackArray(obj, schema)
+      case FIXED   => unpackFixed(obj, schema)
+      case MAP     => unpackMap(obj, schema)
+      case BYTES   => unpackBytes(obj)
+      case RECORD  => unpackRecord(obj)
+      case STRING  => obj.toString
+      case ENUM    => obj.toString
+      case NULL    => obj
+      case BOOLEAN => obj
+      case DOUBLE  => obj
+      case FLOAT   => obj
+      case INT     => obj
+      case LONG    => obj
+      case other   => throw new SparkException(
+        s"Unknown Avro schema type ${other.getName}")
     }
   }
 
@@ -103,28 +110,37 @@ class AvroWrapperToJavaConverter extends Converter[Any, Any] {
         "Unions may only consist of a concrete type and null")
     }
   }
+}
 
-  def fromAvro(obj: Any, schema: Schema): Any = {
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts
+ * an Avro IndexedRecord (e.g., derived from AvroParquetInputFormat) to a Java Map.
+ */
+class IndexedRecordToJavaConverter extends Converter[IndexedRecord, JMap[String, Any]]{
+  override def convert(record: IndexedRecord): JMap[String, Any] = {
+    if (record == null) {
+      return null
+    }
+    val map = new java.util.HashMap[String, Any]
+    AvroConversionUtil.unpackRecord(record)
+  }
+}
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts
+ * an Avro Record wrapped in an AvroKey (or AvroValue) to a Java Map. It tries
+ * to work with all 3 Avro data mappings (Generic, Specific and Reflect).
+ */
+class AvroWrapperToJavaConverter extends Converter[Any, Any] {
+  override def convert(obj: Any): Any = {
     if (obj == null) {
       return null
     }
-    schema.getType match {
-      case UNION   => unpackUnion(obj, schema)
-      case ARRAY   => unpackArray(obj, schema)
-      case FIXED   => unpackFixed(obj, schema)
-      case MAP     => unpackMap(obj, schema)
-      case BYTES   => unpackBytes(obj)
-      case RECORD  => unpackRecord(obj)
-      case STRING  => obj.toString
-      case ENUM    => obj.toString
-      case NULL    => obj
-      case BOOLEAN => obj
-      case DOUBLE  => obj
-      case FLOAT   => obj
-      case INT     => obj
-      case LONG    => obj
-      case other   => throw new SparkException(
-        s"Unknown Avro schema type ${other.getName}")
+    obj.asInstanceOf[AvroWrapper[_]].datum() match {
+      case null => null
+      case record: IndexedRecord => AvroConversionUtil.unpackRecord(record)
+      case other => throw new SparkException(
+        s"Unsupported top-level Avro data type ${other.getClass.getName}")
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org