You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Nick Pentreath <ni...@gmail.com> on 2013/10/25 08:18:03 UTC

[PySpark]: reading arbitrary Hadoop InputFormats

Hi Spark Devs

I was wondering what appetite there may be to add the ability for PySpark
users to create RDDs from (somewhat) arbitrary Hadoop InputFormats.

In my data pipeline for example, I'm currently just using Scala (partly
because I love it but also because I am heavily reliant on quite custom
Hadoop InputFormats for reading data). However, many users may prefer to
use PySpark as much as possible (if not for everything). Reasons might
include the need to use some Python library. While I don't do it yet, I can
certainly see an attractive use case for using say scikit-learn / numpy to
do data analysis & machine learning in Python. Added to this my cofounder
knows Python well but not Scala so it can be very beneficial to do a lot of
stuff in Python.

For text-based data this is fine, but reading data in from more complex
Hadoop formats is an issue.

The current approach would of course be to write an ETL-style Java/Scala
job and then process in Python. Nothing wrong with this, but I was thinking
about ways to allow Python to access arbitrary Hadoop InputFormats.

Here is a quick proof of concept: https://gist.github.com/MLnick/7150058

This works for simple stuff like SequenceFile with simple Writable
key/values.

To work with more complex files, perhaps an approach is to manipulate
Hadoop JobConf via Python and pass that in. The one downside is of course
that the InputFormat (well actually the Key/Value classes) must have a
toString that makes sense so very custom stuff might not work.

I wonder if it would be possible to take the objects that are yielded via
the InputFormat and convert them into some representation like ProtoBuf,
MsgPack, Avro, JSON, that can be read relatively more easily from Python?

Another approach could be to allow a simple "wrapper API" such that one can
write a wrapper function T => String and pass that into an
InputFormatWrapper that takes an arbitrary InputFormat and yields Strings
for the keys and values. Then all that is required is to compile that
function and add it to the SPARK_CLASSPATH and away you go!

Thoughts?

Nick

Re: [PySpark]: reading arbitrary Hadoop InputFormats

Posted by Nick Pentreath <ni...@gmail.com>.
Ok - I'll work something up and reopen a PR against the new spark mirror.


The API itself mirrors the newHadoopFile etc methods, so that should be quite stable once finalised.




It's the "wrapper" stuff of how to serialize custom classes and read them in Python that is the potential tricky part.



—
Sent from Mailbox for iPhone

On Wed, Mar 19, 2014 at 8:55 PM, Matei Zaharia <ma...@gmail.com>
wrote:

> Hey Nick, no worries if this can’t be done in time. It’s probably better to test it thoroughly. If you do have something partially working though, the main concern will be the API, i.e. whether it’s an API we want to support indefinitely. It would be bad to add this and then make major changes to what it returns. But if we’re comfortable with the API, we can mark it as experimental and include it. It might be better to support fewer data types at first and then add some just to keep the API small.
> Matei
> On Mar 18, 2014, at 11:44 PM, Nick Pentreath <ni...@gmail.com> wrote:
>> Hi Matei
>> 
>> 
>> I'm afraid I haven't had enough time to focus on this as work has just been crazy. It's still something I want to get to a mergeable status. 
>> 
>> 
>> 
>> 
>> Actually it was working fine it was just a bit rough and needs to be updated to HEAD.
>> 
>> 
>> 
>> 
>> I'll absolutely try my utmost to get something ready to merge before the window for 1.0 closes. Perhaps we can put it in there (once I've updated and cleaned up) as a more experimental feature? What is the view on having such more untested (as in production) stuff in 1.0?
>> —
>> Sent from Mailbox for iPhone
>> 
>> On Wed, Mar 19, 2014 at 12:15 AM, Matei Zaharia <ma...@gmail.com>
>> wrote:
>> 
>>> Hey Nick, I’m curious, have you been doing any further development on this? It would be good to get expanded InputFormat support in Spark 1.0. To start with we don’t have to do SequenceFiles in particular, we can do stuff like Avro (if it’s easy to read in Python) or some kind of WholeFileInputFormat.
>>> Matei
>>> On Dec 19, 2013, at 10:57 AM, Nick Pentreath <ni...@gmail.com> wrote:
>>>> Hi
>>>> 
>>>> 
>>>> I managed to find the time to put together a PR on this: https://github.com/apache/incubator-spark/pull/263
>>>> 
>>>> 
>>>> 
>>>> 
>>>> Josh has had a look over it - if anyone else with an interest could give some feedback that would be great.
>>>> 
>>>> 
>>>> 
>>>> 
>>>> As mentioned in the PR it's more of an RFC and certainly still needs a bit of clean up work, and I need to add the concept of "wrapper functions" to deserialize classes that MsgPack can't handle out the box.
>>>> 
>>>> 
>>>> 
>>>> 
>>>> N
>>>> —
>>>> Sent from Mailbox for iPhone
>>>> 
>>>> On Fri, Nov 8, 2013 at 12:20 PM, Nick Pentreath <ni...@gmail.com>
>>>> wrote:
>>>> 
>>>>> Wow Josh, that looks great. I've been a bit swamped this week but as soon
>>>>> as I get a chance I'll test out the PR in more detail and port over the
>>>>> InputFormat stuff to use the new framework (including the changes you
>>>>> suggested).
>>>>> I can then look deeper into the MsgPack functionality to see if it can be
>>>>> made to work in a generic enough manner without requiring huge amounts of
>>>>> custom Templates to be written by users.
>>>>> Will feed back asap.
>>>>> N
>>>>> On Thu, Nov 7, 2013 at 5:03 AM, Josh Rosen <ro...@gmail.com> wrote:
>>>>>> I opened a pull request to add custom serializer support to PySpark:
>>>>>> https://github.com/apache/incubator-spark/pull/146
>>>>>> 
>>>>>> My pull request adds the plumbing for transferring data from Java to Python
>>>>>> using formats other than Pickle.  For example, look at how textFile() uses
>>>>>> MUTF8Deserializer to read strings from Java.  Hopefully this provides all
>>>>>> of the functionality needed to support MsgPack.
>>>>>> 
>>>>>> - Josh
>>>>>> 
>>>>>> 
>>>>>> On Thu, Oct 31, 2013 at 11:11 AM, Josh Rosen <ro...@gmail.com> wrote:
>>>>>> 
>>>>>>> Hi Nick,
>>>>>>> 
>>>>>>> This is a nice start.  I'd prefer to keep the Java sequenceFileAsText()
>>>>>>> and newHadoopFileAsText() methods inside PythonRDD instead of adding them
>>>>>>> to JavaSparkContext, since I think these methods are unlikely to be used
>>>>>>> directly by Java users (you can add these methods to the PythonRDD
>>>>>>> companion object, which is how readRDDFromPickleFile is implemented:
>>>>>>> 
>>>>>> https://github.com/apache/incubator-spark/blob/branch-0.8/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L255
>>>>>>> )
>>>>>>> 
>>>>>>> For MsgPack, the UnpicklingError is because the Python worker expects to
>>>>>>> receive its input in a pickled format.  In my prototype of custom
>>>>>>> serializers, I modified the PySpark worker to receive its
>>>>>>> serialization/deserialization function as input (
>>>>>>> 
>>>>>> https://github.com/JoshRosen/spark/blob/59b6b43916dc84fc8b83f22eb9ce13a27bc51ec0/python/pyspark/worker.py#L41
>>>>>> )
>>>>>>> and added logic to pass the appropriate serializers based on each stage's
>>>>>>> input and output formats (
>>>>>>> 
>>>>>> https://github.com/JoshRosen/spark/blob/59b6b43916dc84fc8b83f22eb9ce13a27bc51ec0/python/pyspark/rdd.py#L42
>>>>>>> ).
>>>>>>> 
>>>>>>> At some point, I'd like to port my custom serializers code to PySpark; if
>>>>>>> anyone's interested in helping, I'd be glad to write up some additional
>>>>>>> notes on how this should work.
>>>>>>> 
>>>>>>> - Josh
>>>>>>> 
>>>>>>> On Wed, Oct 30, 2013 at 2:25 PM, Nick Pentreath <
>>>>>> nick.pentreath@gmail.com>wrote:
>>>>>>> 
>>>>>>>> Thanks Josh, Patrick for the feedback.
>>>>>>>> 
>>>>>>>> Based on Josh's pointers I have something working for JavaPairRDD ->
>>>>>>>> PySpark RDD[(String, String)]. This just calls the toString method on
>>>>>> each
>>>>>>>> key and value as before, but without the need for a delimiter. For
>>>>>>>> SequenceFile, it uses SequenceFileAsTextInputFormat which itself calls
>>>>>>>> toString to convert to Text for keys and values. We then call toString
>>>>>>>> (again) ourselves to get Strings to feed to writeAsPickle.
>>>>>>>> 
>>>>>>>> Details here: https://gist.github.com/MLnick/7230588
>>>>>>>> 
>>>>>>>> This also illustrates where the "wrapper function" api would fit in. All
>>>>>>>> that is required is to define a T => String for key and value.
>>>>>>>> 
>>>>>>>> I started playing around with MsgPack and can sort of get things to work
>>>>>>>> in
>>>>>>>> Scala, but am struggling with getting the raw bytes to be written
>>>>>> properly
>>>>>>>> in PythonRDD (I think it is treating them as pickled byte arrays when
>>>>>> they
>>>>>>>> are not, but when I removed the 'stripPickle' calls and amended the
>>>>>> length
>>>>>>>> (-6) I got "UnpicklingError: invalid load key, ' '. ").
>>>>>>>> 
>>>>>>>> Another issue is that MsgPack does well at writing "structures" - like
>>>>>>>> Java
>>>>>>>> classes with public fields that are fairly simple - but for example the
>>>>>>>> Writables have private fields so you end up with nothing being written.
>>>>>>>> This looks like it would require custom "Templates" (serialization
>>>>>>>> functions effectively) for many classes, which means a lot of custom
>>>>>> code
>>>>>>>> for a user to write to use it. Fortunately for most of the common
>>>>>>>> Writables
>>>>>>>> a toString does the job. Will keep looking into it though.
>>>>>>>> 
>>>>>>>> Anyway, Josh if you have ideas or examples on the "Wrapper API from
>>>>>>>> Python"
>>>>>>>> that you mentioned, I'd be interested to hear them.
>>>>>>>> 
>>>>>>>> If you think this is worth working up as a Pull Request covering
>>>>>>>> SequenceFiles and custom InputFormats with default toString conversions
>>>>>>>> and
>>>>>>>> the ability to specify Wrapper functions, I can clean things up more,
>>>>>> add
>>>>>>>> some functionality and tests, and also test to see if common things like
>>>>>>>> the "normal" Writables and reading from things like HBase and Cassandra
>>>>>>>> can
>>>>>>>> be made to work nicely (any other common use cases that you think make
>>>>>>>> sense?).
>>>>>>>> 
>>>>>>>> Thoughts, comments etc welcome.
>>>>>>>> 
>>>>>>>> Nick
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Fri, Oct 25, 2013 at 11:03 PM, Patrick Wendell <pwendell@gmail.com
>>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> As a starting point, a version where people just write their own
>>>>>>>> "wrapper"
>>>>>>>>> functions to convert various HadoopFiles into String <K, V> files
>>>>>> could
>>>>>>>> go
>>>>>>>>> a long way. We could even have a few built-in versions, such as
>>>>>> dealing
>>>>>>>>> with Sequence files that are <String, String>. Basically, the user
>>>>>>>> needs to
>>>>>>>>> write a translator in Java/Scala that produces textual records from
>>>>>>>>> whatever format that want. Then, they make sure this is included in
>>>>>> the
>>>>>>>>> classpath when running PySpark.
>>>>>>>>> 
>>>>>>>>> As Josh is saying, I'm pretty sure this is already possible, but we
>>>>>> may
>>>>>>>>> want to document it for users. In many organizations they might have
>>>>>> 1-2
>>>>>>>>> people who can write the Java/Scala to do this but then many more
>>>>>> people
>>>>>>>>> who are comfortable using python once it's setup.
>>>>>>>>> 
>>>>>>>>> - Patrick
>>>>>>>>> 
>>>>>>>>> On Fri, Oct 25, 2013 at 11:00 AM, Josh Rosen <ro...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Hi Nick,
>>>>>>>>>> 
>>>>>>>>>> I've seen several requests for SequenceFile support in PySpark, so
>>>>>>>>> there's
>>>>>>>>>> definitely demand for this feature.
>>>>>>>>>> 
>>>>>>>>>> I like the idea of passing MsgPack'ed data (or some other structured
>>>>>>>>>> format) from Java to the Python workers.  My early prototype of
>>>>>> custom
>>>>>>>>>> serializers (described at
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>> https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals#PySparkInternals-customserializers
>>>>>>>>>> )
>>>>>>>>>> might be useful for implementing this.  Proper custom serializer
>>>>>>>> support
>>>>>>>>>> would handle the bookkeeping for tracking each stage's input and
>>>>>>>> output
>>>>>>>>>> formats and supplying the appropriate deserialization functions to
>>>>>> the
>>>>>>>>>> Python worker, so the Python worker would be able to directly read
>>>>>> the
>>>>>>>>>> MsgPack'd data that's sent to it.
>>>>>>>>>> 
>>>>>>>>>> Regarding a wrapper API, it's actually possible to initially
>>>>>> transform
>>>>>>>>> data
>>>>>>>>>> using Scala/Java and perform the remainder of the processing in
>>>>>>>> PySpark.
>>>>>>>>>> This involves adding the appropriate compiled to the Java classpath
>>>>>>>> and
>>>>>>>>> a
>>>>>>>>>> bit of work in Py4J to create the Java/Scala RDD and wrap it for use
>>>>>>>> by
>>>>>>>>>> PySpark.  I can hack together a rough example of this if anyone's
>>>>>>>>>> interested, but it would need some work to be developed into a
>>>>>>>>>> user-friendly API.
>>>>>>>>>> 
>>>>>>>>>> If you wanted to extend your proof-of-concept to handle the cases
>>>>>>>> where
>>>>>>>>>> keys and values have parseable toString() values, I think you could
>>>>>>>>> remove
>>>>>>>>>> the need for a delimiter by creating a PythonRDD from the
>>>>>>>> newHadoopFile
>>>>>>>>>> JavaPairRDD and adding a new method to writeAsPickle (
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L224
>>>>>>>>>> )
>>>>>>>>>> to dump its contents as a pickled pair of strings.  (Aside: most of
>>>>>>>>>> writeAsPickle() would probably need be eliminated or refactored when
>>>>>>>>> adding
>>>>>>>>>> general custom serializer support).
>>>>>>>>>> 
>>>>>>>>>> - Josh
>>>>>>>>>> 
>>>>>>>>>> On Thu, Oct 24, 2013 at 11:18 PM, Nick Pentreath
>>>>>>>>>> <ni...@gmail.com>wrote:
>>>>>>>>>> 
>>>>>>>>>>> Hi Spark Devs
>>>>>>>>>>> 
>>>>>>>>>>> I was wondering what appetite there may be to add the ability for
>>>>>>>>> PySpark
>>>>>>>>>>> users to create RDDs from (somewhat) arbitrary Hadoop
>>>>>> InputFormats.
>>>>>>>>>>> 
>>>>>>>>>>> In my data pipeline for example, I'm currently just using Scala
>>>>>>>> (partly
>>>>>>>>>>> because I love it but also because I am heavily reliant on quite
>>>>>>>> custom
>>>>>>>>>>> Hadoop InputFormats for reading data). However, many users may
>>>>>>>> prefer
>>>>>>>>> to
>>>>>>>>>>> use PySpark as much as possible (if not for everything). Reasons
>>>>>>>> might
>>>>>>>>>>> include the need to use some Python library. While I don't do it
>>>>>>>> yet, I
>>>>>>>>>> can
>>>>>>>>>>> certainly see an attractive use case for using say scikit-learn /
>>>>>>>> numpy
>>>>>>>>>> to
>>>>>>>>>>> do data analysis & machine learning in Python. Added to this my
>>>>>>>>> cofounder
>>>>>>>>>>> knows Python well but not Scala so it can be very beneficial to
>>>>>> do a
>>>>>>>>> lot
>>>>>>>>>> of
>>>>>>>>>>> stuff in Python.
>>>>>>>>>>> 
>>>>>>>>>>> For text-based data this is fine, but reading data in from more
>>>>>>>> complex
>>>>>>>>>>> Hadoop formats is an issue.
>>>>>>>>>>> 
>>>>>>>>>>> The current approach would of course be to write an ETL-style
>>>>>>>>> Java/Scala
>>>>>>>>>>> job and then process in Python. Nothing wrong with this, but I was
>>>>>>>>>> thinking
>>>>>>>>>>> about ways to allow Python to access arbitrary Hadoop
>>>>>> InputFormats.
>>>>>>>>>>> 
>>>>>>>>>>> Here is a quick proof of concept:
>>>>>>>>> https://gist.github.com/MLnick/7150058
>>>>>>>>>>> 
>>>>>>>>>>> This works for simple stuff like SequenceFile with simple Writable
>>>>>>>>>>> key/values.
>>>>>>>>>>> 
>>>>>>>>>>> To work with more complex files, perhaps an approach is to
>>>>>>>> manipulate
>>>>>>>>>>> Hadoop JobConf via Python and pass that in. The one downside is of
>>>>>>>>> course
>>>>>>>>>>> that the InputFormat (well actually the Key/Value classes) must
>>>>>>>> have a
>>>>>>>>>>> toString that makes sense so very custom stuff might not work.
>>>>>>>>>>> 
>>>>>>>>>>> I wonder if it would be possible to take the objects that are
>>>>>>>> yielded
>>>>>>>>> via
>>>>>>>>>>> the InputFormat and convert them into some representation like
>>>>>>>>> ProtoBuf,
>>>>>>>>>>> MsgPack, Avro, JSON, that can be read relatively more easily from
>>>>>>>>> Python?
>>>>>>>>>>> 
>>>>>>>>>>> Another approach could be to allow a simple "wrapper API" such
>>>>>> that
>>>>>>>> one
>>>>>>>>>> can
>>>>>>>>>>> write a wrapper function T => String and pass that into an
>>>>>>>>>>> InputFormatWrapper that takes an arbitrary InputFormat and yields
>>>>>>>>> Strings
>>>>>>>>>>> for the keys and values. Then all that is required is to compile
>>>>>>>> that
>>>>>>>>>>> function and add it to the SPARK_CLASSPATH and away you go!
>>>>>>>>>>> 
>>>>>>>>>>> Thoughts?
>>>>>>>>>>> 
>>>>>>>>>>> Nick
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 

