You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Diana Carroll <dc...@cloudera.com> on 2014/03/17 16:18:50 UTC

example of non-line oriented input data?

Has anyone got a working example of a Spark application that analyzes data
in a non-line-oriented format, such as XML or JSON?  I'd like to do this
without re-inventing the wheel...anyone care to share?  Thanks!

Diana

Re: example of non-line oriented input data?

Posted by Matei Zaharia <ma...@gmail.com>.
FYI, one thing we’ve added now is support for reading multiple text files from a directory as separate records: https://github.com/apache/spark/pull/327. This should remove the need for mapPartitions discussed here.

Avro and SequenceFiles look like they may not make it for 1.0, but there’s a chance that Parquet support with Spark SQL will, which should let you store binary data a bit better.

Matei

On Mar 19, 2014, at 3:12 PM, Jeremy Freeman <fr...@gmail.com> wrote:

> Another vote on this, support for simple SequenceFiles and/or Avro would be terrific, as using plain text can be very space-inefficient, especially for numerical data.
> 
> -- Jeremy
> 
> On Mar 19, 2014, at 5:24 PM, Nicholas Chammas <ni...@gmail.com> wrote:
> 
>> I'd second the request for Avro support in Python first, followed by Parquet.
>> 
>> 
>> On Wed, Mar 19, 2014 at 2:14 PM, Evgeny Shishkin <it...@gmail.com> wrote:
>> 
>> On 19 Mar 2014, at 19:54, Diana Carroll <dc...@cloudera.com> wrote:
>> 
>>> Actually, thinking more on this question, Matei: I'd definitely say support for Avro.  There's a lot of interest in this!!
>>> 
>> 
>> Agree, and parquet as default Cloudera Impala format.
>> 
>> 
>> 
>> 
>>> On Tue, Mar 18, 2014 at 8:14 PM, Matei Zaharia <ma...@gmail.com> wrote:
>>> BTW one other thing — in your experience, Diana, which non-text InputFormats would be most useful to support in Python first? Would it be Parquet or Avro, simple SequenceFiles with the Hadoop Writable types, or something else? I think a per-file text input format that does the stuff we did here would also be good.
>>> 
>>> Matei
>>> 
>>> 
>>> On Mar 18, 2014, at 3:27 PM, Matei Zaharia <ma...@gmail.com> wrote:
>>> 
>>>> Hi Diana,
>>>> 
>>>> This seems to work without the iter() in front if you just return treeiterator. What happened when you didn’t include that? Treeiterator should return an iterator.
>>>> 
>>>> Anyway, this is a good example of mapPartitions. It’s one where you want to view the whole file as one object (one XML here), so you couldn’t implement this using a flatMap, but you still want to return multiple values. The MLlib example you saw needs Python 2.7 because unfortunately that is a requirement for our Python MLlib support (see http://spark.incubator.apache.org/docs/0.9.0/python-programming-guide.html#libraries). We’d like to relax this later but we’re using some newer features of NumPy and Python. The rest of PySpark works on 2.6.
>>>> 
>>>> In terms of the size in memory, here both the string s and the XML tree constructed from it need to fit in, so you can’t work on very large individual XML files. You may be able to use a streaming XML parser instead to extract elements from the data in a streaming fashion, without every materializing the whole tree. http://docs.python.org/2/library/xml.sax.reader.html#module-xml.sax.xmlreader is one example.
>>>> 
>>>> Matei
>>>> 
>>>> On Mar 18, 2014, at 7:49 AM, Diana Carroll <dc...@cloudera.com> wrote:
>>>> 
>>>>> Well, if anyone is still following this, I've gotten the following code working which in theory should allow me to parse whole XML files: (the problem was that I can't return the tree iterator directly.  I have to call iter().  Why?)
>>>>> 
>>>>> import xml.etree.ElementTree as ET
>>>>> 
>>>>> # two source files, format <data> <country name="...">...</country>...</data>
>>>>> mydata=sc.textFile("file:/home/training/countries*.xml") 
>>>>> 
>>>>> def parsefile(iterator):
>>>>>     s = ''
>>>>>     for i in iterator: s = s + str(i)
>>>>>     tree = ET.fromstring(s)
>>>>>     treeiterator = tree.getiterator("country")
>>>>>     # why to I have to convert an iterator to an iterator?  not sure but required
>>>>>     return iter(treeiterator)
>>>>> 
>>>>> mydata.mapPartitions(lambda x: parsefile(x)).map(lambda element: element.attrib).collect()
>>>>> 
>>>>> The output is what I expect:
>>>>> [{'name': 'Liechtenstein'}, {'name': 'Singapore'}, {'name': 'Panama'}]
>>>>> 
>>>>> BUT I'm a bit concerned about the construction of the string "s".  How big can my file be before converting it to a string becomes problematic?
>>>>> 
>>>>> 
>>>>> 
>>>>> On Tue, Mar 18, 2014 at 9:41 AM, Diana Carroll <dc...@cloudera.com> wrote:
>>>>> Thanks, Matei.
>>>>> 
>>>>> In the context of this discussion, it would seem mapParitions is essential, because it's the only way I'm going to be able to process each file as a whole, in our example of a large number of small XML files which need to be parsed as a whole file because records are not required to be on a single line.
>>>>> 
>>>>> The theory makes sense but I'm still utterly lost as to how to implement it.  Unfortunately there's only a single example of the use of mapPartitions in any of the Python example programs, which is the log regression example, which I can't run because it requires Python 2.7 and I'm on Python 2.6.  (aside: I couldn't find any statement that Python 2.6 is unsupported...is it?)
>>>>> 
>>>>> I'd really really love to see a real life example of a Python use of mapPartitions.  I do appreciate the very simple examples you provided, but (perhaps because of my novice status on Python) I can't figure out how to translate those to a real world situation in which I'm building RDDs from files, not inline collections like [(1,2),(2,3)].
>>>>> 
>>>>> Also, you say that the function called in mapPartitions can return a collection OR an iterator.  I tried returning an iterator by calling ElementTree getiterator function, but still got the error telling me my object was not an iterator. 
>>>>> 
>>>>> If anyone has a real life example of mapPartitions returning a Python iterator, that would be fabulous.
>>>>> 
>>>>> Diana
>>>>> 
>>>>> 
>>>>> On Mon, Mar 17, 2014 at 6:17 PM, Matei Zaharia <ma...@gmail.com> wrote:
>>>>> Oh, I see, the problem is that the function you pass to mapPartitions must itself return an iterator or a collection. This is used so that you can return multiple output records for each input record. You can implement most of the existing map-like operations in Spark, such as map, filter, flatMap, etc, with mapPartitions, as well as new ones that might do a sliding window over each partition for example, or accumulate data across elements (e.g. to compute a sum).
>>>>> 
>>>>> For example, if you have data = sc.parallelize([1, 2, 3, 4], 2), this will work:
>>>>> 
>>>>> >>> data.mapPartitions(lambda x: x).collect()
>>>>> [1, 2, 3, 4]   # Just return the same iterator, doing nothing
>>>>> 
>>>>> >>> data.mapPartitions(lambda x: [list(x)]).collect()
>>>>> [[1, 2], [3, 4]]   # Group together the elements of each partition in a single list (like glom)
>>>>> 
>>>>> >>> data.mapPartitions(lambda x: [sum(x)]).collect()
>>>>> [3, 7]   # Sum each partition separately
>>>>> 
>>>>> However something like data.mapPartitions(lambda x: sum(x)).collect() will *not* work because sum returns a number, not an iterator. That’s why I put sum(x) inside a list above.
>>>>> 
>>>>> In practice mapPartitions is most useful if you want to share some data or work across the elements. For example maybe you want to load a lookup table once from an external file and then check each element in it, or sum up a bunch of elements without allocating a lot of vector objects.
>>>>> 
>>>>> Matei
>>>>> 
>>>>> 
>>>>> On Mar 17, 2014, at 11:25 AM, Diana Carroll <dc...@cloudera.com> wrote:
>>>>> 
>>>>> > "There’s also mapPartitions, which gives you an iterator for each partition instead of an array. You can then return an iterator or list of objects to produce from that."
>>>>> >
>>>>> > I confess, I was hoping for an example of just that, because i've not yet been able to figure out how to use mapPartitions.  No doubt this is because i'm a rank newcomer to Python, and haven't fully wrapped my head around iterators.  All I get so far in my attempts to use mapPartitions is the darned "suchnsuch is not an iterator" error.
>>>>> >
>>>>> > def myfunction(iterator): return [1,2,3]
>>>>> > mydata.mapPartitions(lambda x: myfunction(x)).take(2)
>>>>> >
>>>>> >
>>>>> >
>>>>> >
>>>>> >
>>>>> > On Mon, Mar 17, 2014 at 1:57 PM, Matei Zaharia <ma...@gmail.com> wrote:
>>>>> > Here’s an example of getting together all lines in a file as one string:
>>>>> >
>>>>> > $ cat dir/a.txt
>>>>> > Hello
>>>>> > world!
>>>>> >
>>>>> > $ cat dir/b.txt
>>>>> > What's
>>>>> > up??
>>>>> >
>>>>> > $ bin/pyspark
>>>>> > >>> files = sc.textFile(“dir”)
>>>>> >
>>>>> > >>> files.collect()
>>>>> > [u'Hello', u'world!', u"What's", u'up??’]   # one element per line, not what we want
>>>>> >
>>>>> > >>> files.glom().collect()
>>>>> > [[u'Hello', u'world!'], [u"What's", u'up??’]]   # one element per file, which is an array of lines
>>>>> >
>>>>> > >>> files.glom().map(lambda a: "\n".join(a)).collect()
>>>>> > [u'Hello\nworld!', u"What's\nup??”]    # join back each file into a single string
>>>>> >
>>>>> > The glom() method groups all the elements of each partition of an RDD into an array, giving you an RDD of arrays of objects. If your input is small files, you always have one partition per file.
>>>>> >
>>>>> > There’s also mapPartitions, which gives you an iterator for each partition instead of an array. You can then return an iterator or list of objects to produce from that.
>>>>> >
>>>>> > Matei
>>>>> >
>>>>> >
>>>>> > On Mar 17, 2014, at 10:46 AM, Diana Carroll <dc...@cloudera.com> wrote:
>>>>> >
>>>>> > > Thanks Matei.  That makes sense.  I have here a dataset of many many smallish XML files, so using mapPartitions that way would make sense.  I'd love to see a code example though ...It's not as obvious to me how to do that as I probably should be.
>>>>> > >
>>>>> > > Thanks,
>>>>> > > Diana
>>>>> > >
>>>>> > >
>>>>> > > On Mon, Mar 17, 2014 at 1:02 PM, Matei Zaharia <ma...@gmail.com> wrote:
>>>>> > > Hi Diana,
>>>>> > >
>>>>> > > Non-text input formats are only supported in Java and Scala right now, where you can use sparkContext.hadoopFile or .hadoopDataset to load data with any InputFormat that Hadoop MapReduce supports. In Python, you unfortunately only have textFile, which gives you one record per line. For JSON, you’d have to fit the whole JSON object on one line as you said. Hopefully we’ll also have some other forms of input soon.
>>>>> > >
>>>>> > > If your input is a collection of separate files (say many .xml files), you can also use mapPartitions on it to group together the lines because each input file will end up being a single dataset partition (or map task). This will let you concatenate the lines in each file and parse them as one XML object.
>>>>> > >
>>>>> > > Matei
>>>>> > >
>>>>> > > On Mar 17, 2014, at 9:52 AM, Diana Carroll <dc...@cloudera.com> wrote:
>>>>> > >
>>>>> > >> Thanks, Krakna, very helpful.  The way I read the code, it looks like you are assuming that each line in foo.log contains a complete json object?  (That is, that the data doesn't contain any records that are split into multiple lines.)  If so, is that because you know that to be true of your data?  Or did you do as Nicholas suggests and have some preprocessing on the text input to flatten the data in that way?
>>>>> > >>
>>>>> > >> Thanks,
>>>>> > >> Diana
>>>>> > >>
>>>>> > >>
>>>>> > >> On Mon, Mar 17, 2014 at 12:09 PM, Krakna H <sh...@gmail.com> wrote:
>>>>> > >> Katrina,
>>>>> > >>
>>>>> > >> Not sure if this is what you had in mind, but here's some simple pyspark code that I recently wrote to deal with JSON files.
>>>>> > >>
>>>>> > >> from pyspark import SparkContext, SparkConf
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >> from operator import add
>>>>> > >> import json
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >> import random
>>>>> > >> import numpy as np
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >> def concatenate_paragraphs(sentence_array):
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >> return ' '.join(sentence_array).split(' ')
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >> logFile = 'foo.json'
>>>>> > >> conf = SparkConf()
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >> conf.setMaster("spark://cluster-master:7077").setAppName("example").set("spark.executor.memory", "1g")
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >> sc = SparkContext(conf=conf)
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >> logData = sc.textFile(logFile).cache()
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >> num_lines = logData.count()
>>>>> > >> print 'Number of lines: %d' % num_lines
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >> # JSON object has the structure: {"key": {'paragraphs': [sentence1, sentence2, ...]}}
>>>>> > >> tm = logData.map(lambda s: (json.loads(s)['key'], len(concatenate_paragraphs(json.loads(s)['paragraphs']))))
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >> tm = tm.reduceByKey(lambda _, x: _ + x)
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >> op = tm.collect()
>>>>> > >> for key, num_words in op:
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>      print 'state: %s, num_words: %d' % (state, num_words)
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >> On Mon, Mar 17, 2014 at 11:58 AM, Diana Carroll [via Apache Spark User List] <[hidden email]> wrote:
>>>>> > >> I don't actually have any data.  I'm writing a course that teaches students how to do this sort of thing and am interested in looking at a variety of real life examples of people doing things like that.  I'd love to see some working code implementing the "obvious work-around" you mention...do you have any to share?  It's an approach that makes a lot of sense, and as I said, I'd love to not have to re-invent the wheel if someone else has already written that code.  Thanks!
>>>>> > >>
>>>>> > >> Diana
>>>>> > >>
>>>>> > >>
>>>>> > >> On Mon, Mar 17, 2014 at 11:35 AM, Nicholas Chammas <[hidden email]> wrote:
>>>>> > >> There was a previous discussion about this here:
>>>>> > >>
>>>>> > >> http://apache-spark-user-list.1001560.n3.nabble.com/Having-Spark-read-a-JSON-file-td1963.html
>>>>> > >>
>>>>> > >> How big are the XML or JSON files you're looking to deal with?
>>>>> > >>
>>>>> > >> It may not be practical to deserialize the entire document at once. In that case an obvious work-around would be to have some kind of pre-processing step that separates XML nodes/JSON objects with newlines so that you can analyze the data with Spark in a "line-oriented format". Your preprocessor wouldn't have to parse/deserialize the massive document; it would just have to track open/closed tags/braces to know when to insert a newline.
>>>>> > >>
>>>>> > >> Then you'd just open the line-delimited result and deserialize the individual objects/nodes with map().
>>>>> > >>
>>>>> > >> Nick
>>>>> > >>
>>>>> > >>
>>>>> > >> On Mon, Mar 17, 2014 at 11:18 AM, Diana Carroll <[hidden email]> wrote:
>>>>> > >> Has anyone got a working example of a Spark application that analyzes data in a non-line-oriented format, such as XML or JSON?  I'd like to do this without re-inventing the wheel...anyone care to share?  Thanks!
>>>>> > >>
>>>>> > >> Diana
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >> If you reply to this email, your message will be added to the discussion below:
>>>>> > >> http://apache-spark-user-list.1001560.n3.nabble.com/example-of-non-line-oriented-input-data-tp2750p2752.html
>>>>> > >> To start a new topic under Apache Spark User List, email [hidden email]
>>>>> > >> To unsubscribe from Apache Spark User List, click here.
>>>>> > >> NAML
>>>>> > >>
>>>>> > >>
>>>>> > >> View this message in context: Re: example of non-line oriented input data?
>>>>> > >> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>> > >>
>>>>> > >
>>>>> > >
>>>>> >
>>>>> >
>>>>> 
>>>>> 
>>>>> 
>>>> 
>>> 
>>> 
>> 
>> 
> 


Re: example of non-line oriented input data?

Posted by Jeremy Freeman <fr...@gmail.com>.
Another vote on this, support for simple SequenceFiles and/or Avro would be terrific, as using plain text can be very space-inefficient, especially for numerical data.

-- Jeremy

On Mar 19, 2014, at 5:24 PM, Nicholas Chammas <ni...@gmail.com> wrote:

> I'd second the request for Avro support in Python first, followed by Parquet.
> 
> 
> On Wed, Mar 19, 2014 at 2:14 PM, Evgeny Shishkin <it...@gmail.com> wrote:
> 
> On 19 Mar 2014, at 19:54, Diana Carroll <dc...@cloudera.com> wrote:
> 
>> Actually, thinking more on this question, Matei: I'd definitely say support for Avro.  There's a lot of interest in this!!
>> 
> 
> Agree, and parquet as default Cloudera Impala format.
> 
> 
> 
> 
>> On Tue, Mar 18, 2014 at 8:14 PM, Matei Zaharia <ma...@gmail.com> wrote:
>> BTW one other thing — in your experience, Diana, which non-text InputFormats would be most useful to support in Python first? Would it be Parquet or Avro, simple SequenceFiles with the Hadoop Writable types, or something else? I think a per-file text input format that does the stuff we did here would also be good.
>> 
>> Matei
>> 
>> 
>> On Mar 18, 2014, at 3:27 PM, Matei Zaharia <ma...@gmail.com> wrote:
>> 
>>> Hi Diana,
>>> 
>>> This seems to work without the iter() in front if you just return treeiterator. What happened when you didn’t include that? Treeiterator should return an iterator.
>>> 
>>> Anyway, this is a good example of mapPartitions. It’s one where you want to view the whole file as one object (one XML here), so you couldn’t implement this using a flatMap, but you still want to return multiple values. The MLlib example you saw needs Python 2.7 because unfortunately that is a requirement for our Python MLlib support (see http://spark.incubator.apache.org/docs/0.9.0/python-programming-guide.html#libraries). We’d like to relax this later but we’re using some newer features of NumPy and Python. The rest of PySpark works on 2.6.
>>> 
>>> In terms of the size in memory, here both the string s and the XML tree constructed from it need to fit in, so you can’t work on very large individual XML files. You may be able to use a streaming XML parser instead to extract elements from the data in a streaming fashion, without every materializing the whole tree. http://docs.python.org/2/library/xml.sax.reader.html#module-xml.sax.xmlreader is one example.
>>> 
>>> Matei
>>> 
>>> On Mar 18, 2014, at 7:49 AM, Diana Carroll <dc...@cloudera.com> wrote:
>>> 
>>>> Well, if anyone is still following this, I've gotten the following code working which in theory should allow me to parse whole XML files: (the problem was that I can't return the tree iterator directly.  I have to call iter().  Why?)
>>>> 
>>>> import xml.etree.ElementTree as ET
>>>> 
>>>> # two source files, format <data> <country name="...">...</country>...</data>
>>>> mydata=sc.textFile("file:/home/training/countries*.xml") 
>>>> 
>>>> def parsefile(iterator):
>>>>     s = ''
>>>>     for i in iterator: s = s + str(i)
>>>>     tree = ET.fromstring(s)
>>>>     treeiterator = tree.getiterator("country")
>>>>     # why to I have to convert an iterator to an iterator?  not sure but required
>>>>     return iter(treeiterator)
>>>> 
>>>> mydata.mapPartitions(lambda x: parsefile(x)).map(lambda element: element.attrib).collect()
>>>> 
>>>> The output is what I expect:
>>>> [{'name': 'Liechtenstein'}, {'name': 'Singapore'}, {'name': 'Panama'}]
>>>> 
>>>> BUT I'm a bit concerned about the construction of the string "s".  How big can my file be before converting it to a string becomes problematic?
>>>> 
>>>> 
>>>> 
>>>> On Tue, Mar 18, 2014 at 9:41 AM, Diana Carroll <dc...@cloudera.com> wrote:
>>>> Thanks, Matei.
>>>> 
>>>> In the context of this discussion, it would seem mapParitions is essential, because it's the only way I'm going to be able to process each file as a whole, in our example of a large number of small XML files which need to be parsed as a whole file because records are not required to be on a single line.
>>>> 
>>>> The theory makes sense but I'm still utterly lost as to how to implement it.  Unfortunately there's only a single example of the use of mapPartitions in any of the Python example programs, which is the log regression example, which I can't run because it requires Python 2.7 and I'm on Python 2.6.  (aside: I couldn't find any statement that Python 2.6 is unsupported...is it?)
>>>> 
>>>> I'd really really love to see a real life example of a Python use of mapPartitions.  I do appreciate the very simple examples you provided, but (perhaps because of my novice status on Python) I can't figure out how to translate those to a real world situation in which I'm building RDDs from files, not inline collections like [(1,2),(2,3)].
>>>> 
>>>> Also, you say that the function called in mapPartitions can return a collection OR an iterator.  I tried returning an iterator by calling ElementTree getiterator function, but still got the error telling me my object was not an iterator. 
>>>> 
>>>> If anyone has a real life example of mapPartitions returning a Python iterator, that would be fabulous.
>>>> 
>>>> Diana
>>>> 
>>>> 
>>>> On Mon, Mar 17, 2014 at 6:17 PM, Matei Zaharia <ma...@gmail.com> wrote:
>>>> Oh, I see, the problem is that the function you pass to mapPartitions must itself return an iterator or a collection. This is used so that you can return multiple output records for each input record. You can implement most of the existing map-like operations in Spark, such as map, filter, flatMap, etc, with mapPartitions, as well as new ones that might do a sliding window over each partition for example, or accumulate data across elements (e.g. to compute a sum).
>>>> 
>>>> For example, if you have data = sc.parallelize([1, 2, 3, 4], 2), this will work:
>>>> 
>>>> >>> data.mapPartitions(lambda x: x).collect()
>>>> [1, 2, 3, 4]   # Just return the same iterator, doing nothing
>>>> 
>>>> >>> data.mapPartitions(lambda x: [list(x)]).collect()
>>>> [[1, 2], [3, 4]]   # Group together the elements of each partition in a single list (like glom)
>>>> 
>>>> >>> data.mapPartitions(lambda x: [sum(x)]).collect()
>>>> [3, 7]   # Sum each partition separately
>>>> 
>>>> However something like data.mapPartitions(lambda x: sum(x)).collect() will *not* work because sum returns a number, not an iterator. That’s why I put sum(x) inside a list above.
>>>> 
>>>> In practice mapPartitions is most useful if you want to share some data or work across the elements. For example maybe you want to load a lookup table once from an external file and then check each element in it, or sum up a bunch of elements without allocating a lot of vector objects.
>>>> 
>>>> Matei
>>>> 
>>>> 
>>>> On Mar 17, 2014, at 11:25 AM, Diana Carroll <dc...@cloudera.com> wrote:
>>>> 
>>>> > "There’s also mapPartitions, which gives you an iterator for each partition instead of an array. You can then return an iterator or list of objects to produce from that."
>>>> >
>>>> > I confess, I was hoping for an example of just that, because i've not yet been able to figure out how to use mapPartitions.  No doubt this is because i'm a rank newcomer to Python, and haven't fully wrapped my head around iterators.  All I get so far in my attempts to use mapPartitions is the darned "suchnsuch is not an iterator" error.
>>>> >
>>>> > def myfunction(iterator): return [1,2,3]
>>>> > mydata.mapPartitions(lambda x: myfunction(x)).take(2)
>>>> >
>>>> >
>>>> >
>>>> >
>>>> >
>>>> > On Mon, Mar 17, 2014 at 1:57 PM, Matei Zaharia <ma...@gmail.com> wrote:
>>>> > Here’s an example of getting together all lines in a file as one string:
>>>> >
>>>> > $ cat dir/a.txt
>>>> > Hello
>>>> > world!
>>>> >
>>>> > $ cat dir/b.txt
>>>> > What's
>>>> > up??
>>>> >
>>>> > $ bin/pyspark
>>>> > >>> files = sc.textFile(“dir”)
>>>> >
>>>> > >>> files.collect()
>>>> > [u'Hello', u'world!', u"What's", u'up??’]   # one element per line, not what we want
>>>> >
>>>> > >>> files.glom().collect()
>>>> > [[u'Hello', u'world!'], [u"What's", u'up??’]]   # one element per file, which is an array of lines
>>>> >
>>>> > >>> files.glom().map(lambda a: "\n".join(a)).collect()
>>>> > [u'Hello\nworld!', u"What's\nup??”]    # join back each file into a single string
>>>> >
>>>> > The glom() method groups all the elements of each partition of an RDD into an array, giving you an RDD of arrays of objects. If your input is small files, you always have one partition per file.
>>>> >
>>>> > There’s also mapPartitions, which gives you an iterator for each partition instead of an array. You can then return an iterator or list of objects to produce from that.
>>>> >
>>>> > Matei
>>>> >
>>>> >
>>>> > On Mar 17, 2014, at 10:46 AM, Diana Carroll <dc...@cloudera.com> wrote:
>>>> >
>>>> > > Thanks Matei.  That makes sense.  I have here a dataset of many many smallish XML files, so using mapPartitions that way would make sense.  I'd love to see a code example though ...It's not as obvious to me how to do that as I probably should be.
>>>> > >
>>>> > > Thanks,
>>>> > > Diana
>>>> > >
>>>> > >
>>>> > > On Mon, Mar 17, 2014 at 1:02 PM, Matei Zaharia <ma...@gmail.com> wrote:
>>>> > > Hi Diana,
>>>> > >
>>>> > > Non-text input formats are only supported in Java and Scala right now, where you can use sparkContext.hadoopFile or .hadoopDataset to load data with any InputFormat that Hadoop MapReduce supports. In Python, you unfortunately only have textFile, which gives you one record per line. For JSON, you’d have to fit the whole JSON object on one line as you said. Hopefully we’ll also have some other forms of input soon.
>>>> > >
>>>> > > If your input is a collection of separate files (say many .xml files), you can also use mapPartitions on it to group together the lines because each input file will end up being a single dataset partition (or map task). This will let you concatenate the lines in each file and parse them as one XML object.
>>>> > >
>>>> > > Matei
>>>> > >
>>>> > > On Mar 17, 2014, at 9:52 AM, Diana Carroll <dc...@cloudera.com> wrote:
>>>> > >
>>>> > >> Thanks, Krakna, very helpful.  The way I read the code, it looks like you are assuming that each line in foo.log contains a complete json object?  (That is, that the data doesn't contain any records that are split into multiple lines.)  If so, is that because you know that to be true of your data?  Or did you do as Nicholas suggests and have some preprocessing on the text input to flatten the data in that way?
>>>> > >>
>>>> > >> Thanks,
>>>> > >> Diana
>>>> > >>
>>>> > >>
>>>> > >> On Mon, Mar 17, 2014 at 12:09 PM, Krakna H <sh...@gmail.com> wrote:
>>>> > >> Katrina,
>>>> > >>
>>>> > >> Not sure if this is what you had in mind, but here's some simple pyspark code that I recently wrote to deal with JSON files.
>>>> > >>
>>>> > >> from pyspark import SparkContext, SparkConf
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> from operator import add
>>>> > >> import json
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> import random
>>>> > >> import numpy as np
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> def concatenate_paragraphs(sentence_array):
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> return ' '.join(sentence_array).split(' ')
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> logFile = 'foo.json'
>>>> > >> conf = SparkConf()
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> conf.setMaster("spark://cluster-master:7077").setAppName("example").set("spark.executor.memory", "1g")
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> sc = SparkContext(conf=conf)
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> logData = sc.textFile(logFile).cache()
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> num_lines = logData.count()
>>>> > >> print 'Number of lines: %d' % num_lines
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> # JSON object has the structure: {"key": {'paragraphs': [sentence1, sentence2, ...]}}
>>>> > >> tm = logData.map(lambda s: (json.loads(s)['key'], len(concatenate_paragraphs(json.loads(s)['paragraphs']))))
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> tm = tm.reduceByKey(lambda _, x: _ + x)
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> op = tm.collect()
>>>> > >> for key, num_words in op:
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>      print 'state: %s, num_words: %d' % (state, num_words)
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> On Mon, Mar 17, 2014 at 11:58 AM, Diana Carroll [via Apache Spark User List] <[hidden email]> wrote:
>>>> > >> I don't actually have any data.  I'm writing a course that teaches students how to do this sort of thing and am interested in looking at a variety of real life examples of people doing things like that.  I'd love to see some working code implementing the "obvious work-around" you mention...do you have any to share?  It's an approach that makes a lot of sense, and as I said, I'd love to not have to re-invent the wheel if someone else has already written that code.  Thanks!
>>>> > >>
>>>> > >> Diana
>>>> > >>
>>>> > >>
>>>> > >> On Mon, Mar 17, 2014 at 11:35 AM, Nicholas Chammas <[hidden email]> wrote:
>>>> > >> There was a previous discussion about this here:
>>>> > >>
>>>> > >> http://apache-spark-user-list.1001560.n3.nabble.com/Having-Spark-read-a-JSON-file-td1963.html
>>>> > >>
>>>> > >> How big are the XML or JSON files you're looking to deal with?
>>>> > >>
>>>> > >> It may not be practical to deserialize the entire document at once. In that case an obvious work-around would be to have some kind of pre-processing step that separates XML nodes/JSON objects with newlines so that you can analyze the data with Spark in a "line-oriented format". Your preprocessor wouldn't have to parse/deserialize the massive document; it would just have to track open/closed tags/braces to know when to insert a newline.
>>>> > >>
>>>> > >> Then you'd just open the line-delimited result and deserialize the individual objects/nodes with map().
>>>> > >>
>>>> > >> Nick
>>>> > >>
>>>> > >>
>>>> > >> On Mon, Mar 17, 2014 at 11:18 AM, Diana Carroll <[hidden email]> wrote:
>>>> > >> Has anyone got a working example of a Spark application that analyzes data in a non-line-oriented format, such as XML or JSON?  I'd like to do this without re-inventing the wheel...anyone care to share?  Thanks!
>>>> > >>
>>>> > >> Diana
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> If you reply to this email, your message will be added to the discussion below:
>>>> > >> http://apache-spark-user-list.1001560.n3.nabble.com/example-of-non-line-oriented-input-data-tp2750p2752.html
>>>> > >> To start a new topic under Apache Spark User List, email [hidden email]
>>>> > >> To unsubscribe from Apache Spark User List, click here.
>>>> > >> NAML
>>>> > >>
>>>> > >>
>>>> > >> View this message in context: Re: example of non-line oriented input data?
>>>> > >> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>> > >>
>>>> > >
>>>> > >
>>>> >
>>>> >
>>>> 
>>>> 
>>>> 
>>> 
>> 
>> 
> 
> 


Re: example of non-line oriented input data?

Posted by Nicholas Chammas <ni...@gmail.com>.
I'd second the request for Avro support in Python first, followed by
Parquet.


On Wed, Mar 19, 2014 at 2:14 PM, Evgeny Shishkin <it...@gmail.com>wrote:

>
> On 19 Mar 2014, at 19:54, Diana Carroll <dc...@cloudera.com> wrote:
>
> Actually, thinking more on this question, Matei: I'd definitely say
> support for Avro.  There's a lot of interest in this!!
>
>
> Agree, and parquet as default Cloudera Impala format.
>
>
>
>
> On Tue, Mar 18, 2014 at 8:14 PM, Matei Zaharia <ma...@gmail.com>wrote:
>
>> BTW one other thing -- in your experience, Diana, which non-text
>> InputFormats would be most useful to support in Python first? Would it be
>> Parquet or Avro, simple SequenceFiles with the Hadoop Writable types, or
>> something else? I think a per-file text input format that does the stuff we
>> did here would also be good.
>>
>> Matei
>>
>>
>> On Mar 18, 2014, at 3:27 PM, Matei Zaharia <ma...@gmail.com>
>> wrote:
>>
>> Hi Diana,
>>
>> This seems to work without the iter() in front if you just return
>> treeiterator. What happened when you didn't include that? Treeiterator
>> should return an iterator.
>>
>> Anyway, this is a good example of mapPartitions. It's one where you want
>> to view the whole file as one object (one XML here), so you couldn't
>> implement this using a flatMap, but you still want to return multiple
>> values. The MLlib example you saw needs Python 2.7 because unfortunately
>> that is a requirement for our Python MLlib support (see
>> http://spark.incubator.apache.org/docs/0.9.0/python-programming-guide.html#libraries).
>> We'd like to relax this later but we're using some newer features of NumPy
>> and Python. The rest of PySpark works on 2.6.
>>
>> In terms of the size in memory, here both the string s and the XML tree
>> constructed from it need to fit in, so you can't work on very large
>> individual XML files. You may be able to use a streaming XML parser instead
>> to extract elements from the data in a streaming fashion, without every
>> materializing the whole tree.
>> http://docs.python.org/2/library/xml.sax.reader.html#module-xml.sax.xmlreaderis one example.
>>
>> Matei
>>
>> On Mar 18, 2014, at 7:49 AM, Diana Carroll <dc...@cloudera.com> wrote:
>>
>> Well, if anyone is still following this, I've gotten the following code
>> working which in theory should allow me to parse whole XML files: (the
>> problem was that I can't return the tree iterator directly.  I have to call
>> iter().  Why?)
>>
>> import xml.etree.ElementTree as ET
>>
>> # two source files, format <data> <country
>> name="...">...</country>...</data>
>> mydata=sc.textFile("file:/home/training/countries*.xml")
>>
>> def parsefile(iterator):
>>     s = ''
>>     for i in iterator: s = s + str(i)
>>     tree = ET.fromstring(s)
>>     treeiterator = tree.getiterator("country")
>>     # why to I have to convert an iterator to an iterator?  not sure but
>> required
>>     return iter(treeiterator)
>>
>> mydata.mapPartitions(lambda x: parsefile(x)).map(lambda element:
>> element.attrib).collect()
>>
>> The output is what I expect:
>> [{'name': 'Liechtenstein'}, {'name': 'Singapore'}, {'name': 'Panama'}]
>>
>> BUT I'm a bit concerned about the construction of the string "s".  How
>> big can my file be before converting it to a string becomes problematic?
>>
>>
>>
>> On Tue, Mar 18, 2014 at 9:41 AM, Diana Carroll <dc...@cloudera.com>wrote:
>>
>>> Thanks, Matei.
>>>
>>> In the context of this discussion, it would seem mapParitions is
>>> essential, because it's the only way I'm going to be able to process each
>>> file as a whole, in our example of a large number of small XML files which
>>> need to be parsed as a whole file because records are not required to be on
>>> a single line.
>>>
>>> The theory makes sense but I'm still utterly lost as to how to implement
>>> it.  Unfortunately there's only a single example of the use of
>>> mapPartitions in any of the Python example programs, which is the log
>>> regression example, which I can't run because it requires Python 2.7 and
>>> I'm on Python 2.6.  (aside: I couldn't find any statement that Python 2.6
>>> is unsupported...is it?)
>>>
>>> I'd really really love to see a real life example of a Python use of
>>> mapPartitions.  I do appreciate the very simple examples you provided, but
>>> (perhaps because of my novice status on Python) I can't figure out how to
>>> translate those to a real world situation in which I'm building RDDs from
>>> files, not inline collections like [(1,2),(2,3)].
>>>
>>> Also, you say that the function called in mapPartitions can return a
>>> collection OR an iterator.  I tried returning an iterator by calling
>>> ElementTree getiterator function, but still got the error telling me my
>>> object was not an iterator.
>>>
>>> If anyone has a real life example of mapPartitions returning a Python
>>> iterator, that would be fabulous.
>>>
>>> Diana
>>>
>>>
>>> On Mon, Mar 17, 2014 at 6:17 PM, Matei Zaharia <ma...@gmail.com>wrote:
>>>
>>>> Oh, I see, the problem is that the function you pass to mapPartitions
>>>> must itself return an iterator or a collection. This is used so that you
>>>> can return multiple output records for each input record. You can implement
>>>> most of the existing map-like operations in Spark, such as map, filter,
>>>> flatMap, etc, with mapPartitions, as well as new ones that might do a
>>>> sliding window over each partition for example, or accumulate data across
>>>> elements (e.g. to compute a sum).
>>>>
>>>> For example, if you have data = sc.parallelize([1, 2, 3, 4], 2), this
>>>> will work:
>>>>
>>>> >>> data.mapPartitions(lambda x: x).collect()
>>>> [1, 2, 3, 4]   # Just return the same iterator, doing nothing
>>>>
>>>> >>> data.mapPartitions(lambda x: [list(x)]).collect()
>>>> [[1, 2], [3, 4]]   # Group together the elements of each partition in a
>>>> single list (like glom)
>>>>
>>>> >>> data.mapPartitions(lambda x: [sum(x)]).collect()
>>>> [3, 7]   # Sum each partition separately
>>>>
>>>> However something like data.mapPartitions(lambda x: sum(x)).collect()
>>>> will *not* work because sum returns a number, not an iterator. That's why I
>>>> put sum(x) inside a list above.
>>>>
>>>> In practice mapPartitions is most useful if you want to share some data
>>>> or work across the elements. For example maybe you want to load a lookup
>>>> table once from an external file and then check each element in it, or sum
>>>> up a bunch of elements without allocating a lot of vector objects.
>>>>
>>>> Matei
>>>>
>>>>
>>>> On Mar 17, 2014, at 11:25 AM, Diana Carroll <dc...@cloudera.com>
>>>> wrote:
>>>>
>>>> > "There's also mapPartitions, which gives you an iterator for each
>>>> partition instead of an array. You can then return an iterator or list of
>>>> objects to produce from that."
>>>> >
>>>> > I confess, I was hoping for an example of just that, because i've not
>>>> yet been able to figure out how to use mapPartitions.  No doubt this is
>>>> because i'm a rank newcomer to Python, and haven't fully wrapped my head
>>>> around iterators.  All I get so far in my attempts to use mapPartitions is
>>>> the darned "suchnsuch is not an iterator" error.
>>>> >
>>>> > def myfunction(iterator): return [1,2,3]
>>>> > mydata.mapPartitions(lambda x: myfunction(x)).take(2)
>>>> >
>>>> >
>>>> >
>>>> >
>>>> >
>>>> > On Mon, Mar 17, 2014 at 1:57 PM, Matei Zaharia <
>>>> matei.zaharia@gmail.com> wrote:
>>>> > Here's an example of getting together all lines in a file as one
>>>> string:
>>>> >
>>>> > $ cat dir/a.txt
>>>> > Hello
>>>> > world!
>>>> >
>>>> > $ cat dir/b.txt
>>>> > What's
>>>> > up??
>>>> >
>>>> > $ bin/pyspark
>>>> > >>> files = sc.textFile("dir")
>>>> >
>>>> > >>> files.collect()
>>>> > [u'Hello', u'world!', u"What's", u'up??']   # one element per line,
>>>> not what we want
>>>> >
>>>> > >>> files.glom().collect()
>>>> > [[u'Hello', u'world!'], [u"What's", u'up??']]   # one element per
>>>> file, which is an array of lines
>>>> >
>>>> > >>> files.glom().map(lambda a: "\n".join(a)).collect()
>>>> > [u'Hello\nworld!', u"What's\nup??"]    # join back each file into a
>>>> single string
>>>> >
>>>> > The glom() method groups all the elements of each partition of an RDD
>>>> into an array, giving you an RDD of arrays of objects. If your input is
>>>> small files, you always have one partition per file.
>>>> >
>>>> > There's also mapPartitions, which gives you an iterator for each
>>>> partition instead of an array. You can then return an iterator or list of
>>>> objects to produce from that.
>>>> >
>>>> > Matei
>>>> >
>>>> >
>>>> > On Mar 17, 2014, at 10:46 AM, Diana Carroll <dc...@cloudera.com>
>>>> wrote:
>>>> >
>>>> > > Thanks Matei.  That makes sense.  I have here a dataset of many
>>>> many smallish XML files, so using mapPartitions that way would make sense.
>>>>  I'd love to see a code example though ...It's not as obvious to me how to
>>>> do that as I probably should be.
>>>> > >
>>>> > > Thanks,
>>>> > > Diana
>>>> > >
>>>> > >
>>>> > > On Mon, Mar 17, 2014 at 1:02 PM, Matei Zaharia <
>>>> matei.zaharia@gmail.com> wrote:
>>>> > > Hi Diana,
>>>> > >
>>>> > > Non-text input formats are only supported in Java and Scala right
>>>> now, where you can use sparkContext.hadoopFile or .hadoopDataset to load
>>>> data with any InputFormat that Hadoop MapReduce supports. In Python, you
>>>> unfortunately only have textFile, which gives you one record per line. For
>>>> JSON, you'd have to fit the whole JSON object on one line as you said.
>>>> Hopefully we'll also have some other forms of input soon.
>>>> > >
>>>> > > If your input is a collection of separate files (say many .xml
>>>> files), you can also use mapPartitions on it to group together the lines
>>>> because each input file will end up being a single dataset partition (or
>>>> map task). This will let you concatenate the lines in each file and parse
>>>> them as one XML object.
>>>> > >
>>>> > > Matei
>>>> > >
>>>> > > On Mar 17, 2014, at 9:52 AM, Diana Carroll <dc...@cloudera.com>
>>>> wrote:
>>>> > >
>>>> > >> Thanks, Krakna, very helpful.  The way I read the code, it looks
>>>> like you are assuming that each line in foo.log contains a complete json
>>>> object?  (That is, that the data doesn't contain any records that are split
>>>> into multiple lines.)  If so, is that because you know that to be true of
>>>> your data?  Or did you do as Nicholas suggests and have some preprocessing
>>>> on the text input to flatten the data in that way?
>>>> > >>
>>>> > >> Thanks,
>>>> > >> Diana
>>>> > >>
>>>> > >>
>>>> > >> On Mon, Mar 17, 2014 at 12:09 PM, Krakna H <sh...@gmail.com>
>>>> wrote:
>>>> > >> Katrina,
>>>> > >>
>>>> > >> Not sure if this is what you had in mind, but here's some simple
>>>> pyspark code that I recently wrote to deal with JSON files.
>>>> > >>
>>>> > >> from pyspark import SparkContext, SparkConf
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> from operator import add
>>>> > >> import json
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> import random
>>>> > >> import numpy as np
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> def concatenate_paragraphs(sentence_array):
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> return ' '.join(sentence_array).split(' ')
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> logFile = 'foo.json'
>>>> > >> conf = SparkConf()
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> conf.setMaster("spark://cluster-master:7077").setAppName("example").set("spark.executor.memory",
>>>> "1g")
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> sc = SparkContext(conf=conf)
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> logData = sc.textFile(logFile).cache()
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> num_lines = logData.count()
>>>> > >> print 'Number of lines: %d' % num_lines
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> # JSON object has the structure: {"key": {'paragraphs':
>>>> [sentence1, sentence2, ...]}}
>>>> > >> tm = logData.map(lambda s: (json.loads(s)['key'],
>>>> len(concatenate_paragraphs(json.loads(s)['paragraphs']))))
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> tm = tm.reduceByKey(lambda _, x: _ + x)
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> op = tm.collect()
>>>> > >> for key, num_words in op:
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>      print 'state: %s, num_words: %d' % (state, num_words)
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> On Mon, Mar 17, 2014 at 11:58 AM, Diana Carroll [via Apache Spark
>>>> User List] <[hidden email]> wrote:
>>>> > >> I don't actually have any data.  I'm writing a course that teaches
>>>> students how to do this sort of thing and am interested in looking at a
>>>> variety of real life examples of people doing things like that.  I'd love
>>>> to see some working code implementing the "obvious work-around" you
>>>> mention...do you have any to share?  It's an approach that makes a lot of
>>>> sense, and as I said, I'd love to not have to re-invent the wheel if
>>>> someone else has already written that code.  Thanks!
>>>> > >>
>>>> > >> Diana
>>>> > >>
>>>> > >>
>>>> > >> On Mon, Mar 17, 2014 at 11:35 AM, Nicholas Chammas <[hidden
>>>> email]> wrote:
>>>> > >> There was a previous discussion about this here:
>>>> > >>
>>>> > >>
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Having-Spark-read-a-JSON-file-td1963.html
>>>> > >>
>>>> > >> How big are the XML or JSON files you're looking to deal with?
>>>> > >>
>>>> > >> It may not be practical to deserialize the entire document at
>>>> once. In that case an obvious work-around would be to have some kind of
>>>> pre-processing step that separates XML nodes/JSON objects with newlines so
>>>> that you can analyze the data with Spark in a "line-oriented format". Your
>>>> preprocessor wouldn't have to parse/deserialize the massive document; it
>>>> would just have to track open/closed tags/braces to know when to insert a
>>>> newline.
>>>> > >>
>>>> > >> Then you'd just open the line-delimited result and deserialize the
>>>> individual objects/nodes with map().
>>>> > >>
>>>> > >> Nick
>>>> > >>
>>>> > >>
>>>> > >> On Mon, Mar 17, 2014 at 11:18 AM, Diana Carroll <[hidden email]>
>>>> wrote:
>>>> > >> Has anyone got a working example of a Spark application that
>>>> analyzes data in a non-line-oriented format, such as XML or JSON?  I'd like
>>>> to do this without re-inventing the wheel...anyone care to share?  Thanks!
>>>> > >>
>>>> > >> Diana
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> If you reply to this email, your message will be added to the
>>>> discussion below:
>>>> > >>
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/example-of-non-line-oriented-input-data-tp2750p2752.html
>>>> > >> To start a new topic under Apache Spark User List, email [hidden
>>>> email]
>>>> > >> To unsubscribe from Apache Spark User List, click here.
>>>> > >> NAML
>>>> > >>
>>>> > >>
>>>> > >> View this message in context: Re: example of non-line oriented
>>>> input data?
>>>> > >> Sent from the Apache Spark User List mailing list archive at
>>>> Nabble.com <http://nabble.com/>.
>>>> > >>
>>>> > >
>>>> > >
>>>> >
>>>> >
>>>>
>>>>
>>>
>>
>>
>>
>
>

Re: example of non-line oriented input data?

Posted by Evgeny Shishkin <it...@gmail.com>.
On 19 Mar 2014, at 19:54, Diana Carroll <dc...@cloudera.com> wrote:

> Actually, thinking more on this question, Matei: I'd definitely say support for Avro.  There's a lot of interest in this!!
> 

Agree, and parquet as default Cloudera Impala format.



> On Tue, Mar 18, 2014 at 8:14 PM, Matei Zaharia <ma...@gmail.com> wrote:
> BTW one other thing — in your experience, Diana, which non-text InputFormats would be most useful to support in Python first? Would it be Parquet or Avro, simple SequenceFiles with the Hadoop Writable types, or something else? I think a per-file text input format that does the stuff we did here would also be good.
> 
> Matei
> 
> 
> On Mar 18, 2014, at 3:27 PM, Matei Zaharia <ma...@gmail.com> wrote:
> 
>> Hi Diana,
>> 
>> This seems to work without the iter() in front if you just return treeiterator. What happened when you didn’t include that? Treeiterator should return an iterator.
>> 
>> Anyway, this is a good example of mapPartitions. It’s one where you want to view the whole file as one object (one XML here), so you couldn’t implement this using a flatMap, but you still want to return multiple values. The MLlib example you saw needs Python 2.7 because unfortunately that is a requirement for our Python MLlib support (see http://spark.incubator.apache.org/docs/0.9.0/python-programming-guide.html#libraries). We’d like to relax this later but we’re using some newer features of NumPy and Python. The rest of PySpark works on 2.6.
>> 
>> In terms of the size in memory, here both the string s and the XML tree constructed from it need to fit in, so you can’t work on very large individual XML files. You may be able to use a streaming XML parser instead to extract elements from the data in a streaming fashion, without every materializing the whole tree. http://docs.python.org/2/library/xml.sax.reader.html#module-xml.sax.xmlreader is one example.
>> 
>> Matei
>> 
>> On Mar 18, 2014, at 7:49 AM, Diana Carroll <dc...@cloudera.com> wrote:
>> 
>>> Well, if anyone is still following this, I've gotten the following code working which in theory should allow me to parse whole XML files: (the problem was that I can't return the tree iterator directly.  I have to call iter().  Why?)
>>> 
>>> import xml.etree.ElementTree as ET
>>> 
>>> # two source files, format <data> <country name="...">...</country>...</data>
>>> mydata=sc.textFile("file:/home/training/countries*.xml") 
>>> 
>>> def parsefile(iterator):
>>>     s = ''
>>>     for i in iterator: s = s + str(i)
>>>     tree = ET.fromstring(s)
>>>     treeiterator = tree.getiterator("country")
>>>     # why to I have to convert an iterator to an iterator?  not sure but required
>>>     return iter(treeiterator)
>>> 
>>> mydata.mapPartitions(lambda x: parsefile(x)).map(lambda element: element.attrib).collect()
>>> 
>>> The output is what I expect:
>>> [{'name': 'Liechtenstein'}, {'name': 'Singapore'}, {'name': 'Panama'}]
>>> 
>>> BUT I'm a bit concerned about the construction of the string "s".  How big can my file be before converting it to a string becomes problematic?
>>> 
>>> 
>>> 
>>> On Tue, Mar 18, 2014 at 9:41 AM, Diana Carroll <dc...@cloudera.com> wrote:
>>> Thanks, Matei.
>>> 
>>> In the context of this discussion, it would seem mapParitions is essential, because it's the only way I'm going to be able to process each file as a whole, in our example of a large number of small XML files which need to be parsed as a whole file because records are not required to be on a single line.
>>> 
>>> The theory makes sense but I'm still utterly lost as to how to implement it.  Unfortunately there's only a single example of the use of mapPartitions in any of the Python example programs, which is the log regression example, which I can't run because it requires Python 2.7 and I'm on Python 2.6.  (aside: I couldn't find any statement that Python 2.6 is unsupported...is it?)
>>> 
>>> I'd really really love to see a real life example of a Python use of mapPartitions.  I do appreciate the very simple examples you provided, but (perhaps because of my novice status on Python) I can't figure out how to translate those to a real world situation in which I'm building RDDs from files, not inline collections like [(1,2),(2,3)].
>>> 
>>> Also, you say that the function called in mapPartitions can return a collection OR an iterator.  I tried returning an iterator by calling ElementTree getiterator function, but still got the error telling me my object was not an iterator. 
>>> 
>>> If anyone has a real life example of mapPartitions returning a Python iterator, that would be fabulous.
>>> 
>>> Diana
>>> 
>>> 
>>> On Mon, Mar 17, 2014 at 6:17 PM, Matei Zaharia <ma...@gmail.com> wrote:
>>> Oh, I see, the problem is that the function you pass to mapPartitions must itself return an iterator or a collection. This is used so that you can return multiple output records for each input record. You can implement most of the existing map-like operations in Spark, such as map, filter, flatMap, etc, with mapPartitions, as well as new ones that might do a sliding window over each partition for example, or accumulate data across elements (e.g. to compute a sum).
>>> 
>>> For example, if you have data = sc.parallelize([1, 2, 3, 4], 2), this will work:
>>> 
>>> >>> data.mapPartitions(lambda x: x).collect()
>>> [1, 2, 3, 4]   # Just return the same iterator, doing nothing
>>> 
>>> >>> data.mapPartitions(lambda x: [list(x)]).collect()
>>> [[1, 2], [3, 4]]   # Group together the elements of each partition in a single list (like glom)
>>> 
>>> >>> data.mapPartitions(lambda x: [sum(x)]).collect()
>>> [3, 7]   # Sum each partition separately
>>> 
>>> However something like data.mapPartitions(lambda x: sum(x)).collect() will *not* work because sum returns a number, not an iterator. That’s why I put sum(x) inside a list above.
>>> 
>>> In practice mapPartitions is most useful if you want to share some data or work across the elements. For example maybe you want to load a lookup table once from an external file and then check each element in it, or sum up a bunch of elements without allocating a lot of vector objects.
>>> 
>>> Matei
>>> 
>>> 
>>> On Mar 17, 2014, at 11:25 AM, Diana Carroll <dc...@cloudera.com> wrote:
>>> 
>>> > "There’s also mapPartitions, which gives you an iterator for each partition instead of an array. You can then return an iterator or list of objects to produce from that."
>>> >
>>> > I confess, I was hoping for an example of just that, because i've not yet been able to figure out how to use mapPartitions.  No doubt this is because i'm a rank newcomer to Python, and haven't fully wrapped my head around iterators.  All I get so far in my attempts to use mapPartitions is the darned "suchnsuch is not an iterator" error.
>>> >
>>> > def myfunction(iterator): return [1,2,3]
>>> > mydata.mapPartitions(lambda x: myfunction(x)).take(2)
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > On Mon, Mar 17, 2014 at 1:57 PM, Matei Zaharia <ma...@gmail.com> wrote:
>>> > Here’s an example of getting together all lines in a file as one string:
>>> >
>>> > $ cat dir/a.txt
>>> > Hello
>>> > world!
>>> >
>>> > $ cat dir/b.txt
>>> > What's
>>> > up??
>>> >
>>> > $ bin/pyspark
>>> > >>> files = sc.textFile(“dir”)
>>> >
>>> > >>> files.collect()
>>> > [u'Hello', u'world!', u"What's", u'up??’]   # one element per line, not what we want
>>> >
>>> > >>> files.glom().collect()
>>> > [[u'Hello', u'world!'], [u"What's", u'up??’]]   # one element per file, which is an array of lines
>>> >
>>> > >>> files.glom().map(lambda a: "\n".join(a)).collect()
>>> > [u'Hello\nworld!', u"What's\nup??”]    # join back each file into a single string
>>> >
>>> > The glom() method groups all the elements of each partition of an RDD into an array, giving you an RDD of arrays of objects. If your input is small files, you always have one partition per file.
>>> >
>>> > There’s also mapPartitions, which gives you an iterator for each partition instead of an array. You can then return an iterator or list of objects to produce from that.
>>> >
>>> > Matei
>>> >
>>> >
>>> > On Mar 17, 2014, at 10:46 AM, Diana Carroll <dc...@cloudera.com> wrote:
>>> >
>>> > > Thanks Matei.  That makes sense.  I have here a dataset of many many smallish XML files, so using mapPartitions that way would make sense.  I'd love to see a code example though ...It's not as obvious to me how to do that as I probably should be.
>>> > >
>>> > > Thanks,
>>> > > Diana
>>> > >
>>> > >
>>> > > On Mon, Mar 17, 2014 at 1:02 PM, Matei Zaharia <ma...@gmail.com> wrote:
>>> > > Hi Diana,
>>> > >
>>> > > Non-text input formats are only supported in Java and Scala right now, where you can use sparkContext.hadoopFile or .hadoopDataset to load data with any InputFormat that Hadoop MapReduce supports. In Python, you unfortunately only have textFile, which gives you one record per line. For JSON, you’d have to fit the whole JSON object on one line as you said. Hopefully we’ll also have some other forms of input soon.
>>> > >
>>> > > If your input is a collection of separate files (say many .xml files), you can also use mapPartitions on it to group together the lines because each input file will end up being a single dataset partition (or map task). This will let you concatenate the lines in each file and parse them as one XML object.
>>> > >
>>> > > Matei
>>> > >
>>> > > On Mar 17, 2014, at 9:52 AM, Diana Carroll <dc...@cloudera.com> wrote:
>>> > >
>>> > >> Thanks, Krakna, very helpful.  The way I read the code, it looks like you are assuming that each line in foo.log contains a complete json object?  (That is, that the data doesn't contain any records that are split into multiple lines.)  If so, is that because you know that to be true of your data?  Or did you do as Nicholas suggests and have some preprocessing on the text input to flatten the data in that way?
>>> > >>
>>> > >> Thanks,
>>> > >> Diana
>>> > >>
>>> > >>
>>> > >> On Mon, Mar 17, 2014 at 12:09 PM, Krakna H <sh...@gmail.com> wrote:
>>> > >> Katrina,
>>> > >>
>>> > >> Not sure if this is what you had in mind, but here's some simple pyspark code that I recently wrote to deal with JSON files.
>>> > >>
>>> > >> from pyspark import SparkContext, SparkConf
>>> > >>
>>> > >>
>>> > >>
>>> > >> from operator import add
>>> > >> import json
>>> > >>
>>> > >>
>>> > >>
>>> > >> import random
>>> > >> import numpy as np
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >> def concatenate_paragraphs(sentence_array):
>>> > >>
>>> > >>
>>> > >>
>>> > >> return ' '.join(sentence_array).split(' ')
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >> logFile = 'foo.json'
>>> > >> conf = SparkConf()
>>> > >>
>>> > >>
>>> > >>
>>> > >> conf.setMaster("spark://cluster-master:7077").setAppName("example").set("spark.executor.memory", "1g")
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >> sc = SparkContext(conf=conf)
>>> > >>
>>> > >>
>>> > >>
>>> > >> logData = sc.textFile(logFile).cache()
>>> > >>
>>> > >>
>>> > >>
>>> > >> num_lines = logData.count()
>>> > >> print 'Number of lines: %d' % num_lines
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >> # JSON object has the structure: {"key": {'paragraphs': [sentence1, sentence2, ...]}}
>>> > >> tm = logData.map(lambda s: (json.loads(s)['key'], len(concatenate_paragraphs(json.loads(s)['paragraphs']))))
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >> tm = tm.reduceByKey(lambda _, x: _ + x)
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >> op = tm.collect()
>>> > >> for key, num_words in op:
>>> > >>
>>> > >>
>>> > >>
>>> > >>      print 'state: %s, num_words: %d' % (state, num_words)
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >> On Mon, Mar 17, 2014 at 11:58 AM, Diana Carroll [via Apache Spark User List] <[hidden email]> wrote:
>>> > >> I don't actually have any data.  I'm writing a course that teaches students how to do this sort of thing and am interested in looking at a variety of real life examples of people doing things like that.  I'd love to see some working code implementing the "obvious work-around" you mention...do you have any to share?  It's an approach that makes a lot of sense, and as I said, I'd love to not have to re-invent the wheel if someone else has already written that code.  Thanks!
>>> > >>
>>> > >> Diana
>>> > >>
>>> > >>
>>> > >> On Mon, Mar 17, 2014 at 11:35 AM, Nicholas Chammas <[hidden email]> wrote:
>>> > >> There was a previous discussion about this here:
>>> > >>
>>> > >> http://apache-spark-user-list.1001560.n3.nabble.com/Having-Spark-read-a-JSON-file-td1963.html
>>> > >>
>>> > >> How big are the XML or JSON files you're looking to deal with?
>>> > >>
>>> > >> It may not be practical to deserialize the entire document at once. In that case an obvious work-around would be to have some kind of pre-processing step that separates XML nodes/JSON objects with newlines so that you can analyze the data with Spark in a "line-oriented format". Your preprocessor wouldn't have to parse/deserialize the massive document; it would just have to track open/closed tags/braces to know when to insert a newline.
>>> > >>
>>> > >> Then you'd just open the line-delimited result and deserialize the individual objects/nodes with map().
>>> > >>
>>> > >> Nick
>>> > >>
>>> > >>
>>> > >> On Mon, Mar 17, 2014 at 11:18 AM, Diana Carroll <[hidden email]> wrote:
>>> > >> Has anyone got a working example of a Spark application that analyzes data in a non-line-oriented format, such as XML or JSON?  I'd like to do this without re-inventing the wheel...anyone care to share?  Thanks!
>>> > >>
>>> > >> Diana
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >> If you reply to this email, your message will be added to the discussion below:
>>> > >> http://apache-spark-user-list.1001560.n3.nabble.com/example-of-non-line-oriented-input-data-tp2750p2752.html
>>> > >> To start a new topic under Apache Spark User List, email [hidden email]
>>> > >> To unsubscribe from Apache Spark User List, click here.
>>> > >> NAML
>>> > >>
>>> > >>
>>> > >> View this message in context: Re: example of non-line oriented input data?
>>> > >> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>> > >>
>>> > >
>>> > >
>>> >
>>> >
>>> 
>>> 
>>> 
>> 
> 
> 


Re: example of non-line oriented input data?

Posted by Diana Carroll <dc...@cloudera.com>.
Actually, thinking more on this question, Matei: I'd definitely say support
for Avro.  There's a lot of interest in this!!


On Tue, Mar 18, 2014 at 8:14 PM, Matei Zaharia <ma...@gmail.com>wrote:

> BTW one other thing -- in your experience, Diana, which non-text
> InputFormats would be most useful to support in Python first? Would it be
> Parquet or Avro, simple SequenceFiles with the Hadoop Writable types, or
> something else? I think a per-file text input format that does the stuff we
> did here would also be good.
>
> Matei
>
>
> On Mar 18, 2014, at 3:27 PM, Matei Zaharia <ma...@gmail.com>
> wrote:
>
> Hi Diana,
>
> This seems to work without the iter() in front if you just return
> treeiterator. What happened when you didn't include that? Treeiterator
> should return an iterator.
>
> Anyway, this is a good example of mapPartitions. It's one where you want
> to view the whole file as one object (one XML here), so you couldn't
> implement this using a flatMap, but you still want to return multiple
> values. The MLlib example you saw needs Python 2.7 because unfortunately
> that is a requirement for our Python MLlib support (see
> http://spark.incubator.apache.org/docs/0.9.0/python-programming-guide.html#libraries).
> We'd like to relax this later but we're using some newer features of NumPy
> and Python. The rest of PySpark works on 2.6.
>
> In terms of the size in memory, here both the string s and the XML tree
> constructed from it need to fit in, so you can't work on very large
> individual XML files. You may be able to use a streaming XML parser instead
> to extract elements from the data in a streaming fashion, without every
> materializing the whole tree.
> http://docs.python.org/2/library/xml.sax.reader.html#module-xml.sax.xmlreaderis one example.
>
> Matei
>
> On Mar 18, 2014, at 7:49 AM, Diana Carroll <dc...@cloudera.com> wrote:
>
> Well, if anyone is still following this, I've gotten the following code
> working which in theory should allow me to parse whole XML files: (the
> problem was that I can't return the tree iterator directly.  I have to call
> iter().  Why?)
>
> import xml.etree.ElementTree as ET
>
> # two source files, format <data> <country
> name="...">...</country>...</data>
> mydata=sc.textFile("file:/home/training/countries*.xml")
>
> def parsefile(iterator):
>     s = ''
>     for i in iterator: s = s + str(i)
>     tree = ET.fromstring(s)
>     treeiterator = tree.getiterator("country")
>     # why to I have to convert an iterator to an iterator?  not sure but
> required
>     return iter(treeiterator)
>
> mydata.mapPartitions(lambda x: parsefile(x)).map(lambda element:
> element.attrib).collect()
>
> The output is what I expect:
> [{'name': 'Liechtenstein'}, {'name': 'Singapore'}, {'name': 'Panama'}]
>
> BUT I'm a bit concerned about the construction of the string "s".  How big
> can my file be before converting it to a string becomes problematic?
>
>
>
> On Tue, Mar 18, 2014 at 9:41 AM, Diana Carroll <dc...@cloudera.com>wrote:
>
>> Thanks, Matei.
>>
>> In the context of this discussion, it would seem mapParitions is
>> essential, because it's the only way I'm going to be able to process each
>> file as a whole, in our example of a large number of small XML files which
>> need to be parsed as a whole file because records are not required to be on
>> a single line.
>>
>> The theory makes sense but I'm still utterly lost as to how to implement
>> it.  Unfortunately there's only a single example of the use of
>> mapPartitions in any of the Python example programs, which is the log
>> regression example, which I can't run because it requires Python 2.7 and
>> I'm on Python 2.6.  (aside: I couldn't find any statement that Python 2.6
>> is unsupported...is it?)
>>
>> I'd really really love to see a real life example of a Python use of
>> mapPartitions.  I do appreciate the very simple examples you provided, but
>> (perhaps because of my novice status on Python) I can't figure out how to
>> translate those to a real world situation in which I'm building RDDs from
>> files, not inline collections like [(1,2),(2,3)].
>>
>> Also, you say that the function called in mapPartitions can return a
>> collection OR an iterator.  I tried returning an iterator by calling
>> ElementTree getiterator function, but still got the error telling me my
>> object was not an iterator.
>>
>> If anyone has a real life example of mapPartitions returning a Python
>> iterator, that would be fabulous.
>>
>> Diana
>>
>>
>> On Mon, Mar 17, 2014 at 6:17 PM, Matei Zaharia <ma...@gmail.com>wrote:
>>
>>> Oh, I see, the problem is that the function you pass to mapPartitions
>>> must itself return an iterator or a collection. This is used so that you
>>> can return multiple output records for each input record. You can implement
>>> most of the existing map-like operations in Spark, such as map, filter,
>>> flatMap, etc, with mapPartitions, as well as new ones that might do a
>>> sliding window over each partition for example, or accumulate data across
>>> elements (e.g. to compute a sum).
>>>
>>> For example, if you have data = sc.parallelize([1, 2, 3, 4], 2), this
>>> will work:
>>>
>>> >>> data.mapPartitions(lambda x: x).collect()
>>> [1, 2, 3, 4]   # Just return the same iterator, doing nothing
>>>
>>> >>> data.mapPartitions(lambda x: [list(x)]).collect()
>>> [[1, 2], [3, 4]]   # Group together the elements of each partition in a
>>> single list (like glom)
>>>
>>> >>> data.mapPartitions(lambda x: [sum(x)]).collect()
>>> [3, 7]   # Sum each partition separately
>>>
>>> However something like data.mapPartitions(lambda x: sum(x)).collect()
>>> will *not* work because sum returns a number, not an iterator. That's why I
>>> put sum(x) inside a list above.
>>>
>>> In practice mapPartitions is most useful if you want to share some data
>>> or work across the elements. For example maybe you want to load a lookup
>>> table once from an external file and then check each element in it, or sum
>>> up a bunch of elements without allocating a lot of vector objects.
>>>
>>> Matei
>>>
>>>
>>> On Mar 17, 2014, at 11:25 AM, Diana Carroll <dc...@cloudera.com>
>>> wrote:
>>>
>>> > "There's also mapPartitions, which gives you an iterator for each
>>> partition instead of an array. You can then return an iterator or list of
>>> objects to produce from that."
>>> >
>>> > I confess, I was hoping for an example of just that, because i've not
>>> yet been able to figure out how to use mapPartitions.  No doubt this is
>>> because i'm a rank newcomer to Python, and haven't fully wrapped my head
>>> around iterators.  All I get so far in my attempts to use mapPartitions is
>>> the darned "suchnsuch is not an iterator" error.
>>> >
>>> > def myfunction(iterator): return [1,2,3]
>>> > mydata.mapPartitions(lambda x: myfunction(x)).take(2)
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > On Mon, Mar 17, 2014 at 1:57 PM, Matei Zaharia <
>>> matei.zaharia@gmail.com> wrote:
>>> > Here's an example of getting together all lines in a file as one
>>> string:
>>> >
>>> > $ cat dir/a.txt
>>> > Hello
>>> > world!
>>> >
>>> > $ cat dir/b.txt
>>> > What's
>>> > up??
>>> >
>>> > $ bin/pyspark
>>> > >>> files = sc.textFile("dir")
>>> >
>>> > >>> files.collect()
>>> > [u'Hello', u'world!', u"What's", u'up??']   # one element per line,
>>> not what we want
>>> >
>>> > >>> files.glom().collect()
>>> > [[u'Hello', u'world!'], [u"What's", u'up??']]   # one element per
>>> file, which is an array of lines
>>> >
>>> > >>> files.glom().map(lambda a: "\n".join(a)).collect()
>>> > [u'Hello\nworld!', u"What's\nup??"]    # join back each file into a
>>> single string
>>> >
>>> > The glom() method groups all the elements of each partition of an RDD
>>> into an array, giving you an RDD of arrays of objects. If your input is
>>> small files, you always have one partition per file.
>>> >
>>> > There's also mapPartitions, which gives you an iterator for each
>>> partition instead of an array. You can then return an iterator or list of
>>> objects to produce from that.
>>> >
>>> > Matei
>>> >
>>> >
>>> > On Mar 17, 2014, at 10:46 AM, Diana Carroll <dc...@cloudera.com>
>>> wrote:
>>> >
>>> > > Thanks Matei.  That makes sense.  I have here a dataset of many many
>>> smallish XML files, so using mapPartitions that way would make sense.  I'd
>>> love to see a code example though ...It's not as obvious to me how to do
>>> that as I probably should be.
>>> > >
>>> > > Thanks,
>>> > > Diana
>>> > >
>>> > >
>>> > > On Mon, Mar 17, 2014 at 1:02 PM, Matei Zaharia <
>>> matei.zaharia@gmail.com> wrote:
>>> > > Hi Diana,
>>> > >
>>> > > Non-text input formats are only supported in Java and Scala right
>>> now, where you can use sparkContext.hadoopFile or .hadoopDataset to load
>>> data with any InputFormat that Hadoop MapReduce supports. In Python, you
>>> unfortunately only have textFile, which gives you one record per line. For
>>> JSON, you'd have to fit the whole JSON object on one line as you said.
>>> Hopefully we'll also have some other forms of input soon.
>>> > >
>>> > > If your input is a collection of separate files (say many .xml
>>> files), you can also use mapPartitions on it to group together the lines
>>> because each input file will end up being a single dataset partition (or
>>> map task). This will let you concatenate the lines in each file and parse
>>> them as one XML object.
>>> > >
>>> > > Matei
>>> > >
>>> > > On Mar 17, 2014, at 9:52 AM, Diana Carroll <dc...@cloudera.com>
>>> wrote:
>>> > >
>>> > >> Thanks, Krakna, very helpful.  The way I read the code, it looks
>>> like you are assuming that each line in foo.log contains a complete json
>>> object?  (That is, that the data doesn't contain any records that are split
>>> into multiple lines.)  If so, is that because you know that to be true of
>>> your data?  Or did you do as Nicholas suggests and have some preprocessing
>>> on the text input to flatten the data in that way?
>>> > >>
>>> > >> Thanks,
>>> > >> Diana
>>> > >>
>>> > >>
>>> > >> On Mon, Mar 17, 2014 at 12:09 PM, Krakna H <sh...@gmail.com>
>>> wrote:
>>> > >> Katrina,
>>> > >>
>>> > >> Not sure if this is what you had in mind, but here's some simple
>>> pyspark code that I recently wrote to deal with JSON files.
>>> > >>
>>> > >> from pyspark import SparkContext, SparkConf
>>> > >>
>>> > >>
>>> > >>
>>> > >> from operator import add
>>> > >> import json
>>> > >>
>>> > >>
>>> > >>
>>> > >> import random
>>> > >> import numpy as np
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >> def concatenate_paragraphs(sentence_array):
>>> > >>
>>> > >>
>>> > >>
>>> > >> return ' '.join(sentence_array).split(' ')
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >> logFile = 'foo.json'
>>> > >> conf = SparkConf()
>>> > >>
>>> > >>
>>> > >>
>>> > >> conf.setMaster("spark://cluster-master:7077").setAppName("example").set("spark.executor.memory",
>>> "1g")
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >> sc = SparkContext(conf=conf)
>>> > >>
>>> > >>
>>> > >>
>>> > >> logData = sc.textFile(logFile).cache()
>>> > >>
>>> > >>
>>> > >>
>>> > >> num_lines = logData.count()
>>> > >> print 'Number of lines: %d' % num_lines
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >> # JSON object has the structure: {"key": {'paragraphs': [sentence1,
>>> sentence2, ...]}}
>>> > >> tm = logData.map(lambda s: (json.loads(s)['key'],
>>> len(concatenate_paragraphs(json.loads(s)['paragraphs']))))
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >> tm = tm.reduceByKey(lambda _, x: _ + x)
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >> op = tm.collect()
>>> > >> for key, num_words in op:
>>> > >>
>>> > >>
>>> > >>
>>> > >>      print 'state: %s, num_words: %d' % (state, num_words)
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >> On Mon, Mar 17, 2014 at 11:58 AM, Diana Carroll [via Apache Spark
>>> User List] <[hidden email]> wrote:
>>> > >> I don't actually have any data.  I'm writing a course that teaches
>>> students how to do this sort of thing and am interested in looking at a
>>> variety of real life examples of people doing things like that.  I'd love
>>> to see some working code implementing the "obvious work-around" you
>>> mention...do you have any to share?  It's an approach that makes a lot of
>>> sense, and as I said, I'd love to not have to re-invent the wheel if
>>> someone else has already written that code.  Thanks!
>>> > >>
>>> > >> Diana
>>> > >>
>>> > >>
>>> > >> On Mon, Mar 17, 2014 at 11:35 AM, Nicholas Chammas <[hidden email]>
>>> wrote:
>>> > >> There was a previous discussion about this here:
>>> > >>
>>> > >>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Having-Spark-read-a-JSON-file-td1963.html
>>> > >>
>>> > >> How big are the XML or JSON files you're looking to deal with?
>>> > >>
>>> > >> It may not be practical to deserialize the entire document at once.
>>> In that case an obvious work-around would be to have some kind of
>>> pre-processing step that separates XML nodes/JSON objects with newlines so
>>> that you can analyze the data with Spark in a "line-oriented format". Your
>>> preprocessor wouldn't have to parse/deserialize the massive document; it
>>> would just have to track open/closed tags/braces to know when to insert a
>>> newline.
>>> > >>
>>> > >> Then you'd just open the line-delimited result and deserialize the
>>> individual objects/nodes with map().
>>> > >>
>>> > >> Nick
>>> > >>
>>> > >>
>>> > >> On Mon, Mar 17, 2014 at 11:18 AM, Diana Carroll <[hidden email]>
>>> wrote:
>>> > >> Has anyone got a working example of a Spark application that
>>> analyzes data in a non-line-oriented format, such as XML or JSON?  I'd like
>>> to do this without re-inventing the wheel...anyone care to share?  Thanks!
>>> > >>
>>> > >> Diana
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >> If you reply to this email, your message will be added to the
>>> discussion below:
>>> > >>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/example-of-non-line-oriented-input-data-tp2750p2752.html
>>> > >> To start a new topic under Apache Spark User List, email [hidden
>>> email]
>>> > >> To unsubscribe from Apache Spark User List, click here.
>>> > >> NAML
>>> > >>
>>> > >>
>>> > >> View this message in context: Re: example of non-line oriented
>>> input data?
>>> > >> Sent from the Apache Spark User List mailing list archive at
>>> Nabble.com <http://nabble.com/>.
>>> > >>
>>> > >
>>> > >
>>> >
>>> >
>>>
>>>
>>
>
>
>

Re: example of non-line oriented input data?

Posted by Matei Zaharia <ma...@gmail.com>.
BTW one other thing — in your experience, Diana, which non-text InputFormats would be most useful to support in Python first? Would it be Parquet or Avro, simple SequenceFiles with the Hadoop Writable types, or something else? I think a per-file text input format that does the stuff we did here would also be good.

Matei


On Mar 18, 2014, at 3:27 PM, Matei Zaharia <ma...@gmail.com> wrote:

> Hi Diana,
> 
> This seems to work without the iter() in front if you just return treeiterator. What happened when you didn’t include that? Treeiterator should return an iterator.
> 
> Anyway, this is a good example of mapPartitions. It’s one where you want to view the whole file as one object (one XML here), so you couldn’t implement this using a flatMap, but you still want to return multiple values. The MLlib example you saw needs Python 2.7 because unfortunately that is a requirement for our Python MLlib support (see http://spark.incubator.apache.org/docs/0.9.0/python-programming-guide.html#libraries). We’d like to relax this later but we’re using some newer features of NumPy and Python. The rest of PySpark works on 2.6.
> 
> In terms of the size in memory, here both the string s and the XML tree constructed from it need to fit in, so you can’t work on very large individual XML files. You may be able to use a streaming XML parser instead to extract elements from the data in a streaming fashion, without every materializing the whole tree. http://docs.python.org/2/library/xml.sax.reader.html#module-xml.sax.xmlreader is one example.
> 
> Matei
> 
> On Mar 18, 2014, at 7:49 AM, Diana Carroll <dc...@cloudera.com> wrote:
> 
>> Well, if anyone is still following this, I've gotten the following code working which in theory should allow me to parse whole XML files: (the problem was that I can't return the tree iterator directly.  I have to call iter().  Why?)
>> 
>> import xml.etree.ElementTree as ET
>> 
>> # two source files, format <data> <country name="...">...</country>...</data>
>> mydata=sc.textFile("file:/home/training/countries*.xml") 
>> 
>> def parsefile(iterator):
>>     s = ''
>>     for i in iterator: s = s + str(i)
>>     tree = ET.fromstring(s)
>>     treeiterator = tree.getiterator("country")
>>     # why to I have to convert an iterator to an iterator?  not sure but required
>>     return iter(treeiterator)
>> 
>> mydata.mapPartitions(lambda x: parsefile(x)).map(lambda element: element.attrib).collect()
>> 
>> The output is what I expect:
>> [{'name': 'Liechtenstein'}, {'name': 'Singapore'}, {'name': 'Panama'}]
>> 
>> BUT I'm a bit concerned about the construction of the string "s".  How big can my file be before converting it to a string becomes problematic?
>> 
>> 
>> 
>> On Tue, Mar 18, 2014 at 9:41 AM, Diana Carroll <dc...@cloudera.com> wrote:
>> Thanks, Matei.
>> 
>> In the context of this discussion, it would seem mapParitions is essential, because it's the only way I'm going to be able to process each file as a whole, in our example of a large number of small XML files which need to be parsed as a whole file because records are not required to be on a single line.
>> 
>> The theory makes sense but I'm still utterly lost as to how to implement it.  Unfortunately there's only a single example of the use of mapPartitions in any of the Python example programs, which is the log regression example, which I can't run because it requires Python 2.7 and I'm on Python 2.6.  (aside: I couldn't find any statement that Python 2.6 is unsupported...is it?)
>> 
>> I'd really really love to see a real life example of a Python use of mapPartitions.  I do appreciate the very simple examples you provided, but (perhaps because of my novice status on Python) I can't figure out how to translate those to a real world situation in which I'm building RDDs from files, not inline collections like [(1,2),(2,3)].
>> 
>> Also, you say that the function called in mapPartitions can return a collection OR an iterator.  I tried returning an iterator by calling ElementTree getiterator function, but still got the error telling me my object was not an iterator. 
>> 
>> If anyone has a real life example of mapPartitions returning a Python iterator, that would be fabulous.
>> 
>> Diana
>> 
>> 
>> On Mon, Mar 17, 2014 at 6:17 PM, Matei Zaharia <ma...@gmail.com> wrote:
>> Oh, I see, the problem is that the function you pass to mapPartitions must itself return an iterator or a collection. This is used so that you can return multiple output records for each input record. You can implement most of the existing map-like operations in Spark, such as map, filter, flatMap, etc, with mapPartitions, as well as new ones that might do a sliding window over each partition for example, or accumulate data across elements (e.g. to compute a sum).
>> 
>> For example, if you have data = sc.parallelize([1, 2, 3, 4], 2), this will work:
>> 
>> >>> data.mapPartitions(lambda x: x).collect()
>> [1, 2, 3, 4]   # Just return the same iterator, doing nothing
>> 
>> >>> data.mapPartitions(lambda x: [list(x)]).collect()
>> [[1, 2], [3, 4]]   # Group together the elements of each partition in a single list (like glom)
>> 
>> >>> data.mapPartitions(lambda x: [sum(x)]).collect()
>> [3, 7]   # Sum each partition separately
>> 
>> However something like data.mapPartitions(lambda x: sum(x)).collect() will *not* work because sum returns a number, not an iterator. That’s why I put sum(x) inside a list above.
>> 
>> In practice mapPartitions is most useful if you want to share some data or work across the elements. For example maybe you want to load a lookup table once from an external file and then check each element in it, or sum up a bunch of elements without allocating a lot of vector objects.
>> 
>> Matei
>> 
>> 
>> On Mar 17, 2014, at 11:25 AM, Diana Carroll <dc...@cloudera.com> wrote:
>> 
>> > "There’s also mapPartitions, which gives you an iterator for each partition instead of an array. You can then return an iterator or list of objects to produce from that."
>> >
>> > I confess, I was hoping for an example of just that, because i've not yet been able to figure out how to use mapPartitions.  No doubt this is because i'm a rank newcomer to Python, and haven't fully wrapped my head around iterators.  All I get so far in my attempts to use mapPartitions is the darned "suchnsuch is not an iterator" error.
>> >
>> > def myfunction(iterator): return [1,2,3]
>> > mydata.mapPartitions(lambda x: myfunction(x)).take(2)
>> >
>> >
>> >
>> >
>> >
>> > On Mon, Mar 17, 2014 at 1:57 PM, Matei Zaharia <ma...@gmail.com> wrote:
>> > Here’s an example of getting together all lines in a file as one string:
>> >
>> > $ cat dir/a.txt
>> > Hello
>> > world!
>> >
>> > $ cat dir/b.txt
>> > What's
>> > up??
>> >
>> > $ bin/pyspark
>> > >>> files = sc.textFile(“dir”)
>> >
>> > >>> files.collect()
>> > [u'Hello', u'world!', u"What's", u'up??’]   # one element per line, not what we want
>> >
>> > >>> files.glom().collect()
>> > [[u'Hello', u'world!'], [u"What's", u'up??’]]   # one element per file, which is an array of lines
>> >
>> > >>> files.glom().map(lambda a: "\n".join(a)).collect()
>> > [u'Hello\nworld!', u"What's\nup??”]    # join back each file into a single string
>> >
>> > The glom() method groups all the elements of each partition of an RDD into an array, giving you an RDD of arrays of objects. If your input is small files, you always have one partition per file.
>> >
>> > There’s also mapPartitions, which gives you an iterator for each partition instead of an array. You can then return an iterator or list of objects to produce from that.
>> >
>> > Matei
>> >
>> >
>> > On Mar 17, 2014, at 10:46 AM, Diana Carroll <dc...@cloudera.com> wrote:
>> >
>> > > Thanks Matei.  That makes sense.  I have here a dataset of many many smallish XML files, so using mapPartitions that way would make sense.  I'd love to see a code example though ...It's not as obvious to me how to do that as I probably should be.
>> > >
>> > > Thanks,
>> > > Diana
>> > >
>> > >
>> > > On Mon, Mar 17, 2014 at 1:02 PM, Matei Zaharia <ma...@gmail.com> wrote:
>> > > Hi Diana,
>> > >
>> > > Non-text input formats are only supported in Java and Scala right now, where you can use sparkContext.hadoopFile or .hadoopDataset to load data with any InputFormat that Hadoop MapReduce supports. In Python, you unfortunately only have textFile, which gives you one record per line. For JSON, you’d have to fit the whole JSON object on one line as you said. Hopefully we’ll also have some other forms of input soon.
>> > >
>> > > If your input is a collection of separate files (say many .xml files), you can also use mapPartitions on it to group together the lines because each input file will end up being a single dataset partition (or map task). This will let you concatenate the lines in each file and parse them as one XML object.
>> > >
>> > > Matei
>> > >
>> > > On Mar 17, 2014, at 9:52 AM, Diana Carroll <dc...@cloudera.com> wrote:
>> > >
>> > >> Thanks, Krakna, very helpful.  The way I read the code, it looks like you are assuming that each line in foo.log contains a complete json object?  (That is, that the data doesn't contain any records that are split into multiple lines.)  If so, is that because you know that to be true of your data?  Or did you do as Nicholas suggests and have some preprocessing on the text input to flatten the data in that way?
>> > >>
>> > >> Thanks,
>> > >> Diana
>> > >>
>> > >>
>> > >> On Mon, Mar 17, 2014 at 12:09 PM, Krakna H <sh...@gmail.com> wrote:
>> > >> Katrina,
>> > >>
>> > >> Not sure if this is what you had in mind, but here's some simple pyspark code that I recently wrote to deal with JSON files.
>> > >>
>> > >> from pyspark import SparkContext, SparkConf
>> > >>
>> > >>
>> > >>
>> > >> from operator import add
>> > >> import json
>> > >>
>> > >>
>> > >>
>> > >> import random
>> > >> import numpy as np
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> def concatenate_paragraphs(sentence_array):
>> > >>
>> > >>
>> > >>
>> > >> return ' '.join(sentence_array).split(' ')
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> logFile = 'foo.json'
>> > >> conf = SparkConf()
>> > >>
>> > >>
>> > >>
>> > >> conf.setMaster("spark://cluster-master:7077").setAppName("example").set("spark.executor.memory", "1g")
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> sc = SparkContext(conf=conf)
>> > >>
>> > >>
>> > >>
>> > >> logData = sc.textFile(logFile).cache()
>> > >>
>> > >>
>> > >>
>> > >> num_lines = logData.count()
>> > >> print 'Number of lines: %d' % num_lines
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> # JSON object has the structure: {"key": {'paragraphs': [sentence1, sentence2, ...]}}
>> > >> tm = logData.map(lambda s: (json.loads(s)['key'], len(concatenate_paragraphs(json.loads(s)['paragraphs']))))
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> tm = tm.reduceByKey(lambda _, x: _ + x)
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> op = tm.collect()
>> > >> for key, num_words in op:
>> > >>
>> > >>
>> > >>
>> > >>      print 'state: %s, num_words: %d' % (state, num_words)
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> On Mon, Mar 17, 2014 at 11:58 AM, Diana Carroll [via Apache Spark User List] <[hidden email]> wrote:
>> > >> I don't actually have any data.  I'm writing a course that teaches students how to do this sort of thing and am interested in looking at a variety of real life examples of people doing things like that.  I'd love to see some working code implementing the "obvious work-around" you mention...do you have any to share?  It's an approach that makes a lot of sense, and as I said, I'd love to not have to re-invent the wheel if someone else has already written that code.  Thanks!
>> > >>
>> > >> Diana
>> > >>
>> > >>
>> > >> On Mon, Mar 17, 2014 at 11:35 AM, Nicholas Chammas <[hidden email]> wrote:
>> > >> There was a previous discussion about this here:
>> > >>
>> > >> http://apache-spark-user-list.1001560.n3.nabble.com/Having-Spark-read-a-JSON-file-td1963.html
>> > >>
>> > >> How big are the XML or JSON files you're looking to deal with?
>> > >>
>> > >> It may not be practical to deserialize the entire document at once. In that case an obvious work-around would be to have some kind of pre-processing step that separates XML nodes/JSON objects with newlines so that you can analyze the data with Spark in a "line-oriented format". Your preprocessor wouldn't have to parse/deserialize the massive document; it would just have to track open/closed tags/braces to know when to insert a newline.
>> > >>
>> > >> Then you'd just open the line-delimited result and deserialize the individual objects/nodes with map().
>> > >>
>> > >> Nick
>> > >>
>> > >>
>> > >> On Mon, Mar 17, 2014 at 11:18 AM, Diana Carroll <[hidden email]> wrote:
>> > >> Has anyone got a working example of a Spark application that analyzes data in a non-line-oriented format, such as XML or JSON?  I'd like to do this without re-inventing the wheel...anyone care to share?  Thanks!
>> > >>
>> > >> Diana
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> If you reply to this email, your message will be added to the discussion below:
>> > >> http://apache-spark-user-list.1001560.n3.nabble.com/example-of-non-line-oriented-input-data-tp2750p2752.html
>> > >> To start a new topic under Apache Spark User List, email [hidden email]
>> > >> To unsubscribe from Apache Spark User List, click here.
>> > >> NAML
>> > >>
>> > >>
>> > >> View this message in context: Re: example of non-line oriented input data?
>> > >> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> > >>
>> > >
>> > >
>> >
>> >
>> 
>> 
>> 
> 


Re: example of non-line oriented input data?

Posted by Diana Carroll <dc...@cloudera.com>.
If I don't call iter(), and just return treeiterator directly, I get an
error message that the object is not of an iterator type.  This is in
Python 2.6...perhaps a bug?

BUT I also realized my code was wrong.  It results in an RDD containing all
the tags in all the files.  What I really want is an RDD where each record
corresponds to a single file.  So if I have a thousand files, I should have
a thousand elements in my RDD, each of which is an ElementTree.  (Which I
can then use to map or flatMap to pull out the data I actually care about.)

So, this works:

def parsefile(iterator):
    s = ''
    for i in iterator: s = s + str(i)
    yield ElementTree.fromstring(s)

I would think the ability to process very large numbers of smallish XML
files is pretty common. The use case I'm playing with right now is using a
knowledge base of HTML documents.  Each document in the KB is a single
file, which in my experience is not an unusual configuration.  I'd like to
be able to suck the whole KB into an RDD and then do analysis such as
"which keywords are most commonly used in the KB" or "is there a
correlation between certain user attributes and the KB articles they
request" and so on.

Unfortunately I'm not sure I'm best to answer your question about non-text
InputFormats to support.  I'm fairly new to Hadoop (about 8 months) and I'm
not in the field.  My background is in app servers, ecommerce and business
process management, so that's my bias.  From that perspective, it would be
really useful to be able to work with XML/HTML and CSV files...but are
those what big data analysts are actually using Spark for?  I dunno.  And,
really, if I were actually in those fields, I'd be getting the data from a
DB using Shark, right?

Diana


On Tue, Mar 18, 2014 at 6:27 PM, Matei Zaharia <ma...@gmail.com>wrote:

> Hi Diana,
>
> This seems to work without the iter() in front if you just return
> treeiterator. What happened when you didn't include that? Treeiterator
> should return an iterator.
>
> Anyway, this is a good example of mapPartitions. It's one where you want
> to view the whole file as one object (one XML here), so you couldn't
> implement this using a flatMap, but you still want to return multiple
> values. The MLlib example you saw needs Python 2.7 because unfortunately
> that is a requirement for our Python MLlib support (see
> http://spark.incubator.apache.org/docs/0.9.0/python-programming-guide.html#libraries).
> We'd like to relax this later but we're using some newer features of NumPy
> and Python. The rest of PySpark works on 2.6.
>
> In terms of the size in memory, here both the string s and the XML tree
> constructed from it need to fit in, so you can't work on very large
> individual XML files. You may be able to use a streaming XML parser instead
> to extract elements from the data in a streaming fashion, without every
> materializing the whole tree.
> http://docs.python.org/2/library/xml.sax.reader.html#module-xml.sax.xmlreaderis one example.
>
> Matei
>
> On Mar 18, 2014, at 7:49 AM, Diana Carroll <dc...@cloudera.com> wrote:
>
> Well, if anyone is still following this, I've gotten the following code
> working which in theory should allow me to parse whole XML files: (the
> problem was that I can't return the tree iterator directly.  I have to call
> iter().  Why?)
>
> import xml.etree.ElementTree as ET
>
> # two source files, format <data> <country
> name="...">...</country>...</data>
> mydata=sc.textFile("file:/home/training/countries*.xml")
>
> def parsefile(iterator):
>     s = ''
>     for i in iterator: s = s + str(i)
>     tree = ET.fromstring(s)
>     treeiterator = tree.getiterator("country")
>     # why to I have to convert an iterator to an iterator?  not sure but
> required
>     return iter(treeiterator)
>
> mydata.mapPartitions(lambda x: parsefile(x)).map(lambda element:
> element.attrib).collect()
>
> The output is what I expect:
> [{'name': 'Liechtenstein'}, {'name': 'Singapore'}, {'name': 'Panama'}]
>
> BUT I'm a bit concerned about the construction of the string "s".  How big
> can my file be before converting it to a string becomes problematic?
>
>
>
> On Tue, Mar 18, 2014 at 9:41 AM, Diana Carroll <dc...@cloudera.com>wrote:
>
>> Thanks, Matei.
>>
>> In the context of this discussion, it would seem mapParitions is
>> essential, because it's the only way I'm going to be able to process each
>> file as a whole, in our example of a large number of small XML files which
>> need to be parsed as a whole file because records are not required to be on
>> a single line.
>>
>> The theory makes sense but I'm still utterly lost as to how to implement
>> it.  Unfortunately there's only a single example of the use of
>> mapPartitions in any of the Python example programs, which is the log
>> regression example, which I can't run because it requires Python 2.7 and
>> I'm on Python 2.6.  (aside: I couldn't find any statement that Python 2.6
>> is unsupported...is it?)
>>
>> I'd really really love to see a real life example of a Python use of
>> mapPartitions.  I do appreciate the very simple examples you provided, but
>> (perhaps because of my novice status on Python) I can't figure out how to
>> translate those to a real world situation in which I'm building RDDs from
>> files, not inline collections like [(1,2),(2,3)].
>>
>> Also, you say that the function called in mapPartitions can return a
>> collection OR an iterator.  I tried returning an iterator by calling
>> ElementTree getiterator function, but still got the error telling me my
>> object was not an iterator.
>>
>> If anyone has a real life example of mapPartitions returning a Python
>> iterator, that would be fabulous.
>>
>> Diana
>>
>>
>> On Mon, Mar 17, 2014 at 6:17 PM, Matei Zaharia <ma...@gmail.com>wrote:
>>
>>> Oh, I see, the problem is that the function you pass to mapPartitions
>>> must itself return an iterator or a collection. This is used so that you
>>> can return multiple output records for each input record. You can implement
>>> most of the existing map-like operations in Spark, such as map, filter,
>>> flatMap, etc, with mapPartitions, as well as new ones that might do a
>>> sliding window over each partition for example, or accumulate data across
>>> elements (e.g. to compute a sum).
>>>
>>> For example, if you have data = sc.parallelize([1, 2, 3, 4], 2), this
>>> will work:
>>>
>>> >>> data.mapPartitions(lambda x: x).collect()
>>> [1, 2, 3, 4]   # Just return the same iterator, doing nothing
>>>
>>> >>> data.mapPartitions(lambda x: [list(x)]).collect()
>>> [[1, 2], [3, 4]]   # Group together the elements of each partition in a
>>> single list (like glom)
>>>
>>> >>> data.mapPartitions(lambda x: [sum(x)]).collect()
>>> [3, 7]   # Sum each partition separately
>>>
>>> However something like data.mapPartitions(lambda x: sum(x)).collect()
>>> will *not* work because sum returns a number, not an iterator. That's why I
>>> put sum(x) inside a list above.
>>>
>>> In practice mapPartitions is most useful if you want to share some data
>>> or work across the elements. For example maybe you want to load a lookup
>>> table once from an external file and then check each element in it, or sum
>>> up a bunch of elements without allocating a lot of vector objects.
>>>
>>> Matei
>>>
>>>
>>> On Mar 17, 2014, at 11:25 AM, Diana Carroll <dc...@cloudera.com>
>>> wrote:
>>>
>>> > "There's also mapPartitions, which gives you an iterator for each
>>> partition instead of an array. You can then return an iterator or list of
>>> objects to produce from that."
>>> >
>>> > I confess, I was hoping for an example of just that, because i've not
>>> yet been able to figure out how to use mapPartitions.  No doubt this is
>>> because i'm a rank newcomer to Python, and haven't fully wrapped my head
>>> around iterators.  All I get so far in my attempts to use mapPartitions is
>>> the darned "suchnsuch is not an iterator" error.
>>> >
>>> > def myfunction(iterator): return [1,2,3]
>>> > mydata.mapPartitions(lambda x: myfunction(x)).take(2)
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > On Mon, Mar 17, 2014 at 1:57 PM, Matei Zaharia <
>>> matei.zaharia@gmail.com> wrote:
>>> > Here's an example of getting together all lines in a file as one
>>> string:
>>> >
>>> > $ cat dir/a.txt
>>> > Hello
>>> > world!
>>> >
>>> > $ cat dir/b.txt
>>> > What's
>>> > up??
>>> >
>>> > $ bin/pyspark
>>> > >>> files = sc.textFile("dir")
>>> >
>>> > >>> files.collect()
>>> > [u'Hello', u'world!', u"What's", u'up??']   # one element per line,
>>> not what we want
>>> >
>>> > >>> files.glom().collect()
>>> > [[u'Hello', u'world!'], [u"What's", u'up??']]   # one element per
>>> file, which is an array of lines
>>> >
>>> > >>> files.glom().map(lambda a: "\n".join(a)).collect()
>>> > [u'Hello\nworld!', u"What's\nup??"]    # join back each file into a
>>> single string
>>> >
>>> > The glom() method groups all the elements of each partition of an RDD
>>> into an array, giving you an RDD of arrays of objects. If your input is
>>> small files, you always have one partition per file.
>>> >
>>> > There's also mapPartitions, which gives you an iterator for each
>>> partition instead of an array. You can then return an iterator or list of
>>> objects to produce from that.
>>> >
>>> > Matei
>>> >
>>> >
>>> > On Mar 17, 2014, at 10:46 AM, Diana Carroll <dc...@cloudera.com>
>>> wrote:
>>> >
>>> > > Thanks Matei.  That makes sense.  I have here a dataset of many many
>>> smallish XML files, so using mapPartitions that way would make sense.  I'd
>>> love to see a code example though ...It's not as obvious to me how to do
>>> that as I probably should be.
>>> > >
>>> > > Thanks,
>>> > > Diana
>>> > >
>>> > >
>>> > > On Mon, Mar 17, 2014 at 1:02 PM, Matei Zaharia <
>>> matei.zaharia@gmail.com> wrote:
>>> > > Hi Diana,
>>> > >
>>> > > Non-text input formats are only supported in Java and Scala right
>>> now, where you can use sparkContext.hadoopFile or .hadoopDataset to load
>>> data with any InputFormat that Hadoop MapReduce supports. In Python, you
>>> unfortunately only have textFile, which gives you one record per line. For
>>> JSON, you'd have to fit the whole JSON object on one line as you said.
>>> Hopefully we'll also have some other forms of input soon.
>>> > >
>>> > > If your input is a collection of separate files (say many .xml
>>> files), you can also use mapPartitions on it to group together the lines
>>> because each input file will end up being a single dataset partition (or
>>> map task). This will let you concatenate the lines in each file and parse
>>> them as one XML object.
>>> > >
>>> > > Matei
>>> > >
>>> > > On Mar 17, 2014, at 9:52 AM, Diana Carroll <dc...@cloudera.com>
>>> wrote:
>>> > >
>>> > >> Thanks, Krakna, very helpful.  The way I read the code, it looks
>>> like you are assuming that each line in foo.log contains a complete json
>>> object?  (That is, that the data doesn't contain any records that are split
>>> into multiple lines.)  If so, is that because you know that to be true of
>>> your data?  Or did you do as Nicholas suggests and have some preprocessing
>>> on the text input to flatten the data in that way?
>>> > >>
>>> > >> Thanks,
>>> > >> Diana
>>> > >>
>>> > >>
>>> > >> On Mon, Mar 17, 2014 at 12:09 PM, Krakna H <sh...@gmail.com>
>>> wrote:
>>> > >> Katrina,
>>> > >>
>>> > >> Not sure if this is what you had in mind, but here's some simple
>>> pyspark code that I recently wrote to deal with JSON files.
>>> > >>
>>> > >> from pyspark import SparkContext, SparkConf
>>> > >>
>>> > >>
>>> > >>
>>> > >> from operator import add
>>> > >> import json
>>> > >>
>>> > >>
>>> > >>
>>> > >> import random
>>> > >> import numpy as np
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >> def concatenate_paragraphs(sentence_array):
>>> > >>
>>> > >>
>>> > >>
>>> > >> return ' '.join(sentence_array).split(' ')
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >> logFile = 'foo.json'
>>> > >> conf = SparkConf()
>>> > >>
>>> > >>
>>> > >>
>>> > >> conf.setMaster("spark://cluster-master:7077").setAppName("example").set("spark.executor.memory",
>>> "1g")
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >> sc = SparkContext(conf=conf)
>>> > >>
>>> > >>
>>> > >>
>>> > >> logData = sc.textFile(logFile).cache()
>>> > >>
>>> > >>
>>> > >>
>>> > >> num_lines = logData.count()
>>> > >> print 'Number of lines: %d' % num_lines
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >> # JSON object has the structure: {"key": {'paragraphs': [sentence1,
>>> sentence2, ...]}}
>>> > >> tm = logData.map(lambda s: (json.loads(s)['key'],
>>> len(concatenate_paragraphs(json.loads(s)['paragraphs']))))
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >> tm = tm.reduceByKey(lambda _, x: _ + x)
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >> op = tm.collect()
>>> > >> for key, num_words in op:
>>> > >>
>>> > >>
>>> > >>
>>> > >>      print 'state: %s, num_words: %d' % (state, num_words)
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >> On Mon, Mar 17, 2014 at 11:58 AM, Diana Carroll [via Apache Spark
>>> User List] <[hidden email]> wrote:
>>> > >> I don't actually have any data.  I'm writing a course that teaches
>>> students how to do this sort of thing and am interested in looking at a
>>> variety of real life examples of people doing things like that.  I'd love
>>> to see some working code implementing the "obvious work-around" you
>>> mention...do you have any to share?  It's an approach that makes a lot of
>>> sense, and as I said, I'd love to not have to re-invent the wheel if
>>> someone else has already written that code.  Thanks!
>>> > >>
>>> > >> Diana
>>> > >>
>>> > >>
>>> > >> On Mon, Mar 17, 2014 at 11:35 AM, Nicholas Chammas <[hidden email]>
>>> wrote:
>>> > >> There was a previous discussion about this here:
>>> > >>
>>> > >>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Having-Spark-read-a-JSON-file-td1963.html
>>> > >>
>>> > >> How big are the XML or JSON files you're looking to deal with?
>>> > >>
>>> > >> It may not be practical to deserialize the entire document at once.
>>> In that case an obvious work-around would be to have some kind of
>>> pre-processing step that separates XML nodes/JSON objects with newlines so
>>> that you can analyze the data with Spark in a "line-oriented format". Your
>>> preprocessor wouldn't have to parse/deserialize the massive document; it
>>> would just have to track open/closed tags/braces to know when to insert a
>>> newline.
>>> > >>
>>> > >> Then you'd just open the line-delimited result and deserialize the
>>> individual objects/nodes with map().
>>> > >>
>>> > >> Nick
>>> > >>
>>> > >>
>>> > >> On Mon, Mar 17, 2014 at 11:18 AM, Diana Carroll <[hidden email]>
>>> wrote:
>>> > >> Has anyone got a working example of a Spark application that
>>> analyzes data in a non-line-oriented format, such as XML or JSON?  I'd like
>>> to do this without re-inventing the wheel...anyone care to share?  Thanks!
>>> > >>
>>> > >> Diana
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >> If you reply to this email, your message will be added to the
>>> discussion below:
>>> > >>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/example-of-non-line-oriented-input-data-tp2750p2752.html
>>> > >> To start a new topic under Apache Spark User List, email [hidden
>>> email]
>>> > >> To unsubscribe from Apache Spark User List, click here.
>>> > >> NAML
>>> > >>
>>> > >>
>>> > >> View this message in context: Re: example of non-line oriented
>>> input data?
>>> > >> Sent from the Apache Spark User List mailing list archive at
>>> Nabble.com.
>>> > >>
>>> > >
>>> > >
>>> >
>>> >
>>>
>>>
>>
>
>

Re: example of non-line oriented input data?

Posted by Matei Zaharia <ma...@gmail.com>.
Hi Diana,

This seems to work without the iter() in front if you just return treeiterator. What happened when you didn’t include that? Treeiterator should return an iterator.

Anyway, this is a good example of mapPartitions. It’s one where you want to view the whole file as one object (one XML here), so you couldn’t implement this using a flatMap, but you still want to return multiple values. The MLlib example you saw needs Python 2.7 because unfortunately that is a requirement for our Python MLlib support (see http://spark.incubator.apache.org/docs/0.9.0/python-programming-guide.html#libraries). We’d like to relax this later but we’re using some newer features of NumPy and Python. The rest of PySpark works on 2.6.

In terms of the size in memory, here both the string s and the XML tree constructed from it need to fit in, so you can’t work on very large individual XML files. You may be able to use a streaming XML parser instead to extract elements from the data in a streaming fashion, without every materializing the whole tree. http://docs.python.org/2/library/xml.sax.reader.html#module-xml.sax.xmlreader is one example.

Matei

On Mar 18, 2014, at 7:49 AM, Diana Carroll <dc...@cloudera.com> wrote:

> Well, if anyone is still following this, I've gotten the following code working which in theory should allow me to parse whole XML files: (the problem was that I can't return the tree iterator directly.  I have to call iter().  Why?)
> 
> import xml.etree.ElementTree as ET
> 
> # two source files, format <data> <country name="...">...</country>...</data>
> mydata=sc.textFile("file:/home/training/countries*.xml") 
> 
> def parsefile(iterator):
>     s = ''
>     for i in iterator: s = s + str(i)
>     tree = ET.fromstring(s)
>     treeiterator = tree.getiterator("country")
>     # why to I have to convert an iterator to an iterator?  not sure but required
>     return iter(treeiterator)
> 
> mydata.mapPartitions(lambda x: parsefile(x)).map(lambda element: element.attrib).collect()
> 
> The output is what I expect:
> [{'name': 'Liechtenstein'}, {'name': 'Singapore'}, {'name': 'Panama'}]
> 
> BUT I'm a bit concerned about the construction of the string "s".  How big can my file be before converting it to a string becomes problematic?
> 
> 
> 
> On Tue, Mar 18, 2014 at 9:41 AM, Diana Carroll <dc...@cloudera.com> wrote:
> Thanks, Matei.
> 
> In the context of this discussion, it would seem mapParitions is essential, because it's the only way I'm going to be able to process each file as a whole, in our example of a large number of small XML files which need to be parsed as a whole file because records are not required to be on a single line.
> 
> The theory makes sense but I'm still utterly lost as to how to implement it.  Unfortunately there's only a single example of the use of mapPartitions in any of the Python example programs, which is the log regression example, which I can't run because it requires Python 2.7 and I'm on Python 2.6.  (aside: I couldn't find any statement that Python 2.6 is unsupported...is it?)
> 
> I'd really really love to see a real life example of a Python use of mapPartitions.  I do appreciate the very simple examples you provided, but (perhaps because of my novice status on Python) I can't figure out how to translate those to a real world situation in which I'm building RDDs from files, not inline collections like [(1,2),(2,3)].
> 
> Also, you say that the function called in mapPartitions can return a collection OR an iterator.  I tried returning an iterator by calling ElementTree getiterator function, but still got the error telling me my object was not an iterator. 
> 
> If anyone has a real life example of mapPartitions returning a Python iterator, that would be fabulous.
> 
> Diana
> 
> 
> On Mon, Mar 17, 2014 at 6:17 PM, Matei Zaharia <ma...@gmail.com> wrote:
> Oh, I see, the problem is that the function you pass to mapPartitions must itself return an iterator or a collection. This is used so that you can return multiple output records for each input record. You can implement most of the existing map-like operations in Spark, such as map, filter, flatMap, etc, with mapPartitions, as well as new ones that might do a sliding window over each partition for example, or accumulate data across elements (e.g. to compute a sum).
> 
> For example, if you have data = sc.parallelize([1, 2, 3, 4], 2), this will work:
> 
> >>> data.mapPartitions(lambda x: x).collect()
> [1, 2, 3, 4]   # Just return the same iterator, doing nothing
> 
> >>> data.mapPartitions(lambda x: [list(x)]).collect()
> [[1, 2], [3, 4]]   # Group together the elements of each partition in a single list (like glom)
> 
> >>> data.mapPartitions(lambda x: [sum(x)]).collect()
> [3, 7]   # Sum each partition separately
> 
> However something like data.mapPartitions(lambda x: sum(x)).collect() will *not* work because sum returns a number, not an iterator. That’s why I put sum(x) inside a list above.
> 
> In practice mapPartitions is most useful if you want to share some data or work across the elements. For example maybe you want to load a lookup table once from an external file and then check each element in it, or sum up a bunch of elements without allocating a lot of vector objects.
> 
> Matei
> 
> 
> On Mar 17, 2014, at 11:25 AM, Diana Carroll <dc...@cloudera.com> wrote:
> 
> > "There’s also mapPartitions, which gives you an iterator for each partition instead of an array. You can then return an iterator or list of objects to produce from that."
> >
> > I confess, I was hoping for an example of just that, because i've not yet been able to figure out how to use mapPartitions.  No doubt this is because i'm a rank newcomer to Python, and haven't fully wrapped my head around iterators.  All I get so far in my attempts to use mapPartitions is the darned "suchnsuch is not an iterator" error.
> >
> > def myfunction(iterator): return [1,2,3]
> > mydata.mapPartitions(lambda x: myfunction(x)).take(2)
> >
> >
> >
> >
> >
> > On Mon, Mar 17, 2014 at 1:57 PM, Matei Zaharia <ma...@gmail.com> wrote:
> > Here’s an example of getting together all lines in a file as one string:
> >
> > $ cat dir/a.txt
> > Hello
> > world!
> >
> > $ cat dir/b.txt
> > What's
> > up??
> >
> > $ bin/pyspark
> > >>> files = sc.textFile(“dir”)
> >
> > >>> files.collect()
> > [u'Hello', u'world!', u"What's", u'up??’]   # one element per line, not what we want
> >
> > >>> files.glom().collect()
> > [[u'Hello', u'world!'], [u"What's", u'up??’]]   # one element per file, which is an array of lines
> >
> > >>> files.glom().map(lambda a: "\n".join(a)).collect()
> > [u'Hello\nworld!', u"What's\nup??”]    # join back each file into a single string
> >
> > The glom() method groups all the elements of each partition of an RDD into an array, giving you an RDD of arrays of objects. If your input is small files, you always have one partition per file.
> >
> > There’s also mapPartitions, which gives you an iterator for each partition instead of an array. You can then return an iterator or list of objects to produce from that.
> >
> > Matei
> >
> >
> > On Mar 17, 2014, at 10:46 AM, Diana Carroll <dc...@cloudera.com> wrote:
> >
> > > Thanks Matei.  That makes sense.  I have here a dataset of many many smallish XML files, so using mapPartitions that way would make sense.  I'd love to see a code example though ...It's not as obvious to me how to do that as I probably should be.
> > >
> > > Thanks,
> > > Diana
> > >
> > >
> > > On Mon, Mar 17, 2014 at 1:02 PM, Matei Zaharia <ma...@gmail.com> wrote:
> > > Hi Diana,
> > >
> > > Non-text input formats are only supported in Java and Scala right now, where you can use sparkContext.hadoopFile or .hadoopDataset to load data with any InputFormat that Hadoop MapReduce supports. In Python, you unfortunately only have textFile, which gives you one record per line. For JSON, you’d have to fit the whole JSON object on one line as you said. Hopefully we’ll also have some other forms of input soon.
> > >
> > > If your input is a collection of separate files (say many .xml files), you can also use mapPartitions on it to group together the lines because each input file will end up being a single dataset partition (or map task). This will let you concatenate the lines in each file and parse them as one XML object.
> > >
> > > Matei
> > >
> > > On Mar 17, 2014, at 9:52 AM, Diana Carroll <dc...@cloudera.com> wrote:
> > >
> > >> Thanks, Krakna, very helpful.  The way I read the code, it looks like you are assuming that each line in foo.log contains a complete json object?  (That is, that the data doesn't contain any records that are split into multiple lines.)  If so, is that because you know that to be true of your data?  Or did you do as Nicholas suggests and have some preprocessing on the text input to flatten the data in that way?
> > >>
> > >> Thanks,
> > >> Diana
> > >>
> > >>
> > >> On Mon, Mar 17, 2014 at 12:09 PM, Krakna H <sh...@gmail.com> wrote:
> > >> Katrina,
> > >>
> > >> Not sure if this is what you had in mind, but here's some simple pyspark code that I recently wrote to deal with JSON files.
> > >>
> > >> from pyspark import SparkContext, SparkConf
> > >>
> > >>
> > >>
> > >> from operator import add
> > >> import json
> > >>
> > >>
> > >>
> > >> import random
> > >> import numpy as np
> > >>
> > >>
> > >>
> > >>
> > >> def concatenate_paragraphs(sentence_array):
> > >>
> > >>
> > >>
> > >> return ' '.join(sentence_array).split(' ')
> > >>
> > >>
> > >>
> > >>
> > >> logFile = 'foo.json'
> > >> conf = SparkConf()
> > >>
> > >>
> > >>
> > >> conf.setMaster("spark://cluster-master:7077").setAppName("example").set("spark.executor.memory", "1g")
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> sc = SparkContext(conf=conf)
> > >>
> > >>
> > >>
> > >> logData = sc.textFile(logFile).cache()
> > >>
> > >>
> > >>
> > >> num_lines = logData.count()
> > >> print 'Number of lines: %d' % num_lines
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> # JSON object has the structure: {"key": {'paragraphs': [sentence1, sentence2, ...]}}
> > >> tm = logData.map(lambda s: (json.loads(s)['key'], len(concatenate_paragraphs(json.loads(s)['paragraphs']))))
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> tm = tm.reduceByKey(lambda _, x: _ + x)
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> op = tm.collect()
> > >> for key, num_words in op:
> > >>
> > >>
> > >>
> > >>      print 'state: %s, num_words: %d' % (state, num_words)
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> On Mon, Mar 17, 2014 at 11:58 AM, Diana Carroll [via Apache Spark User List] <[hidden email]> wrote:
> > >> I don't actually have any data.  I'm writing a course that teaches students how to do this sort of thing and am interested in looking at a variety of real life examples of people doing things like that.  I'd love to see some working code implementing the "obvious work-around" you mention...do you have any to share?  It's an approach that makes a lot of sense, and as I said, I'd love to not have to re-invent the wheel if someone else has already written that code.  Thanks!
> > >>
> > >> Diana
> > >>
> > >>
> > >> On Mon, Mar 17, 2014 at 11:35 AM, Nicholas Chammas <[hidden email]> wrote:
> > >> There was a previous discussion about this here:
> > >>
> > >> http://apache-spark-user-list.1001560.n3.nabble.com/Having-Spark-read-a-JSON-file-td1963.html
> > >>
> > >> How big are the XML or JSON files you're looking to deal with?
> > >>
> > >> It may not be practical to deserialize the entire document at once. In that case an obvious work-around would be to have some kind of pre-processing step that separates XML nodes/JSON objects with newlines so that you can analyze the data with Spark in a "line-oriented format". Your preprocessor wouldn't have to parse/deserialize the massive document; it would just have to track open/closed tags/braces to know when to insert a newline.
> > >>
> > >> Then you'd just open the line-delimited result and deserialize the individual objects/nodes with map().
> > >>
> > >> Nick
> > >>
> > >>
> > >> On Mon, Mar 17, 2014 at 11:18 AM, Diana Carroll <[hidden email]> wrote:
> > >> Has anyone got a working example of a Spark application that analyzes data in a non-line-oriented format, such as XML or JSON?  I'd like to do this without re-inventing the wheel...anyone care to share?  Thanks!
> > >>
> > >> Diana
> > >>
> > >>
> > >>
> > >>
> > >> If you reply to this email, your message will be added to the discussion below:
> > >> http://apache-spark-user-list.1001560.n3.nabble.com/example-of-non-line-oriented-input-data-tp2750p2752.html
> > >> To start a new topic under Apache Spark User List, email [hidden email]
> > >> To unsubscribe from Apache Spark User List, click here.
> > >> NAML
> > >>
> > >>
> > >> View this message in context: Re: example of non-line oriented input data?
> > >> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> > >>
> > >
> > >
> >
> >
> 
> 
> 


Re: example of non-line oriented input data?

Posted by Diana Carroll <dc...@cloudera.com>.
Well, if anyone is still following this, I've gotten the following code
working which in theory should allow me to parse whole XML files: (the
problem was that I can't return the tree iterator directly.  I have to call
iter().  Why?)

import xml.etree.ElementTree as ET

# two source files, format <data> <country
name="...">...</country>...</data>
mydata=sc.textFile("file:/home/training/countries*.xml")

def parsefile(iterator):
    s = ''
    for i in iterator: s = s + str(i)
    tree = ET.fromstring(s)
    treeiterator = tree.getiterator("country")
    # why to I have to convert an iterator to an iterator?  not sure but
required
    return iter(treeiterator)

mydata.mapPartitions(lambda x: parsefile(x)).map(lambda element:
element.attrib).collect()

The output is what I expect:
[{'name': 'Liechtenstein'}, {'name': 'Singapore'}, {'name': 'Panama'}]

BUT I'm a bit concerned about the construction of the string "s".  How big
can my file be before converting it to a string becomes problematic?



On Tue, Mar 18, 2014 at 9:41 AM, Diana Carroll <dc...@cloudera.com>wrote:

> Thanks, Matei.
>
> In the context of this discussion, it would seem mapParitions is
> essential, because it's the only way I'm going to be able to process each
> file as a whole, in our example of a large number of small XML files which
> need to be parsed as a whole file because records are not required to be on
> a single line.
>
> The theory makes sense but I'm still utterly lost as to how to implement
> it.  Unfortunately there's only a single example of the use of
> mapPartitions in any of the Python example programs, which is the log
> regression example, which I can't run because it requires Python 2.7 and
> I'm on Python 2.6.  (aside: I couldn't find any statement that Python 2.6
> is unsupported...is it?)
>
> I'd really really love to see a real life example of a Python use of
> mapPartitions.  I do appreciate the very simple examples you provided, but
> (perhaps because of my novice status on Python) I can't figure out how to
> translate those to a real world situation in which I'm building RDDs from
> files, not inline collections like [(1,2),(2,3)].
>
> Also, you say that the function called in mapPartitions can return a
> collection OR an iterator.  I tried returning an iterator by calling
> ElementTree getiterator function, but still got the error telling me my
> object was not an iterator.
>
> If anyone has a real life example of mapPartitions returning a Python
> iterator, that would be fabulous.
>
> Diana
>
>
> On Mon, Mar 17, 2014 at 6:17 PM, Matei Zaharia <ma...@gmail.com>wrote:
>
>> Oh, I see, the problem is that the function you pass to mapPartitions
>> must itself return an iterator or a collection. This is used so that you
>> can return multiple output records for each input record. You can implement
>> most of the existing map-like operations in Spark, such as map, filter,
>> flatMap, etc, with mapPartitions, as well as new ones that might do a
>> sliding window over each partition for example, or accumulate data across
>> elements (e.g. to compute a sum).
>>
>> For example, if you have data = sc.parallelize([1, 2, 3, 4], 2), this
>> will work:
>>
>> >>> data.mapPartitions(lambda x: x).collect()
>> [1, 2, 3, 4]   # Just return the same iterator, doing nothing
>>
>> >>> data.mapPartitions(lambda x: [list(x)]).collect()
>> [[1, 2], [3, 4]]   # Group together the elements of each partition in a
>> single list (like glom)
>>
>> >>> data.mapPartitions(lambda x: [sum(x)]).collect()
>> [3, 7]   # Sum each partition separately
>>
>> However something like data.mapPartitions(lambda x: sum(x)).collect()
>> will *not* work because sum returns a number, not an iterator. That's why I
>> put sum(x) inside a list above.
>>
>> In practice mapPartitions is most useful if you want to share some data
>> or work across the elements. For example maybe you want to load a lookup
>> table once from an external file and then check each element in it, or sum
>> up a bunch of elements without allocating a lot of vector objects.
>>
>> Matei
>>
>>
>> On Mar 17, 2014, at 11:25 AM, Diana Carroll <dc...@cloudera.com>
>> wrote:
>>
>> > "There's also mapPartitions, which gives you an iterator for each
>> partition instead of an array. You can then return an iterator or list of
>> objects to produce from that."
>> >
>> > I confess, I was hoping for an example of just that, because i've not
>> yet been able to figure out how to use mapPartitions.  No doubt this is
>> because i'm a rank newcomer to Python, and haven't fully wrapped my head
>> around iterators.  All I get so far in my attempts to use mapPartitions is
>> the darned "suchnsuch is not an iterator" error.
>> >
>> > def myfunction(iterator): return [1,2,3]
>> > mydata.mapPartitions(lambda x: myfunction(x)).take(2)
>> >
>> >
>> >
>> >
>> >
>> > On Mon, Mar 17, 2014 at 1:57 PM, Matei Zaharia <ma...@gmail.com>
>> wrote:
>> > Here's an example of getting together all lines in a file as one string:
>> >
>> > $ cat dir/a.txt
>> > Hello
>> > world!
>> >
>> > $ cat dir/b.txt
>> > What's
>> > up??
>> >
>> > $ bin/pyspark
>> > >>> files = sc.textFile("dir")
>> >
>> > >>> files.collect()
>> > [u'Hello', u'world!', u"What's", u'up??']   # one element per line, not
>> what we want
>> >
>> > >>> files.glom().collect()
>> > [[u'Hello', u'world!'], [u"What's", u'up??']]   # one element per file,
>> which is an array of lines
>> >
>> > >>> files.glom().map(lambda a: "\n".join(a)).collect()
>> > [u'Hello\nworld!', u"What's\nup??"]    # join back each file into a
>> single string
>> >
>> > The glom() method groups all the elements of each partition of an RDD
>> into an array, giving you an RDD of arrays of objects. If your input is
>> small files, you always have one partition per file.
>> >
>> > There's also mapPartitions, which gives you an iterator for each
>> partition instead of an array. You can then return an iterator or list of
>> objects to produce from that.
>> >
>> > Matei
>> >
>> >
>> > On Mar 17, 2014, at 10:46 AM, Diana Carroll <dc...@cloudera.com>
>> wrote:
>> >
>> > > Thanks Matei.  That makes sense.  I have here a dataset of many many
>> smallish XML files, so using mapPartitions that way would make sense.  I'd
>> love to see a code example though ...It's not as obvious to me how to do
>> that as I probably should be.
>> > >
>> > > Thanks,
>> > > Diana
>> > >
>> > >
>> > > On Mon, Mar 17, 2014 at 1:02 PM, Matei Zaharia <
>> matei.zaharia@gmail.com> wrote:
>> > > Hi Diana,
>> > >
>> > > Non-text input formats are only supported in Java and Scala right
>> now, where you can use sparkContext.hadoopFile or .hadoopDataset to load
>> data with any InputFormat that Hadoop MapReduce supports. In Python, you
>> unfortunately only have textFile, which gives you one record per line. For
>> JSON, you'd have to fit the whole JSON object on one line as you said.
>> Hopefully we'll also have some other forms of input soon.
>> > >
>> > > If your input is a collection of separate files (say many .xml
>> files), you can also use mapPartitions on it to group together the lines
>> because each input file will end up being a single dataset partition (or
>> map task). This will let you concatenate the lines in each file and parse
>> them as one XML object.
>> > >
>> > > Matei
>> > >
>> > > On Mar 17, 2014, at 9:52 AM, Diana Carroll <dc...@cloudera.com>
>> wrote:
>> > >
>> > >> Thanks, Krakna, very helpful.  The way I read the code, it looks
>> like you are assuming that each line in foo.log contains a complete json
>> object?  (That is, that the data doesn't contain any records that are split
>> into multiple lines.)  If so, is that because you know that to be true of
>> your data?  Or did you do as Nicholas suggests and have some preprocessing
>> on the text input to flatten the data in that way?
>> > >>
>> > >> Thanks,
>> > >> Diana
>> > >>
>> > >>
>> > >> On Mon, Mar 17, 2014 at 12:09 PM, Krakna H <sh...@gmail.com>
>> wrote:
>> > >> Katrina,
>> > >>
>> > >> Not sure if this is what you had in mind, but here's some simple
>> pyspark code that I recently wrote to deal with JSON files.
>> > >>
>> > >> from pyspark import SparkContext, SparkConf
>> > >>
>> > >>
>> > >>
>> > >> from operator import add
>> > >> import json
>> > >>
>> > >>
>> > >>
>> > >> import random
>> > >> import numpy as np
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> def concatenate_paragraphs(sentence_array):
>> > >>
>> > >>
>> > >>
>> > >> return ' '.join(sentence_array).split(' ')
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> logFile = 'foo.json'
>> > >> conf = SparkConf()
>> > >>
>> > >>
>> > >>
>> > >>
>> conf.setMaster("spark://cluster-master:7077").setAppName("example").set("spark.executor.memory",
>> "1g")
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> sc = SparkContext(conf=conf)
>> > >>
>> > >>
>> > >>
>> > >> logData = sc.textFile(logFile).cache()
>> > >>
>> > >>
>> > >>
>> > >> num_lines = logData.count()
>> > >> print 'Number of lines: %d' % num_lines
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> # JSON object has the structure: {"key": {'paragraphs': [sentence1,
>> sentence2, ...]}}
>> > >> tm = logData.map(lambda s: (json.loads(s)['key'],
>> len(concatenate_paragraphs(json.loads(s)['paragraphs']))))
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> tm = tm.reduceByKey(lambda _, x: _ + x)
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> op = tm.collect()
>> > >> for key, num_words in op:
>> > >>
>> > >>
>> > >>
>> > >>      print 'state: %s, num_words: %d' % (state, num_words)
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> On Mon, Mar 17, 2014 at 11:58 AM, Diana Carroll [via Apache Spark
>> User List] <[hidden email]> wrote:
>> > >> I don't actually have any data.  I'm writing a course that teaches
>> students how to do this sort of thing and am interested in looking at a
>> variety of real life examples of people doing things like that.  I'd love
>> to see some working code implementing the "obvious work-around" you
>> mention...do you have any to share?  It's an approach that makes a lot of
>> sense, and as I said, I'd love to not have to re-invent the wheel if
>> someone else has already written that code.  Thanks!
>> > >>
>> > >> Diana
>> > >>
>> > >>
>> > >> On Mon, Mar 17, 2014 at 11:35 AM, Nicholas Chammas <[hidden email]>
>> wrote:
>> > >> There was a previous discussion about this here:
>> > >>
>> > >>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Having-Spark-read-a-JSON-file-td1963.html
>> > >>
>> > >> How big are the XML or JSON files you're looking to deal with?
>> > >>
>> > >> It may not be practical to deserialize the entire document at once.
>> In that case an obvious work-around would be to have some kind of
>> pre-processing step that separates XML nodes/JSON objects with newlines so
>> that you can analyze the data with Spark in a "line-oriented format". Your
>> preprocessor wouldn't have to parse/deserialize the massive document; it
>> would just have to track open/closed tags/braces to know when to insert a
>> newline.
>> > >>
>> > >> Then you'd just open the line-delimited result and deserialize the
>> individual objects/nodes with map().
>> > >>
>> > >> Nick
>> > >>
>> > >>
>> > >> On Mon, Mar 17, 2014 at 11:18 AM, Diana Carroll <[hidden email]>
>> wrote:
>> > >> Has anyone got a working example of a Spark application that
>> analyzes data in a non-line-oriented format, such as XML or JSON?  I'd like
>> to do this without re-inventing the wheel...anyone care to share?  Thanks!
>> > >>
>> > >> Diana
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> If you reply to this email, your message will be added to the
>> discussion below:
>> > >>
>> http://apache-spark-user-list.1001560.n3.nabble.com/example-of-non-line-oriented-input-data-tp2750p2752.html
>> > >> To start a new topic under Apache Spark User List, email [hidden
>> email]
>> > >> To unsubscribe from Apache Spark User List, click here.
>> > >> NAML
>> > >>
>> > >>
>> > >> View this message in context: Re: example of non-line oriented input
>> data?
>> > >> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>> > >>
>> > >
>> > >
>> >
>> >
>>
>>
>

Re: example of non-line oriented input data?

Posted by Diana Carroll <dc...@cloudera.com>.
Thanks, Matei.

In the context of this discussion, it would seem mapParitions is essential,
because it's the only way I'm going to be able to process each file as a
whole, in our example of a large number of small XML files which need to be
parsed as a whole file because records are not required to be on a single
line.

The theory makes sense but I'm still utterly lost as to how to implement
it.  Unfortunately there's only a single example of the use of
mapPartitions in any of the Python example programs, which is the log
regression example, which I can't run because it requires Python 2.7 and
I'm on Python 2.6.  (aside: I couldn't find any statement that Python 2.6
is unsupported...is it?)

I'd really really love to see a real life example of a Python use of
mapPartitions.  I do appreciate the very simple examples you provided, but
(perhaps because of my novice status on Python) I can't figure out how to
translate those to a real world situation in which I'm building RDDs from
files, not inline collections like [(1,2),(2,3)].

Also, you say that the function called in mapPartitions can return a
collection OR an iterator.  I tried returning an iterator by calling
ElementTree getiterator function, but still got the error telling me my
object was not an iterator.

If anyone has a real life example of mapPartitions returning a Python
iterator, that would be fabulous.

Diana


On Mon, Mar 17, 2014 at 6:17 PM, Matei Zaharia <ma...@gmail.com>wrote:

> Oh, I see, the problem is that the function you pass to mapPartitions must
> itself return an iterator or a collection. This is used so that you can
> return multiple output records for each input record. You can implement
> most of the existing map-like operations in Spark, such as map, filter,
> flatMap, etc, with mapPartitions, as well as new ones that might do a
> sliding window over each partition for example, or accumulate data across
> elements (e.g. to compute a sum).
>
> For example, if you have data = sc.parallelize([1, 2, 3, 4], 2), this will
> work:
>
> >>> data.mapPartitions(lambda x: x).collect()
> [1, 2, 3, 4]   # Just return the same iterator, doing nothing
>
> >>> data.mapPartitions(lambda x: [list(x)]).collect()
> [[1, 2], [3, 4]]   # Group together the elements of each partition in a
> single list (like glom)
>
> >>> data.mapPartitions(lambda x: [sum(x)]).collect()
> [3, 7]   # Sum each partition separately
>
> However something like data.mapPartitions(lambda x: sum(x)).collect() will
> *not* work because sum returns a number, not an iterator. That's why I put
> sum(x) inside a list above.
>
> In practice mapPartitions is most useful if you want to share some data or
> work across the elements. For example maybe you want to load a lookup table
> once from an external file and then check each element in it, or sum up a
> bunch of elements without allocating a lot of vector objects.
>
> Matei
>
>
> On Mar 17, 2014, at 11:25 AM, Diana Carroll <dc...@cloudera.com> wrote:
>
> > "There's also mapPartitions, which gives you an iterator for each
> partition instead of an array. You can then return an iterator or list of
> objects to produce from that."
> >
> > I confess, I was hoping for an example of just that, because i've not
> yet been able to figure out how to use mapPartitions.  No doubt this is
> because i'm a rank newcomer to Python, and haven't fully wrapped my head
> around iterators.  All I get so far in my attempts to use mapPartitions is
> the darned "suchnsuch is not an iterator" error.
> >
> > def myfunction(iterator): return [1,2,3]
> > mydata.mapPartitions(lambda x: myfunction(x)).take(2)
> >
> >
> >
> >
> >
> > On Mon, Mar 17, 2014 at 1:57 PM, Matei Zaharia <ma...@gmail.com>
> wrote:
> > Here's an example of getting together all lines in a file as one string:
> >
> > $ cat dir/a.txt
> > Hello
> > world!
> >
> > $ cat dir/b.txt
> > What's
> > up??
> >
> > $ bin/pyspark
> > >>> files = sc.textFile("dir")
> >
> > >>> files.collect()
> > [u'Hello', u'world!', u"What's", u'up??']   # one element per line, not
> what we want
> >
> > >>> files.glom().collect()
> > [[u'Hello', u'world!'], [u"What's", u'up??']]   # one element per file,
> which is an array of lines
> >
> > >>> files.glom().map(lambda a: "\n".join(a)).collect()
> > [u'Hello\nworld!', u"What's\nup??"]    # join back each file into a
> single string
> >
> > The glom() method groups all the elements of each partition of an RDD
> into an array, giving you an RDD of arrays of objects. If your input is
> small files, you always have one partition per file.
> >
> > There's also mapPartitions, which gives you an iterator for each
> partition instead of an array. You can then return an iterator or list of
> objects to produce from that.
> >
> > Matei
> >
> >
> > On Mar 17, 2014, at 10:46 AM, Diana Carroll <dc...@cloudera.com>
> wrote:
> >
> > > Thanks Matei.  That makes sense.  I have here a dataset of many many
> smallish XML files, so using mapPartitions that way would make sense.  I'd
> love to see a code example though ...It's not as obvious to me how to do
> that as I probably should be.
> > >
> > > Thanks,
> > > Diana
> > >
> > >
> > > On Mon, Mar 17, 2014 at 1:02 PM, Matei Zaharia <
> matei.zaharia@gmail.com> wrote:
> > > Hi Diana,
> > >
> > > Non-text input formats are only supported in Java and Scala right now,
> where you can use sparkContext.hadoopFile or .hadoopDataset to load data
> with any InputFormat that Hadoop MapReduce supports. In Python, you
> unfortunately only have textFile, which gives you one record per line. For
> JSON, you'd have to fit the whole JSON object on one line as you said.
> Hopefully we'll also have some other forms of input soon.
> > >
> > > If your input is a collection of separate files (say many .xml files),
> you can also use mapPartitions on it to group together the lines because
> each input file will end up being a single dataset partition (or map task).
> This will let you concatenate the lines in each file and parse them as one
> XML object.
> > >
> > > Matei
> > >
> > > On Mar 17, 2014, at 9:52 AM, Diana Carroll <dc...@cloudera.com>
> wrote:
> > >
> > >> Thanks, Krakna, very helpful.  The way I read the code, it looks like
> you are assuming that each line in foo.log contains a complete json object?
>  (That is, that the data doesn't contain any records that are split into
> multiple lines.)  If so, is that because you know that to be true of your
> data?  Or did you do as Nicholas suggests and have some preprocessing on
> the text input to flatten the data in that way?
> > >>
> > >> Thanks,
> > >> Diana
> > >>
> > >>
> > >> On Mon, Mar 17, 2014 at 12:09 PM, Krakna H <sh...@gmail.com>
> wrote:
> > >> Katrina,
> > >>
> > >> Not sure if this is what you had in mind, but here's some simple
> pyspark code that I recently wrote to deal with JSON files.
> > >>
> > >> from pyspark import SparkContext, SparkConf
> > >>
> > >>
> > >>
> > >> from operator import add
> > >> import json
> > >>
> > >>
> > >>
> > >> import random
> > >> import numpy as np
> > >>
> > >>
> > >>
> > >>
> > >> def concatenate_paragraphs(sentence_array):
> > >>
> > >>
> > >>
> > >> return ' '.join(sentence_array).split(' ')
> > >>
> > >>
> > >>
> > >>
> > >> logFile = 'foo.json'
> > >> conf = SparkConf()
> > >>
> > >>
> > >>
> > >>
> conf.setMaster("spark://cluster-master:7077").setAppName("example").set("spark.executor.memory",
> "1g")
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> sc = SparkContext(conf=conf)
> > >>
> > >>
> > >>
> > >> logData = sc.textFile(logFile).cache()
> > >>
> > >>
> > >>
> > >> num_lines = logData.count()
> > >> print 'Number of lines: %d' % num_lines
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> # JSON object has the structure: {"key": {'paragraphs': [sentence1,
> sentence2, ...]}}
> > >> tm = logData.map(lambda s: (json.loads(s)['key'],
> len(concatenate_paragraphs(json.loads(s)['paragraphs']))))
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> tm = tm.reduceByKey(lambda _, x: _ + x)
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> op = tm.collect()
> > >> for key, num_words in op:
> > >>
> > >>
> > >>
> > >>      print 'state: %s, num_words: %d' % (state, num_words)
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> On Mon, Mar 17, 2014 at 11:58 AM, Diana Carroll [via Apache Spark
> User List] <[hidden email]> wrote:
> > >> I don't actually have any data.  I'm writing a course that teaches
> students how to do this sort of thing and am interested in looking at a
> variety of real life examples of people doing things like that.  I'd love
> to see some working code implementing the "obvious work-around" you
> mention...do you have any to share?  It's an approach that makes a lot of
> sense, and as I said, I'd love to not have to re-invent the wheel if
> someone else has already written that code.  Thanks!
> > >>
> > >> Diana
> > >>
> > >>
> > >> On Mon, Mar 17, 2014 at 11:35 AM, Nicholas Chammas <[hidden email]>
> wrote:
> > >> There was a previous discussion about this here:
> > >>
> > >>
> http://apache-spark-user-list.1001560.n3.nabble.com/Having-Spark-read-a-JSON-file-td1963.html
> > >>
> > >> How big are the XML or JSON files you're looking to deal with?
> > >>
> > >> It may not be practical to deserialize the entire document at once.
> In that case an obvious work-around would be to have some kind of
> pre-processing step that separates XML nodes/JSON objects with newlines so
> that you can analyze the data with Spark in a "line-oriented format". Your
> preprocessor wouldn't have to parse/deserialize the massive document; it
> would just have to track open/closed tags/braces to know when to insert a
> newline.
> > >>
> > >> Then you'd just open the line-delimited result and deserialize the
> individual objects/nodes with map().
> > >>
> > >> Nick
> > >>
> > >>
> > >> On Mon, Mar 17, 2014 at 11:18 AM, Diana Carroll <[hidden email]>
> wrote:
> > >> Has anyone got a working example of a Spark application that analyzes
> data in a non-line-oriented format, such as XML or JSON?  I'd like to do
> this without re-inventing the wheel...anyone care to share?  Thanks!
> > >>
> > >> Diana
> > >>
> > >>
> > >>
> > >>
> > >> If you reply to this email, your message will be added to the
> discussion below:
> > >>
> http://apache-spark-user-list.1001560.n3.nabble.com/example-of-non-line-oriented-input-data-tp2750p2752.html
> > >> To start a new topic under Apache Spark User List, email [hidden
> email]
> > >> To unsubscribe from Apache Spark User List, click here.
> > >> NAML
> > >>
> > >>
> > >> View this message in context: Re: example of non-line oriented input
> data?
> > >> Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
> > >>
> > >
> > >
> >
> >
>
>

Re: example of non-line oriented input data?

Posted by Matei Zaharia <ma...@gmail.com>.
Oh, I see, the problem is that the function you pass to mapPartitions must itself return an iterator or a collection. This is used so that you can return multiple output records for each input record. You can implement most of the existing map-like operations in Spark, such as map, filter, flatMap, etc, with mapPartitions, as well as new ones that might do a sliding window over each partition for example, or accumulate data across elements (e.g. to compute a sum).

For example, if you have data = sc.parallelize([1, 2, 3, 4], 2), this will work:

>>> data.mapPartitions(lambda x: x).collect()
[1, 2, 3, 4]   # Just return the same iterator, doing nothing

>>> data.mapPartitions(lambda x: [list(x)]).collect()
[[1, 2], [3, 4]]   # Group together the elements of each partition in a single list (like glom)

>>> data.mapPartitions(lambda x: [sum(x)]).collect()
[3, 7]   # Sum each partition separately

However something like data.mapPartitions(lambda x: sum(x)).collect() will *not* work because sum returns a number, not an iterator. That’s why I put sum(x) inside a list above.

In practice mapPartitions is most useful if you want to share some data or work across the elements. For example maybe you want to load a lookup table once from an external file and then check each element in it, or sum up a bunch of elements without allocating a lot of vector objects.

Matei


On Mar 17, 2014, at 11:25 AM, Diana Carroll <dc...@cloudera.com> wrote:

> "There’s also mapPartitions, which gives you an iterator for each partition instead of an array. You can then return an iterator or list of objects to produce from that."
> 
> I confess, I was hoping for an example of just that, because i've not yet been able to figure out how to use mapPartitions.  No doubt this is because i'm a rank newcomer to Python, and haven't fully wrapped my head around iterators.  All I get so far in my attempts to use mapPartitions is the darned "suchnsuch is not an iterator" error.   
> 
> def myfunction(iterator): return [1,2,3]
> mydata.mapPartitions(lambda x: myfunction(x)).take(2)
> 
> 
> 
> 
> 
> On Mon, Mar 17, 2014 at 1:57 PM, Matei Zaharia <ma...@gmail.com> wrote:
> Here’s an example of getting together all lines in a file as one string:
> 
> $ cat dir/a.txt
> Hello
> world!
> 
> $ cat dir/b.txt
> What's
> up??
> 
> $ bin/pyspark
> >>> files = sc.textFile(“dir”)
> 
> >>> files.collect()
> [u'Hello', u'world!', u"What's", u'up??’]   # one element per line, not what we want
> 
> >>> files.glom().collect()
> [[u'Hello', u'world!'], [u"What's", u'up??’]]   # one element per file, which is an array of lines
> 
> >>> files.glom().map(lambda a: "\n".join(a)).collect()
> [u'Hello\nworld!', u"What's\nup??”]    # join back each file into a single string
> 
> The glom() method groups all the elements of each partition of an RDD into an array, giving you an RDD of arrays of objects. If your input is small files, you always have one partition per file.
> 
> There’s also mapPartitions, which gives you an iterator for each partition instead of an array. You can then return an iterator or list of objects to produce from that.
> 
> Matei
> 
> 
> On Mar 17, 2014, at 10:46 AM, Diana Carroll <dc...@cloudera.com> wrote:
> 
> > Thanks Matei.  That makes sense.  I have here a dataset of many many smallish XML files, so using mapPartitions that way would make sense.  I'd love to see a code example though ...It's not as obvious to me how to do that as I probably should be.
> >
> > Thanks,
> > Diana
> >
> >
> > On Mon, Mar 17, 2014 at 1:02 PM, Matei Zaharia <ma...@gmail.com> wrote:
> > Hi Diana,
> >
> > Non-text input formats are only supported in Java and Scala right now, where you can use sparkContext.hadoopFile or .hadoopDataset to load data with any InputFormat that Hadoop MapReduce supports. In Python, you unfortunately only have textFile, which gives you one record per line. For JSON, you’d have to fit the whole JSON object on one line as you said. Hopefully we’ll also have some other forms of input soon.
> >
> > If your input is a collection of separate files (say many .xml files), you can also use mapPartitions on it to group together the lines because each input file will end up being a single dataset partition (or map task). This will let you concatenate the lines in each file and parse them as one XML object.
> >
> > Matei
> >
> > On Mar 17, 2014, at 9:52 AM, Diana Carroll <dc...@cloudera.com> wrote:
> >
> >> Thanks, Krakna, very helpful.  The way I read the code, it looks like you are assuming that each line in foo.log contains a complete json object?  (That is, that the data doesn't contain any records that are split into multiple lines.)  If so, is that because you know that to be true of your data?  Or did you do as Nicholas suggests and have some preprocessing on the text input to flatten the data in that way?
> >>
> >> Thanks,
> >> Diana
> >>
> >>
> >> On Mon, Mar 17, 2014 at 12:09 PM, Krakna H <sh...@gmail.com> wrote:
> >> Katrina,
> >>
> >> Not sure if this is what you had in mind, but here's some simple pyspark code that I recently wrote to deal with JSON files.
> >>
> >> from pyspark import SparkContext, SparkConf
> >>
> >>
> >>
> >> from operator import add
> >> import json
> >>
> >>
> >>
> >> import random
> >> import numpy as np
> >>
> >>
> >>
> >>
> >> def concatenate_paragraphs(sentence_array):
> >>
> >>
> >>
> >> return ' '.join(sentence_array).split(' ')
> >>
> >>
> >>
> >>
> >> logFile = 'foo.json'
> >> conf = SparkConf()
> >>
> >>
> >>
> >> conf.setMaster("spark://cluster-master:7077").setAppName("example").set("spark.executor.memory", "1g")
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> sc = SparkContext(conf=conf)
> >>
> >>
> >>
> >> logData = sc.textFile(logFile).cache()
> >>
> >>
> >>
> >> num_lines = logData.count()
> >> print 'Number of lines: %d' % num_lines
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> # JSON object has the structure: {"key": {'paragraphs': [sentence1, sentence2, ...]}}
> >> tm = logData.map(lambda s: (json.loads(s)['key'], len(concatenate_paragraphs(json.loads(s)['paragraphs']))))
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> tm = tm.reduceByKey(lambda _, x: _ + x)
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> op = tm.collect()
> >> for key, num_words in op:
> >>
> >>
> >>
> >>      print 'state: %s, num_words: %d' % (state, num_words)
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> On Mon, Mar 17, 2014 at 11:58 AM, Diana Carroll [via Apache Spark User List] <[hidden email]> wrote:
> >> I don't actually have any data.  I'm writing a course that teaches students how to do this sort of thing and am interested in looking at a variety of real life examples of people doing things like that.  I'd love to see some working code implementing the "obvious work-around" you mention...do you have any to share?  It's an approach that makes a lot of sense, and as I said, I'd love to not have to re-invent the wheel if someone else has already written that code.  Thanks!
> >>
> >> Diana
> >>
> >>
> >> On Mon, Mar 17, 2014 at 11:35 AM, Nicholas Chammas <[hidden email]> wrote:
> >> There was a previous discussion about this here:
> >>
> >> http://apache-spark-user-list.1001560.n3.nabble.com/Having-Spark-read-a-JSON-file-td1963.html
> >>
> >> How big are the XML or JSON files you're looking to deal with?
> >>
> >> It may not be practical to deserialize the entire document at once. In that case an obvious work-around would be to have some kind of pre-processing step that separates XML nodes/JSON objects with newlines so that you can analyze the data with Spark in a "line-oriented format". Your preprocessor wouldn't have to parse/deserialize the massive document; it would just have to track open/closed tags/braces to know when to insert a newline.
> >>
> >> Then you'd just open the line-delimited result and deserialize the individual objects/nodes with map().
> >>
> >> Nick
> >>
> >>
> >> On Mon, Mar 17, 2014 at 11:18 AM, Diana Carroll <[hidden email]> wrote:
> >> Has anyone got a working example of a Spark application that analyzes data in a non-line-oriented format, such as XML or JSON?  I'd like to do this without re-inventing the wheel...anyone care to share?  Thanks!
> >>
> >> Diana
> >>
> >>
> >>
> >>
> >> If you reply to this email, your message will be added to the discussion below:
> >> http://apache-spark-user-list.1001560.n3.nabble.com/example-of-non-line-oriented-input-data-tp2750p2752.html
> >> To start a new topic under Apache Spark User List, email [hidden email]
> >> To unsubscribe from Apache Spark User List, click here.
> >> NAML
> >>
> >>
> >> View this message in context: Re: example of non-line oriented input data?
> >> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >>
> >
> >
> 
> 


Re: example of non-line oriented input data?

Posted by Diana Carroll <dc...@cloudera.com>.
"There's also mapPartitions, which gives you an iterator for each partition
instead of an array. You can then return an iterator or list of objects to
produce from that."

I confess, I was hoping for an example of just that, because i've not yet
been able to figure out how to use mapPartitions.  No doubt this is because
i'm a rank newcomer to Python, and haven't fully wrapped my head around
iterators.  All I get so far in my attempts to use mapPartitions is the
darned "suchnsuch is not an iterator" error.

def myfunction(iterator): return [1,2,3]
mydata.mapPartitions(lambda x: myfunction(x)).take(2)





On Mon, Mar 17, 2014 at 1:57 PM, Matei Zaharia <ma...@gmail.com>wrote:

> Here's an example of getting together all lines in a file as one string:
>
> $ cat dir/a.txt
> Hello
> world!
>
> $ cat dir/b.txt
> What's
> up??
>
> $ bin/pyspark
> >>> files = sc.textFile("dir")
>
> >>> files.collect()
> [u'Hello', u'world!', u"What's", u'up??']   # one element per line, not
> what we want
>
> >>> files.glom().collect()
> [[u'Hello', u'world!'], [u"What's", u'up??']]   # one element per file,
> which is an array of lines
>
> >>> files.glom().map(lambda a: "\n".join(a)).collect()
> [u'Hello\nworld!', u"What's\nup??"]    # join back each file into a single
> string
>
> The glom() method groups all the elements of each partition of an RDD into
> an array, giving you an RDD of arrays of objects. If your input is small
> files, you always have one partition per file.
>
> There's also mapPartitions, which gives you an iterator for each partition
> instead of an array. You can then return an iterator or list of objects to
> produce from that.
>
> Matei
>
>
> On Mar 17, 2014, at 10:46 AM, Diana Carroll <dc...@cloudera.com> wrote:
>
> > Thanks Matei.  That makes sense.  I have here a dataset of many many
> smallish XML files, so using mapPartitions that way would make sense.  I'd
> love to see a code example though ...It's not as obvious to me how to do
> that as I probably should be.
> >
> > Thanks,
> > Diana
> >
> >
> > On Mon, Mar 17, 2014 at 1:02 PM, Matei Zaharia <ma...@gmail.com>
> wrote:
> > Hi Diana,
> >
> > Non-text input formats are only supported in Java and Scala right now,
> where you can use sparkContext.hadoopFile or .hadoopDataset to load data
> with any InputFormat that Hadoop MapReduce supports. In Python, you
> unfortunately only have textFile, which gives you one record per line. For
> JSON, you'd have to fit the whole JSON object on one line as you said.
> Hopefully we'll also have some other forms of input soon.
> >
> > If your input is a collection of separate files (say many .xml files),
> you can also use mapPartitions on it to group together the lines because
> each input file will end up being a single dataset partition (or map task).
> This will let you concatenate the lines in each file and parse them as one
> XML object.
> >
> > Matei
> >
> > On Mar 17, 2014, at 9:52 AM, Diana Carroll <dc...@cloudera.com>
> wrote:
> >
> >> Thanks, Krakna, very helpful.  The way I read the code, it looks like
> you are assuming that each line in foo.log contains a complete json object?
>  (That is, that the data doesn't contain any records that are split into
> multiple lines.)  If so, is that because you know that to be true of your
> data?  Or did you do as Nicholas suggests and have some preprocessing on
> the text input to flatten the data in that way?
> >>
> >> Thanks,
> >> Diana
> >>
> >>
> >> On Mon, Mar 17, 2014 at 12:09 PM, Krakna H <sh...@gmail.com>
> wrote:
> >> Katrina,
> >>
> >> Not sure if this is what you had in mind, but here's some simple
> pyspark code that I recently wrote to deal with JSON files.
> >>
> >> from pyspark import SparkContext, SparkConf
> >>
> >>
> >>
> >> from operator import add
> >> import json
> >>
> >>
> >>
> >> import random
> >> import numpy as np
> >>
> >>
> >>
> >>
> >> def concatenate_paragraphs(sentence_array):
> >>
> >>
> >>
> >> return ' '.join(sentence_array).split(' ')
> >>
> >>
> >>
> >>
> >> logFile = 'foo.json'
> >> conf = SparkConf()
> >>
> >>
> >>
> >>
> conf.setMaster("spark://cluster-master:7077").setAppName("example").set("spark.executor.memory",
> "1g")
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> sc = SparkContext(conf=conf)
> >>
> >>
> >>
> >> logData = sc.textFile(logFile).cache()
> >>
> >>
> >>
> >> num_lines = logData.count()
> >> print 'Number of lines: %d' % num_lines
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> # JSON object has the structure: {"key": {'paragraphs': [sentence1,
> sentence2, ...]}}
> >> tm = logData.map(lambda s: (json.loads(s)['key'],
> len(concatenate_paragraphs(json.loads(s)['paragraphs']))))
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> tm = tm.reduceByKey(lambda _, x: _ + x)
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> op = tm.collect()
> >> for key, num_words in op:
> >>
> >>
> >>
> >>      print 'state: %s, num_words: %d' % (state, num_words)
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> On Mon, Mar 17, 2014 at 11:58 AM, Diana Carroll [via Apache Spark User
> List] <[hidden email]> wrote:
> >> I don't actually have any data.  I'm writing a course that teaches
> students how to do this sort of thing and am interested in looking at a
> variety of real life examples of people doing things like that.  I'd love
> to see some working code implementing the "obvious work-around" you
> mention...do you have any to share?  It's an approach that makes a lot of
> sense, and as I said, I'd love to not have to re-invent the wheel if
> someone else has already written that code.  Thanks!
> >>
> >> Diana
> >>
> >>
> >> On Mon, Mar 17, 2014 at 11:35 AM, Nicholas Chammas <[hidden email]>
> wrote:
> >> There was a previous discussion about this here:
> >>
> >>
> http://apache-spark-user-list.1001560.n3.nabble.com/Having-Spark-read-a-JSON-file-td1963.html
> >>
> >> How big are the XML or JSON files you're looking to deal with?
> >>
> >> It may not be practical to deserialize the entire document at once. In
> that case an obvious work-around would be to have some kind of
> pre-processing step that separates XML nodes/JSON objects with newlines so
> that you can analyze the data with Spark in a "line-oriented format". Your
> preprocessor wouldn't have to parse/deserialize the massive document; it
> would just have to track open/closed tags/braces to know when to insert a
> newline.
> >>
> >> Then you'd just open the line-delimited result and deserialize the
> individual objects/nodes with map().
> >>
> >> Nick
> >>
> >>
> >> On Mon, Mar 17, 2014 at 11:18 AM, Diana Carroll <[hidden email]> wrote:
> >> Has anyone got a working example of a Spark application that analyzes
> data in a non-line-oriented format, such as XML or JSON?  I'd like to do
> this without re-inventing the wheel...anyone care to share?  Thanks!
> >>
> >> Diana
> >>
> >>
> >>
> >>
> >> If you reply to this email, your message will be added to the
> discussion below:
> >>
> http://apache-spark-user-list.1001560.n3.nabble.com/example-of-non-line-oriented-input-data-tp2750p2752.html
> >> To start a new topic under Apache Spark User List, email [hidden email]
> >> To unsubscribe from Apache Spark User List, click here.
> >> NAML
> >>
> >>
> >> View this message in context: Re: example of non-line oriented input
> data?
> >> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >>
> >
> >
>
>

Re: example of non-line oriented input data?

Posted by Matei Zaharia <ma...@gmail.com>.
Here’s an example of getting together all lines in a file as one string:

$ cat dir/a.txt 
Hello
world!

$ cat dir/b.txt 
What's
up??

$ bin/pyspark
>>> files = sc.textFile(“dir”)

>>> files.collect()
[u'Hello', u'world!', u"What's", u'up??’]   # one element per line, not what we want

>>> files.glom().collect()
[[u'Hello', u'world!'], [u"What's", u'up??’]]   # one element per file, which is an array of lines

>>> files.glom().map(lambda a: "\n".join(a)).collect()
[u'Hello\nworld!', u"What's\nup??”]    # join back each file into a single string

The glom() method groups all the elements of each partition of an RDD into an array, giving you an RDD of arrays of objects. If your input is small files, you always have one partition per file.

There’s also mapPartitions, which gives you an iterator for each partition instead of an array. You can then return an iterator or list of objects to produce from that.

Matei


On Mar 17, 2014, at 10:46 AM, Diana Carroll <dc...@cloudera.com> wrote:

> Thanks Matei.  That makes sense.  I have here a dataset of many many smallish XML files, so using mapPartitions that way would make sense.  I'd love to see a code example though ...It's not as obvious to me how to do that as I probably should be. 
> 
> Thanks,
> Diana
> 
> 
> On Mon, Mar 17, 2014 at 1:02 PM, Matei Zaharia <ma...@gmail.com> wrote:
> Hi Diana,
> 
> Non-text input formats are only supported in Java and Scala right now, where you can use sparkContext.hadoopFile or .hadoopDataset to load data with any InputFormat that Hadoop MapReduce supports. In Python, you unfortunately only have textFile, which gives you one record per line. For JSON, you’d have to fit the whole JSON object on one line as you said. Hopefully we’ll also have some other forms of input soon.
> 
> If your input is a collection of separate files (say many .xml files), you can also use mapPartitions on it to group together the lines because each input file will end up being a single dataset partition (or map task). This will let you concatenate the lines in each file and parse them as one XML object.
> 
> Matei
> 
> On Mar 17, 2014, at 9:52 AM, Diana Carroll <dc...@cloudera.com> wrote:
> 
>> Thanks, Krakna, very helpful.  The way I read the code, it looks like you are assuming that each line in foo.log contains a complete json object?  (That is, that the data doesn't contain any records that are split into multiple lines.)  If so, is that because you know that to be true of your data?  Or did you do as Nicholas suggests and have some preprocessing on the text input to flatten the data in that way?
>> 
>> Thanks,
>> Diana
>> 
>> 
>> On Mon, Mar 17, 2014 at 12:09 PM, Krakna H <sh...@gmail.com> wrote:
>> Katrina, 
>> 
>> Not sure if this is what you had in mind, but here's some simple pyspark code that I recently wrote to deal with JSON files.
>> 
>> from pyspark import SparkContext, SparkConf
>> 
>> 
>> 
>> from operator import add
>> import json
>> 
>> 
>> 
>> import random
>> import numpy as np
>> 
>> 
>> 
>> 
>> def concatenate_paragraphs(sentence_array):
>> 
>> 
>> 	
>> return ' '.join(sentence_array).split(' ')
>> 
>> 
>> 
>> 
>> logFile = 'foo.json'
>> conf = SparkConf()
>> 
>> 
>> 
>> conf.setMaster("spark://cluster-master:7077").setAppName("example").set("spark.executor.memory", "1g")
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> sc = SparkContext(conf=conf)
>> 
>> 
>> 
>> logData = sc.textFile(logFile).cache()
>> 
>> 
>> 
>> num_lines = logData.count()
>> print 'Number of lines: %d' % num_lines
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> # JSON object has the structure: {"key": {'paragraphs': [sentence1, sentence2, ...]}}
>> tm = logData.map(lambda s: (json.loads(s)['key'], len(concatenate_paragraphs(json.loads(s)['paragraphs']))))
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> tm = tm.reduceByKey(lambda _, x: _ + x)
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> op = tm.collect()
>> for key, num_words in op:
>> 
>> 
>> 
>> 	print 'state: %s, num_words: %d' % (state, num_words)
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> On Mon, Mar 17, 2014 at 11:58 AM, Diana Carroll [via Apache Spark User List] <[hidden email]> wrote:
>> I don't actually have any data.  I'm writing a course that teaches students how to do this sort of thing and am interested in looking at a variety of real life examples of people doing things like that.  I'd love to see some working code implementing the "obvious work-around" you mention...do you have any to share?  It's an approach that makes a lot of sense, and as I said, I'd love to not have to re-invent the wheel if someone else has already written that code.  Thanks!
>> 
>> Diana
>> 
>> 
>> On Mon, Mar 17, 2014 at 11:35 AM, Nicholas Chammas <[hidden email]> wrote:
>> There was a previous discussion about this here:
>> 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Having-Spark-read-a-JSON-file-td1963.html
>> 
>> How big are the XML or JSON files you're looking to deal with? 
>> 
>> It may not be practical to deserialize the entire document at once. In that case an obvious work-around would be to have some kind of pre-processing step that separates XML nodes/JSON objects with newlines so that you can analyze the data with Spark in a "line-oriented format". Your preprocessor wouldn't have to parse/deserialize the massive document; it would just have to track open/closed tags/braces to know when to insert a newline.
>> 
>> Then you'd just open the line-delimited result and deserialize the individual objects/nodes with map().
>> 
>> Nick
>> 
>> 
>> On Mon, Mar 17, 2014 at 11:18 AM, Diana Carroll <[hidden email]> wrote:
>> Has anyone got a working example of a Spark application that analyzes data in a non-line-oriented format, such as XML or JSON?  I'd like to do this without re-inventing the wheel...anyone care to share?  Thanks!
>> 
>> Diana
>> 
>> 
>> 
>> 
>> If you reply to this email, your message will be added to the discussion below:
>> http://apache-spark-user-list.1001560.n3.nabble.com/example-of-non-line-oriented-input-data-tp2750p2752.html
>> To start a new topic under Apache Spark User List, email [hidden email] 
>> To unsubscribe from Apache Spark User List, click here.
>> NAML
>> 
>> 
>> View this message in context: Re: example of non-line oriented input data?
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> 
> 
> 


Re: example of non-line oriented input data?

Posted by Diana Carroll <dc...@cloudera.com>.
Thanks Matei.  That makes sense.  I have here a dataset of many many
smallish XML files, so using mapPartitions that way would make sense.  I'd
love to see a code example though ...It's not as obvious to me how to do
that as I probably should be.

Thanks,
Diana


On Mon, Mar 17, 2014 at 1:02 PM, Matei Zaharia <ma...@gmail.com>wrote:

> Hi Diana,
>
> Non-text input formats are only supported in Java and Scala right now,
> where you can use sparkContext.hadoopFile or .hadoopDataset to load data
> with any InputFormat that Hadoop MapReduce supports. In Python, you
> unfortunately only have textFile, which gives you one record per line. For
> JSON, you'd have to fit the whole JSON object on one line as you said.
> Hopefully we'll also have some other forms of input soon.
>
> If your input is a collection of separate files (say many .xml files), you
> can also use mapPartitions on it to group together the lines because each
> input file will end up being a single dataset partition (or map task). This
> will let you concatenate the lines in each file and parse them as one XML
> object.
>
> Matei
>
> On Mar 17, 2014, at 9:52 AM, Diana Carroll <dc...@cloudera.com> wrote:
>
> Thanks, Krakna, very helpful.  The way I read the code, it looks like you
> are assuming that each line in foo.log contains a complete json object?
>  (That is, that the data doesn't contain any records that are split into
> multiple lines.)  If so, is that because you know that to be true of your
> data?  Or did you do as Nicholas suggests and have some preprocessing on
> the text input to flatten the data in that way?
>
> Thanks,
> Diana
>
>
> On Mon, Mar 17, 2014 at 12:09 PM, Krakna H <sh...@gmail.com> wrote:
>
>> Katrina,
>>
>> Not sure if this is what you had in mind, but here's some simple pyspark
>> code that I recently wrote to deal with JSON files.
>>
>> from pyspark import SparkContext, SparkConf
>>
>> from operator import add
>> import json
>>
>> import random
>> import numpy as np
>>
>>
>> def concatenate_paragraphs(sentence_array):
>>
>>
>> 	return ' '.join(sentence_array).split(' ')
>>
>>
>> logFile = 'foo.json'
>> conf = SparkConf()
>>
>> conf.setMaster("spark://cluster-master:7077").setAppName("example").set("spark.executor.memory", "1g")
>>
>>
>>
>>
>> sc = SparkContext(conf=conf)
>>
>> logData = sc.textFile(logFile).cache()
>>
>> num_lines = logData.count()
>> print 'Number of lines: %d' % num_lines
>>
>>
>>
>>
>> # JSON object has the structure: {"key": {'paragraphs': [sentence1, sentence2, ...]}}
>> tm = logData.map(lambda s: (json.loads(s)['key'], len(concatenate_paragraphs(json.loads(s)['paragraphs']))))
>>
>>
>>
>>
>> tm = tm.reduceByKey(lambda _, x: _ + x)
>>
>>
>>
>>
>> op = tm.collect()
>> for key, num_words in op:
>>
>> 	print 'state: %s, num_words: %d' % (state, num_words)
>>
>>
>>
>>
>>
>>
>>
>>
>> On Mon, Mar 17, 2014 at 11:58 AM, Diana Carroll [via Apache Spark User
>> List] <[hidden email] <http://user/SendEmail.jtp?type=node&node=2754&i=0>
>> > wrote:
>>
>>> I don't actually have any data.  I'm writing a course that teaches
>>> students how to do this sort of thing and am interested in looking at a
>>> variety of real life examples of people doing things like that.  I'd love
>>> to see some working code implementing the "obvious work-around" you
>>> mention...do you have any to share?  It's an approach that makes a lot of
>>> sense, and as I said, I'd love to not have to re-invent the wheel if
>>> someone else has already written that code.  Thanks!
>>>
>>> Diana
>>>
>>>
>>> On Mon, Mar 17, 2014 at 11:35 AM, Nicholas Chammas <[hidden email]<http://user/SendEmail.jtp?type=node&node=2752&i=0>
>>> > wrote:
>>>
>>>> There was a previous discussion about this here:
>>>>
>>>>
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Having-Spark-read-a-JSON-file-td1963.html
>>>>
>>>> How big are the XML or JSON files you're looking to deal with?
>>>>
>>>> It may not be practical to deserialize the entire document at once. In
>>>> that case an obvious work-around would be to have some kind of
>>>> pre-processing step that separates XML nodes/JSON objects with newlines so
>>>> that you *can* analyze the data with Spark in a "line-oriented
>>>> format". Your preprocessor wouldn't have to parse/deserialize the massive
>>>> document; it would just have to track open/closed tags/braces to know when
>>>> to insert a newline.
>>>>
>>>> Then you'd just open the line-delimited result and deserialize the
>>>> individual objects/nodes with map().
>>>>
>>>> Nick
>>>>
>>>>
>>>> On Mon, Mar 17, 2014 at 11:18 AM, Diana Carroll <[hidden email]<http://user/SendEmail.jtp?type=node&node=2752&i=1>
>>>> > wrote:
>>>>
>>>>> Has anyone got a working example of a Spark application that analyzes
>>>>> data in a non-line-oriented format, such as XML or JSON?  I'd like to do
>>>>> this without re-inventing the wheel...anyone care to share?  Thanks!
>>>>>
>>>>> Diana
>>>>>
>>>>
>>>>
>>>
>>>
>>> ------------------------------
>>>  If you reply to this email, your message will be added to the
>>> discussion below:
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/example-of-non-line-oriented-input-data-tp2750p2752.html
>>>  To start a new topic under Apache Spark User List, email [hidden email]<http://user/SendEmail.jtp?type=node&node=2754&i=1>
>>> To unsubscribe from Apache Spark User List, click here.
>>> NAML<http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>
>>
>>
>> ------------------------------
>> View this message in context: Re: example of non-line oriented input
>> data?<http://apache-spark-user-list.1001560.n3.nabble.com/example-of-non-line-oriented-input-data-tp2750p2754.html>
>> Sent from the Apache Spark User List mailing list archive<http://apache-spark-user-list.1001560.n3.nabble.com/>at
>> Nabble.com.
>>
>
>
>

Re: example of non-line oriented input data?

Posted by Matei Zaharia <ma...@gmail.com>.
Hi Diana,

Non-text input formats are only supported in Java and Scala right now, where you can use sparkContext.hadoopFile or .hadoopDataset to load data with any InputFormat that Hadoop MapReduce supports. In Python, you unfortunately only have textFile, which gives you one record per line. For JSON, you’d have to fit the whole JSON object on one line as you said. Hopefully we’ll also have some other forms of input soon.

If your input is a collection of separate files (say many .xml files), you can also use mapPartitions on it to group together the lines because each input file will end up being a single dataset partition (or map task). This will let you concatenate the lines in each file and parse them as one XML object.

Matei

On Mar 17, 2014, at 9:52 AM, Diana Carroll <dc...@cloudera.com> wrote:

> Thanks, Krakna, very helpful.  The way I read the code, it looks like you are assuming that each line in foo.log contains a complete json object?  (That is, that the data doesn't contain any records that are split into multiple lines.)  If so, is that because you know that to be true of your data?  Or did you do as Nicholas suggests and have some preprocessing on the text input to flatten the data in that way?
> 
> Thanks,
> Diana
> 
> 
> On Mon, Mar 17, 2014 at 12:09 PM, Krakna H <sh...@gmail.com> wrote:
> Katrina, 
> 
> Not sure if this is what you had in mind, but here's some simple pyspark code that I recently wrote to deal with JSON files.
> 
> from pyspark import SparkContext, SparkConf
> 
> 
> from operator import add
> import json
> 
> 
> import random
> import numpy as np
> 
> 
> 
> def concatenate_paragraphs(sentence_array):
> 
> 
> 	return ' '.join(sentence_array).split(' ')
> 
> 
> 
> logFile = 'foo.json'
> conf = SparkConf()
> 
> 
> conf.setMaster("spark://cluster-master:7077").setAppName("example").set("spark.executor.memory", "1g")
> 
> 
> 
> 
> sc = SparkContext(conf=conf)
> 
> 
> logData = sc.textFile(logFile).cache()
> 
> 
> num_lines = logData.count()
> print 'Number of lines: %d' % num_lines
> 
> 
> 
> 
> # JSON object has the structure: {"key": {'paragraphs': [sentence1, sentence2, ...]}}
> tm = logData.map(lambda s: (json.loads(s)['key'], len(concatenate_paragraphs(json.loads(s)['paragraphs']))))
> 
> 
> 
> 
> tm = tm.reduceByKey(lambda _, x: _ + x)
> 
> 
> 
> 
> op = tm.collect()
> for key, num_words in op:
> 
> 
> 	print 'state: %s, num_words: %d' % (state, num_words)
> 
> 
> 
> 
> 
> 
> 
> 
> On Mon, Mar 17, 2014 at 11:58 AM, Diana Carroll [via Apache Spark User List] <[hidden email]> wrote:
> I don't actually have any data.  I'm writing a course that teaches students how to do this sort of thing and am interested in looking at a variety of real life examples of people doing things like that.  I'd love to see some working code implementing the "obvious work-around" you mention...do you have any to share?  It's an approach that makes a lot of sense, and as I said, I'd love to not have to re-invent the wheel if someone else has already written that code.  Thanks!
> 
> Diana
> 
> 
> On Mon, Mar 17, 2014 at 11:35 AM, Nicholas Chammas <[hidden email]> wrote:
> There was a previous discussion about this here:
> 
> http://apache-spark-user-list.1001560.n3.nabble.com/Having-Spark-read-a-JSON-file-td1963.html
> 
> How big are the XML or JSON files you're looking to deal with? 
> 
> It may not be practical to deserialize the entire document at once. In that case an obvious work-around would be to have some kind of pre-processing step that separates XML nodes/JSON objects with newlines so that you can analyze the data with Spark in a "line-oriented format". Your preprocessor wouldn't have to parse/deserialize the massive document; it would just have to track open/closed tags/braces to know when to insert a newline.
> 
> Then you'd just open the line-delimited result and deserialize the individual objects/nodes with map().
> 
> Nick
> 
> 
> On Mon, Mar 17, 2014 at 11:18 AM, Diana Carroll <[hidden email]> wrote:
> Has anyone got a working example of a Spark application that analyzes data in a non-line-oriented format, such as XML or JSON?  I'd like to do this without re-inventing the wheel...anyone care to share?  Thanks!
> 
> Diana
> 
> 
> 
> 
> If you reply to this email, your message will be added to the discussion below:
> http://apache-spark-user-list.1001560.n3.nabble.com/example-of-non-line-oriented-input-data-tp2750p2752.html
> To start a new topic under Apache Spark User List, email [hidden email] 
> To unsubscribe from Apache Spark User List, click here.
> NAML
> 
> 
> View this message in context: Re: example of non-line oriented input data?
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 


Re: example of non-line oriented input data?

Posted by Krakna H <sh...@gmail.com>.
Diana, that's correct (and I apologize for calling you Katrina mistakenly
in my earlier e-mail) -- I had to do some kind of pre-processing to split
up the original JSON object, although this is not that hard. Especially if
your JSON data is coming from something like Mongodb where you can just
spew out newline-separated data for each item in the dataset.


On Mon, Mar 17, 2014 at 12:53 PM, Diana Carroll [via Apache Spark User
List] <ml...@n3.nabble.com> wrote:

> Thanks, Krakna, very helpful.  The way I read the code, it looks like you
> are assuming that each line in foo.log contains a complete json object?
>  (That is, that the data doesn't contain any records that are split into
> multiple lines.)  If so, is that because you know that to be true of your
> data?  Or did you do as Nicholas suggests and have some preprocessing on
> the text input to flatten the data in that way?
>
> Thanks,
> Diana
>
>
> On Mon, Mar 17, 2014 at 12:09 PM, Krakna H <[hidden email]<http://user/SendEmail.jtp?type=node&node=2758&i=0>
> > wrote:
>
>> Katrina,
>>
>> Not sure if this is what you had in mind, but here's some simple pyspark
>> code that I recently wrote to deal with JSON files.
>>
>> from pyspark import SparkContext, SparkConf
>>
>> from operator import add
>> import json
>>
>> import random
>> import numpy as np
>>
>>
>> def concatenate_paragraphs(sentence_array):
>>
>>
>> 	return ' '.join(sentence_array).split(' ')
>>
>>
>> logFile = 'foo.json'
>> conf = SparkConf()
>>
>> conf.setMaster("spark://cluster-master:7077").setAppName("example").set("spark.executor.memory", "1g")
>>
>>
>>
>>
>> sc = SparkContext(conf=conf)
>>
>> logData = sc.textFile(logFile).cache()
>>
>> num_lines = logData.count()
>> print 'Number of lines: %d' % num_lines
>>
>>
>>
>>
>> # JSON object has the structure: {"key": {'paragraphs': [sentence1, sentence2, ...]}}
>> tm = logData.map(lambda s: (json.loads(s)['key'], len(concatenate_paragraphs(json.loads(s)['paragraphs']))))
>>
>>
>>
>>
>> tm = tm.reduceByKey(lambda _, x: _ + x)
>>
>>
>>
>>
>> op = tm.collect()
>> for key, num_words in op:
>>
>> 	print 'state: %s, num_words: %d' % (state, num_words)
>>
>>
>>
>>
>>
>>
>>
>>
>> On Mon, Mar 17, 2014 at 11:58 AM, Diana Carroll [via Apache Spark User
>> List] <[hidden email] <http://user/SendEmail.jtp?type=node&node=2754&i=0>
>> > wrote:
>>
>>> I don't actually have any data.  I'm writing a course that teaches
>>> students how to do this sort of thing and am interested in looking at a
>>> variety of real life examples of people doing things like that.  I'd love
>>> to see some working code implementing the "obvious work-around" you
>>> mention...do you have any to share?  It's an approach that makes a lot of
>>> sense, and as I said, I'd love to not have to re-invent the wheel if
>>> someone else has already written that code.  Thanks!
>>>
>>> Diana
>>>
>>>
>>> On Mon, Mar 17, 2014 at 11:35 AM, Nicholas Chammas <[hidden email]<http://user/SendEmail.jtp?type=node&node=2752&i=0>
>>> > wrote:
>>>
>>>> There was a previous discussion about this here:
>>>>
>>>>
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Having-Spark-read-a-JSON-file-td1963.html
>>>>
>>>> How big are the XML or JSON files you're looking to deal with?
>>>>
>>>> It may not be practical to deserialize the entire document at once. In
>>>> that case an obvious work-around would be to have some kind of
>>>> pre-processing step that separates XML nodes/JSON objects with newlines so
>>>> that you *can* analyze the data with Spark in a "line-oriented
>>>> format". Your preprocessor wouldn't have to parse/deserialize the massive
>>>> document; it would just have to track open/closed tags/braces to know when
>>>> to insert a newline.
>>>>
>>>> Then you'd just open the line-delimited result and deserialize the
>>>> individual objects/nodes with map().
>>>>
>>>> Nick
>>>>
>>>>
>>>> On Mon, Mar 17, 2014 at 11:18 AM, Diana Carroll <[hidden email]<http://user/SendEmail.jtp?type=node&node=2752&i=1>
>>>> > wrote:
>>>>
>>>>> Has anyone got a working example of a Spark application that analyzes
>>>>> data in a non-line-oriented format, such as XML or JSON?  I'd like to do
>>>>> this without re-inventing the wheel...anyone care to share?  Thanks!
>>>>>
>>>>> Diana
>>>>>
>>>>
>>>>
>>>
>>>
>>> ------------------------------
>>>  If you reply to this email, your message will be added to the
>>> discussion below:
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/example-of-non-line-oriented-input-data-tp2750p2752.html
>>>  To start a new topic under Apache Spark User List, email [hidden email]<http://user/SendEmail.jtp?type=node&node=2754&i=1>
>>> To unsubscribe from Apache Spark User List, click here.
>>> NAML<http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>
>>
>>
>> ------------------------------
>> View this message in context: Re: example of non-line oriented input
>> data?<http://apache-spark-user-list.1001560.n3.nabble.com/example-of-non-line-oriented-input-data-tp2750p2754.html>
>> Sent from the Apache Spark User List mailing list archive<http://apache-spark-user-list.1001560.n3.nabble.com/>at Nabble.com.
>>
>
>
>
> ------------------------------
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/example-of-non-line-oriented-input-data-tp2750p2758.html
>  To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1h34@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here<http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=c2hhbmthcmsrc3lzQGdtYWlsLmNvbXwxfDk3NjU5Mzg0>
> .
> NAML<http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/example-of-non-line-oriented-input-data-tp2750p2761.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: example of non-line oriented input data?

Posted by Diana Carroll <dc...@cloudera.com>.
Thanks, Krakna, very helpful.  The way I read the code, it looks like you
are assuming that each line in foo.log contains a complete json object?
 (That is, that the data doesn't contain any records that are split into
multiple lines.)  If so, is that because you know that to be true of your
data?  Or did you do as Nicholas suggests and have some preprocessing on
the text input to flatten the data in that way?

Thanks,
Diana


On Mon, Mar 17, 2014 at 12:09 PM, Krakna H <sh...@gmail.com> wrote:

> Katrina,
>
> Not sure if this is what you had in mind, but here's some simple pyspark
> code that I recently wrote to deal with JSON files.
>
> from pyspark import SparkContext, SparkConf
>
> from operator import add
> import json
>
> import random
> import numpy as np
>
>
> def concatenate_paragraphs(sentence_array):
>
>
> 	return ' '.join(sentence_array).split(' ')
>
>
> logFile = 'foo.json'
> conf = SparkConf()
>
> conf.setMaster("spark://cluster-master:7077").setAppName("example").set("spark.executor.memory", "1g")
>
>
> sc = SparkContext(conf=conf)
>
> logData = sc.textFile(logFile).cache()
>
> num_lines = logData.count()
> print 'Number of lines: %d' % num_lines
>
>
> # JSON object has the structure: {"key": {'paragraphs': [sentence1, sentence2, ...]}}
> tm = logData.map(lambda s: (json.loads(s)['key'], len(concatenate_paragraphs(json.loads(s)['paragraphs']))))
>
>
> tm = tm.reduceByKey(lambda _, x: _ + x)
>
>
> op = tm.collect()
> for key, num_words in op:
>
> 	print 'state: %s, num_words: %d' % (state, num_words)
>
>
>
>
>
>
> On Mon, Mar 17, 2014 at 11:58 AM, Diana Carroll [via Apache Spark User
> List] <[hidden email] <http://user/SendEmail.jtp?type=node&node=2754&i=0>>wrote:
>
>> I don't actually have any data.  I'm writing a course that teaches
>> students how to do this sort of thing and am interested in looking at a
>> variety of real life examples of people doing things like that.  I'd love
>> to see some working code implementing the "obvious work-around" you
>> mention...do you have any to share?  It's an approach that makes a lot of
>> sense, and as I said, I'd love to not have to re-invent the wheel if
>> someone else has already written that code.  Thanks!
>>
>> Diana
>>
>>
>> On Mon, Mar 17, 2014 at 11:35 AM, Nicholas Chammas <[hidden email]<http://user/SendEmail.jtp?type=node&node=2752&i=0>
>> > wrote:
>>
>>> There was a previous discussion about this here:
>>>
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Having-Spark-read-a-JSON-file-td1963.html
>>>
>>> How big are the XML or JSON files you're looking to deal with?
>>>
>>> It may not be practical to deserialize the entire document at once. In
>>> that case an obvious work-around would be to have some kind of
>>> pre-processing step that separates XML nodes/JSON objects with newlines so
>>> that you *can* analyze the data with Spark in a "line-oriented format".
>>> Your preprocessor wouldn't have to parse/deserialize the massive document;
>>> it would just have to track open/closed tags/braces to know when to insert
>>> a newline.
>>>
>>> Then you'd just open the line-delimited result and deserialize the
>>> individual objects/nodes with map().
>>>
>>> Nick
>>>
>>>
>>> On Mon, Mar 17, 2014 at 11:18 AM, Diana Carroll <[hidden email]<http://user/SendEmail.jtp?type=node&node=2752&i=1>
>>> > wrote:
>>>
>>>> Has anyone got a working example of a Spark application that analyzes
>>>> data in a non-line-oriented format, such as XML or JSON?  I'd like to do
>>>> this without re-inventing the wheel...anyone care to share?  Thanks!
>>>>
>>>> Diana
>>>>
>>>
>>>
>>
>>
>> ------------------------------
>>  If you reply to this email, your message will be added to the
>> discussion below:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/example-of-non-line-oriented-input-data-tp2750p2752.html
>>  To start a new topic under Apache Spark User List, email [hidden email]<http://user/SendEmail.jtp?type=node&node=2754&i=1>
>> To unsubscribe from Apache Spark User List, click here.
>> NAML<http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>
> ------------------------------
> View this message in context: Re: example of non-line oriented input data?<http://apache-spark-user-list.1001560.n3.nabble.com/example-of-non-line-oriented-input-data-tp2750p2754.html>
> Sent from the Apache Spark User List mailing list archive<http://apache-spark-user-list.1001560.n3.nabble.com/>at Nabble.com.
>

Re: example of non-line oriented input data?

Posted by Krakna H <sh...@gmail.com>.
Katrina,

Not sure if this is what you had in mind, but here's some simple pyspark
code that I recently wrote to deal with JSON files.

from pyspark import SparkContext, SparkConf
from operator import add
import json
import random
import numpy as np

def concatenate_paragraphs(sentence_array):

	return ' '.join(sentence_array).split(' ')

logFile = 'foo.json'
conf = SparkConf()
conf.setMaster("spark://cluster-master:7077").setAppName("example").set("spark.executor.memory",
"1g")
sc = SparkContext(conf=conf)
logData = sc.textFile(logFile).cache()
num_lines = logData.count()
print 'Number of lines: %d' % num_lines
# JSON object has the structure: {"key": {'paragraphs': [sentence1,
sentence2, ...]}}
tm = logData.map(lambda s: (json.loads(s)['key'],
len(concatenate_paragraphs(json.loads(s)['paragraphs']))))
tm = tm.reduceByKey(lambda _, x: _ + x)
op = tm.collect()
for key, num_words in op:
	print 'state: %s, num_words: %d' % (state, num_words)





On Mon, Mar 17, 2014 at 11:58 AM, Diana Carroll [via Apache Spark User
List] <ml...@n3.nabble.com> wrote:

> I don't actually have any data.  I'm writing a course that teaches
> students how to do this sort of thing and am interested in looking at a
> variety of real life examples of people doing things like that.  I'd love
> to see some working code implementing the "obvious work-around" you
> mention...do you have any to share?  It's an approach that makes a lot of
> sense, and as I said, I'd love to not have to re-invent the wheel if
> someone else has already written that code.  Thanks!
>
> Diana
>
>
> On Mon, Mar 17, 2014 at 11:35 AM, Nicholas Chammas <[hidden email]<http://user/SendEmail.jtp?type=node&node=2752&i=0>
> > wrote:
>
>> There was a previous discussion about this here:
>>
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Having-Spark-read-a-JSON-file-td1963.html
>>
>> How big are the XML or JSON files you're looking to deal with?
>>
>> It may not be practical to deserialize the entire document at once. In
>> that case an obvious work-around would be to have some kind of
>> pre-processing step that separates XML nodes/JSON objects with newlines so
>> that you *can* analyze the data with Spark in a "line-oriented format".
>> Your preprocessor wouldn't have to parse/deserialize the massive document;
>> it would just have to track open/closed tags/braces to know when to insert
>> a newline.
>>
>> Then you'd just open the line-delimited result and deserialize the
>> individual objects/nodes with map().
>>
>> Nick
>>
>>
>> On Mon, Mar 17, 2014 at 11:18 AM, Diana Carroll <[hidden email]<http://user/SendEmail.jtp?type=node&node=2752&i=1>
>> > wrote:
>>
>>> Has anyone got a working example of a Spark application that analyzes
>>> data in a non-line-oriented format, such as XML or JSON?  I'd like to do
>>> this without re-inventing the wheel...anyone care to share?  Thanks!
>>>
>>> Diana
>>>
>>
>>
>
>
> ------------------------------
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/example-of-non-line-oriented-input-data-tp2750p2752.html
>  To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1h34@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here<http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=c2hhbmthcmsrc3lzQGdtYWlsLmNvbXwxfDk3NjU5Mzg0>
> .
> NAML<http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/example-of-non-line-oriented-input-data-tp2750p2754.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: example of non-line oriented input data?

Posted by Nicholas Chammas <ni...@gmail.com>.
Hmm, so I lucked out with my data source in that it comes to me as
line-delimited JSON, so I didn't have to write code to massage it into that
format.

If you are prepared to make several assumptions about your data (let's say
it's JSON), it should be straightforward to write some kind of
pre-processor that splits it out into lines just by counting and matching
open and closed braces. You'll have to assume, for example, that your JSON
is well formed and that values themselves don't contain braces, but that
may be okay for your purposes. If you want it to be more involved, there's this
post <http://stackoverflow.com/a/7795029/877069> on "lazily" reading JSON
objects from a file stream.

And if all of this is too much, just stick with deserializing the entire
document at once and take it from there.

Nick


On Mon, Mar 17, 2014 at 11:56 AM, Diana Carroll <dc...@cloudera.com>wrote:

> I don't actually have any data.  I'm writing a course that teaches
> students how to do this sort of thing and am interested in looking at a
> variety of real life examples of people doing things like that.  I'd love
> to see some working code implementing the "obvious work-around" you
> mention...do you have any to share?  It's an approach that makes a lot of
> sense, and as I said, I'd love to not have to re-invent the wheel if
> someone else has already written that code.  Thanks!
>
> Diana
>
>
> On Mon, Mar 17, 2014 at 11:35 AM, Nicholas Chammas <
> nicholas.chammas@gmail.com> wrote:
>
>> There was a previous discussion about this here:
>>
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Having-Spark-read-a-JSON-file-td1963.html
>>
>> How big are the XML or JSON files you're looking to deal with?
>>
>> It may not be practical to deserialize the entire document at once. In
>> that case an obvious work-around would be to have some kind of
>> pre-processing step that separates XML nodes/JSON objects with newlines so
>> that you *can* analyze the data with Spark in a "line-oriented format".
>> Your preprocessor wouldn't have to parse/deserialize the massive document;
>> it would just have to track open/closed tags/braces to know when to insert
>> a newline.
>>
>> Then you'd just open the line-delimited result and deserialize the
>> individual objects/nodes with map().
>>
>> Nick
>>
>>
>> On Mon, Mar 17, 2014 at 11:18 AM, Diana Carroll <dc...@cloudera.com>wrote:
>>
>>> Has anyone got a working example of a Spark application that analyzes
>>> data in a non-line-oriented format, such as XML or JSON?  I'd like to do
>>> this without re-inventing the wheel...anyone care to share?  Thanks!
>>>
>>> Diana
>>>
>>
>>
>

Re: example of non-line oriented input data?

Posted by Diana Carroll <dc...@cloudera.com>.
I don't actually have any data.  I'm writing a course that teaches students
how to do this sort of thing and am interested in looking at a variety of
real life examples of people doing things like that.  I'd love to see some
working code implementing the "obvious work-around" you mention...do you
have any to share?  It's an approach that makes a lot of sense, and as I
said, I'd love to not have to re-invent the wheel if someone else has
already written that code.  Thanks!

Diana


On Mon, Mar 17, 2014 at 11:35 AM, Nicholas Chammas <
nicholas.chammas@gmail.com> wrote:

> There was a previous discussion about this here:
>
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Having-Spark-read-a-JSON-file-td1963.html
>
> How big are the XML or JSON files you're looking to deal with?
>
> It may not be practical to deserialize the entire document at once. In
> that case an obvious work-around would be to have some kind of
> pre-processing step that separates XML nodes/JSON objects with newlines so
> that you *can* analyze the data with Spark in a "line-oriented format".
> Your preprocessor wouldn't have to parse/deserialize the massive document;
> it would just have to track open/closed tags/braces to know when to insert
> a newline.
>
> Then you'd just open the line-delimited result and deserialize the
> individual objects/nodes with map().
>
> Nick
>
>
> On Mon, Mar 17, 2014 at 11:18 AM, Diana Carroll <dc...@cloudera.com>wrote:
>
>> Has anyone got a working example of a Spark application that analyzes
>> data in a non-line-oriented format, such as XML or JSON?  I'd like to do
>> this without re-inventing the wheel...anyone care to share?  Thanks!
>>
>> Diana
>>
>
>

Re: example of non-line oriented input data?

Posted by Nicholas Chammas <ni...@gmail.com>.
There was a previous discussion about this here:

http://apache-spark-user-list.1001560.n3.nabble.com/Having-Spark-read-a-JSON-file-td1963.html

How big are the XML or JSON files you're looking to deal with?

It may not be practical to deserialize the entire document at once. In that
case an obvious work-around would be to have some kind of pre-processing
step that separates XML nodes/JSON objects with newlines so that you
*can* analyze
the data with Spark in a "line-oriented format". Your preprocessor wouldn't
have to parse/deserialize the massive document; it would just have to track
open/closed tags/braces to know when to insert a newline.

Then you'd just open the line-delimited result and deserialize the
individual objects/nodes with map().

Nick


On Mon, Mar 17, 2014 at 11:18 AM, Diana Carroll <dc...@cloudera.com>wrote:

> Has anyone got a working example of a Spark application that analyzes data
> in a non-line-oriented format, such as XML or JSON?  I'd like to do this
> without re-inventing the wheel...anyone care to share?  Thanks!
>
> Diana
>