You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Chengi Liu <ch...@gmail.com> on 2014/02/24 22:08:24 UTC

ETL on pyspark

Hi,
  A newbie here. I am trying to do etl on spark. Few questions.

I have csv file with header.
1) How do I parse this file (as it has a header..)
2) I was trying to follow the tutorial here:
http://ampcamp.berkeley.edu/3/exercises/data-exploration-using-spark.html

3) I am trying to do a frequency count..
     rows_key_value = rows.map(lambda x:(x[1],1)).reduceByKey(lambda
x,y:x+y,1).collect()


After waiting for like few minutes I see this error:
java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:2271)
    at
java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:178)
    at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:50)
    at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:223)
    at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
14/02/24 13:06:45 INFO TaskSetManager: Starting task 13.0:0 as TID 331 on
executor 2: node07 (PROCESS_LOCAL)
14/02/24 13:06:45 INFO TaskSetManager: Serialized task 13.0:0 as 3809 bytes
in 0 ms


How do i fix this?
Thanks

RE: ETL on pyspark

Posted by Adrian Mocanu <am...@verticalscope.com>.
Matei, one more follow up,
If you write the stream data to your file by iterating through each RDD? I know foreach is not idempotent. Can this rewrite some tuples or RDDs twice?

stream.foreachRDD(rdd=>rdd.foreach({
     tuple=>appendToFileTuple(tuple)
    }))

Thanks a lot!
-A
From: Matei Zaharia [mailto:matei.zaharia@gmail.com]
Sent: February-25-14 3:02 PM
To: user@spark.apache.org
Cc: user@spark.incubator.apache.org
Subject: Re: ETL on pyspark

It will only move a file to the final directory when it's successfully finished writing it, so the file shouldn't have any duplicates. Old attempts will just be deleted.

Matei

On Feb 25, 2014, at 9:19 AM, Adrian Mocanu <am...@verticalscope.com>> wrote:


Hi Matei
If Spark crashes while writing the file, after recovery from the failure does it continue where it left off or will there be duplicates in the file?

-A
From: Matei Zaharia [mailto:matei.zaharia@gmail.com]
Sent: February-24-14 4:20 PM
To: user@spark.incubator.apache.org<ma...@spark.incubator.apache.org>
Subject: Re: ETL on pyspark

collect() means to bring all the data back to the master node, and there might just be too much of it for that. How big is your file? If you can't bring it back to the master node try saveAsTextFile to write it out to a filesystem (in parallel).

Matei

On Feb 24, 2014, at 1:08 PM, Chengi Liu <ch...@gmail.com>> wrote:



Hi,
  A newbie here. I am trying to do etl on spark. Few questions.

I have csv file with header.
1) How do I parse this file (as it has a header..)
2) I was trying to follow the tutorial here: http://ampcamp.berkeley.edu/3/exercises/data-exploration-using-spark.html
3) I am trying to do a frequency count..
     rows_key_value = rows.map(lambda x:(x[1],1)).reduceByKey(lambda x,y:x+y,1).collect()


After waiting for like few minutes I see this error:
java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:2271)
    at java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:178)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:50)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:223)
    at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
14/02/24 13:06:45 INFO TaskSetManager: Starting task 13.0:0 as TID 331 on executor 2: node07 (PROCESS_LOCAL)
14/02/24 13:06:45 INFO TaskSetManager: Serialized task 13.0:0 as 3809 bytes in 0 ms


How do i fix this?
Thanks


Re: ETL on pyspark

Posted by Matei Zaharia <ma...@gmail.com>.
It will only move a file to the final directory when it’s successfully finished writing it, so the file shouldn’t have any duplicates. Old attempts will just be deleted.

Matei

On Feb 25, 2014, at 9:19 AM, Adrian Mocanu <am...@verticalscope.com> wrote:

