You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Kane <ka...@gmail.com> on 2014/03/22 06:45:55 UTC

distinct on huge dataset

I have a huge 2.5TB file. When i run:
val t = sc.textFile("/user/hdfs/dump.csv")
t.distinct.count

It fails right away with a lot of:

Loss was due to java.lang.ArrayIndexOutOfBoundsException
java.lang.ArrayIndexOutOfBoundsException: 1
        at $line59.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:18)
        at $line59.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:16)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at
org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
        at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95)
        at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:94)
        at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471)
        at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471)
        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
        at org.apache.spark.scheduler.Task.run(Task.scala:53)
        at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
        at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:46)
        at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:45)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:396)
        at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
        at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:45)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/distinct-on-huge-dataset-tp3025.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: distinct on huge dataset

Posted by Kane <ka...@gmail.com>.
Yes, that helped, at least it was able to advance a bit further.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/distinct-on-huge-dataset-tp3025p3038.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: distinct on huge dataset

Posted by Mayur Rustagi <ma...@gmail.com>.
Preferably increase the ulimit on your machines. Spark needs to access a lot of small files hence hard to control file handlers. 


—
Sent from Mailbox

On Fri, Apr 18, 2014 at 3:59 AM, Ryan Compton <co...@gmail.com>
wrote:

> Btw, I've got System.setProperty("spark.shuffle.consolidate.files",
> "true") and use ext3 (CentOS...)
> On Thu, Apr 17, 2014 at 3:20 PM, Ryan Compton <co...@gmail.com> wrote:
>> Does this continue in newer versions? (I'm on 0.8.0 now)
>>
>> When I use .distinct() on moderately large datasets (224GB, 8.5B rows,
>> I'm guessing about 500M are distinct) my jobs fail with:
>>
>> 14/04/17 15:04:02 INFO cluster.ClusterTaskSetManager: Loss was due to
>> java.io.FileNotFoundException
>> java.io.FileNotFoundException:
>> /tmp/spark-local-20140417145643-a055/3c/shuffle_1_218_1157 (Too many
>> open files)
>>
>> ulimit -n tells me I can open 32000 files. Here's a plot of lsof on a
>> worker node during a failed .distinct():
>> http://i.imgur.com/wyBHmzz.png , you can see tasks fail when Spark
>> tries to open 32000 files.
>>
>> I never ran into this in 0.7.3. Is there a parameter I can set to tell
>> Spark to use less than 32000 files?
>>
>> On Mon, Mar 24, 2014 at 10:23 AM, Aaron Davidson <il...@gmail.com> wrote:
>>> Look up setting ulimit, though note the distinction between soft and hard
>>> limits, and that updating your hard limit may require changing
>>> /etc/security/limits.confand restarting each worker.
>>>
>>>
>>> On Mon, Mar 24, 2014 at 1:39 AM, Kane <ka...@gmail.com> wrote:
>>>>
>>>> Got a bit further, i think out of memory error was caused by setting
>>>> spark.spill to false. Now i have this error, is there an easy way to
>>>> increase file limit for spark, cluster-wide?:
>>>>
>>>> java.io.FileNotFoundException:
>>>>
>>>> /tmp/spark-local-20140324074221-b8f1/01/temp_1ab674f9-4556-4239-9f21-688dfc9f17d2
>>>> (Too many open files)
>>>>         at java.io.FileOutputStream.openAppend(Native Method)
>>>>         at java.io.FileOutputStream.<init>(FileOutputStream.java:192)
>>>>         at
>>>>
>>>> org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:113)
>>>>         at
>>>>
>>>> org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:174)
>>>>         at
>>>>
>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:191)
>>>>         at
>>>>
>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insert(ExternalAppendOnlyMap.scala:141)
>>>>         at
>>>> org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:59)
>>>>         at
>>>>
>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95)
>>>>         at
>>>>
>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:94)
>>>>         at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471)
>>>>         at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471)
>>>>         at
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
>>>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
>>>>         at
>>>>
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
>>>>         at
>>>>
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:53)
>>>>         at
>>>>
>>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
>>>>         at
>>>>
>>>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
>>>>         at
>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>>>>         at
>>>>
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>>>         at
>>>>
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>>>>         at java.lang.Thread.run(Thread.java:662)
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/distinct-on-huge-dataset-tp3025p3084.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>>

Re: distinct on huge dataset

Posted by Ryan Compton <co...@gmail.com>.
Btw, I've got System.setProperty("spark.shuffle.consolidate.files",
"true") and use ext3 (CentOS...)