Re: [PySpark]: reading arbitrary Hadoop InputFormats

Posted by Matei Zaharia <ma...@gmail.com>.
Hey Nick, no worries if this can’t be done in time. It’s probably better to test it thoroughly. If you do have something partially working though, the main concern will be the API, i.e. whether it’s an API we want to support indefinitely. It would be bad to add this and then make major changes to what it returns. But if we’re comfortable with the API, we can mark it as experimental and include it. It might be better to support fewer data types at first and then add some just to keep the API small.

Matei

On Mar 18, 2014, at 11:44 PM, Nick Pentreath <ni...@gmail.com> wrote:

> Hi Matei
> 
> 
> I'm afraid I haven't had enough time to focus on this as work has just been crazy. It's still something I want to get to a mergeable status. 
> 
> 
> 
> 
> Actually it was working fine it was just a bit rough and needs to be updated to HEAD.
> 
> 
> 
> 
> I'll absolutely try my utmost to get something ready to merge before the window for 1.0 closes. Perhaps we can put it in there (once I've updated and cleaned up) as a more experimental feature? What is the view on having such more untested (as in production) stuff in 1.0?
> —
> Sent from Mailbox for iPhone
> 
> On Wed, Mar 19, 2014 at 12:15 AM, Matei Zaharia <ma...@gmail.com>
> wrote:
> 
>> Hey Nick, I’m curious, have you been doing any further development on this? It would be good to get expanded InputFormat support in Spark 1.0. To start with we don’t have to do SequenceFiles in particular, we can do stuff like Avro (if it’s easy to read in Python) or some kind of WholeFileInputFormat.
>> Matei
>> On Dec 19, 2013, at 10:57 AM, Nick Pentreath <ni...@gmail.com> wrote:
>>> Hi
>>> 
>>> 
>>> I managed to find the time to put together a PR on this: https://github.com/apache/incubator-spark/pull/263
>>> 
>>> 
>>> 
>>> 
>>> Josh has had a look over it - if anyone else with an interest could give some feedback that would be great.
>>> 
>>> 
>>> 
>>> 
>>> As mentioned in the PR it's more of an RFC and certainly still needs a bit of clean up work, and I need to add the concept of "wrapper functions" to deserialize classes that MsgPack can't handle out the box.
>>> 
>>> 
>>> 
>>> 
>>> N
>>> —
>>> Sent from Mailbox for iPhone
>>> 
>>> On Fri, Nov 8, 2013 at 12:20 PM, Nick Pentreath <ni...@gmail.com>
>>> wrote:
>>> 
>>>> Wow Josh, that looks great. I've been a bit swamped this week but as soon
>>>> as I get a chance I'll test out the PR in more detail and port over the
>>>> InputFormat stuff to use the new framework (including the changes you
>>>> suggested).
>>>> I can then look deeper into the MsgPack functionality to see if it can be
>>>> made to work in a generic enough manner without requiring huge amounts of
>>>> custom Templates to be written by users.
>>>> Will feed back asap.
>>>> N
>>>> On Thu, Nov 7, 2013 at 5:03 AM, Josh Rosen <ro...@gmail.com> wrote:
>>>>> I opened a pull request to add custom serializer support to PySpark:
>>>>> https://github.com/apache/incubator-spark/pull/146
>>>>> 
>>>>> My pull request adds the plumbing for transferring data from Java to Python
>>>>> using formats other than Pickle.  For example, look at how textFile() uses
>>>>> MUTF8Deserializer to read strings from Java.  Hopefully this provides all
>>>>> of the functionality needed to support MsgPack.
>>>>> 
>>>>> - Josh
>>>>> 
>>>>> 
>>>>> On Thu, Oct 31, 2013 at 11:11 AM, Josh Rosen <ro...@gmail.com> wrote:
>>>>> 
>>>>>> Hi Nick,
>>>>>> 
>>>>>> This is a nice start.  I'd prefer to keep the Java sequenceFileAsText()
>>>>>> and newHadoopFileAsText() methods inside PythonRDD instead of adding them
>>>>>> to JavaSparkContext, since I think these methods are unlikely to be used
>>>>>> directly by Java users (you can add these methods to the PythonRDD
>>>>>> companion object, which is how readRDDFromPickleFile is implemented:
>>>>>> 
>>>>> https://github.com/apache/incubator-spark/blob/branch-0.8/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L255
>>>>>> )
>>>>>> 
>>>>>> For MsgPack, the UnpicklingError is because the Python worker expects to
>>>>>> receive its input in a pickled format.  In my prototype of custom
>>>>>> serializers, I modified the PySpark worker to receive its
>>>>>> serialization/deserialization function as input (
>>>>>> 
>>>>> https://github.com/JoshRosen/spark/blob/59b6b43916dc84fc8b83f22eb9ce13a27bc51ec0/python/pyspark/worker.py#L41
>>>>> )
>>>>>> and added logic to pass the appropriate serializers based on each stage's
>>>>>> input and output formats (
>>>>>> 
>>>>> https://github.com/JoshRosen/spark/blob/59b6b43916dc84fc8b83f22eb9ce13a27bc51ec0/python/pyspark/rdd.py#L42
>>>>>> ).
>>>>>> 
>>>>>> At some point, I'd like to port my custom serializers code to PySpark; if
>>>>>> anyone's interested in helping, I'd be glad to write up some additional
>>>>>> notes on how this should work.
>>>>>> 
>>>>>> - Josh
>>>>>> 
>>>>>> On Wed, Oct 30, 2013 at 2:25 PM, Nick Pentreath <
>>>>> nick.pentreath@gmail.com>wrote:
>>>>>> 
>>>>>>> Thanks Josh, Patrick for the feedback.
>>>>>>> 
>>>>>>> Based on Josh's pointers I have something working for JavaPairRDD ->
>>>>>>> PySpark RDD[(String, String)]. This just calls the toString method on
>>>>> each
>>>>>>> key and value as before, but without the need for a delimiter. For
>>>>>>> SequenceFile, it uses SequenceFileAsTextInputFormat which itself calls
>>>>>>> toString to convert to Text for keys and values. We then call toString
>>>>>>> (again) ourselves to get Strings to feed to writeAsPickle.
>>>>>>> 
>>>>>>> Details here: https://gist.github.com/MLnick/7230588
>>>>>>> 
>>>>>>> This also illustrates where the "wrapper function" api would fit in. All
>>>>>>> that is required is to define a T => String for key and value.
>>>>>>> 
>>>>>>> I started playing around with MsgPack and can sort of get things to work
>>>>>>> in
>>>>>>> Scala, but am struggling with getting the raw bytes to be written
>>>>> properly
>>>>>>> in PythonRDD (I think it is treating them as pickled byte arrays when
>>>>> they
>>>>>>> are not, but when I removed the 'stripPickle' calls and amended the
>>>>> length
>>>>>>> (-6) I got "UnpicklingError: invalid load key, ' '. ").
>>>>>>> 
>>>>>>> Another issue is that MsgPack does well at writing "structures" - like
>>>>>>> Java
>>>>>>> classes with public fields that are fairly simple - but for example the
>>>>>>> Writables have private fields so you end up with nothing being written.
>>>>>>> This looks like it would require custom "Templates" (serialization
>>>>>>> functions effectively) for many classes, which means a lot of custom
>>>>> code
>>>>>>> for a user to write to use it. Fortunately for most of the common
>>>>>>> Writables
>>>>>>> a toString does the job. Will keep looking into it though.
>>>>>>> 
>>>>>>> Anyway, Josh if you have ideas or examples on the "Wrapper API from
>>>>>>> Python"
>>>>>>> that you mentioned, I'd be interested to hear them.
>>>>>>> 
>>>>>>> If you think this is worth working up as a Pull Request covering
>>>>>>> SequenceFiles and custom InputFormats with default toString conversions
>>>>>>> and
>>>>>>> the ability to specify Wrapper functions, I can clean things up more,
>>>>> add
>>>>>>> some functionality and tests, and also test to see if common things like
>>>>>>> the "normal" Writables and reading from things like HBase and Cassandra
>>>>>>> can
>>>>>>> be made to work nicely (any other common use cases that you think make
>>>>>>> sense?).
>>>>>>> 
>>>>>>> Thoughts, comments etc welcome.
>>>>>>> 
>>>>>>> Nick
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> On Fri, Oct 25, 2013 at 11:03 PM, Patrick Wendell <pwendell@gmail.com
>>>>>>>> wrote:
>>>>>>> 
>>>>>>>> As a starting point, a version where people just write their own
>>>>>>> "wrapper"
>>>>>>>> functions to convert various HadoopFiles into String <K, V> files
>>>>> could
>>>>>>> go
>>>>>>>> a long way. We could even have a few built-in versions, such as
>>>>> dealing
>>>>>>>> with Sequence files that are <String, String>. Basically, the user
>>>>>>> needs to
>>>>>>>> write a translator in Java/Scala that produces textual records from
>>>>>>>> whatever format that want. Then, they make sure this is included in
>>>>> the
>>>>>>>> classpath when running PySpark.
>>>>>>>> 
>>>>>>>> As Josh is saying, I'm pretty sure this is already possible, but we
>>>>> may
>>>>>>>> want to document it for users. In many organizations they might have
>>>>> 1-2
>>>>>>>> people who can write the Java/Scala to do this but then many more
>>>>> people
>>>>>>>> who are comfortable using python once it's setup.
>>>>>>>> 
>>>>>>>> - Patrick
>>>>>>>> 
>>>>>>>> On Fri, Oct 25, 2013 at 11:00 AM, Josh Rosen <ro...@gmail.com>
>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Hi Nick,
>>>>>>>>> 
>>>>>>>>> I've seen several requests for SequenceFile support in PySpark, so
>>>>>>>> there's
>>>>>>>>> definitely demand for this feature.
>>>>>>>>> 
>>>>>>>>> I like the idea of passing MsgPack'ed data (or some other structured
>>>>>>>>> format) from Java to the Python workers.  My early prototype of
>>>>> custom
>>>>>>>>> serializers (described at
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals#PySparkInternals-customserializers
>>>>>>>>> )
>>>>>>>>> might be useful for implementing this.  Proper custom serializer
>>>>>>> support
>>>>>>>>> would handle the bookkeeping for tracking each stage's input and
>>>>>>> output
>>>>>>>>> formats and supplying the appropriate deserialization functions to
>>>>> the
>>>>>>>>> Python worker, so the Python worker would be able to directly read
>>>>> the
>>>>>>>>> MsgPack'd data that's sent to it.
>>>>>>>>> 
>>>>>>>>> Regarding a wrapper API, it's actually possible to initially
>>>>> transform
>>>>>>>> data
>>>>>>>>> using Scala/Java and perform the remainder of the processing in
>>>>>>> PySpark.
>>>>>>>>> This involves adding the appropriate compiled to the Java classpath
>>>>>>> and
>>>>>>>> a
>>>>>>>>> bit of work in Py4J to create the Java/Scala RDD and wrap it for use
>>>>>>> by
>>>>>>>>> PySpark.  I can hack together a rough example of this if anyone's
>>>>>>>>> interested, but it would need some work to be developed into a
>>>>>>>>> user-friendly API.
>>>>>>>>> 
>>>>>>>>> If you wanted to extend your proof-of-concept to handle the cases
>>>>>>> where
>>>>>>>>> keys and values have parseable toString() values, I think you could
>>>>>>>> remove
>>>>>>>>> the need for a delimiter by creating a PythonRDD from the
>>>>>>> newHadoopFile
>>>>>>>>> JavaPairRDD and adding a new method to writeAsPickle (
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L224
>>>>>>>>> )
>>>>>>>>> to dump its contents as a pickled pair of strings.  (Aside: most of
>>>>>>>>> writeAsPickle() would probably need be eliminated or refactored when
>>>>>>>> adding
>>>>>>>>> general custom serializer support).
>>>>>>>>> 
>>>>>>>>> - Josh
>>>>>>>>> 
>>>>>>>>> On Thu, Oct 24, 2013 at 11:18 PM, Nick Pentreath
>>>>>>>>> <ni...@gmail.com>wrote:
>>>>>>>>> 
>>>>>>>>>> Hi Spark Devs
>>>>>>>>>> 
>>>>>>>>>> I was wondering what appetite there may be to add the ability for
>>>>>>>> PySpark
>>>>>>>>>> users to create RDDs from (somewhat) arbitrary Hadoop
>>>>> InputFormats.
>>>>>>>>>> 
>>>>>>>>>> In my data pipeline for example, I'm currently just using Scala
>>>>>>> (partly
>>>>>>>>>> because I love it but also because I am heavily reliant on quite
>>>>>>> custom
>>>>>>>>>> Hadoop InputFormats for reading data). However, many users may
>>>>>>> prefer
>>>>>>>> to
>>>>>>>>>> use PySpark as much as possible (if not for everything). Reasons
>>>>>>> might
>>>>>>>>>> include the need to use some Python library. While I don't do it
>>>>>>> yet, I
>>>>>>>>> can
>>>>>>>>>> certainly see an attractive use case for using say scikit-learn /
>>>>>>> numpy
>>>>>>>>> to
>>>>>>>>>> do data analysis & machine learning in Python. Added to this my
>>>>>>>> cofounder
>>>>>>>>>> knows Python well but not Scala so it can be very beneficial to
>>>>> do a
>>>>>>>> lot
>>>>>>>>> of
>>>>>>>>>> stuff in Python.
>>>>>>>>>> 
>>>>>>>>>> For text-based data this is fine, but reading data in from more
>>>>>>> complex
>>>>>>>>>> Hadoop formats is an issue.
>>>>>>>>>> 
>>>>>>>>>> The current approach would of course be to write an ETL-style
>>>>>>>> Java/Scala
>>>>>>>>>> job and then process in Python. Nothing wrong with this, but I was
>>>>>>>>> thinking
>>>>>>>>>> about ways to allow Python to access arbitrary Hadoop
>>>>> InputFormats.
>>>>>>>>>> 
>>>>>>>>>> Here is a quick proof of concept:
>>>>>>>> https://gist.github.com/MLnick/7150058
>>>>>>>>>> 
>>>>>>>>>> This works for simple stuff like SequenceFile with simple Writable
>>>>>>>>>> key/values.
>>>>>>>>>> 
>>>>>>>>>> To work with more complex files, perhaps an approach is to
>>>>>>> manipulate
>>>>>>>>>> Hadoop JobConf via Python and pass that in. The one downside is of
>>>>>>>> course
>>>>>>>>>> that the InputFormat (well actually the Key/Value classes) must
>>>>>>> have a
>>>>>>>>>> toString that makes sense so very custom stuff might not work.
>>>>>>>>>> 
>>>>>>>>>> I wonder if it would be possible to take the objects that are
>>>>>>> yielded
>>>>>>>> via
>>>>>>>>>> the InputFormat and convert them into some representation like
>>>>>>>> ProtoBuf,
>>>>>>>>>> MsgPack, Avro, JSON, that can be read relatively more easily from
>>>>>>>> Python?
>>>>>>>>>> 
>>>>>>>>>> Another approach could be to allow a simple "wrapper API" such
>>>>> that
>>>>>>> one
>>>>>>>>> can
>>>>>>>>>> write a wrapper function T => String and pass that into an
>>>>>>>>>> InputFormatWrapper that takes an arbitrary InputFormat and yields
>>>>>>>> Strings
>>>>>>>>>> for the keys and values. Then all that is required is to compile
>>>>>>> that
>>>>>>>>>> function and add it to the SPARK_CLASSPATH and away you go!
>>>>>>>>>> 
>>>>>>>>>> Thoughts?
>>>>>>>>>> 
>>>>>>>>>> Nick
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 