> Hi Matei
> If Spark crashes while writing the file, after recovery from the failure does it continue where it left off or will there be duplicates in the file?
>  
> -A
> From: Matei Zaharia [mailto:matei.zaharia@gmail.com] 
> Sent: February-24-14 4:20 PM
> To: user@spark.incubator.apache.org
> Subject: Re: ETL on pyspark
>  
> collect() means to bring all the data back to the master node, and there might just be too much of it for that. How big is your file? If you can’t bring it back to the master node try saveAsTextFile to write it out to a filesystem (in parallel).
>  
> Matei
>  
> On Feb 24, 2014, at 1:08 PM, Chengi Liu <ch...@gmail.com> wrote:
> 
> 
> Hi,
>   A newbie here. I am trying to do etl on spark. Few questions.
> 
> I have csv file with header.
> 1) How do I parse this file (as it has a header..)
> 2) I was trying to follow the tutorial here: http://ampcamp.berkeley.edu/3/exercises/data-exploration-using-spark.html
> 
> 3) I am trying to do a frequency count.. 
>      rows_key_value = rows.map(lambda x:(x[1],1)).reduceByKey(lambda x,y:x+y,1).collect()
> 
> 
> After waiting for like few minutes I see this error:
> java.lang.OutOfMemoryError: Java heap space
>     at java.util.Arrays.copyOf(Arrays.java:2271)
>     at java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:178)
>     at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:50)
>     at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:223)
>     at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>     at java.lang.Thread.run(Thread.java:744)
> 14/02/24 13:06:45 INFO TaskSetManager: Starting task 13.0:0 as TID 331 on executor 2: node07 (PROCESS_LOCAL)
> 14/02/24 13:06:45 INFO TaskSetManager: Serialized task 13.0:0 as 3809 bytes in 0 ms
> 
> 
> How do i fix this?
> Thanks
> 


RE: ETL on pyspark

Posted by Adrian Mocanu <am...@verticalscope.com>.
Hi Matei
If Spark crashes while writing the file, after recovery from the failure does it continue where it left off or will there be duplicates in the file?

-A
From: Matei Zaharia [mailto:matei.zaharia@gmail.com]
Sent: February-24-14 4:20 PM
To: user@spark.incubator.apache.org
Subject: Re: ETL on pyspark

collect() means to bring all the data back to the master node, and there might just be too much of it for that. How big is your file? If you can't bring it back to the master node try saveAsTextFile to write it out to a filesystem (in parallel).

Matei

On Feb 24, 2014, at 1:08 PM, Chengi Liu <ch...@gmail.com>> wrote:


Hi,
  A newbie here. I am trying to do etl on spark. Few questions.

I have csv file with header.
1) How do I parse this file (as it has a header..)
2) I was trying to follow the tutorial here: http://ampcamp.berkeley.edu/3/exercises/data-exploration-using-spark.html
3) I am trying to do a frequency count..
     rows_key_value = rows.map(lambda x:(x[1],1)).reduceByKey(lambda x,y:x+y,1).collect()

After waiting for like few minutes I see this error:
java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:2271)
    at java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:178)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:50)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:223)
    at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
14/02/24 13:06:45 INFO TaskSetManager: Starting task 13.0:0 as TID 331 on executor 2: node07 (PROCESS_LOCAL)
14/02/24 13:06:45 INFO TaskSetManager: Serialized task 13.0:0 as 3809 bytes in 0 ms

How do i fix this?
Thanks



Re: ETL on pyspark

Posted by Matei Zaharia <ma...@gmail.com>.
Yeah, so the problem is that countByValue returns *all* values and their counts to your machine. If you just want the top 10, try this:

# do a distributed count using reduceByKey
counts = data.map(lambda x: (x, 1)).reduceByKey(lambda a, b: a + b)

# reverse the (key, count) pairs into (count, key) and then sort in descending order
sorted = counts.map(lambda (key, count): (count, key)).sortByKey(False)

# take the top 10 elements
top = sorted.take(10)

Matei

On Feb 24, 2014, at 1:48 PM, Chengi Liu <ch...@gmail.com> wrote:

> Hi,
>    Using pyspark for the first time on realistic dataset ( few hundred GB's) but have been seeing a lot of errors on pyspark shell? This might be because maybe I am not using pyspark correctly?
> 
> But here is what I was trying:
> extract_subs.take(2) 
> //returns [u'867430', u'867429']
> extract_subs_count = sorted(extract_subs.countByValue().items())
> To do the frequency count...
> Somewhere in between.. I see one error..
> 
> 14/02/24 13:43:44 INFO DAGScheduler: Failed to run countByValue at <stdin>:1
> Traceback (most recent call last):
>   File "<stdin>", line 1, in <module>
>   File "/home/hadoop/spark/python/pyspark/rdd.py", line 618, in countByValue
>     return self.mapPartitions(countPartition).reduce(mergeMaps)
>   File "/home/hadoop/spark/python/pyspark/rdd.py", line 499, in reduce
>     vals = self.mapPartitions(func).collect()
>   File "/home/hadoop/spark/python/pyspark/rdd.py", line 463, in collect
>     bytesInJava = self._jrdd.collect().iterator()
>   File "/home/hadoop/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py", line 537, in __call__
>   File "/home/hadoop/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py", line 300, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o163.collect.
> : org.apache.spark.SparkException: Job aborted: Exception while deserializing and fetching task: java.lang.OutOfMemoryError: Java heap space
>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
>     at scala.Option.foreach(Option.scala:236)
>     at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>     at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 
> 
> 
> 
> And then it stucks:
> 14/02/24 13:43:50 INFO BlockManagerMasterActor$BlockManagerInfo: Removed taskresult_1267 on node07:34461 in memory (size: 15.5 MB, free: 279.2 MB)
> 14/02/24 13:43:51 INFO TaskSetManager: Finished TID 1257 in 22267 ms on node07 (progress: 64/66)
> 14/02/24 13:43:51 INFO BlockManagerMasterActor$BlockManagerInfo: Removed taskresult_1257 on node07:34461 in memory (size: 15.7 MB, free: 294.9 MB)
> 
> And then when I press enter..
> on the command prompt:
> extract_subs_count 
> Traceback (most recent call last):
>   File "<stdin>", line 1, in <module>
> NameError: name 'extract_subs_count' is not defined
> 
> 
> 
> So.. what I am wondering is if pyspark ready for realistic datasets or is that I am doing soemthing stupid.
> Thanks
> 
> 
> 
> 
> On Mon, Feb 24, 2014 at 1:22 PM, Chengi Liu <ch...@gmail.com> wrote:
> Its around 10 GB big?
> All I want  is to do a frequency count? And then get top 10 entries based on count?
> How do i do this (again on pyspark(
> Thanks
> 
> 
> On Mon, Feb 24, 2014 at 1:19 PM, Matei Zaharia <ma...@gmail.com> wrote:
> collect() means to bring all the data back to the master node, and there might just be too much of it for that. How big is your file? If you can’t bring it back to the master node try saveAsTextFile to write it out to a filesystem (in parallel).
> 
> Matei
> 
> On Feb 24, 2014, at 1:08 PM, Chengi Liu <ch...@gmail.com> wrote:
> 
>> Hi,
>>   A newbie here. I am trying to do etl on spark. Few questions.
>> 
>> I have csv file with header.
>> 1) How do I parse this file (as it has a header..)
>> 2) I was trying to follow the tutorial here: http://ampcamp.berkeley.edu/3/exercises/data-exploration-using-spark.html
>> 
>> 3) I am trying to do a frequency count.. 
>>      rows_key_value = rows.map(lambda x:(x[1],1)).reduceByKey(lambda x,y:x+y,1).collect()
>> 
>> 
>> After waiting for like few minutes I see this error:
>> java.lang.OutOfMemoryError: Java heap space
>>     at java.util.Arrays.copyOf(Arrays.java:2271)
>>     at java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:178)
>>     at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:50)
>>     at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:223)
>>     at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
>>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>     at java.lang.Thread.run(Thread.java:744)
>> 14/02/24 13:06:45 INFO TaskSetManager: Starting task 13.0:0 as TID 331 on executor 2: node07 (PROCESS_LOCAL)
>> 14/02/24 13:06:45 INFO TaskSetManager: Serialized task 13.0:0 as 3809 bytes in 0 ms
>> 
>> 
>> How do i fix this?
>> Thanks
>> 
>> 
> 
> 
> 