On Thu, Apr 17, 2014 at 3:20 PM, Ryan Compton <co...@gmail.com> wrote:
> Does this continue in newer versions? (I'm on 0.8.0 now)
>
> When I use .distinct() on moderately large datasets (224GB, 8.5B rows,
> I'm guessing about 500M are distinct) my jobs fail with:
>
> 14/04/17 15:04:02 INFO cluster.ClusterTaskSetManager: Loss was due to
> java.io.FileNotFoundException
> java.io.FileNotFoundException:
> /tmp/spark-local-20140417145643-a055/3c/shuffle_1_218_1157 (Too many
> open files)
>
> ulimit -n tells me I can open 32000 files. Here's a plot of lsof on a
> worker node during a failed .distinct():
> http://i.imgur.com/wyBHmzz.png , you can see tasks fail when Spark
> tries to open 32000 files.
>
> I never ran into this in 0.7.3. Is there a parameter I can set to tell
> Spark to use less than 32000 files?
>
> On Mon, Mar 24, 2014 at 10:23 AM, Aaron Davidson <il...@gmail.com> wrote:
>> Look up setting ulimit, though note the distinction between soft and hard
>> limits, and that updating your hard limit may require changing
>> /etc/security/limits.confand restarting each worker.
>>
>>
>> On Mon, Mar 24, 2014 at 1:39 AM, Kane <ka...@gmail.com> wrote:
>>>
>>> Got a bit further, i think out of memory error was caused by setting
>>> spark.spill to false. Now i have this error, is there an easy way to
>>> increase file limit for spark, cluster-wide?:
>>>
>>> java.io.FileNotFoundException:
>>>
>>> /tmp/spark-local-20140324074221-b8f1/01/temp_1ab674f9-4556-4239-9f21-688dfc9f17d2
>>> (Too many open files)
>>>         at java.io.FileOutputStream.openAppend(Native Method)
>>>         at java.io.FileOutputStream.<init>(FileOutputStream.java:192)
>>>         at
>>>
>>> org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:113)
>>>         at
>>>
>>> org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:174)
>>>         at
>>>
>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:191)
>>>         at
>>>
>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insert(ExternalAppendOnlyMap.scala:141)
>>>         at
>>> org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:59)
>>>         at
>>>
>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95)
>>>         at
>>>
>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:94)
>>>         at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471)
>>>         at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471)
>>>         at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
>>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
>>>         at
>>>
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
>>>         at
>>>
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
>>>         at org.apache.spark.scheduler.Task.run(Task.scala:53)
>>>         at
>>>
>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
>>>         at
>>>
>>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>>>         at
>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>>         at
>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>>>         at java.lang.Thread.run(Thread.java:662)
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/distinct-on-huge-dataset-tp3025p3084.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>>

Re: distinct on huge dataset

Posted by Ryan Compton <co...@gmail.com>.
Does this continue in newer versions? (I'm on 0.8.0 now)

When I use .distinct() on moderately large datasets (224GB, 8.5B rows,
I'm guessing about 500M are distinct) my jobs fail with:

14/04/17 15:04:02 INFO cluster.ClusterTaskSetManager: Loss was due to
java.io.FileNotFoundException
java.io.FileNotFoundException:
/tmp/spark-local-20140417145643-a055/3c/shuffle_1_218_1157 (Too many
open files)

ulimit -n tells me I can open 32000 files. Here's a plot of lsof on a
worker node during a failed .distinct():
http://i.imgur.com/wyBHmzz.png , you can see tasks fail when Spark
tries to open 32000 files.

I never ran into this in 0.7.3. Is there a parameter I can set to tell
Spark to use less than 32000 files?

On Mon, Mar 24, 2014 at 10:23 AM, Aaron Davidson <il...@gmail.com> wrote:
> Look up setting ulimit, though note the distinction between soft and hard
> limits, and that updating your hard limit may require changing
> /etc/security/limits.confand restarting each worker.
>
>
> On Mon, Mar 24, 2014 at 1:39 AM, Kane <ka...@gmail.com> wrote:
>>
>> Got a bit further, i think out of memory error was caused by setting
>> spark.spill to false. Now i have this error, is there an easy way to
>> increase file limit for spark, cluster-wide?:
>>
>> java.io.FileNotFoundException:
>>
>> /tmp/spark-local-20140324074221-b8f1/01/temp_1ab674f9-4556-4239-9f21-688dfc9f17d2
>> (Too many open files)
>>         at java.io.FileOutputStream.openAppend(Native Method)
>>         at java.io.FileOutputStream.<init>(FileOutputStream.java:192)
>>         at
>>
>> org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:113)
>>         at
>>
>> org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:174)
>>         at
>>
>> org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:191)
>>         at
>>
>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insert(ExternalAppendOnlyMap.scala:141)
>>         at
>> org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:59)
>>         at
>>
>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95)
>>         at
>>
>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:94)
>>         at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471)
>>         at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471)
>>         at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
>>         at
>>
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
>>         at
>>
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:53)
>>         at
>>
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
>>         at
>>
>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>>         at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>         at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>>         at java.lang.Thread.run(Thread.java:662)
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/distinct-on-huge-dataset-tp3025p3084.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>