Re: [PySpark]: reading arbitrary Hadoop InputFormats

Posted by Nick Pentreath <ni...@gmail.com>.
Hi Matei


I'm afraid I haven't had enough time to focus on this as work has just been crazy. It's still something I want to get to a mergeable status. 




Actually it was working fine it was just a bit rough and needs to be updated to HEAD.




I'll absolutely try my utmost to get something ready to merge before the window for 1.0 closes. Perhaps we can put it in there (once I've updated and cleaned up) as a more experimental feature? What is the view on having such more untested (as in production) stuff in 1.0?
—
Sent from Mailbox for iPhone

On Wed, Mar 19, 2014 at 12:15 AM, Matei Zaharia <ma...@gmail.com>
wrote:

> Hey Nick, I’m curious, have you been doing any further development on this? It would be good to get expanded InputFormat support in Spark 1.0. To start with we don’t have to do SequenceFiles in particular, we can do stuff like Avro (if it’s easy to read in Python) or some kind of WholeFileInputFormat.
> Matei
> On Dec 19, 2013, at 10:57 AM, Nick Pentreath <ni...@gmail.com> wrote:
>> Hi
>> 
>> 
>> I managed to find the time to put together a PR on this: https://github.com/apache/incubator-spark/pull/263
>> 
>> 
>> 
>> 
>> Josh has had a look over it - if anyone else with an interest could give some feedback that would be great.
>> 
>> 
>> 
>> 
>> As mentioned in the PR it's more of an RFC and certainly still needs a bit of clean up work, and I need to add the concept of "wrapper functions" to deserialize classes that MsgPack can't handle out the box.
>> 
>> 
>> 
>> 
>> N
>> —
>> Sent from Mailbox for iPhone
>> 
>> On Fri, Nov 8, 2013 at 12:20 PM, Nick Pentreath <ni...@gmail.com>
>> wrote:
>> 
>>> Wow Josh, that looks great. I've been a bit swamped this week but as soon
>>> as I get a chance I'll test out the PR in more detail and port over the
>>> InputFormat stuff to use the new framework (including the changes you
>>> suggested).
>>> I can then look deeper into the MsgPack functionality to see if it can be
>>> made to work in a generic enough manner without requiring huge amounts of
>>> custom Templates to be written by users.
>>> Will feed back asap.
>>> N
>>> On Thu, Nov 7, 2013 at 5:03 AM, Josh Rosen <ro...@gmail.com> wrote:
>>>> I opened a pull request to add custom serializer support to PySpark:
>>>> https://github.com/apache/incubator-spark/pull/146
>>>> 
>>>> My pull request adds the plumbing for transferring data from Java to Python
>>>> using formats other than Pickle.  For example, look at how textFile() uses
>>>> MUTF8Deserializer to read strings from Java.  Hopefully this provides all
>>>> of the functionality needed to support MsgPack.
>>>> 
>>>> - Josh
>>>> 
>>>> 
>>>> On Thu, Oct 31, 2013 at 11:11 AM, Josh Rosen <ro...@gmail.com> wrote:
>>>> 
>>>>> Hi Nick,
>>>>> 
>>>>> This is a nice start.  I'd prefer to keep the Java sequenceFileAsText()
>>>>> and newHadoopFileAsText() methods inside PythonRDD instead of adding them
>>>>> to JavaSparkContext, since I think these methods are unlikely to be used
>>>>> directly by Java users (you can add these methods to the PythonRDD
>>>>> companion object, which is how readRDDFromPickleFile is implemented:
>>>>> 
>>>> https://github.com/apache/incubator-spark/blob/branch-0.8/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L255
>>>>> )
>>>>> 
>>>>> For MsgPack, the UnpicklingError is because the Python worker expects to
>>>>> receive its input in a pickled format.  In my prototype of custom
>>>>> serializers, I modified the PySpark worker to receive its
>>>>> serialization/deserialization function as input (
>>>>> 
>>>> https://github.com/JoshRosen/spark/blob/59b6b43916dc84fc8b83f22eb9ce13a27bc51ec0/python/pyspark/worker.py#L41
>>>> )
>>>>> and added logic to pass the appropriate serializers based on each stage's
>>>>> input and output formats (
>>>>> 
>>>> https://github.com/JoshRosen/spark/blob/59b6b43916dc84fc8b83f22eb9ce13a27bc51ec0/python/pyspark/rdd.py#L42
>>>>> ).
>>>>> 
>>>>> At some point, I'd like to port my custom serializers code to PySpark; if
>>>>> anyone's interested in helping, I'd be glad to write up some additional
>>>>> notes on how this should work.
>>>>> 
>>>>> - Josh
>>>>> 
>>>>> On Wed, Oct 30, 2013 at 2:25 PM, Nick Pentreath <
>>>> nick.pentreath@gmail.com>wrote:
>>>>> 
>>>>>> Thanks Josh, Patrick for the feedback.
>>>>>> 
>>>>>> Based on Josh's pointers I have something working for JavaPairRDD ->
>>>>>> PySpark RDD[(String, String)]. This just calls the toString method on
>>>> each
>>>>>> key and value as before, but without the need for a delimiter. For
>>>>>> SequenceFile, it uses SequenceFileAsTextInputFormat which itself calls
>>>>>> toString to convert to Text for keys and values. We then call toString
>>>>>> (again) ourselves to get Strings to feed to writeAsPickle.
>>>>>> 
>>>>>> Details here: https://gist.github.com/MLnick/7230588
>>>>>> 
>>>>>> This also illustrates where the "wrapper function" api would fit in. All
>>>>>> that is required is to define a T => String for key and value.
>>>>>> 
>>>>>> I started playing around with MsgPack and can sort of get things to work
>>>>>> in
>>>>>> Scala, but am struggling with getting the raw bytes to be written
>>>> properly
>>>>>> in PythonRDD (I think it is treating them as pickled byte arrays when
>>>> they
>>>>>> are not, but when I removed the 'stripPickle' calls and amended the
>>>> length
>>>>>> (-6) I got "UnpicklingError: invalid load key, ' '. ").
>>>>>> 
>>>>>> Another issue is that MsgPack does well at writing "structures" - like
>>>>>> Java
>>>>>> classes with public fields that are fairly simple - but for example the
>>>>>> Writables have private fields so you end up with nothing being written.
>>>>>> This looks like it would require custom "Templates" (serialization
>>>>>> functions effectively) for many classes, which means a lot of custom
>>>> code
>>>>>> for a user to write to use it. Fortunately for most of the common
>>>>>> Writables
>>>>>> a toString does the job. Will keep looking into it though.
>>>>>> 
>>>>>> Anyway, Josh if you have ideas or examples on the "Wrapper API from
>>>>>> Python"
>>>>>> that you mentioned, I'd be interested to hear them.
>>>>>> 
>>>>>> If you think this is worth working up as a Pull Request covering
>>>>>> SequenceFiles and custom InputFormats with default toString conversions
>>>>>> and
>>>>>> the ability to specify Wrapper functions, I can clean things up more,
>>>> add
>>>>>> some functionality and tests, and also test to see if common things like
>>>>>> the "normal" Writables and reading from things like HBase and Cassandra
>>>>>> can
>>>>>> be made to work nicely (any other common use cases that you think make
>>>>>> sense?).
>>>>>> 
>>>>>> Thoughts, comments etc welcome.
>>>>>> 
>>>>>> Nick
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Fri, Oct 25, 2013 at 11:03 PM, Patrick Wendell <pwendell@gmail.com
>>>>>>> wrote:
>>>>>> 
>>>>>>> As a starting point, a version where people just write their own
>>>>>> "wrapper"
>>>>>>> functions to convert various HadoopFiles into String <K, V> files
>>>> could
>>>>>> go
>>>>>>> a long way. We could even have a few built-in versions, such as
>>>> dealing
>>>>>>> with Sequence files that are <String, String>. Basically, the user
>>>>>> needs to
>>>>>>> write a translator in Java/Scala that produces textual records from
>>>>>>> whatever format that want. Then, they make sure this is included in
>>>> the
>>>>>>> classpath when running PySpark.
>>>>>>> 
>>>>>>> As Josh is saying, I'm pretty sure this is already possible, but we
>>>> may
>>>>>>> want to document it for users. In many organizations they might have
>>>> 1-2
>>>>>>> people who can write the Java/Scala to do this but then many more
>>>> people
>>>>>>> who are comfortable using python once it's setup.
>>>>>>> 
>>>>>>> - Patrick
>>>>>>> 
>>>>>>> On Fri, Oct 25, 2013 at 11:00 AM, Josh Rosen <ro...@gmail.com>
>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi Nick,
>>>>>>>> 
>>>>>>>> I've seen several requests for SequenceFile support in PySpark, so
>>>>>>> there's
>>>>>>>> definitely demand for this feature.
>>>>>>>> 
>>>>>>>> I like the idea of passing MsgPack'ed data (or some other structured
>>>>>>>> format) from Java to the Python workers.  My early prototype of
>>>> custom
>>>>>>>> serializers (described at
>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals#PySparkInternals-customserializers
>>>>>>>> )
>>>>>>>> might be useful for implementing this.  Proper custom serializer
>>>>>> support
>>>>>>>> would handle the bookkeeping for tracking each stage's input and
>>>>>> output
>>>>>>>> formats and supplying the appropriate deserialization functions to
>>>> the
>>>>>>>> Python worker, so the Python worker would be able to directly read
>>>> the
>>>>>>>> MsgPack'd data that's sent to it.
>>>>>>>> 
>>>>>>>> Regarding a wrapper API, it's actually possible to initially
>>>> transform
>>>>>>> data
>>>>>>>> using Scala/Java and perform the remainder of the processing in
>>>>>> PySpark.
>>>>>>>> This involves adding the appropriate compiled to the Java classpath
>>>>>> and
>>>>>>> a
>>>>>>>> bit of work in Py4J to create the Java/Scala RDD and wrap it for use
>>>>>> by
>>>>>>>> PySpark.  I can hack together a rough example of this if anyone's
>>>>>>>> interested, but it would need some work to be developed into a
>>>>>>>> user-friendly API.
>>>>>>>> 
>>>>>>>> If you wanted to extend your proof-of-concept to handle the cases
>>>>>> where
>>>>>>>> keys and values have parseable toString() values, I think you could
>>>>>>> remove
>>>>>>>> the need for a delimiter by creating a PythonRDD from the
>>>>>> newHadoopFile
>>>>>>>> JavaPairRDD and adding a new method to writeAsPickle (
>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L224
>>>>>>>> )
>>>>>>>> to dump its contents as a pickled pair of strings.  (Aside: most of
>>>>>>>> writeAsPickle() would probably need be eliminated or refactored when
>>>>>>> adding
>>>>>>>> general custom serializer support).
>>>>>>>> 
>>>>>>>> - Josh
>>>>>>>> 
>>>>>>>> On Thu, Oct 24, 2013 at 11:18 PM, Nick Pentreath
>>>>>>>> <ni...@gmail.com>wrote:
>>>>>>>> 
>>>>>>>>> Hi Spark Devs
>>>>>>>>> 
>>>>>>>>> I was wondering what appetite there may be to add the ability for
>>>>>>> PySpark
>>>>>>>>> users to create RDDs from (somewhat) arbitrary Hadoop
>>>> InputFormats.
>>>>>>>>> 
>>>>>>>>> In my data pipeline for example, I'm currently just using Scala
>>>>>> (partly
>>>>>>>>> because I love it but also because I am heavily reliant on quite
>>>>>> custom
>>>>>>>>> Hadoop InputFormats for reading data). However, many users may
>>>>>> prefer
>>>>>>> to
>>>>>>>>> use PySpark as much as possible (if not for everything). Reasons
>>>>>> might
>>>>>>>>> include the need to use some Python library. While I don't do it
>>>>>> yet, I
>>>>>>>> can
>>>>>>>>> certainly see an attractive use case for using say scikit-learn /
>>>>>> numpy
>>>>>>>> to
>>>>>>>>> do data analysis & machine learning in Python. Added to this my
>>>>>>> cofounder
>>>>>>>>> knows Python well but not Scala so it can be very beneficial to
>>>> do a
>>>>>>> lot
>>>>>>>> of
>>>>>>>>> stuff in Python.
>>>>>>>>> 
>>>>>>>>> For text-based data this is fine, but reading data in from more
>>>>>> complex
>>>>>>>>> Hadoop formats is an issue.
>>>>>>>>> 
>>>>>>>>> The current approach would of course be to write an ETL-style
>>>>>>> Java/Scala
>>>>>>>>> job and then process in Python. Nothing wrong with this, but I was
>>>>>>>> thinking
>>>>>>>>> about ways to allow Python to access arbitrary Hadoop
>>>> InputFormats.
>>>>>>>>> 
>>>>>>>>> Here is a quick proof of concept:
>>>>>>> https://gist.github.com/MLnick/7150058
>>>>>>>>> 
>>>>>>>>> This works for simple stuff like SequenceFile with simple Writable
>>>>>>>>> key/values.
>>>>>>>>> 
>>>>>>>>> To work with more complex files, perhaps an approach is to
>>>>>> manipulate
>>>>>>>>> Hadoop JobConf via Python and pass that in. The one downside is of
>>>>>>> course
>>>>>>>>> that the InputFormat (well actually the Key/Value classes) must
>>>>>> have a
>>>>>>>>> toString that makes sense so very custom stuff might not work.
>>>>>>>>> 
>>>>>>>>> I wonder if it would be possible to take the objects that are
>>>>>> yielded
>>>>>>> via
>>>>>>>>> the InputFormat and convert them into some representation like
>>>>>>> ProtoBuf,
>>>>>>>>> MsgPack, Avro, JSON, that can be read relatively more easily from
>>>>>>> Python?
>>>>>>>>> 
>>>>>>>>> Another approach could be to allow a simple "wrapper API" such
>>>> that
>>>>>> one
>>>>>>>> can
>>>>>>>>> write a wrapper function T => String and pass that into an
>>>>>>>>> InputFormatWrapper that takes an arbitrary InputFormat and yields
>>>>>>> Strings
>>>>>>>>> for the keys and values. Then all that is required is to compile
>>>>>> that
>>>>>>>>> function and add it to the SPARK_CLASSPATH and away you go!
>>>>>>>>> 
>>>>>>>>> Thoughts?
>>>>>>>>> 
>>>>>>>>> Nick
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 

