You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by lllll <li...@gmail.com> on 2014/07/28 21:58:54 UTC

zip two RDD in pyspark

I have a file in s3 that I want to map each line with an index. Here is my
code:

>>> input_data = sc.textFile('s3n:/myinput',minPartitions=6).cache()
>>> N input_data.count()
>>> index = sc.parallelize(range(N), 6)
>>> index.zip(input_data).collect()

...
14/07/28 19:49:31 INFO DAGScheduler: Completed ResultTask(18, 4)
14/07/28 19:49:31 INFO DAGScheduler: Stage 18 (collect at <stdin>:1)
finished in 0.031 s
14/07/28 19:49:31 INFO SparkContext: Job finished: collect at <stdin>:1,
took 0.039999707 s
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/root/spark/python/pyspark/rdd.py", line 584, in collect
    return list(self._collect_iterator_through_file(bytesInJava))
  File "/root/spark/python/pyspark/rdd.py", line 592, in
_collect_iterator_through_file
    self.ctx._writeToFile(iterator, tempFile.name)
  File "/root/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
line 537, in __call__
  File "/root/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py", line
300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.writeToFile.
: java.lang.ClassCastException: java.lang.String cannot be cast to [B
	at
org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$3.apply(PythonRDD.scala:312)
	at
org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$3.apply(PythonRDD.scala:309)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
	at
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:309)
	at org.apache.spark.api.python.PythonRDD$.writeToFile(PythonRDD.scala:342)
	at org.apache.spark.api.python.PythonRDD$.writeToFile(PythonRDD.scala:337)
	at org.apache.spark.api.python.PythonRDD.writeToFile(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
	at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:207)
	at java.lang.Thread.run(Thread.java:744)

As I see it, the job is completed, but I don't understand what's happening
to 'String cannot be cast to [B'. I tried to zip two parallelCollectionRDD
and it works fine. But here I have a MappedRDD at textFile. Not sure what's
going on here. 

Also, why Python does not have ZipWithIndex()?

Thanks for any help. 



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/zip-two-RDD-in-pyspark-tp10806.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: zip two RDD in pyspark

Posted by Nick Pentreath <ni...@gmail.com>.
parallelize uses the default Serializer (PickleSerializer) while textFile
uses UTF8Serializer.

You can get around this with index.zip(input_data._reserialize())  (or
index.zip(input_data.map(lambda x: x)))

(But if you try to just do this, you run into the issue with different
number of partitions):

index.zip(input_data._reserialize()).count()

Py4JJavaError: An error occurred while calling o60.collect.

: java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers
of partitions

at org.apache.spark.rdd.ZippedRDD.getPartitions(ZippedRDD.scala:55)






On Wed, Jul 30, 2014 at 7:53 AM, Davies Liu <da...@databricks.com> wrote:

> On Mon, Jul 28, 2014 at 12:58 PM, lllll <li...@gmail.com> wrote:
> > I have a file in s3 that I want to map each line with an index. Here is
> my
> > code:
> >
> >>>> input_data = sc.textFile('s3n:/myinput',minPartitions=6).cache()
> >>>> N input_data.count()
> >>>> index = sc.parallelize(range(N), 6)
> >>>> index.zip(input_data).collect()
>
> I think you can not do zipWithIndex() in this way, because the number of
> lines in each partition of input_data will be different than index. You
> need
> get the exact number of lines for each partitions first, then generate
> correct
> index. It will be easy to do with mapPartitions()
>
> >>> nums = input_data.mapPartitions(lambda it: [sum(1 for i in
> it)]).collect()
> >>> starts = [sum(nums[:i]) for i in range(len(nums))]
> >>> zipped = input_data.mapPartitionsWithIndex(lambda i,it: ((starts[i]+j,
> x) for j,x in enumerate(it)))
>
> >
> > ...
> > 14/07/28 19:49:31 INFO DAGScheduler: Completed ResultTask(18, 4)
> > 14/07/28 19:49:31 INFO DAGScheduler: Stage 18 (collect at <stdin>:1)
> > finished in 0.031 s
> > 14/07/28 19:49:31 INFO SparkContext: Job finished: collect at <stdin>:1,
> > took 0.039999707 s
> > Traceback (most recent call last):
> >   File "<stdin>", line 1, in <module>
> >   File "/root/spark/python/pyspark/rdd.py", line 584, in collect
> >     return list(self._collect_iterator_through_file(bytesInJava))
> >   File "/root/spark/python/pyspark/rdd.py", line 592, in
> > _collect_iterator_through_file
> >     self.ctx._writeToFile(iterator, tempFile.name)
> >   File "/root/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
> > line 537, in __call__
> >   File "/root/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py", line
> > 300, in get_return_value
> > py4j.protocol.Py4JJavaError: An error occurred while calling
> > z:org.apache.spark.api.python.PythonRDD.writeToFile.
> > : java.lang.ClassCastException: java.lang.String cannot be cast to [B
> >         at
> >
> org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$3.apply(PythonRDD.scala:312)
> >         at
> >
> org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$3.apply(PythonRDD.scala:309)
> >         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >         at
> >
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:309)
> >         at
> org.apache.spark.api.python.PythonRDD$.writeToFile(PythonRDD.scala:342)
> >         at
> org.apache.spark.api.python.PythonRDD$.writeToFile(PythonRDD.scala:337)
> >         at
> org.apache.spark.api.python.PythonRDD.writeToFile(PythonRDD.scala)
> >         at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
> >         at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >         at java.lang.reflect.Method.invoke(Method.java:606)
> >         at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
> >         at
> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
> >         at py4j.Gateway.invoke(Gateway.java:259)
> >         at
> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> >         at py4j.commands.CallCommand.execute(CallCommand.java:79)
> >         at py4j.GatewayConnection.run(GatewayConnection.java:207)
> >         at java.lang.Thread.run(Thread.java:744)
>
> > As I see it, the job is completed, but I don't understand what's
> happening
> > to 'String cannot be cast to [B'. I tried to zip two
> parallelCollectionRDD
> > and it works fine. But here I have a MappedRDD at textFile. Not sure
> what's
> > going on here.
>
> Could you provide an script and dataset to reproduce this error? Maybe
> there are some corner cases during serialization.
>
>
> > Also, why Python does not have ZipWithIndex()?
>
> The features in PySpark are much less than Spark, hopefully it will
> catch up in next two releases.
>
> >
> > Thanks for any help.
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/zip-two-RDD-in-pyspark-tp10806.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Re: zip two RDD in pyspark

