You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by David Montague <da...@gmail.com> on 2015/08/10 18:49:34 UTC

Problem with take vs. takeSample in PySpark

Hi all,

I am getting some strange behavior with the RDD take function in PySpark
while doing some interactive coding in an IPython notebook.  I am running
PySpark on Spark 1.2.0 in yarn-client mode if that is relevant.

I am using sc.wholeTextFiles and pandas to load a collection of .csv files
into an RDD of pandas dataframes. I create an RDD called train_rdd for
which each row of the RDD contains a label and pandas dataframe pair:

import pandas as pd
from StringIO import StringIO

rdd = sc.wholeTextFiles(data_path, 1000)
train_rdd = rdd.map(lambda x: x[0], pd.read_csv(StringIO(x[1]))))

In order to test out the next steps I want to take, I am trying to use take to
select one of the dataframes and apply the desired modifications before
writing out the Spark code to apply it to all of the dataframes in parallel.

However, when I try to use take like this:

label, df = train_rdd.take(1)[0]

I get a spark.driver.maxResultSize error (stack trace included at the end
of this message). Now, each of these dataframes is only about 100MB in
size, so should easily fit on the driver and not go over the maxResultSize
limit of 1024MB.

If I instead use takeSample, though, there is no problem:

label, df = train_rdd.takeSample(False, 1, seed=50)[0]

(Here, I have set the seed so that the RDD that is selected is the same one
that the take function is trying to load (i.e., the first one), just to
ensure that it is not because the specific dataframe take is getting is too
large.)

Does calling take result in a collect operation being performed before
outputting the first item? That would explain to me why this error is
occurring, but that seems like poor behavior for the take function. Clearly
takeSample is behaving the way I want it to, but it would be nice if I
could get this behavior with the take function, or at least without needing
to choose an element randomly. I was able to get the behavior I wanted
above by just changing the seed until I got the dataframe I wanted, but I
don't think that is a good approach in general.

Any insight is appreciated.

Best,
David Montague




---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-38-7eca647cba46> in <module>()
      1 label_s, df_s = train_rdd.takeSample(False, 1, seed=50)[0]
----> 2 label, df = train_rdd.take(1)[0]

/var/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041//python/pyspark/rdd.py
in take(self, num)
   1109
   1110             p = range(partsScanned, min(partsScanned +
numPartsToTry, totalParts))
-> 1111             res = self.context.runJob(self, takeUpToNumLeft, p,
True)
   1112
   1113             items += res

/var/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041//python/pyspark/context.py
in runJob(self, rdd, partitionFunc, partitions, allowLocal)
    816         # SparkContext#runJob.
    817         mappedRDD = rdd.mapPartitions(partitionFunc)
--> 818         it = self._jvm.PythonRDD.runJob(self._jsc.sc(),
mappedRDD._jrdd, javaPartitions, allowLocal)
    819         return list(mappedRDD._collect_iterator_through_file(it))
    820

/var/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539
    540         for temp_arg in temp_args:

/var/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py
in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Total
size of serialized results of 177 tasks (1038.0 MB) is bigger than
spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Re: Problem with take vs. takeSample in PySpark

Posted by Davies Liu <da...@databricks.com>.
I tested this in master (1.5 release), it worked as expected (changed
spark.driver.maxResultSize to 10m),

>>> len(sc.range(10).map(lambda i: '*' * (1<<23) ).take(1))
1
>>> len(sc.range(10).map(lambda i: '*' * (1<<24) ).take(1))
15/08/10 10:45:55 ERROR TaskSetManager: Total size of serialized
results of 1 tasks (16.1 MB) is bigger than spark.driver.maxResultSize
(10.0 MB)
>>> len(sc.range(10).map(lambda i: '*' * (1<<23) ).take(2))
15/08/10 10:46:04 ERROR TaskSetManager: Total size of serialized
results of 1 tasks (16.1 MB) is bigger than spark.driver.maxResultSize
(10.0 MB)

Could you reproduce this in 1.2?

We didn't change take() much since 1.2 (unable build 1.2 branch right
now, because of dependency changed)