Re: [PySpark]: reading arbitrary Hadoop InputFormats

Posted by Matei Zaharia <ma...@gmail.com>.
Hey Nick, I’m curious, have you been doing any further development on this? It would be good to get expanded InputFormat support in Spark 1.0. To start with we don’t have to do SequenceFiles in particular, we can do stuff like Avro (if it’s easy to read in Python) or some kind of WholeFileInputFormat.

Matei

On Dec 19, 2013, at 10:57 AM, Nick Pentreath <ni...@gmail.com> wrote:

> Hi
> 
> 
> I managed to find the time to put together a PR on this: https://github.com/apache/incubator-spark/pull/263
> 
> 
> 
> 
> Josh has had a look over it - if anyone else with an interest could give some feedback that would be great.
> 
> 
> 
> 
> As mentioned in the PR it's more of an RFC and certainly still needs a bit of clean up work, and I need to add the concept of "wrapper functions" to deserialize classes that MsgPack can't handle out the box.
> 
> 
> 
> 
> N
> —
> Sent from Mailbox for iPhone
> 
> On Fri, Nov 8, 2013 at 12:20 PM, Nick Pentreath <ni...@gmail.com>
> wrote:
> 
>> Wow Josh, that looks great. I've been a bit swamped this week but as soon
>> as I get a chance I'll test out the PR in more detail and port over the
>> InputFormat stuff to use the new framework (including the changes you
>> suggested).
>> I can then look deeper into the MsgPack functionality to see if it can be
>> made to work in a generic enough manner without requiring huge amounts of
>> custom Templates to be written by users.
>> Will feed back asap.
>> N
>> On Thu, Nov 7, 2013 at 5:03 AM, Josh Rosen <ro...@gmail.com> wrote:
>>> I opened a pull request to add custom serializer support to PySpark:
>>> https://github.com/apache/incubator-spark/pull/146
>>> 
>>> My pull request adds the plumbing for transferring data from Java to Python
>>> using formats other than Pickle.  For example, look at how textFile() uses
>>> MUTF8Deserializer to read strings from Java.  Hopefully this provides all
>>> of the functionality needed to support MsgPack.
>>> 
>>> - Josh
>>> 
>>> 
>>> On Thu, Oct 31, 2013 at 11:11 AM, Josh Rosen <ro...@gmail.com> wrote:
>>> 
>>>> Hi Nick,
>>>> 
>>>> This is a nice start.  I'd prefer to keep the Java sequenceFileAsText()
>>>> and newHadoopFileAsText() methods inside PythonRDD instead of adding them
>>>> to JavaSparkContext, since I think these methods are unlikely to be used
>>>> directly by Java users (you can add these methods to the PythonRDD
>>>> companion object, which is how readRDDFromPickleFile is implemented:
>>>> 
>>> https://github.com/apache/incubator-spark/blob/branch-0.8/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L255
>>>> )
>>>> 
>>>> For MsgPack, the UnpicklingError is because the Python worker expects to
>>>> receive its input in a pickled format.  In my prototype of custom
>>>> serializers, I modified the PySpark worker to receive its
>>>> serialization/deserialization function as input (
>>>> 
>>> https://github.com/JoshRosen/spark/blob/59b6b43916dc84fc8b83f22eb9ce13a27bc51ec0/python/pyspark/worker.py#L41
>>> )
>>>> and added logic to pass the appropriate serializers based on each stage's
>>>> input and output formats (
>>>> 
>>> https://github.com/JoshRosen/spark/blob/59b6b43916dc84fc8b83f22eb9ce13a27bc51ec0/python/pyspark/rdd.py#L42
>>>> ).
>>>> 
>>>> At some point, I'd like to port my custom serializers code to PySpark; if
>>>> anyone's interested in helping, I'd be glad to write up some additional
>>>> notes on how this should work.
>>>> 
>>>> - Josh
>>>> 
>>>> On Wed, Oct 30, 2013 at 2:25 PM, Nick Pentreath <
>>> nick.pentreath@gmail.com>wrote:
>>>> 
>>>>> Thanks Josh, Patrick for the feedback.
>>>>> 
>>>>> Based on Josh's pointers I have something working for JavaPairRDD ->
>>>>> PySpark RDD[(String, String)]. This just calls the toString method on
>>> each
>>>>> key and value as before, but without the need for a delimiter. For
>>>>> SequenceFile, it uses SequenceFileAsTextInputFormat which itself calls
>>>>> toString to convert to Text for keys and values. We then call toString
>>>>> (again) ourselves to get Strings to feed to writeAsPickle.
>>>>> 
>>>>> Details here: https://gist.github.com/MLnick/7230588
>>>>> 
>>>>> This also illustrates where the "wrapper function" api would fit in. All
>>>>> that is required is to define a T => String for key and value.
>>>>> 
>>>>> I started playing around with MsgPack and can sort of get things to work
>>>>> in
>>>>> Scala, but am struggling with getting the raw bytes to be written
>>> properly
>>>>> in PythonRDD (I think it is treating them as pickled byte arrays when
>>> they
>>>>> are not, but when I removed the 'stripPickle' calls and amended the
>>> length
>>>>> (-6) I got "UnpicklingError: invalid load key, ' '. ").
>>>>> 
>>>>> Another issue is that MsgPack does well at writing "structures" - like
>>>>> Java
>>>>> classes with public fields that are fairly simple - but for example the
>>>>> Writables have private fields so you end up with nothing being written.
>>>>> This looks like it would require custom "Templates" (serialization
>>>>> functions effectively) for many classes, which means a lot of custom
>>> code
>>>>> for a user to write to use it. Fortunately for most of the common
>>>>> Writables
>>>>> a toString does the job. Will keep looking into it though.
>>>>> 
>>>>> Anyway, Josh if you have ideas or examples on the "Wrapper API from
>>>>> Python"
>>>>> that you mentioned, I'd be interested to hear them.
>>>>> 
>>>>> If you think this is worth working up as a Pull Request covering
>>>>> SequenceFiles and custom InputFormats with default toString conversions
>>>>> and
>>>>> the ability to specify Wrapper functions, I can clean things up more,
>>> add
>>>>> some functionality and tests, and also test to see if common things like
>>>>> the "normal" Writables and reading from things like HBase and Cassandra
>>>>> can
>>>>> be made to work nicely (any other common use cases that you think make
>>>>> sense?).
>>>>> 
>>>>> Thoughts, comments etc welcome.
>>>>> 
>>>>> Nick
>>>>> 
>>>>> 
>>>>> 
>>>>> On Fri, Oct 25, 2013 at 11:03 PM, Patrick Wendell <pwendell@gmail.com
>>>>>> wrote:
>>>>> 
>>>>>> As a starting point, a version where people just write their own
>>>>> "wrapper"
>>>>>> functions to convert various HadoopFiles into String <K, V> files
>>> could
>>>>> go
>>>>>> a long way. We could even have a few built-in versions, such as
>>> dealing
>>>>>> with Sequence files that are <String, String>. Basically, the user
>>>>> needs to
>>>>>> write a translator in Java/Scala that produces textual records from
>>>>>> whatever format that want. Then, they make sure this is included in
>>> the
>>>>>> classpath when running PySpark.
>>>>>> 
>>>>>> As Josh is saying, I'm pretty sure this is already possible, but we
>>> may
>>>>>> want to document it for users. In many organizations they might have
>>> 1-2
>>>>>> people who can write the Java/Scala to do this but then many more
>>> people
>>>>>> who are comfortable using python once it's setup.
>>>>>> 
>>>>>> - Patrick
>>>>>> 
>>>>>> On Fri, Oct 25, 2013 at 11:00 AM, Josh Rosen <ro...@gmail.com>
>>>>> wrote:
>>>>>> 
>>>>>>> Hi Nick,
>>>>>>> 
>>>>>>> I've seen several requests for SequenceFile support in PySpark, so
>>>>>> there's
>>>>>>> definitely demand for this feature.
>>>>>>> 
>>>>>>> I like the idea of passing MsgPack'ed data (or some other structured
>>>>>>> format) from Java to the Python workers.  My early prototype of
>>> custom
>>>>>>> serializers (described at
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>> https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals#PySparkInternals-customserializers
>>>>>>> )
>>>>>>> might be useful for implementing this.  Proper custom serializer
>>>>> support
>>>>>>> would handle the bookkeeping for tracking each stage's input and
>>>>> output
>>>>>>> formats and supplying the appropriate deserialization functions to
>>> the
>>>>>>> Python worker, so the Python worker would be able to directly read
>>> the
>>>>>>> MsgPack'd data that's sent to it.
>>>>>>> 
>>>>>>> Regarding a wrapper API, it's actually possible to initially
>>> transform
>>>>>> data
>>>>>>> using Scala/Java and perform the remainder of the processing in
>>>>> PySpark.
>>>>>>> This involves adding the appropriate compiled to the Java classpath
>>>>> and
>>>>>> a
>>>>>>> bit of work in Py4J to create the Java/Scala RDD and wrap it for use
>>>>> by
>>>>>>> PySpark.  I can hack together a rough example of this if anyone's
>>>>>>> interested, but it would need some work to be developed into a
>>>>>>> user-friendly API.
>>>>>>> 
>>>>>>> If you wanted to extend your proof-of-concept to handle the cases
>>>>> where
>>>>>>> keys and values have parseable toString() values, I think you could
>>>>>> remove
>>>>>>> the need for a delimiter by creating a PythonRDD from the
>>>>> newHadoopFile
>>>>>>> JavaPairRDD and adding a new method to writeAsPickle (
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L224
>>>>>>> )
>>>>>>> to dump its contents as a pickled pair of strings.  (Aside: most of
>>>>>>> writeAsPickle() would probably need be eliminated or refactored when
>>>>>> adding
>>>>>>> general custom serializer support).
>>>>>>> 
>>>>>>> - Josh
>>>>>>> 
>>>>>>> On Thu, Oct 24, 2013 at 11:18 PM, Nick Pentreath
>>>>>>> <ni...@gmail.com>wrote:
>>>>>>> 
>>>>>>>> Hi Spark Devs
>>>>>>>> 
>>>>>>>> I was wondering what appetite there may be to add the ability for
>>>>>> PySpark
>>>>>>>> users to create RDDs from (somewhat) arbitrary Hadoop
>>> InputFormats.
>>>>>>>> 
>>>>>>>> In my data pipeline for example, I'm currently just using Scala
>>>>> (partly
>>>>>>>> because I love it but also because I am heavily reliant on quite
>>>>> custom
>>>>>>>> Hadoop InputFormats for reading data). However, many users may
>>>>> prefer
>>>>>> to
>>>>>>>> use PySpark as much as possible (if not for everything). Reasons
>>>>> might
>>>>>>>> include the need to use some Python library. While I don't do it
>>>>> yet, I
>>>>>>> can
>>>>>>>> certainly see an attractive use case for using say scikit-learn /
>>>>> numpy
>>>>>>> to
>>>>>>>> do data analysis & machine learning in Python. Added to this my
>>>>>> cofounder
>>>>>>>> knows Python well but not Scala so it can be very beneficial to
>>> do a
>>>>>> lot
>>>>>>> of
>>>>>>>> stuff in Python.
>>>>>>>> 
>>>>>>>> For text-based data this is fine, but reading data in from more
>>>>> complex
>>>>>>>> Hadoop formats is an issue.
>>>>>>>> 
>>>>>>>> The current approach would of course be to write an ETL-style
>>>>>> Java/Scala
>>>>>>>> job and then process in Python. Nothing wrong with this, but I was
>>>>>>> thinking
>>>>>>>> about ways to allow Python to access arbitrary Hadoop
>>> InputFormats.
>>>>>>>> 
>>>>>>>> Here is a quick proof of concept:
>>>>>> https://gist.github.com/MLnick/7150058
>>>>>>>> 
>>>>>>>> This works for simple stuff like SequenceFile with simple Writable
>>>>>>>> key/values.
>>>>>>>> 
>>>>>>>> To work with more complex files, perhaps an approach is to
>>>>> manipulate
>>>>>>>> Hadoop JobConf via Python and pass that in. The one downside is of
>>>>>> course
>>>>>>>> that the InputFormat (well actually the Key/Value classes) must
>>>>> have a
>>>>>>>> toString that makes sense so very custom stuff might not work.
>>>>>>>> 
>>>>>>>> I wonder if it would be possible to take the objects that are
>>>>> yielded
>>>>>> via
>>>>>>>> the InputFormat and convert them into some representation like
>>>>>> ProtoBuf,
>>>>>>>> MsgPack, Avro, JSON, that can be read relatively more easily from
>>>>>> Python?
>>>>>>>> 
>>>>>>>> Another approach could be to allow a simple "wrapper API" such
>>> that
>>>>> one
>>>>>>> can
>>>>>>>> write a wrapper function T => String and pass that into an
>>>>>>>> InputFormatWrapper that takes an arbitrary InputFormat and yields
>>>>>> Strings
>>>>>>>> for the keys and values. Then all that is required is to compile
>>>>> that
>>>>>>>> function and add it to the SPARK_CLASSPATH and away you go!
>>>>>>>> 
>>>>>>>> Thoughts?
>>>>>>>> 
>>>>>>>> Nick
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>>> 


