You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/11/06 09:22:23 UTC

git commit: [SPARK-4186] add binaryFiles and binaryRecords in Python

Repository: spark
Updated Branches:
  refs/heads/master 5f27ae16d -> b41a39e24


[SPARK-4186] add binaryFiles and binaryRecords in Python

add binaryFiles() and binaryRecords() in Python
```
binaryFiles(self, path, minPartitions=None):
    :: Developer API ::

    Read a directory of binary files from HDFS, a local file system
    (available on all nodes), or any Hadoop-supported file system URI
    as a byte array. Each file is read as a single record and returned
    in a key-value pair, where the key is the path of each file, the
    value is the content of each file.

    Note: Small files are preferred, large file is also allowable, but
    may cause bad performance.

binaryRecords(self, path, recordLength):
    Load data from a flat binary file, assuming each record is a set of numbers
    with the specified numerical format (see ByteBuffer), and the number of
    bytes per record is constant.

    :param path: Directory to the input data files
    :param recordLength: The length at which to split the records
```

Author: Davies Liu <da...@databricks.com>

Closes #3078 from davies/binary and squashes the following commits:

cd0bdbd [Davies Liu] Merge branch 'master' of github.com:apache/spark into binary
3aa349b [Davies Liu] add experimental notes
24e84b6 [Davies Liu] Merge branch 'master' of github.com:apache/spark into binary
5ceaa8a [Davies Liu] Merge branch 'master' of github.com:apache/spark into binary
1900085 [Davies Liu] bugfix
bb22442 [Davies Liu] add binaryFiles and binaryRecords in Python


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b41a39e2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b41a39e2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b41a39e2

Branch: refs/heads/master
Commit: b41a39e24038876359aeb7ce2bbbb4de2234e5f3
Parents: 5f27ae1
Author: Davies Liu <da...@databricks.com>
Authored: Thu Nov 6 00:22:19 2014 -0800
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Thu Nov 6 00:22:19 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  4 ++
 .../spark/api/java/JavaSparkContext.scala       | 12 +++---
 .../org/apache/spark/api/python/PythonRDD.scala | 45 +++++++++++++-------
 python/pyspark/context.py                       | 32 +++++++++++++-
 python/pyspark/tests.py                         | 19 +++++++++
 5 files changed, 90 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b41a39e2/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 3cdaa6a..03ea672 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -560,6 +560,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
 
 
   /**
+   * :: Experimental ::
+   *
    * Get an RDD for a Hadoop-readable dataset as PortableDataStream for each file
    * (useful for binary data)
    *
@@ -602,6 +604,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
   }
 
   /**
+   * :: Experimental ::
+   *
    * Load data from a flat binary file, assuming the length of each record is constant.
    *
    * @param path Directory to the input data files

http://git-wip-us.apache.org/repos/asf/spark/blob/b41a39e2/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index e3aeba7..5c6e8d3 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -21,11 +21,6 @@ import java.io.Closeable
 import java.util
 import java.util.{Map => JMap}
 
-import java.io.DataInputStream
-
-import org.apache.hadoop.io.{BytesWritable, LongWritable}
-import org.apache.spark.input.{PortableDataStream, FixedLengthBinaryInputFormat}
-
 import scala.collection.JavaConversions
 import scala.collection.JavaConversions._
 import scala.language.implicitConversions
@@ -33,6 +28,7 @@ import scala.reflect.ClassTag
 
 import com.google.common.base.Optional
 import org.apache.hadoop.conf.Configuration
+import org.apache.spark.input.PortableDataStream
 import org.apache.hadoop.mapred.{InputFormat, JobConf}
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
 
@@ -286,6 +282,8 @@ class JavaSparkContext(val sc: SparkContext)
     new JavaPairRDD(sc.binaryFiles(path, minPartitions))
 
   /**
+   * :: Experimental ::
+   *
    * Read a directory of binary files from HDFS, a local file system (available on all nodes),
    * or any Hadoop-supported file system URI as a byte array. Each file is read as a single
    * record and returned in a key-value pair, where the key is the path of each file,
@@ -312,15 +310,19 @@ class JavaSparkContext(val sc: SparkContext)
    *
    * @note Small files are preferred; very large files but may cause bad performance.
    */
+  @Experimental
   def binaryFiles(path: String): JavaPairRDD[String, PortableDataStream] =
     new JavaPairRDD(sc.binaryFiles(path, defaultMinPartitions))
 
   /**
+   * :: Experimental ::
+   *
    * Load data from a flat binary file, assuming the length of each record is constant.
    *
    * @param path Directory to the input data files
    * @return An RDD of data with values, represented as byte arrays
    */
+  @Experimental
   def binaryRecords(path: String, recordLength: Int): JavaRDD[Array[Byte]] = {
     new JavaRDD(sc.binaryRecords(path, recordLength))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/b41a39e2/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index e94ccdc..45beb8f 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -21,6 +21,8 @@ import java.io._
 import java.net._
 import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}
 
+import org.apache.spark.input.PortableDataStream
+
 import scala.collection.JavaConversions._
 import scala.collection.mutable
 import scala.language.existentials