On Mon, Aug 10, 2015 at 9:49 AM, David Montague <da...@gmail.com> wrote:
> Hi all,
>
> I am getting some strange behavior with the RDD take function in PySpark
> while doing some interactive coding in an IPython notebook.  I am running
> PySpark on Spark 1.2.0 in yarn-client mode if that is relevant.
>
> I am using sc.wholeTextFiles and pandas to load a collection of .csv files
> into an RDD of pandas dataframes. I create an RDD called train_rdd for which
> each row of the RDD contains a label and pandas dataframe pair:
>
> import pandas as pd
> from StringIO import StringIO
>
> rdd = sc.wholeTextFiles(data_path, 1000)
> train_rdd = rdd.map(lambda x: x[0], pd.read_csv(StringIO(x[1]))))
>
> In order to test out the next steps I want to take, I am trying to use take
> to select one of the dataframes and apply the desired modifications before
> writing out the Spark code to apply it to all of the dataframes in parallel.
>
> However, when I try to use take like this:
>
> label, df = train_rdd.take(1)[0]
>
> I get a spark.driver.maxResultSize error (stack trace included at the end of
> this message). Now, each of these dataframes is only about 100MB in size, so
> should easily fit on the driver and not go over the maxResultSize limit of
> 1024MB.
>
> If I instead use takeSample, though, there is no problem:
>
> label, df = train_rdd.takeSample(False, 1, seed=50)[0]
>
> (Here, I have set the seed so that the RDD that is selected is the same one
> that the take function is trying to load (i.e., the first one), just to
> ensure that it is not because the specific dataframe take is getting is too
> large.)
>
> Does calling take result in a collect operation being performed before
> outputting the first item? That would explain to me why this error is
> occurring, but that seems like poor behavior for the take function. Clearly
> takeSample is behaving the way I want it to, but it would be nice if I could
> get this behavior with the take function, or at least without needing to
> choose an element randomly. I was able to get the behavior I wanted above by
> just changing the seed until I got the dataframe I wanted, but I don't think
> that is a good approach in general.
>
> Any insight is appreciated.
>
> Best,
> David Montague
>
>
>
>
> ---------------------------------------------------------------------------
> Py4JJavaError                             Traceback (most recent call last)
> <ipython-input-38-7eca647cba46> in <module>()
>       1 label_s, df_s = train_rdd.takeSample(False, 1, seed=50)[0]
> ----> 2 label, df = train_rdd.take(1)[0]
>
> /var/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041//python/pyspark/rdd.py in
> take(self, num)
>    1109
>    1110             p = range(partsScanned, min(partsScanned +
> numPartsToTry, totalParts))
> -> 1111             res = self.context.runJob(self, takeUpToNumLeft, p,
> True)
>    1112
>    1113             items += res
>
> /var/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041//python/pyspark/context.py
> in runJob(self, rdd, partitionFunc, partitions, allowLocal)
>     816         # SparkContext#runJob.
>     817         mappedRDD = rdd.mapPartitions(partitionFunc)
> --> 818         it = self._jvm.PythonRDD.runJob(self._jsc.sc(),
> mappedRDD._jrdd, javaPartitions, allowLocal)
>     819         return list(mappedRDD._collect_iterator_through_file(it))
>     820
>
> /var/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
> in __call__(self, *args)
>     536         answer = self.gateway_client.send_command(command)
>     537         return_value = get_return_value(answer, self.gateway_client,
> --> 538                 self.target_id, self.name)
>     539
>     540         for temp_arg in temp_args:
>
> /var/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py
> in get_return_value(answer, gateway_client, target_id, name)
>     298                 raise Py4JJavaError(
>     299                     'An error occurred while calling {0}{1}{2}.\n'.
> --> 300                     format(target_id, '.', name), value)
>     301             else:
>     302                 raise Py4JError(
>
> Py4JJavaError: An error occurred while calling
> z:org.apache.spark.api.python.PythonRDD.runJob.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Total
> size of serialized results of 177 tasks (1038.0 MB) is bigger than
> spark.driver.maxResultSize (1024.0 MB)
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org