Re: [PySpark]: reading arbitrary Hadoop InputFormats

Posted by Nick Pentreath <ni...@gmail.com>.
Hi


I managed to find the time to put together a PR on this: https://github.com/apache/incubator-spark/pull/263




Josh has had a look over it - if anyone else with an interest could give some feedback that would be great.




As mentioned in the PR it's more of an RFC and certainly still needs a bit of clean up work, and I need to add the concept of "wrapper functions" to deserialize classes that MsgPack can't handle out the box.




N
—
Sent from Mailbox for iPhone

On Fri, Nov 8, 2013 at 12:20 PM, Nick Pentreath <ni...@gmail.com>
wrote:

> Wow Josh, that looks great. I've been a bit swamped this week but as soon
> as I get a chance I'll test out the PR in more detail and port over the
> InputFormat stuff to use the new framework (including the changes you
> suggested).
> I can then look deeper into the MsgPack functionality to see if it can be
> made to work in a generic enough manner without requiring huge amounts of
> custom Templates to be written by users.
> Will feed back asap.
> N
> On Thu, Nov 7, 2013 at 5:03 AM, Josh Rosen <ro...@gmail.com> wrote:
>> I opened a pull request to add custom serializer support to PySpark:
>> https://github.com/apache/incubator-spark/pull/146
>>
>> My pull request adds the plumbing for transferring data from Java to Python
>> using formats other than Pickle.  For example, look at how textFile() uses
>> MUTF8Deserializer to read strings from Java.  Hopefully this provides all
>> of the functionality needed to support MsgPack.
>>
>> - Josh
>>
>>
>> On Thu, Oct 31, 2013 at 11:11 AM, Josh Rosen <ro...@gmail.com> wrote:
>>
>> > Hi Nick,
>> >
>> > This is a nice start.  I'd prefer to keep the Java sequenceFileAsText()
>> > and newHadoopFileAsText() methods inside PythonRDD instead of adding them
>> > to JavaSparkContext, since I think these methods are unlikely to be used
>> > directly by Java users (you can add these methods to the PythonRDD
>> > companion object, which is how readRDDFromPickleFile is implemented:
>> >
>> https://github.com/apache/incubator-spark/blob/branch-0.8/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L255
>> > )
>> >
>> > For MsgPack, the UnpicklingError is because the Python worker expects to
>> > receive its input in a pickled format.  In my prototype of custom
>> > serializers, I modified the PySpark worker to receive its
>> > serialization/deserialization function as input (
>> >
>> https://github.com/JoshRosen/spark/blob/59b6b43916dc84fc8b83f22eb9ce13a27bc51ec0/python/pyspark/worker.py#L41
>> )
>> > and added logic to pass the appropriate serializers based on each stage's
>> > input and output formats (
>> >
>> https://github.com/JoshRosen/spark/blob/59b6b43916dc84fc8b83f22eb9ce13a27bc51ec0/python/pyspark/rdd.py#L42
>> > ).
>> >
>> > At some point, I'd like to port my custom serializers code to PySpark; if
>> > anyone's interested in helping, I'd be glad to write up some additional
>> > notes on how this should work.
>> >
>> > - Josh
>> >
>> > On Wed, Oct 30, 2013 at 2:25 PM, Nick Pentreath <
>> nick.pentreath@gmail.com>wrote:
>> >
>> >> Thanks Josh, Patrick for the feedback.
>> >>
>> >> Based on Josh's pointers I have something working for JavaPairRDD ->
>> >> PySpark RDD[(String, String)]. This just calls the toString method on
>> each
>> >> key and value as before, but without the need for a delimiter. For
>> >> SequenceFile, it uses SequenceFileAsTextInputFormat which itself calls
>> >> toString to convert to Text for keys and values. We then call toString
>> >> (again) ourselves to get Strings to feed to writeAsPickle.
>> >>
>> >> Details here: https://gist.github.com/MLnick/7230588
>> >>
>> >> This also illustrates where the "wrapper function" api would fit in. All
>> >> that is required is to define a T => String for key and value.
>> >>
>> >> I started playing around with MsgPack and can sort of get things to work
>> >> in
>> >> Scala, but am struggling with getting the raw bytes to be written
>> properly
>> >> in PythonRDD (I think it is treating them as pickled byte arrays when
>> they
>> >> are not, but when I removed the 'stripPickle' calls and amended the
>> length
>> >> (-6) I got "UnpicklingError: invalid load key, ' '. ").
>> >>
>> >> Another issue is that MsgPack does well at writing "structures" - like
>> >> Java
>> >> classes with public fields that are fairly simple - but for example the
>> >> Writables have private fields so you end up with nothing being written.
>> >> This looks like it would require custom "Templates" (serialization
>> >> functions effectively) for many classes, which means a lot of custom
>> code
>> >> for a user to write to use it. Fortunately for most of the common
>> >> Writables
>> >> a toString does the job. Will keep looking into it though.
>> >>
>> >> Anyway, Josh if you have ideas or examples on the "Wrapper API from
>> >> Python"
>> >> that you mentioned, I'd be interested to hear them.
>> >>
>> >> If you think this is worth working up as a Pull Request covering
>> >> SequenceFiles and custom InputFormats with default toString conversions
>> >> and
>> >> the ability to specify Wrapper functions, I can clean things up more,
>> add
>> >> some functionality and tests, and also test to see if common things like
>> >> the "normal" Writables and reading from things like HBase and Cassandra
>> >> can
>> >> be made to work nicely (any other common use cases that you think make
>> >> sense?).
>> >>
>> >> Thoughts, comments etc welcome.
>> >>
>> >> Nick
>> >>
>> >>
>> >>
>> >> On Fri, Oct 25, 2013 at 11:03 PM, Patrick Wendell <pwendell@gmail.com
>> >> >wrote:
>> >>
>> >> > As a starting point, a version where people just write their own
>> >> "wrapper"
>> >> > functions to convert various HadoopFiles into String <K, V> files
>> could
>> >> go
>> >> > a long way. We could even have a few built-in versions, such as
>> dealing
>> >> > with Sequence files that are <String, String>. Basically, the user
>> >> needs to
>> >> > write a translator in Java/Scala that produces textual records from
>> >> > whatever format that want. Then, they make sure this is included in
>> the
>> >> > classpath when running PySpark.
>> >> >
>> >> > As Josh is saying, I'm pretty sure this is already possible, but we
>> may
>> >> > want to document it for users. In many organizations they might have
>> 1-2
>> >> > people who can write the Java/Scala to do this but then many more
>> people
>> >> > who are comfortable using python once it's setup.
>> >> >
>> >> > - Patrick
>> >> >
>> >> > On Fri, Oct 25, 2013 at 11:00 AM, Josh Rosen <ro...@gmail.com>
>> >> wrote:
>> >> >
>> >> > > Hi Nick,
>> >> > >
>> >> > > I've seen several requests for SequenceFile support in PySpark, so
>> >> > there's
>> >> > > definitely demand for this feature.
>> >> > >
>> >> > > I like the idea of passing MsgPack'ed data (or some other structured
>> >> > > format) from Java to the Python workers.  My early prototype of
>> custom
>> >> > > serializers (described at
>> >> > >
>> >> > >
>> >> >
>> >>
>> https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals#PySparkInternals-customserializers
>> >> > > )
>> >> > > might be useful for implementing this.  Proper custom serializer
>> >> support
>> >> > > would handle the bookkeeping for tracking each stage's input and
>> >> output
>> >> > > formats and supplying the appropriate deserialization functions to
>> the
>> >> > > Python worker, so the Python worker would be able to directly read
>> the
>> >> > > MsgPack'd data that's sent to it.
>> >> > >
>> >> > > Regarding a wrapper API, it's actually possible to initially
>> transform
>> >> > data
>> >> > > using Scala/Java and perform the remainder of the processing in
>> >> PySpark.
>> >> > >  This involves adding the appropriate compiled to the Java classpath
>> >> and
>> >> > a
>> >> > > bit of work in Py4J to create the Java/Scala RDD and wrap it for use
>> >> by
>> >> > > PySpark.  I can hack together a rough example of this if anyone's
>> >> > > interested, but it would need some work to be developed into a
>> >> > > user-friendly API.
>> >> > >
>> >> > > If you wanted to extend your proof-of-concept to handle the cases
>> >> where
>> >> > > keys and values have parseable toString() values, I think you could
>> >> > remove
>> >> > > the need for a delimiter by creating a PythonRDD from the
>> >> newHadoopFile
>> >> > > JavaPairRDD and adding a new method to writeAsPickle (
>> >> > >
>> >> > >
>> >> >
>> >>
>> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L224
>> >> > > )
>> >> > > to dump its contents as a pickled pair of strings.  (Aside: most of
>> >> > > writeAsPickle() would probably need be eliminated or refactored when
>> >> > adding
>> >> > > general custom serializer support).
>> >> > >
>> >> > > - Josh
>> >> > >
>> >> > > On Thu, Oct 24, 2013 at 11:18 PM, Nick Pentreath
>> >> > > <ni...@gmail.com>wrote:
>> >> > >
>> >> > > > Hi Spark Devs
>> >> > > >
>> >> > > > I was wondering what appetite there may be to add the ability for
>> >> > PySpark
>> >> > > > users to create RDDs from (somewhat) arbitrary Hadoop
>> InputFormats.
>> >> > > >
>> >> > > > In my data pipeline for example, I'm currently just using Scala
>> >> (partly
>> >> > > > because I love it but also because I am heavily reliant on quite
>> >> custom
>> >> > > > Hadoop InputFormats for reading data). However, many users may
>> >> prefer
>> >> > to
>> >> > > > use PySpark as much as possible (if not for everything). Reasons
>> >> might
>> >> > > > include the need to use some Python library. While I don't do it
>> >> yet, I
>> >> > > can
>> >> > > > certainly see an attractive use case for using say scikit-learn /
>> >> numpy
>> >> > > to
>> >> > > > do data analysis & machine learning in Python. Added to this my
>> >> > cofounder
>> >> > > > knows Python well but not Scala so it can be very beneficial to
>> do a
>> >> > lot
>> >> > > of
>> >> > > > stuff in Python.
>> >> > > >
>> >> > > > For text-based data this is fine, but reading data in from more
>> >> complex
>> >> > > > Hadoop formats is an issue.
>> >> > > >
>> >> > > > The current approach would of course be to write an ETL-style
>> >> > Java/Scala
>> >> > > > job and then process in Python. Nothing wrong with this, but I was
>> >> > > thinking
>> >> > > > about ways to allow Python to access arbitrary Hadoop
>> InputFormats.
>> >> > > >
>> >> > > > Here is a quick proof of concept:
>> >> > https://gist.github.com/MLnick/7150058
>> >> > > >
>> >> > > > This works for simple stuff like SequenceFile with simple Writable
>> >> > > > key/values.
>> >> > > >
>> >> > > > To work with more complex files, perhaps an approach is to
>> >> manipulate
>> >> > > > Hadoop JobConf via Python and pass that in. The one downside is of
>> >> > course
>> >> > > > that the InputFormat (well actually the Key/Value classes) must
>> >> have a
>> >> > > > toString that makes sense so very custom stuff might not work.
>> >> > > >
>> >> > > > I wonder if it would be possible to take the objects that are
>> >> yielded
>> >> > via
>> >> > > > the InputFormat and convert them into some representation like
>> >> > ProtoBuf,
>> >> > > > MsgPack, Avro, JSON, that can be read relatively more easily from
>> >> > Python?
>> >> > > >
>> >> > > > Another approach could be to allow a simple "wrapper API" such
>> that
>> >> one
>> >> > > can
>> >> > > > write a wrapper function T => String and pass that into an
>> >> > > > InputFormatWrapper that takes an arbitrary InputFormat and yields
>> >> > Strings
>> >> > > > for the keys and values. Then all that is required is to compile
>> >> that
>> >> > > > function and add it to the SPARK_CLASSPATH and away you go!
>> >> > > >
>> >> > > > Thoughts?
>> >> > > >
>> >> > > > Nick
>> >> > > >
>> >> > >
>> >> >
>> >>
>> >
>> >
>>

