You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/11/06 09:22:23 UTC
git commit: [SPARK-4186] add binaryFiles and binaryRecords in Python
Repository: spark
Updated Branches:
refs/heads/master 5f27ae16d -> b41a39e24
[SPARK-4186] add binaryFiles and binaryRecords in Python
add binaryFiles() and binaryRecords() in Python
```
binaryFiles(self, path, minPartitions=None):
:: Developer API ::
Read a directory of binary files from HDFS, a local file system
(available on all nodes), or any Hadoop-supported file system URI
as a byte array. Each file is read as a single record and returned
in a key-value pair, where the key is the path of each file, the
value is the content of each file.
Note: Small files are preferred, large file is also allowable, but
may cause bad performance.
binaryRecords(self, path, recordLength):
Load data from a flat binary file, assuming each record is a set of numbers
with the specified numerical format (see ByteBuffer), and the number of
bytes per record is constant.
:param path: Directory to the input data files
:param recordLength: The length at which to split the records
```
Author: Davies Liu <da...@databricks.com>
Closes #3078 from davies/binary and squashes the following commits:
cd0bdbd [Davies Liu] Merge branch 'master' of github.com:apache/spark into binary
3aa349b [Davies Liu] add experimental notes
24e84b6 [Davies Liu] Merge branch 'master' of github.com:apache/spark into binary
5ceaa8a [Davies Liu] Merge branch 'master' of github.com:apache/spark into binary
1900085 [Davies Liu] bugfix
bb22442 [Davies Liu] add binaryFiles and binaryRecords in Python
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b41a39e2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b41a39e2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b41a39e2
Branch: refs/heads/master
Commit: b41a39e24038876359aeb7ce2bbbb4de2234e5f3
Parents: 5f27ae1
Author: Davies Liu <da...@databricks.com>
Authored: Thu Nov 6 00:22:19 2014 -0800
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Thu Nov 6 00:22:19 2014 -0800
----------------------------------------------------------------------
.../scala/org/apache/spark/SparkContext.scala | 4 ++
.../spark/api/java/JavaSparkContext.scala | 12 +++---
.../org/apache/spark/api/python/PythonRDD.scala | 45 +++++++++++++-------
python/pyspark/context.py | 32 +++++++++++++-
python/pyspark/tests.py | 19 +++++++++
5 files changed, 90 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/b41a39e2/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 3cdaa6a..03ea672 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -560,6 +560,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
/**
+ * :: Experimental ::
+ *
* Get an RDD for a Hadoop-readable dataset as PortableDataStream for each file
* (useful for binary data)
*
@@ -602,6 +604,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
}
/**
+ * :: Experimental ::
+ *
* Load data from a flat binary file, assuming the length of each record is constant.
*
* @param path Directory to the input data files
http://git-wip-us.apache.org/repos/asf/spark/blob/b41a39e2/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index e3aeba7..5c6e8d3 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -21,11 +21,6 @@ import java.io.Closeable
import java.util
import java.util.{Map => JMap}
-import java.io.DataInputStream
-
-import org.apache.hadoop.io.{BytesWritable, LongWritable}
-import org.apache.spark.input.{PortableDataStream, FixedLengthBinaryInputFormat}
-
import scala.collection.JavaConversions
import scala.collection.JavaConversions._
import scala.language.implicitConversions
@@ -33,6 +28,7 @@ import scala.reflect.ClassTag
import com.google.common.base.Optional
import org.apache.hadoop.conf.Configuration
+import org.apache.spark.input.PortableDataStream
import org.apache.hadoop.mapred.{InputFormat, JobConf}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
@@ -286,6 +282,8 @@ class JavaSparkContext(val sc: SparkContext)
new JavaPairRDD(sc.binaryFiles(path, minPartitions))
/**
+ * :: Experimental ::
+ *
* Read a directory of binary files from HDFS, a local file system (available on all nodes),
* or any Hadoop-supported file system URI as a byte array. Each file is read as a single
* record and returned in a key-value pair, where the key is the path of each file,
@@ -312,15 +310,19 @@ class JavaSparkContext(val sc: SparkContext)
*
* @note Small files are preferred; very large files but may cause bad performance.
*/
+ @Experimental
def binaryFiles(path: String): JavaPairRDD[String, PortableDataStream] =
new JavaPairRDD(sc.binaryFiles(path, defaultMinPartitions))
/**
+ * :: Experimental ::
+ *
* Load data from a flat binary file, assuming the length of each record is constant.
*
* @param path Directory to the input data files
* @return An RDD of data with values, represented as byte arrays
*/
+ @Experimental
def binaryRecords(path: String, recordLength: Int): JavaRDD[Array[Byte]] = {
new JavaRDD(sc.binaryRecords(path, recordLength))
}
http://git-wip-us.apache.org/repos/asf/spark/blob/b41a39e2/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index e94ccdc..45beb8f 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -21,6 +21,8 @@ import java.io._
import java.net._
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}
+import org.apache.spark.input.PortableDataStream
+
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.language.existentials
@@ -395,22 +397,33 @@ private[spark] object PythonRDD extends Logging {
newIter.asInstanceOf[Iterator[String]].foreach { str =>
writeUTF(str, dataOut)
}
- case pair: Tuple2[_, _] =>
- pair._1 match {
- case bytePair: Array[Byte] =>
- newIter.asInstanceOf[Iterator[Tuple2[Array[Byte], Array[Byte]]]].foreach { pair =>
- dataOut.writeInt(pair._1.length)
- dataOut.write(pair._1)
- dataOut.writeInt(pair._2.length)
- dataOut.write(pair._2)
- }
- case stringPair: String =>
- newIter.asInstanceOf[Iterator[Tuple2[String, String]]].foreach { pair =>
- writeUTF(pair._1, dataOut)
- writeUTF(pair._2, dataOut)
- }
- case other =>
- throw new SparkException("Unexpected Tuple2 element type " + pair._1.getClass)
+ case stream: PortableDataStream =>
+ newIter.asInstanceOf[Iterator[PortableDataStream]].foreach { stream =>
+ val bytes = stream.toArray()
+ dataOut.writeInt(bytes.length)
+ dataOut.write(bytes)
+ }
+ case (key: String, stream: PortableDataStream) =>
+ newIter.asInstanceOf[Iterator[(String, PortableDataStream)]].foreach {
+ case (key, stream) =>
+ writeUTF(key, dataOut)
+ val bytes = stream.toArray()
+ dataOut.writeInt(bytes.length)
+ dataOut.write(bytes)
+ }
+ case (key: String, value: String) =>
+ newIter.asInstanceOf[Iterator[(String, String)]].foreach {
+ case (key, value) =>
+ writeUTF(key, dataOut)
+ writeUTF(value, dataOut)
+ }
+ case (key: Array[Byte], value: Array[Byte]) =>
+ newIter.asInstanceOf[Iterator[(Array[Byte], Array[Byte])]].foreach {
+ case (key, value) =>
+ dataOut.writeInt(key.length)
+ dataOut.write(key)
+ dataOut.writeInt(value.length)
+ dataOut.write(value)
}
case other =>
throw new SparkException("Unexpected element type " + first.getClass)
http://git-wip-us.apache.org/repos/asf/spark/blob/b41a39e2/python/pyspark/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index a0e4821..faa5952 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -29,7 +29,7 @@ from pyspark.conf import SparkConf
from pyspark.files import SparkFiles
from pyspark.java_gateway import launch_gateway
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
- PairDeserializer, CompressedSerializer, AutoBatchedSerializer
+ PairDeserializer, CompressedSerializer, AutoBatchedSerializer, NoOpSerializer
from pyspark.storagelevel import StorageLevel
from pyspark.rdd import RDD
from pyspark.traceback_utils import CallSite, first_spark_call
@@ -388,6 +388,36 @@ class SparkContext(object):
return RDD(self._jsc.wholeTextFiles(path, minPartitions), self,
PairDeserializer(UTF8Deserializer(use_unicode), UTF8Deserializer(use_unicode)))
+ def binaryFiles(self, path, minPartitions=None):
+ """
+ :: Experimental ::
+
+ Read a directory of binary files from HDFS, a local file system
+ (available on all nodes), or any Hadoop-supported file system URI
+ as a byte array. Each file is read as a single record and returned
+ in a key-value pair, where the key is the path of each file, the
+ value is the content of each file.
+
+ Note: Small files are preferred, large file is also allowable, but
+ may cause bad performance.
+ """
+ minPartitions = minPartitions or self.defaultMinPartitions
+ return RDD(self._jsc.binaryFiles(path, minPartitions), self,
+ PairDeserializer(UTF8Deserializer(), NoOpSerializer()))
+
+ def binaryRecords(self, path, recordLength):
+ """
+ :: Experimental ::
+
+ Load data from a flat binary file, assuming each record is a set of numbers
+ with the specified numerical format (see ByteBuffer), and the number of
+ bytes per record is constant.
+
+ :param path: Directory to the input data files
+ :param recordLength: The length at which to split the records
+ """
+ return RDD(self._jsc.binaryRecords(path, recordLength), self, NoOpSerializer())
+
def _dictToJavaMap(self, d):
jm = self._jvm.java.util.HashMap()
if not d:
http://git-wip-us.apache.org/repos/asf/spark/blob/b41a39e2/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 7e61b01..9f625c5 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -1110,6 +1110,25 @@ class InputFormatTests(ReusedPySparkTestCase):
(u'\x03', [2.0])]
self.assertEqual(maps, em)
+ def test_binary_files(self):
+ path = os.path.join(self.tempdir.name, "binaryfiles")
+ os.mkdir(path)
+ data = "short binary data"
+ with open(os.path.join(path, "part-0000"), 'w') as f:
+ f.write(data)
+ [(p, d)] = self.sc.binaryFiles(path).collect()
+ self.assertTrue(p.endswith("part-0000"))
+ self.assertEqual(d, data)
+
+ def test_binary_records(self):
+ path = os.path.join(self.tempdir.name, "binaryrecords")
+ os.mkdir(path)
+ with open(os.path.join(path, "part-0000"), 'w') as f:
+ for i in range(100):
+ f.write('%04d' % i)
+ result = self.sc.binaryRecords(path, 4).map(int).collect()
+ self.assertEqual(range(100), result)
+
class OutputFormatTests(ReusedPySparkTestCase):
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org