Posted by Davies Liu <da...@databricks.com>.
On Mon, Jul 28, 2014 at 12:58 PM, lllll <li...@gmail.com> wrote:
> I have a file in s3 that I want to map each line with an index. Here is my
> code:
>
>>>> input_data = sc.textFile('s3n:/myinput',minPartitions=6).cache()
>>>> N input_data.count()
>>>> index = sc.parallelize(range(N), 6)
>>>> index.zip(input_data).collect()

I think you can not do zipWithIndex() in this way, because the number of
lines in each partition of input_data will be different than index. You need
get the exact number of lines for each partitions first, then generate correct
index. It will be easy to do with mapPartitions()

>>> nums = input_data.mapPartitions(lambda it: [sum(1 for i in it)]).collect()
>>> starts = [sum(nums[:i]) for i in range(len(nums))]
>>> zipped = input_data.mapPartitionsWithIndex(lambda i,it: ((starts[i]+j, x) for j,x in enumerate(it)))

>
> ...
> 14/07/28 19:49:31 INFO DAGScheduler: Completed ResultTask(18, 4)
> 14/07/28 19:49:31 INFO DAGScheduler: Stage 18 (collect at <stdin>:1)
> finished in 0.031 s
> 14/07/28 19:49:31 INFO SparkContext: Job finished: collect at <stdin>:1,
> took 0.039999707 s
> Traceback (most recent call last):
>   File "<stdin>", line 1, in <module>
>   File "/root/spark/python/pyspark/rdd.py", line 584, in collect
>     return list(self._collect_iterator_through_file(bytesInJava))
>   File "/root/spark/python/pyspark/rdd.py", line 592, in
> _collect_iterator_through_file
>     self.ctx._writeToFile(iterator, tempFile.name)
>   File "/root/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
> line 537, in __call__
>   File "/root/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py", line
> 300, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling
> z:org.apache.spark.api.python.PythonRDD.writeToFile.
> : java.lang.ClassCastException: java.lang.String cannot be cast to [B
>         at
> org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$3.apply(PythonRDD.scala:312)
>         at
> org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$3.apply(PythonRDD.scala:309)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>         at
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:309)
>         at org.apache.spark.api.python.PythonRDD$.writeToFile(PythonRDD.scala:342)
>         at org.apache.spark.api.python.PythonRDD$.writeToFile(PythonRDD.scala:337)
>         at org.apache.spark.api.python.PythonRDD.writeToFile(PythonRDD.scala)
>         at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
>         at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
>         at py4j.Gateway.invoke(Gateway.java:259)
>         at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>         at py4j.commands.CallCommand.execute(CallCommand.java:79)
>         at py4j.GatewayConnection.run(GatewayConnection.java:207)
>         at java.lang.Thread.run(Thread.java:744)

> As I see it, the job is completed, but I don't understand what's happening
> to 'String cannot be cast to [B'. I tried to zip two parallelCollectionRDD
> and it works fine. But here I have a MappedRDD at textFile. Not sure what's
> going on here.

Could you provide an script and dataset to reproduce this error? Maybe
there are some corner cases during serialization.


> Also, why Python does not have ZipWithIndex()?

The features in PySpark are much less than Spark, hopefully it will
catch up in next two releases.

>
> Thanks for any help.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/zip-two-RDD-in-pyspark-tp10806.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.