Re: [PySpark]: reading arbitrary Hadoop InputFormats

Posted by Nick Pentreath <ni...@gmail.com>.
Wow Josh, that looks great. I've been a bit swamped this week but as soon
as I get a chance I'll test out the PR in more detail and port over the
InputFormat stuff to use the new framework (including the changes you
suggested).

I can then look deeper into the MsgPack functionality to see if it can be
made to work in a generic enough manner without requiring huge amounts of
custom Templates to be written by users.

Will feed back asap.
N


On Thu, Nov 7, 2013 at 5:03 AM, Josh Rosen <ro...@gmail.com> wrote:

> I opened a pull request to add custom serializer support to PySpark:
> https://github.com/apache/incubator-spark/pull/146
>
> My pull request adds the plumbing for transferring data from Java to Python
> using formats other than Pickle.  For example, look at how textFile() uses
> MUTF8Deserializer to read strings from Java.  Hopefully this provides all
> of the functionality needed to support MsgPack.
>
> - Josh
>
>
> On Thu, Oct 31, 2013 at 11:11 AM, Josh Rosen <ro...@gmail.com> wrote:
>
> > Hi Nick,
> >
> > This is a nice start.  I'd prefer to keep the Java sequenceFileAsText()
> > and newHadoopFileAsText() methods inside PythonRDD instead of adding them
> > to JavaSparkContext, since I think these methods are unlikely to be used
> > directly by Java users (you can add these methods to the PythonRDD
> > companion object, which is how readRDDFromPickleFile is implemented:
> >
> https://github.com/apache/incubator-spark/blob/branch-0.8/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L255
> > )
> >
> > For MsgPack, the UnpicklingError is because the Python worker expects to
> > receive its input in a pickled format.  In my prototype of custom
> > serializers, I modified the PySpark worker to receive its
> > serialization/deserialization function as input (
> >
> https://github.com/JoshRosen/spark/blob/59b6b43916dc84fc8b83f22eb9ce13a27bc51ec0/python/pyspark/worker.py#L41
> )
> > and added logic to pass the appropriate serializers based on each stage's
> > input and output formats (
> >
> https://github.com/JoshRosen/spark/blob/59b6b43916dc84fc8b83f22eb9ce13a27bc51ec0/python/pyspark/rdd.py#L42
> > ).
> >
> > At some point, I'd like to port my custom serializers code to PySpark; if
> > anyone's interested in helping, I'd be glad to write up some additional
> > notes on how this should work.
> >
> > - Josh
> >
> > On Wed, Oct 30, 2013 at 2:25 PM, Nick Pentreath <
> nick.pentreath@gmail.com>wrote:
> >
> >> Thanks Josh, Patrick for the feedback.
> >>
> >> Based on Josh's pointers I have something working for JavaPairRDD ->
> >> PySpark RDD[(String, String)]. This just calls the toString method on
> each
> >> key and value as before, but without the need for a delimiter. For
> >> SequenceFile, it uses SequenceFileAsTextInputFormat which itself calls
> >> toString to convert to Text for keys and values. We then call toString
> >> (again) ourselves to get Strings to feed to writeAsPickle.
> >>
> >> Details here: https://gist.github.com/MLnick/7230588
> >>
> >> This also illustrates where the "wrapper function" api would fit in. All
> >> that is required is to define a T => String for key and value.
> >>
> >> I started playing around with MsgPack and can sort of get things to work
> >> in
> >> Scala, but am struggling with getting the raw bytes to be written
> properly
> >> in PythonRDD (I think it is treating them as pickled byte arrays when
> they
> >> are not, but when I removed the 'stripPickle' calls and amended the
> length
> >> (-6) I got "UnpicklingError: invalid load key, ' '. ").
> >>
> >> Another issue is that MsgPack does well at writing "structures" - like
> >> Java
> >> classes with public fields that are fairly simple - but for example the
> >> Writables have private fields so you end up with nothing being written.
> >> This looks like it would require custom "Templates" (serialization
> >> functions effectively) for many classes, which means a lot of custom
> code
> >> for a user to write to use it. Fortunately for most of the common
> >> Writables
> >> a toString does the job. Will keep looking into it though.
> >>
> >> Anyway, Josh if you have ideas or examples on the "Wrapper API from
> >> Python"
> >> that you mentioned, I'd be interested to hear them.
> >>
> >> If you think this is worth working up as a Pull Request covering
> >> SequenceFiles and custom InputFormats with default toString conversions
> >> and
> >> the ability to specify Wrapper functions, I can clean things up more,
> add
> >> some functionality and tests, and also test to see if common things like
> >> the "normal" Writables and reading from things like HBase and Cassandra
> >> can
> >> be made to work nicely (any other common use cases that you think make
> >> sense?).
> >>
> >> Thoughts, comments etc welcome.
> >>
> >> Nick
> >>
> >>
> >>
> >> On Fri, Oct 25, 2013 at 11:03 PM, Patrick Wendell <pwendell@gmail.com
> >> >wrote:
> >>
> >> > As a starting point, a version where people just write their own
> >> "wrapper"
> >> > functions to convert various HadoopFiles into String <K, V> files
> could
> >> go
> >> > a long way. We could even have a few built-in versions, such as
> dealing
> >> > with Sequence files that are <String, String>. Basically, the user
> >> needs to
> >> > write a translator in Java/Scala that produces textual records from
> >> > whatever format that want. Then, they make sure this is included in
> the
> >> > classpath when running PySpark.
> >> >
> >> > As Josh is saying, I'm pretty sure this is already possible, but we
> may
> >> > want to document it for users. In many organizations they might have
> 1-2
> >> > people who can write the Java/Scala to do this but then many more
> people
> >> > who are comfortable using python once it's setup.
> >> >
> >> > - Patrick
> >> >
> >> > On Fri, Oct 25, 2013 at 11:00 AM, Josh Rosen <ro...@gmail.com>
> >> wrote:
> >> >
> >> > > Hi Nick,
> >> > >
> >> > > I've seen several requests for SequenceFile support in PySpark, so
> >> > there's
> >> > > definitely demand for this feature.
> >> > >
> >> > > I like the idea of passing MsgPack'ed data (or some other structured
> >> > > format) from Java to the Python workers.  My early prototype of
> custom
> >> > > serializers (described at
> >> > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals#PySparkInternals-customserializers
> >> > > )
> >> > > might be useful for implementing this.  Proper custom serializer
> >> support
> >> > > would handle the bookkeeping for tracking each stage's input and
> >> output
> >> > > formats and supplying the appropriate deserialization functions to
> the
> >> > > Python worker, so the Python worker would be able to directly read
> the
> >> > > MsgPack'd data that's sent to it.
> >> > >
> >> > > Regarding a wrapper API, it's actually possible to initially
> transform
> >> > data
> >> > > using Scala/Java and perform the remainder of the processing in
> >> PySpark.
> >> > >  This involves adding the appropriate compiled to the Java classpath
> >> and
> >> > a
> >> > > bit of work in Py4J to create the Java/Scala RDD and wrap it for use
> >> by
> >> > > PySpark.  I can hack together a rough example of this if anyone's
> >> > > interested, but it would need some work to be developed into a
> >> > > user-friendly API.
> >> > >
> >> > > If you wanted to extend your proof-of-concept to handle the cases
> >> where
> >> > > keys and values have parseable toString() values, I think you could
> >> > remove
> >> > > the need for a delimiter by creating a PythonRDD from the
> >> newHadoopFile
> >> > > JavaPairRDD and adding a new method to writeAsPickle (
> >> > >
> >> > >
> >> >
> >>
> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L224
> >> > > )
> >> > > to dump its contents as a pickled pair of strings.  (Aside: most of
> >> > > writeAsPickle() would probably need be eliminated or refactored when
> >> > adding
> >> > > general custom serializer support).
> >> > >
> >> > > - Josh
> >> > >
> >> > > On Thu, Oct 24, 2013 at 11:18 PM, Nick Pentreath
> >> > > <ni...@gmail.com>wrote:
> >> > >
> >> > > > Hi Spark Devs
> >> > > >
> >> > > > I was wondering what appetite there may be to add the ability for
> >> > PySpark
> >> > > > users to create RDDs from (somewhat) arbitrary Hadoop
> InputFormats.
> >> > > >
> >> > > > In my data pipeline for example, I'm currently just using Scala
> >> (partly
> >> > > > because I love it but also because I am heavily reliant on quite
> >> custom
> >> > > > Hadoop InputFormats for reading data). However, many users may
> >> prefer
> >> > to
> >> > > > use PySpark as much as possible (if not for everything). Reasons
> >> might
> >> > > > include the need to use some Python library. While I don't do it
> >> yet, I
> >> > > can
> >> > > > certainly see an attractive use case for using say scikit-learn /
> >> numpy
> >> > > to
> >> > > > do data analysis & machine learning in Python. Added to this my
> >> > cofounder
> >> > > > knows Python well but not Scala so it can be very beneficial to
> do a
> >> > lot
> >> > > of
> >> > > > stuff in Python.
> >> > > >
> >> > > > For text-based data this is fine, but reading data in from more
> >> complex
> >> > > > Hadoop formats is an issue.
> >> > > >
> >> > > > The current approach would of course be to write an ETL-style
> >> > Java/Scala
> >> > > > job and then process in Python. Nothing wrong with this, but I was
> >> > > thinking
> >> > > > about ways to allow Python to access arbitrary Hadoop
> InputFormats.
> >> > > >
> >> > > > Here is a quick proof of concept:
> >> > https://gist.github.com/MLnick/7150058
> >> > > >
> >> > > > This works for simple stuff like SequenceFile with simple Writable
> >> > > > key/values.
> >> > > >
> >> > > > To work with more complex files, perhaps an approach is to
> >> manipulate
> >> > > > Hadoop JobConf via Python and pass that in. The one downside is of
> >> > course
> >> > > > that the InputFormat (well actually the Key/Value classes) must
> >> have a
> >> > > > toString that makes sense so very custom stuff might not work.
> >> > > >
> >> > > > I wonder if it would be possible to take the objects that are
> >> yielded
> >> > via
> >> > > > the InputFormat and convert them into some representation like
> >> > ProtoBuf,
> >> > > > MsgPack, Avro, JSON, that can be read relatively more easily from
> >> > Python?
> >> > > >
> >> > > > Another approach could be to allow a simple "wrapper API" such
> that
> >> one
> >> > > can
> >> > > > write a wrapper function T => String and pass that into an
> >> > > > InputFormatWrapper that takes an arbitrary InputFormat and yields
> >> > Strings
> >> > > > for the keys and values. Then all that is required is to compile
> >> that
> >> > > > function and add it to the SPARK_CLASSPATH and away you go!
> >> > > >
> >> > > > Thoughts?
> >> > > >
> >> > > > Nick
> >> > > >
> >> > >
> >> >
> >>
> >
> >
>

Re: [PySpark]: reading arbitrary Hadoop InputFormats

Posted by Josh Rosen <ro...@gmail.com>.
I opened a pull request to add custom serializer support to PySpark:
https://github.com/apache/incubator-spark/pull/146

My pull request adds the plumbing for transferring data from Java to Python
using formats other than Pickle.  For example, look at how textFile() uses
MUTF8Deserializer to read strings from Java.  Hopefully this provides all
of the functionality needed to support MsgPack.

- Josh


On Thu, Oct 31, 2013 at 11:11 AM, Josh Rosen <ro...@gmail.com> wrote:

> Hi Nick,
>
> This is a nice start.  I'd prefer to keep the Java sequenceFileAsText()
> and newHadoopFileAsText() methods inside PythonRDD instead of adding them
> to JavaSparkContext, since I think these methods are unlikely to be used
> directly by Java users (you can add these methods to the PythonRDD
> companion object, which is how readRDDFromPickleFile is implemented:
> https://github.com/apache/incubator-spark/blob/branch-0.8/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L255
> )
>
> For MsgPack, the UnpicklingError is because the Python worker expects to
> receive its input in a pickled format.  In my prototype of custom
> serializers, I modified the PySpark worker to receive its
> serialization/deserialization function as input (
> https://github.com/JoshRosen/spark/blob/59b6b43916dc84fc8b83f22eb9ce13a27bc51ec0/python/pyspark/worker.py#L41)
> and added logic to pass the appropriate serializers based on each stage's
> input and output formats (
> https://github.com/JoshRosen/spark/blob/59b6b43916dc84fc8b83f22eb9ce13a27bc51ec0/python/pyspark/rdd.py#L42
> ).
>
> At some point, I'd like to port my custom serializers code to PySpark; if
> anyone's interested in helping, I'd be glad to write up some additional
> notes on how this should work.
>
> - Josh
>
> On Wed, Oct 30, 2013 at 2:25 PM, Nick Pentreath <ni...@gmail.com>wrote:
>
>> Thanks Josh, Patrick for the feedback.
>>
>> Based on Josh's pointers I have something working for JavaPairRDD ->
>> PySpark RDD[(String, String)]. This just calls the toString method on each
>> key and value as before, but without the need for a delimiter. For
>> SequenceFile, it uses SequenceFileAsTextInputFormat which itself calls
>> toString to convert to Text for keys and values. We then call toString
>> (again) ourselves to get Strings to feed to writeAsPickle.
>>
>> Details here: https://gist.github.com/MLnick/7230588
>>
>> This also illustrates where the "wrapper function" api would fit in. All
>> that is required is to define a T => String for key and value.
>>
>> I started playing around with MsgPack and can sort of get things to work
>> in
>> Scala, but am struggling with getting the raw bytes to be written properly
>> in PythonRDD (I think it is treating them as pickled byte arrays when they
>> are not, but when I removed the 'stripPickle' calls and amended the length
>> (-6) I got "UnpicklingError: invalid load key, ' '. ").
>>
>> Another issue is that MsgPack does well at writing "structures" - like
>> Java
>> classes with public fields that are fairly simple - but for example the
>> Writables have private fields so you end up with nothing being written.
>> This looks like it would require custom "Templates" (serialization
>> functions effectively) for many classes, which means a lot of custom code
>> for a user to write to use it. Fortunately for most of the common
>> Writables
>> a toString does the job. Will keep looking into it though.
>>
>> Anyway, Josh if you have ideas or examples on the "Wrapper API from
>> Python"
>> that you mentioned, I'd be interested to hear them.
>>
>> If you think this is worth working up as a Pull Request covering
>> SequenceFiles and custom InputFormats with default toString conversions
>> and
>> the ability to specify Wrapper functions, I can clean things up more, add
>> some functionality and tests, and also test to see if common things like
>> the "normal" Writables and reading from things like HBase and Cassandra
>> can
>> be made to work nicely (any other common use cases that you think make
>> sense?).
>>
>> Thoughts, comments etc welcome.
>>
>> Nick
>>
>>
>>
>> On Fri, Oct 25, 2013 at 11:03 PM, Patrick Wendell <pwendell@gmail.com
>> >wrote:
>>
>> > As a starting point, a version where people just write their own
>> "wrapper"
>> > functions to convert various HadoopFiles into String <K, V> files could
>> go
>> > a long way. We could even have a few built-in versions, such as dealing
>> > with Sequence files that are <String, String>. Basically, the user
>> needs to
>> > write a translator in Java/Scala that produces textual records from
>> > whatever format that want. Then, they make sure this is included in the
>> > classpath when running PySpark.
>> >
>> > As Josh is saying, I'm pretty sure this is already possible, but we may
>> > want to document it for users. In many organizations they might have 1-2
>> > people who can write the Java/Scala to do this but then many more people
>> > who are comfortable using python once it's setup.
>> >
>> > - Patrick
>> >
>> > On Fri, Oct 25, 2013 at 11:00 AM, Josh Rosen <ro...@gmail.com>
>> wrote:
>> >
>> > > Hi Nick,
>> > >
>> > > I've seen several requests for SequenceFile support in PySpark, so
>> > there's
>> > > definitely demand for this feature.
>> > >
>> > > I like the idea of passing MsgPack'ed data (or some other structured
>> > > format) from Java to the Python workers.  My early prototype of custom
>> > > serializers (described at
>> > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals#PySparkInternals-customserializers
>> > > )
>> > > might be useful for implementing this.  Proper custom serializer
>> support
>> > > would handle the bookkeeping for tracking each stage's input and
>> output
>> > > formats and supplying the appropriate deserialization functions to the
>> > > Python worker, so the Python worker would be able to directly read the
>> > > MsgPack'd data that's sent to it.
>> > >
>> > > Regarding a wrapper API, it's actually possible to initially transform
>> > data
>> > > using Scala/Java and perform the remainder of the processing in
>> PySpark.
>> > >  This involves adding the appropriate compiled to the Java classpath
>> and
>> > a
>> > > bit of work in Py4J to create the Java/Scala RDD and wrap it for use
>> by
>> > > PySpark.  I can hack together a rough example of this if anyone's
>> > > interested, but it would need some work to be developed into a
>> > > user-friendly API.
>> > >
>> > > If you wanted to extend your proof-of-concept to handle the cases
>> where
>> > > keys and values have parseable toString() values, I think you could
>> > remove
>> > > the need for a delimiter by creating a PythonRDD from the
>> newHadoopFile
>> > > JavaPairRDD and adding a new method to writeAsPickle (
>> > >
>> > >
>> >
>> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L224
>> > > )
>> > > to dump its contents as a pickled pair of strings.  (Aside: most of
>> > > writeAsPickle() would probably need be eliminated or refactored when
>> > adding
>> > > general custom serializer support).
>> > >
>> > > - Josh
>> > >
>> > > On Thu, Oct 24, 2013 at 11:18 PM, Nick Pentreath
>> > > <ni...@gmail.com>wrote:
>> > >
>> > > > Hi Spark Devs
>> > > >
>> > > > I was wondering what appetite there may be to add the ability for
>> > PySpark
>> > > > users to create RDDs from (somewhat) arbitrary Hadoop InputFormats.
>> > > >
>> > > > In my data pipeline for example, I'm currently just using Scala
>> (partly
>> > > > because I love it but also because I am heavily reliant on quite
>> custom
>> > > > Hadoop InputFormats for reading data). However, many users may
>> prefer
>> > to
>> > > > use PySpark as much as possible (if not for everything). Reasons
>> might
>> > > > include the need to use some Python library. While I don't do it
>> yet, I
>> > > can
>> > > > certainly see an attractive use case for using say scikit-learn /
>> numpy
>> > > to
>> > > > do data analysis & machine learning in Python. Added to this my
>> > cofounder
>> > > > knows Python well but not Scala so it can be very beneficial to do a
>> > lot
>> > > of
>> > > > stuff in Python.
>> > > >
>> > > > For text-based data this is fine, but reading data in from more
>> complex
>> > > > Hadoop formats is an issue.
>> > > >
>> > > > The current approach would of course be to write an ETL-style
>> > Java/Scala
>> > > > job and then process in Python. Nothing wrong with this, but I was
>> > > thinking
>> > > > about ways to allow Python to access arbitrary Hadoop InputFormats.
>> > > >
>> > > > Here is a quick proof of concept:
>> > https://gist.github.com/MLnick/7150058
>> > > >
>> > > > This works for simple stuff like SequenceFile with simple Writable
>> > > > key/values.
>> > > >
>> > > > To work with more complex files, perhaps an approach is to
>> manipulate
>> > > > Hadoop JobConf via Python and pass that in. The one downside is of
>> > course
>> > > > that the InputFormat (well actually the Key/Value classes) must
>> have a
>> > > > toString that makes sense so very custom stuff might not work.
>> > > >
>> > > > I wonder if it would be possible to take the objects that are
>> yielded
>> > via
>> > > > the InputFormat and convert them into some representation like
>> > ProtoBuf,
>> > > > MsgPack, Avro, JSON, that can be read relatively more easily from
>> > Python?
>> > > >
>> > > > Another approach could be to allow a simple "wrapper API" such that
>> one
>> > > can
>> > > > write a wrapper function T => String and pass that into an
>> > > > InputFormatWrapper that takes an arbitrary InputFormat and yields
>> > Strings
>> > > > for the keys and values. Then all that is required is to compile
>> that
>> > > > function and add it to the SPARK_CLASSPATH and away you go!
>> > > >
>> > > > Thoughts?
>> > > >
>> > > > Nick
>> > > >
>> > >
>> >
>>
>
>

Re: [PySpark]: reading arbitrary Hadoop InputFormats

Posted by Josh Rosen <ro...@gmail.com>.
Hi Nick,

This is a nice start.  I'd prefer to keep the Java sequenceFileAsText() and
newHadoopFileAsText() methods inside PythonRDD instead of adding them to
JavaSparkContext, since I think these methods are unlikely to be used
directly by Java users (you can add these methods to the PythonRDD
companion object, which is how readRDDFromPickleFile is implemented:
https://github.com/apache/incubator-spark/blob/branch-0.8/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L255
)

For MsgPack, the UnpicklingError is because the Python worker expects to
receive its input in a pickled format.  In my prototype of custom
serializers, I modified the PySpark worker to receive its
serialization/deserialization function as input (
https://github.com/JoshRosen/spark/blob/59b6b43916dc84fc8b83f22eb9ce13a27bc51ec0/python/pyspark/worker.py#L41)
and added logic to pass the appropriate serializers based on each stage's
input and output formats (
https://github.com/JoshRosen/spark/blob/59b6b43916dc84fc8b83f22eb9ce13a27bc51ec0/python/pyspark/rdd.py#L42
).

At some point, I'd like to port my custom serializers code to PySpark; if
anyone's interested in helping, I'd be glad to write up some additional
notes on how this should work.

- Josh

On Wed, Oct 30, 2013 at 2:25 PM, Nick Pentreath <ni...@gmail.com>wrote:

> Thanks Josh, Patrick for the feedback.
>
> Based on Josh's pointers I have something working for JavaPairRDD ->
> PySpark RDD[(String, String)]. This just calls the toString method on each
> key and value as before, but without the need for a delimiter. For
> SequenceFile, it uses SequenceFileAsTextInputFormat which itself calls
> toString to convert to Text for keys and values. We then call toString
> (again) ourselves to get Strings to feed to writeAsPickle.
>
> Details here: https://gist.github.com/MLnick/7230588
>
> This also illustrates where the "wrapper function" api would fit in. All
> that is required is to define a T => String for key and value.
>
> I started playing around with MsgPack and can sort of get things to work in
> Scala, but am struggling with getting the raw bytes to be written properly
> in PythonRDD (I think it is treating them as pickled byte arrays when they
> are not, but when I removed the 'stripPickle' calls and amended the length
> (-6) I got "UnpicklingError: invalid load key, ' '. ").
>
> Another issue is that MsgPack does well at writing "structures" - like Java
> classes with public fields that are fairly simple - but for example the
> Writables have private fields so you end up with nothing being written.
> This looks like it would require custom "Templates" (serialization
> functions effectively) for many classes, which means a lot of custom code
> for a user to write to use it. Fortunately for most of the common Writables
> a toString does the job. Will keep looking into it though.
>
> Anyway, Josh if you have ideas or examples on the "Wrapper API from Python"
> that you mentioned, I'd be interested to hear them.
>
> If you think this is worth working up as a Pull Request covering
> SequenceFiles and custom InputFormats with default toString conversions and
> the ability to specify Wrapper functions, I can clean things up more, add
> some functionality and tests, and also test to see if common things like
> the "normal" Writables and reading from things like HBase and Cassandra can
> be made to work nicely (any other common use cases that you think make
> sense?).
>
> Thoughts, comments etc welcome.
>
> Nick
>
>
>
> On Fri, Oct 25, 2013 at 11:03 PM, Patrick Wendell <pwendell@gmail.com
> >wrote:
>
> > As a starting point, a version where people just write their own
> "wrapper"
> > functions to convert various HadoopFiles into String <K, V> files could
> go
> > a long way. We could even have a few built-in versions, such as dealing
> > with Sequence files that are <String, String>. Basically, the user needs
> to
> > write a translator in Java/Scala that produces textual records from
> > whatever format that want. Then, they make sure this is included in the
> > classpath when running PySpark.
> >
> > As Josh is saying, I'm pretty sure this is already possible, but we may
> > want to document it for users. In many organizations they might have 1-2
> > people who can write the Java/Scala to do this but then many more people
> > who are comfortable using python once it's setup.
> >
> > - Patrick
> >
> > On Fri, Oct 25, 2013 at 11:00 AM, Josh Rosen <ro...@gmail.com>
> wrote:
> >
> > > Hi Nick,
> > >
> > > I've seen several requests for SequenceFile support in PySpark, so
> > there's
> > > definitely demand for this feature.
> > >
> > > I like the idea of passing MsgPack'ed data (or some other structured
> > > format) from Java to the Python workers.  My early prototype of custom
> > > serializers (described at
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals#PySparkInternals-customserializers
> > > )
> > > might be useful for implementing this.  Proper custom serializer
> support
> > > would handle the bookkeeping for tracking each stage's input and output
> > > formats and supplying the appropriate deserialization functions to the
> > > Python worker, so the Python worker would be able to directly read the
> > > MsgPack'd data that's sent to it.
> > >
> > > Regarding a wrapper API, it's actually possible to initially transform
> > data
> > > using Scala/Java and perform the remainder of the processing in
> PySpark.
> > >  This involves adding the appropriate compiled to the Java classpath
> and
> > a
> > > bit of work in Py4J to create the Java/Scala RDD and wrap it for use by
> > > PySpark.  I can hack together a rough example of this if anyone's
> > > interested, but it would need some work to be developed into a
> > > user-friendly API.
> > >
> > > If you wanted to extend your proof-of-concept to handle the cases where
> > > keys and values have parseable toString() values, I think you could
> > remove
> > > the need for a delimiter by creating a PythonRDD from the newHadoopFile
> > > JavaPairRDD and adding a new method to writeAsPickle (
> > >
> > >
> >
> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L224
> > > )
> > > to dump its contents as a pickled pair of strings.  (Aside: most of
> > > writeAsPickle() would probably need be eliminated or refactored when
> > adding
> > > general custom serializer support).
> > >
> > > - Josh
> > >
> > > On Thu, Oct 24, 2013 at 11:18 PM, Nick Pentreath
> > > <ni...@gmail.com>wrote:
> > >
> > > > Hi Spark Devs
> > > >
> > > > I was wondering what appetite there may be to add the ability for
> > PySpark
> > > > users to create RDDs from (somewhat) arbitrary Hadoop InputFormats.
> > > >
> > > > In my data pipeline for example, I'm currently just using Scala
> (partly
> > > > because I love it but also because I am heavily reliant on quite
> custom
> > > > Hadoop InputFormats for reading data). However, many users may prefer
> > to
> > > > use PySpark as much as possible (if not for everything). Reasons
> might
> > > > include the need to use some Python library. While I don't do it
> yet, I
> > > can
> > > > certainly see an attractive use case for using say scikit-learn /
> numpy
> > > to
> > > > do data analysis & machine learning in Python. Added to this my
> > cofounder
> > > > knows Python well but not Scala so it can be very beneficial to do a
> > lot
> > > of
> > > > stuff in Python.
> > > >
> > > > For text-based data this is fine, but reading data in from more
> complex
> > > > Hadoop formats is an issue.
> > > >
> > > > The current approach would of course be to write an ETL-style
> > Java/Scala
> > > > job and then process in Python. Nothing wrong with this, but I was
> > > thinking
> > > > about ways to allow Python to access arbitrary Hadoop InputFormats.
> > > >
> > > > Here is a quick proof of concept:
> > https://gist.github.com/MLnick/7150058
> > > >
> > > > This works for simple stuff like SequenceFile with simple Writable
> > > > key/values.
> > > >
> > > > To work with more complex files, perhaps an approach is to manipulate
> > > > Hadoop JobConf via Python and pass that in. The one downside is of
> > course
> > > > that the InputFormat (well actually the Key/Value classes) must have
> a
> > > > toString that makes sense so very custom stuff might not work.
> > > >
> > > > I wonder if it would be possible to take the objects that are yielded
> > via
> > > > the InputFormat and convert them into some representation like
> > ProtoBuf,
> > > > MsgPack, Avro, JSON, that can be read relatively more easily from
> > Python?
> > > >
> > > > Another approach could be to allow a simple "wrapper API" such that
> one
> > > can
> > > > write a wrapper function T => String and pass that into an
> > > > InputFormatWrapper that takes an arbitrary InputFormat and yields
> > Strings
> > > > for the keys and values. Then all that is required is to compile that
> > > > function and add it to the SPARK_CLASSPATH and away you go!
> > > >
> > > > Thoughts?
> > > >
> > > > Nick
> > > >
> > >
> >
>