Re: distinct on huge dataset

Posted by Aaron Davidson <il...@gmail.com>.
Look up setting ulimit, though note the distinction between soft and hard
limits, and that updating your hard limit may require
changing /etc/security/limits.confand restarting each worker.


On Mon, Mar 24, 2014 at 1:39 AM, Kane <ka...@gmail.com> wrote:

> Got a bit further, i think out of memory error was caused by setting
> spark.spill to false. Now i have this error, is there an easy way to
> increase file limit for spark, cluster-wide?:
>
> java.io.FileNotFoundException:
>
> /tmp/spark-local-20140324074221-b8f1/01/temp_1ab674f9-4556-4239-9f21-688dfc9f17d2
> (Too many open files)
>         at java.io.FileOutputStream.openAppend(Native Method)
>         at java.io.FileOutputStream.<init>(FileOutputStream.java:192)
>         at
>
> org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:113)
>         at
>
> org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:174)
>         at
>
> org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:191)
>         at
>
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insert(ExternalAppendOnlyMap.scala:141)
>         at
> org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:59)
>         at
>
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95)
>         at
>
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:94)
>         at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471)
>         at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471)
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
>         at org.apache.spark.scheduler.Task.run(Task.scala:53)
>         at
>
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
>         at
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>         at java.lang.Thread.run(Thread.java:662)
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/distinct-on-huge-dataset-tp3025p3084.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Re: distinct on huge dataset

Posted by Kane <ka...@gmail.com>.
Got a bit further, i think out of memory error was caused by setting
spark.spill to false. Now i have this error, is there an easy way to
increase file limit for spark, cluster-wide?:

java.io.FileNotFoundException:
/tmp/spark-local-20140324074221-b8f1/01/temp_1ab674f9-4556-4239-9f21-688dfc9f17d2
(Too many open files)
        at java.io.FileOutputStream.openAppend(Native Method)
        at java.io.FileOutputStream.<init>(FileOutputStream.java:192)
        at
org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:113)
        at
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:174)
        at
org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:191)
        at
org.apache.spark.util.collection.ExternalAppendOnlyMap.insert(ExternalAppendOnlyMap.scala:141)
        at
org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:59)
        at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95)
        at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:94)
        at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471)
        at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471)
        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
        at org.apache.spark.scheduler.Task.run(Task.scala:53)
        at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
        at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/distinct-on-huge-dataset-tp3025p3084.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: distinct on huge dataset

Posted by Aaron Davidson <il...@gmail.com>.
Ah, interesting. count() without distinct is streaming and does not require
that a single partition fits in memory, for instance. That said, the
behavior may change if you increase the number of partitions in your input
RDD by using RDD.repartition()


On Sun, Mar 23, 2014 at 11:47 AM, Kane <ka...@gmail.com> wrote:

> Yes, there was an error in data, after fixing it - count fails with Out of
> Memory Error.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/distinct-on-huge-dataset-tp3025p3051.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Re: distinct on huge dataset

Posted by Kane <ka...@gmail.com>.
Yes, there was an error in data, after fixing it - count fails with Out of
Memory Error.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/distinct-on-huge-dataset-tp3025p3051.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: distinct on huge dataset

Posted by Aaron Davidson <il...@gmail.com>.
Andrew, this should be fixed in 0.9.1, assuming it is the same hash
collision error we found there.

Kane, is it possible your bigger data is corrupt, such that that any
operations on it fail?


On Sat, Mar 22, 2014 at 10:39 PM, Andrew Ash <an...@andrewash.com> wrote:

> FWIW I've seen correctness errors with spark.shuffle.spill on 0.9.0 and
> have it disabled now. The specific error behavior was that a join would
> consistently return one count of rows with spill enabled and another count
> with it disabled.
>
> Sent from my mobile phone
> On Mar 22, 2014 1:52 PM, "Kane" <ka...@gmail.com> wrote:
>
>> But i was wrong - map also fails on big file and setting
>> spark.shuffle.spill
>> doesn't help. Map fails with the same error.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/distinct-on-huge-dataset-tp3025p3039.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>