Re: ETL on pyspark

Posted by Chengi Liu <ch...@gmail.com>.
Hi,
   Using pyspark for the first time on realistic dataset ( few hundred
GB's) but have been seeing a lot of errors on pyspark shell? This might be
because maybe I am not using pyspark correctly?

But here is what I was trying:
extract_subs.take(2)
//returns [u'867430', u'867429']
extract_subs_count = sorted(extract_subs.countByValue().items())
To do the frequency count...
Somewhere in between.. I see one error..

14/02/24 13:43:44 INFO DAGScheduler: Failed to run countByValue at <stdin>:1
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/hadoop/spark/python/pyspark/rdd.py", line 618, in countByValue
    return self.mapPartitions(countPartition).reduce(mergeMaps)
  File "/home/hadoop/spark/python/pyspark/rdd.py", line 499, in reduce
    vals = self.mapPartitions(func).collect()
  File "/home/hadoop/spark/python/pyspark/rdd.py", line 463, in collect
    bytesInJava = self._jrdd.collect().iterator()
  File
"/home/hadoop/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
line 537, in __call__
  File "/home/hadoop/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py",
line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o163.collect.
: org.apache.spark.SparkException: Job aborted: Exception while
deserializing and fetching task: java.lang.OutOfMemoryError: Java heap space
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
    at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
    at scala.Option.foreach(Option.scala:236)
    at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)




And then it stucks:
14/02/24 13:43:50 INFO BlockManagerMasterActor$BlockManagerInfo: Removed
taskresult_1267 on node07:34461 in memory (size: 15.5 MB, free: 279.2 MB)
14/02/24 13:43:51 INFO TaskSetManager: Finished TID 1257 in 22267 ms on
node07 (progress: 64/66)
14/02/24 13:43:51 INFO BlockManagerMasterActor$BlockManagerInfo: Removed
taskresult_1257 on node07:34461 in memory (size: 15.7 MB, free: 294.9 MB)

And then when I press enter..
on the command prompt:
extract_subs_count
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
NameError: name 'extract_subs_count' is not defined



So.. what I am wondering is if pyspark ready for realistic datasets or is
that I am doing soemthing stupid.
Thanks




On Mon, Feb 24, 2014 at 1:22 PM, Chengi Liu <ch...@gmail.com> wrote:

> Its around 10 GB big?
> All I want  is to do a frequency count? And then get top 10 entries based
> on count?
> How do i do this (again on pyspark(
> Thanks
>
>
> On Mon, Feb 24, 2014 at 1:19 PM, Matei Zaharia <ma...@gmail.com>wrote:
>
>> collect() means to bring all the data back to the master node, and there
>> might just be too much of it for that. How big is your file? If you can't
>> bring it back to the master node try saveAsTextFile to write it out to a
>> filesystem (in parallel).
>>
>> Matei
>>
>> On Feb 24, 2014, at 1:08 PM, Chengi Liu <ch...@gmail.com> wrote:
>>
>> Hi,
>>   A newbie here. I am trying to do etl on spark. Few questions.
>>
>> I have csv file with header.
>> 1) How do I parse this file (as it has a header..)
>>  2) I was trying to follow the tutorial here:
>> http://ampcamp.berkeley.edu/3/exercises/data-exploration-using-spark.html
>>
>> 3) I am trying to do a frequency count..
>>      rows_key_value = rows.map(lambda x:(x[1],1)).reduceByKey(lambda
>> x,y:x+y,1).collect()
>>
>>
>> After waiting for like few minutes I see this error:
>> java.lang.OutOfMemoryError: Java heap space
>>     at java.util.Arrays.copyOf(Arrays.java:2271)
>>     at
>> java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:178)
>>     at
>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:50)
>>     at
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:223)
>>     at
>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
>>     at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>>     at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>     at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>     at java.lang.Thread.run(Thread.java:744)
>> 14/02/24 13:06:45 INFO TaskSetManager: Starting task 13.0:0 as TID 331 on
>> executor 2: node07 (PROCESS_LOCAL)
>> 14/02/24 13:06:45 INFO TaskSetManager: Serialized task 13.0:0 as 3809
>> bytes in 0 ms
>>
>>
>> How do i fix this?
>> Thanks
>>
>>
>>
>>
>