@@ -395,22 +397,33 @@ private[spark] object PythonRDD extends Logging {
           newIter.asInstanceOf[Iterator[String]].foreach { str =>
             writeUTF(str, dataOut)
           }
-        case pair: Tuple2[_, _] =>
-          pair._1 match {
-            case bytePair: Array[Byte] =>
-              newIter.asInstanceOf[Iterator[Tuple2[Array[Byte], Array[Byte]]]].foreach { pair =>
-                dataOut.writeInt(pair._1.length)
-                dataOut.write(pair._1)
-                dataOut.writeInt(pair._2.length)
-                dataOut.write(pair._2)
-              }
-            case stringPair: String =>
-              newIter.asInstanceOf[Iterator[Tuple2[String, String]]].foreach { pair =>
-                writeUTF(pair._1, dataOut)
-                writeUTF(pair._2, dataOut)
-              }
-            case other =>
-              throw new SparkException("Unexpected Tuple2 element type " + pair._1.getClass)
+        case stream: PortableDataStream =>
+          newIter.asInstanceOf[Iterator[PortableDataStream]].foreach { stream =>
+            val bytes = stream.toArray()
+            dataOut.writeInt(bytes.length)
+            dataOut.write(bytes)
+          }
+        case (key: String, stream: PortableDataStream) =>
+          newIter.asInstanceOf[Iterator[(String, PortableDataStream)]].foreach {
+            case (key, stream) =>
+              writeUTF(key, dataOut)
+              val bytes = stream.toArray()
+              dataOut.writeInt(bytes.length)
+              dataOut.write(bytes)
+          }
+        case (key: String, value: String) =>
+          newIter.asInstanceOf[Iterator[(String, String)]].foreach {
+            case (key, value) =>
+              writeUTF(key, dataOut)
+              writeUTF(value, dataOut)
+          }
+        case (key: Array[Byte], value: Array[Byte]) =>
+          newIter.asInstanceOf[Iterator[(Array[Byte], Array[Byte])]].foreach {
+            case (key, value) =>
+              dataOut.writeInt(key.length)
+              dataOut.write(key)
+              dataOut.writeInt(value.length)
+              dataOut.write(value)
           }
         case other =>
           throw new SparkException("Unexpected element type " + first.getClass)

http://git-wip-us.apache.org/repos/asf/spark/blob/b41a39e2/python/pyspark/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index a0e4821..faa5952 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -29,7 +29,7 @@ from pyspark.conf import SparkConf
 from pyspark.files import SparkFiles
 from pyspark.java_gateway import launch_gateway
 from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
-    PairDeserializer, CompressedSerializer, AutoBatchedSerializer
+    PairDeserializer, CompressedSerializer, AutoBatchedSerializer, NoOpSerializer
 from pyspark.storagelevel import StorageLevel
 from pyspark.rdd import RDD
 from pyspark.traceback_utils import CallSite, first_spark_call
@@ -388,6 +388,36 @@ class SparkContext(object):
         return RDD(self._jsc.wholeTextFiles(path, minPartitions), self,
                    PairDeserializer(UTF8Deserializer(use_unicode), UTF8Deserializer(use_unicode)))
 
+    def binaryFiles(self, path, minPartitions=None):
+        """
+        :: Experimental ::
+
+        Read a directory of binary files from HDFS, a local file system
+        (available on all nodes), or any Hadoop-supported file system URI
+        as a byte array. Each file is read as a single record and returned
+        in a key-value pair, where the key is the path of each file, the
+        value is the content of each file.
+
+        Note: Small files are preferred, large file is also allowable, but
+        may cause bad performance.
+        """
+        minPartitions = minPartitions or self.defaultMinPartitions
+        return RDD(self._jsc.binaryFiles(path, minPartitions), self,
+                   PairDeserializer(UTF8Deserializer(), NoOpSerializer()))
+
+    def binaryRecords(self, path, recordLength):
+        """
+        :: Experimental ::
+
+        Load data from a flat binary file, assuming each record is a set of numbers
+        with the specified numerical format (see ByteBuffer), and the number of
+        bytes per record is constant.
+
+        :param path: Directory to the input data files
+        :param recordLength: The length at which to split the records
+        """
+        return RDD(self._jsc.binaryRecords(path, recordLength), self, NoOpSerializer())
+
     def _dictToJavaMap(self, d):
         jm = self._jvm.java.util.HashMap()
         if not d:

http://git-wip-us.apache.org/repos/asf/spark/blob/b41a39e2/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 7e61b01..9f625c5 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -1110,6 +1110,25 @@ class InputFormatTests(ReusedPySparkTestCase):
               (u'\x03', [2.0])]
         self.assertEqual(maps, em)
 
+    def test_binary_files(self):
+        path = os.path.join(self.tempdir.name, "binaryfiles")
+        os.mkdir(path)
+        data = "short binary data"
+        with open(os.path.join(path, "part-0000"), 'w') as f:
+            f.write(data)
+        [(p, d)] = self.sc.binaryFiles(path).collect()
+        self.assertTrue(p.endswith("part-0000"))
+        self.assertEqual(d, data)
+
+    def test_binary_records(self):
+        path = os.path.join(self.tempdir.name, "binaryrecords")
+        os.mkdir(path)
+        with open(os.path.join(path, "part-0000"), 'w') as f:
+            for i in range(100):
+                f.write('%04d' % i)
+        result = self.sc.binaryRecords(path, 4).map(int).collect()
+        self.assertEqual(range(100), result)
+
 
 class OutputFormatTests(ReusedPySparkTestCase):
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org