Re: [PySpark]: reading arbitrary Hadoop InputFormats

Posted by Nick Pentreath <ni...@gmail.com>.
Thanks Josh, Patrick for the feedback.

Based on Josh's pointers I have something working for JavaPairRDD ->
PySpark RDD[(String, String)]. This just calls the toString method on each
key and value as before, but without the need for a delimiter. For
SequenceFile, it uses SequenceFileAsTextInputFormat which itself calls
toString to convert to Text for keys and values. We then call toString
(again) ourselves to get Strings to feed to writeAsPickle.

Details here: https://gist.github.com/MLnick/7230588

This also illustrates where the "wrapper function" api would fit in. All
that is required is to define a T => String for key and value.

I started playing around with MsgPack and can sort of get things to work in
Scala, but am struggling with getting the raw bytes to be written properly
in PythonRDD (I think it is treating them as pickled byte arrays when they
are not, but when I removed the 'stripPickle' calls and amended the length
(-6) I got "UnpicklingError: invalid load key, ' '. ").

Another issue is that MsgPack does well at writing "structures" - like Java
classes with public fields that are fairly simple - but for example the
Writables have private fields so you end up with nothing being written.
This looks like it would require custom "Templates" (serialization
functions effectively) for many classes, which means a lot of custom code
for a user to write to use it. Fortunately for most of the common Writables
a toString does the job. Will keep looking into it though.

Anyway, Josh if you have ideas or examples on the "Wrapper API from Python"
that you mentioned, I'd be interested to hear them.

If you think this is worth working up as a Pull Request covering
SequenceFiles and custom InputFormats with default toString conversions and
the ability to specify Wrapper functions, I can clean things up more, add
some functionality and tests, and also test to see if common things like
the "normal" Writables and reading from things like HBase and Cassandra can
be made to work nicely (any other common use cases that you think make
sense?).

Thoughts, comments etc welcome.

Nick



On Fri, Oct 25, 2013 at 11:03 PM, Patrick Wendell <pw...@gmail.com>wrote:

> As a starting point, a version where people just write their own "wrapper"
> functions to convert various HadoopFiles into String <K, V> files could go
> a long way. We could even have a few built-in versions, such as dealing
> with Sequence files that are <String, String>. Basically, the user needs to
> write a translator in Java/Scala that produces textual records from
> whatever format that want. Then, they make sure this is included in the
> classpath when running PySpark.
>
> As Josh is saying, I'm pretty sure this is already possible, but we may
> want to document it for users. In many organizations they might have 1-2
> people who can write the Java/Scala to do this but then many more people
> who are comfortable using python once it's setup.
>
> - Patrick
>
> On Fri, Oct 25, 2013 at 11:00 AM, Josh Rosen <ro...@gmail.com> wrote:
>
> > Hi Nick,
> >
> > I've seen several requests for SequenceFile support in PySpark, so
> there's
> > definitely demand for this feature.
> >
> > I like the idea of passing MsgPack'ed data (or some other structured
> > format) from Java to the Python workers.  My early prototype of custom
> > serializers (described at
> >
> >
> https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals#PySparkInternals-customserializers
> > )
> > might be useful for implementing this.  Proper custom serializer support
> > would handle the bookkeeping for tracking each stage's input and output
> > formats and supplying the appropriate deserialization functions to the
> > Python worker, so the Python worker would be able to directly read the
> > MsgPack'd data that's sent to it.
> >
> > Regarding a wrapper API, it's actually possible to initially transform
> data
> > using Scala/Java and perform the remainder of the processing in PySpark.
> >  This involves adding the appropriate compiled to the Java classpath and
> a
> > bit of work in Py4J to create the Java/Scala RDD and wrap it for use by
> > PySpark.  I can hack together a rough example of this if anyone's
> > interested, but it would need some work to be developed into a
> > user-friendly API.
> >
> > If you wanted to extend your proof-of-concept to handle the cases where
> > keys and values have parseable toString() values, I think you could
> remove
> > the need for a delimiter by creating a PythonRDD from the newHadoopFile
> > JavaPairRDD and adding a new method to writeAsPickle (
> >
> >
> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L224
> > )
> > to dump its contents as a pickled pair of strings.  (Aside: most of
> > writeAsPickle() would probably need be eliminated or refactored when
> adding
> > general custom serializer support).
> >
> > - Josh
> >
> > On Thu, Oct 24, 2013 at 11:18 PM, Nick Pentreath
> > <ni...@gmail.com>wrote:
> >
> > > Hi Spark Devs
> > >
> > > I was wondering what appetite there may be to add the ability for
> PySpark
> > > users to create RDDs from (somewhat) arbitrary Hadoop InputFormats.
> > >
> > > In my data pipeline for example, I'm currently just using Scala (partly
> > > because I love it but also because I am heavily reliant on quite custom
> > > Hadoop InputFormats for reading data). However, many users may prefer
> to
> > > use PySpark as much as possible (if not for everything). Reasons might
> > > include the need to use some Python library. While I don't do it yet, I
> > can
> > > certainly see an attractive use case for using say scikit-learn / numpy
> > to
> > > do data analysis & machine learning in Python. Added to this my
> cofounder
> > > knows Python well but not Scala so it can be very beneficial to do a
> lot
> > of
> > > stuff in Python.
> > >
> > > For text-based data this is fine, but reading data in from more complex
> > > Hadoop formats is an issue.
> > >
> > > The current approach would of course be to write an ETL-style
> Java/Scala
> > > job and then process in Python. Nothing wrong with this, but I was
> > thinking
> > > about ways to allow Python to access arbitrary Hadoop InputFormats.
> > >
> > > Here is a quick proof of concept:
> https://gist.github.com/MLnick/7150058
> > >
> > > This works for simple stuff like SequenceFile with simple Writable
> > > key/values.
> > >
> > > To work with more complex files, perhaps an approach is to manipulate
> > > Hadoop JobConf via Python and pass that in. The one downside is of
> course
> > > that the InputFormat (well actually the Key/Value classes) must have a
> > > toString that makes sense so very custom stuff might not work.
> > >
> > > I wonder if it would be possible to take the objects that are yielded
> via
> > > the InputFormat and convert them into some representation like
> ProtoBuf,
> > > MsgPack, Avro, JSON, that can be read relatively more easily from
> Python?
> > >
> > > Another approach could be to allow a simple "wrapper API" such that one
> > can
> > > write a wrapper function T => String and pass that into an
> > > InputFormatWrapper that takes an arbitrary InputFormat and yields
> Strings
> > > for the keys and values. Then all that is required is to compile that
> > > function and add it to the SPARK_CLASSPATH and away you go!
> > >
> > > Thoughts?
> > >
> > > Nick
> > >
> >
>

Re: [PySpark]: reading arbitrary Hadoop InputFormats

Posted by Patrick Wendell <pw...@gmail.com>.
As a starting point, a version where people just write their own "wrapper"
functions to convert various HadoopFiles into String <K, V> files could go
a long way. We could even have a few built-in versions, such as dealing
with Sequence files that are <String, String>. Basically, the user needs to
write a translator in Java/Scala that produces textual records from
whatever format that want. Then, they make sure this is included in the
classpath when running PySpark.

As Josh is saying, I'm pretty sure this is already possible, but we may
want to document it for users. In many organizations they might have 1-2
people who can write the Java/Scala to do this but then many more people
who are comfortable using python once it's setup.

- Patrick

On Fri, Oct 25, 2013 at 11:00 AM, Josh Rosen <ro...@gmail.com> wrote:

> Hi Nick,
>
> I've seen several requests for SequenceFile support in PySpark, so there's
> definitely demand for this feature.
>
> I like the idea of passing MsgPack'ed data (or some other structured
> format) from Java to the Python workers.  My early prototype of custom
> serializers (described at
>
> https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals#PySparkInternals-customserializers
> )
> might be useful for implementing this.  Proper custom serializer support
> would handle the bookkeeping for tracking each stage's input and output
> formats and supplying the appropriate deserialization functions to the
> Python worker, so the Python worker would be able to directly read the
> MsgPack'd data that's sent to it.
>
> Regarding a wrapper API, it's actually possible to initially transform data
> using Scala/Java and perform the remainder of the processing in PySpark.
>  This involves adding the appropriate compiled to the Java classpath and a
> bit of work in Py4J to create the Java/Scala RDD and wrap it for use by
> PySpark.  I can hack together a rough example of this if anyone's
> interested, but it would need some work to be developed into a
> user-friendly API.
>
> If you wanted to extend your proof-of-concept to handle the cases where
> keys and values have parseable toString() values, I think you could remove
> the need for a delimiter by creating a PythonRDD from the newHadoopFile
> JavaPairRDD and adding a new method to writeAsPickle (
>
> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L224
> )
> to dump its contents as a pickled pair of strings.  (Aside: most of
> writeAsPickle() would probably need be eliminated or refactored when adding
> general custom serializer support).
>
> - Josh
>
> On Thu, Oct 24, 2013 at 11:18 PM, Nick Pentreath
> <ni...@gmail.com>wrote:
>
> > Hi Spark Devs
> >
> > I was wondering what appetite there may be to add the ability for PySpark
> > users to create RDDs from (somewhat) arbitrary Hadoop InputFormats.
> >
> > In my data pipeline for example, I'm currently just using Scala (partly
> > because I love it but also because I am heavily reliant on quite custom
> > Hadoop InputFormats for reading data). However, many users may prefer to
> > use PySpark as much as possible (if not for everything). Reasons might
> > include the need to use some Python library. While I don't do it yet, I
> can
> > certainly see an attractive use case for using say scikit-learn / numpy
> to
> > do data analysis & machine learning in Python. Added to this my cofounder
> > knows Python well but not Scala so it can be very beneficial to do a lot
> of
> > stuff in Python.
> >
> > For text-based data this is fine, but reading data in from more complex
> > Hadoop formats is an issue.
> >
> > The current approach would of course be to write an ETL-style Java/Scala
> > job and then process in Python. Nothing wrong with this, but I was
> thinking
> > about ways to allow Python to access arbitrary Hadoop InputFormats.
> >
> > Here is a quick proof of concept: https://gist.github.com/MLnick/7150058
> >
> > This works for simple stuff like SequenceFile with simple Writable
> > key/values.
> >
> > To work with more complex files, perhaps an approach is to manipulate
> > Hadoop JobConf via Python and pass that in. The one downside is of course
> > that the InputFormat (well actually the Key/Value classes) must have a
> > toString that makes sense so very custom stuff might not work.
> >
> > I wonder if it would be possible to take the objects that are yielded via
> > the InputFormat and convert them into some representation like ProtoBuf,
> > MsgPack, Avro, JSON, that can be read relatively more easily from Python?
> >
> > Another approach could be to allow a simple "wrapper API" such that one
> can
> > write a wrapper function T => String and pass that into an
> > InputFormatWrapper that takes an arbitrary InputFormat and yields Strings
> > for the keys and values. Then all that is required is to compile that
> > function and add it to the SPARK_CLASSPATH and away you go!
> >
> > Thoughts?
> >
> > Nick
> >
>

Re: [PySpark]: reading arbitrary Hadoop InputFormats

Posted by Josh Rosen <ro...@gmail.com>.
Hi Nick,

I've seen several requests for SequenceFile support in PySpark, so there's
definitely demand for this feature.

I like the idea of passing MsgPack'ed data (or some other structured
format) from Java to the Python workers.  My early prototype of custom
serializers (described at
https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals#PySparkInternals-customserializers)
might be useful for implementing this.  Proper custom serializer support
would handle the bookkeeping for tracking each stage's input and output
formats and supplying the appropriate deserialization functions to the
Python worker, so the Python worker would be able to directly read the
MsgPack'd data that's sent to it.

Regarding a wrapper API, it's actually possible to initially transform data
using Scala/Java and perform the remainder of the processing in PySpark.
 This involves adding the appropriate compiled to the Java classpath and a
bit of work in Py4J to create the Java/Scala RDD and wrap it for use by
PySpark.  I can hack together a rough example of this if anyone's
interested, but it would need some work to be developed into a
user-friendly API.

If you wanted to extend your proof-of-concept to handle the cases where
keys and values have parseable toString() values, I think you could remove
the need for a delimiter by creating a PythonRDD from the newHadoopFile
JavaPairRDD and adding a new method to writeAsPickle (
https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L224)
to dump its contents as a pickled pair of strings.  (Aside: most of
writeAsPickle() would probably need be eliminated or refactored when adding
general custom serializer support).

- Josh

On Thu, Oct 24, 2013 at 11:18 PM, Nick Pentreath
<ni...@gmail.com>wrote:

> Hi Spark Devs
>
> I was wondering what appetite there may be to add the ability for PySpark
> users to create RDDs from (somewhat) arbitrary Hadoop InputFormats.
>
> In my data pipeline for example, I'm currently just using Scala (partly
> because I love it but also because I am heavily reliant on quite custom
> Hadoop InputFormats for reading data). However, many users may prefer to
> use PySpark as much as possible (if not for everything). Reasons might
> include the need to use some Python library. While I don't do it yet, I can
> certainly see an attractive use case for using say scikit-learn / numpy to
> do data analysis & machine learning in Python. Added to this my cofounder
> knows Python well but not Scala so it can be very beneficial to do a lot of
> stuff in Python.
>
> For text-based data this is fine, but reading data in from more complex
> Hadoop formats is an issue.
>
> The current approach would of course be to write an ETL-style Java/Scala
> job and then process in Python. Nothing wrong with this, but I was thinking
> about ways to allow Python to access arbitrary Hadoop InputFormats.
>
> Here is a quick proof of concept: https://gist.github.com/MLnick/7150058
>
> This works for simple stuff like SequenceFile with simple Writable
> key/values.
>
> To work with more complex files, perhaps an approach is to manipulate
> Hadoop JobConf via Python and pass that in. The one downside is of course
> that the InputFormat (well actually the Key/Value classes) must have a
> toString that makes sense so very custom stuff might not work.
>
> I wonder if it would be possible to take the objects that are yielded via
> the InputFormat and convert them into some representation like ProtoBuf,
> MsgPack, Avro, JSON, that can be read relatively more easily from Python?
>
> Another approach could be to allow a simple "wrapper API" such that one can
> write a wrapper function T => String and pass that into an
> InputFormatWrapper that takes an arbitrary InputFormat and yields Strings
> for the keys and values. Then all that is required is to compile that
> function and add it to the SPARK_CLASSPATH and away you go!
>
> Thoughts?
>
> Nick
>