Re: ETL on pyspark

Posted by Chengi Liu <ch...@gmail.com>.
Its around 10 GB big?
All I want  is to do a frequency count? And then get top 10 entries based
on count?
How do i do this (again on pyspark(
Thanks


On Mon, Feb 24, 2014 at 1:19 PM, Matei Zaharia <ma...@gmail.com>wrote:

> collect() means to bring all the data back to the master node, and there
> might just be too much of it for that. How big is your file? If you can't
> bring it back to the master node try saveAsTextFile to write it out to a
> filesystem (in parallel).
>
> Matei
>
> On Feb 24, 2014, at 1:08 PM, Chengi Liu <ch...@gmail.com> wrote:
>
> Hi,
>   A newbie here. I am trying to do etl on spark. Few questions.
>
> I have csv file with header.
> 1) How do I parse this file (as it has a header..)
> 2) I was trying to follow the tutorial here:
> http://ampcamp.berkeley.edu/3/exercises/data-exploration-using-spark.html
>
> 3) I am trying to do a frequency count..
>      rows_key_value = rows.map(lambda x:(x[1],1)).reduceByKey(lambda
> x,y:x+y,1).collect()
>
>
> After waiting for like few minutes I see this error:
> java.lang.OutOfMemoryError: Java heap space
>     at java.util.Arrays.copyOf(Arrays.java:2271)
>     at
> java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:178)
>     at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:50)
>     at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:223)
>     at
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
>     at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>     at java.lang.Thread.run(Thread.java:744)
> 14/02/24 13:06:45 INFO TaskSetManager: Starting task 13.0:0 as TID 331 on
> executor 2: node07 (PROCESS_LOCAL)
> 14/02/24 13:06:45 INFO TaskSetManager: Serialized task 13.0:0 as 3809
> bytes in 0 ms
>
>
> How do i fix this?
> Thanks
>
>
>
>

Re: ETL on pyspark

Posted by Matei Zaharia <ma...@gmail.com>.
collect() means to bring all the data back to the master node, and there might just be too much of it for that. How big is your file? If you can’t bring it back to the master node try saveAsTextFile to write it out to a filesystem (in parallel).

Matei

On Feb 24, 2014, at 1:08 PM, Chengi Liu <ch...@gmail.com> wrote:

> Hi,
>   A newbie here. I am trying to do etl on spark. Few questions.
> 
> I have csv file with header.
> 1) How do I parse this file (as it has a header..)
> 2) I was trying to follow the tutorial here: http://ampcamp.berkeley.edu/3/exercises/data-exploration-using-spark.html
> 
> 3) I am trying to do a frequency count.. 
>      rows_key_value = rows.map(lambda x:(x[1],1)).reduceByKey(lambda x,y:x+y,1).collect()
> 
> 
> After waiting for like few minutes I see this error:
> java.lang.OutOfMemoryError: Java heap space
>     at java.util.Arrays.copyOf(Arrays.java:2271)
>     at java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:178)
>     at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:50)
>     at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:223)
>     at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>     at java.lang.Thread.run(Thread.java:744)
> 14/02/24 13:06:45 INFO TaskSetManager: Starting task 13.0:0 as TID 331 on executor 2: node07 (PROCESS_LOCAL)
> 14/02/24 13:06:45 INFO TaskSetManager: Serialized task 13.0:0 as 3809 bytes in 0 ms
> 
> 
> How do i fix this?
> Thanks
> 
>