Re: distinct on huge dataset

Posted by Andrew Ash <an...@andrewash.com>.
FWIW I've seen correctness errors with spark.shuffle.spill on 0.9.0 and
have it disabled now. The specific error behavior was that a join would
consistently return one count of rows with spill enabled and another count
with it disabled.

Sent from my mobile phone
On Mar 22, 2014 1:52 PM, "Kane" <ka...@gmail.com> wrote:

> But i was wrong - map also fails on big file and setting
> spark.shuffle.spill
> doesn't help. Map fails with the same error.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/distinct-on-huge-dataset-tp3025p3039.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Re: distinct on huge dataset

Posted by Kane <ka...@gmail.com>.
But i was wrong - map also fails on big file and setting spark.shuffle.spill
doesn't help. Map fails with the same error.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/distinct-on-huge-dataset-tp3025p3039.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: distinct on huge dataset

Posted by Aaron Davidson <il...@gmail.com>.
This could be related to the hash collision bug in ExternalAppendOnlyMap in
0.9.0: https://spark-project.atlassian.net/browse/SPARK-1045

You might try setting spark.shuffle.spill to false and see if that runs any
longer (turning off shuffle spill is dangerous, though, as it may cause
Spark to OOM if your reduce partitions are too large).



On Sat, Mar 22, 2014 at 10:00 AM, Kane <ka...@gmail.com> wrote:

> I mean everything works with the small file. With huge file only count and
> map work, distinct - doesn't
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/distinct-on-huge-dataset-tp3025p3034.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Re: distinct on huge dataset

Posted by Kane <ka...@gmail.com>.
I mean everything works with the small file. With huge file only count and
map work, distinct - doesn't



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/distinct-on-huge-dataset-tp3025p3034.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: distinct on huge dataset

Posted by Kane <ka...@gmail.com>.
Yes it works with smaller file, it can count and map, but not distinct.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/distinct-on-huge-dataset-tp3025p3033.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: distinct on huge dataset

Posted by Mayur Rustagi <ma...@gmail.com>.
Does it work on a smaller file?

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi <https://twitter.com/mayur_rustagi>



On Sat, Mar 22, 2014 at 4:50 AM, Ryan Compton <co...@gmail.com>wrote:

> Does it work without .distinct() ?
>
> Possibly related issue I ran into:
>
> https://mail-archives.apache.org/mod_mbox/spark-user/201401.mbox/%3CCAMgYSQ-3YNwD=VEB1Ct9JRO_jetJ40RJ5Ce_8exGsrhm7jbVQA@mail.gmail.com%3E
>
> On Sat, Mar 22, 2014 at 12:45 AM, Kane <ka...@gmail.com> wrote:
> > It's 0.9.0
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/distinct-on-huge-dataset-tp3025p3027.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Re: distinct on huge dataset

Posted by Ryan Compton <co...@gmail.com>.
Does it work without .distinct() ?

Possibly related issue I ran into:
https://mail-archives.apache.org/mod_mbox/spark-user/201401.mbox/%3CCAMgYSQ-3YNwD=VEB1Ct9JRO_jetJ40RJ5Ce_8exGsrhm7jbVQA@mail.gmail.com%3E

On Sat, Mar 22, 2014 at 12:45 AM, Kane <ka...@gmail.com> wrote:
> It's 0.9.0
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/distinct-on-huge-dataset-tp3025p3027.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: distinct on huge dataset

Posted by Kane <ka...@gmail.com>.
It's 0.9.0



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/distinct-on-huge-dataset-tp3025p3027.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: distinct on huge dataset

Posted by Aaron Davidson <il...@gmail.com>.
Which version of spark are you running?


On Fri, Mar 21, 2014 at 10:45 PM, Kane <ka...@gmail.com> wrote:

> I have a huge 2.5TB file. When i run:
> val t = sc.textFile("/user/hdfs/dump.csv")
> t.distinct.count
>
> It fails right away with a lot of:
>
> Loss was due to java.lang.ArrayIndexOutOfBoundsException
> java.lang.ArrayIndexOutOfBoundsException: 1
>         at $line59.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:18)
>         at $line59.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:16)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         at
> org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
>         at
>
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95)
>         at
>
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:94)
>         at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471)
>         at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471)
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
>         at org.apache.spark.scheduler.Task.run(Task.scala:53)
>         at
>
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
>         at
>
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:46)
>         at
>
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:45)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:396)
>         at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
>         at
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:45)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>         at java.lang.Thread.run(Thread.java:662)
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/distinct-on-huge-dataset-tp3025.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>