You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by "fightfate@163.com" <fi...@163.com> on 2015/02/12 06:37:16 UTC

Re: Re: Sort Shuffle performance issues about using AppendOnlyMap for large data sets

Hi,

Really have no adequate solution got for this issue. Expecting any available analytical rules or hints.

Thanks,
Sun.



fightfate@163.com
 
From: fightfate@163.com
Date: 2015-02-09 11:56
To: user; dev
Subject: Re: Sort Shuffle performance issues about using AppendOnlyMap for large data sets
Hi,
Problem still exists. Any experts would take a look at this? 

Thanks,
Sun.



fightfate@163.com
 
From: fightfate@163.com
Date: 2015-02-06 17:54
To: user; dev
Subject: Sort Shuffle performance issues about using AppendOnlyMap for large data sets
Hi, all
Recently we had caught performance issues when using spark 1.2.0 to read data from hbase and do some summary work.
Our scenario means to : read large data sets from hbase (maybe 100G+ file) , form hbaseRDD, transform to schemardd, 
groupby and aggregate the data while got fewer new summary data sets, loading data into hbase (phoenix).

Our major issue lead to : aggregate large datasets to get summary data sets would consume too long time (1 hour +) , while that
should be supposed not so bad performance. We got the dump file attached and stacktrace from jstack like the following:

From the stacktrace and dump file we can identify that processing large datasets would cause frequent AppendOnlyMap growing, and 
leading to huge map entrysize. We had referenced the source code of org.apache.spark.util.collection.AppendOnlyMap and found that 
the map had been initialized with capacity of 64. That would be too small for our use case. 

So the question is : Does anyone had encounted such issues before? How did that be resolved? I cannot find any jira issues for such problems and 
if someone had seen, please kindly let us know.

More specified solution would goes to : Does any possibility exists for user defining the map capacity releatively in spark? If so, please
tell how to achieve that. 

Best Thanks,
Sun.

   Thread 22432: (state = IN_JAVA)
- org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87, line=224 (Compiled frame; information may be imprecise)
- org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable() @bci=1, line=38 (Interpreted frame)
- org.apache.spark.util.collection.AppendOnlyMap.incrementSize() @bci=22, line=198 (Compiled frame)
- org.apache.spark.util.collection.AppendOnlyMap.changeValue(java.lang.Object, scala.Function2) @bci=201, line=145 (Compiled frame)
- org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(java.lang.Object, scala.Function2) @bci=3, line=32 (Compiled frame)
- org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator) @bci=141, line=205 (Compiled frame)
- org.apache.spark.shuffle.sort.SortShuffleWriter.write(scala.collection.Iterator) @bci=74, line=58 (Interpreted frame)
- org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=169, line=68 (Interpreted frame)
- org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=2, line=41 (Interpreted frame)
- org.apache.spark.scheduler.Task.run(long) @bci=77, line=56 (Interpreted frame)
- org.apache.spark.executor.Executor$TaskRunner.run() @bci=310, line=196 (Interpreted frame)
- java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker) @bci=95, line=1145 (Interpreted frame)
- java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615 (Interpreted frame)
- java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)


Thread 22431: (state = IN_JAVA)
- org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87, line=224 (Compiled frame; information may be imprecise)
- org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable() @bci=1, line=38 (Interpreted frame)
- org.apache.spark.util.collection.AppendOnlyMap.incrementSize() @bci=22, line=198 (Compiled frame)
- org.apache.spark.util.collection.AppendOnlyMap.changeValue(java.lang.Object, scala.Function2) @bci=201, line=145 (Compiled frame)
- org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(java.lang.Object, scala.Function2) @bci=3, line=32 (Compiled frame)
- org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator) @bci=141, line=205 (Compiled frame)
- org.apache.spark.shuffle.sort.SortShuffleWriter.write(scala.collection.Iterator) @bci=74, line=58 (Interpreted frame)
- org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=169, line=68 (Interpreted frame)
- org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=2, line=41 (Interpreted frame)
- org.apache.spark.scheduler.Task.run(long) @bci=77, line=56 (Interpreted frame)
- org.apache.spark.executor.Executor$TaskRunner.run() @bci=310, line=196 (Interpreted frame)
- java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker) @bci=95, line=1145 (Interpreted frame)
- java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615 (Interpreted frame)
- java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)


fightfate@163.com
1 attachments
dump.png(42K) download preview 

Re: Re: Sort Shuffle performance issues about using AppendOnlyMap for large data sets

Posted by "fightfate@163.com" <fi...@163.com>.
Hi, there
Which version are you using ? Actually the problem seems gone after we change our spark version from 1.2.0 to 1.3.0 

Not sure what the internal changes did.

Best,
Sun.



fightfate@163.com
 
From: Night Wolf
Date: 2015-05-12 22:05
To: fightfate@163.com
CC: Patrick Wendell; user; dev
Subject: Re: Re: Sort Shuffle performance issues about using AppendOnlyMap for large data sets
Seeing similar issues, did you find a solution? One would be to increase the number of partitions if you're doing lots of object creation. 

On Thu, Feb 12, 2015 at 7:26 PM, fightfate@163.com <fi...@163.com> wrote:
Hi, patrick

Really glad to get your reply. 
Yes, we are doing group by operations for our work. We know that this is common for growTable when processing large data sets.

The problem actually goes to : Do we have any possible chance to self-modify the initialCapacity using specifically for our 
application? Does spark provide such configs for achieving that goal? 

We know that this is trickle to get it working. Just want to know that how could this be resolved, or from other possible channel for
we did not cover.

Expecting for your kind advice.

Thanks,
Sun.



fightfate@163.com
 
From: Patrick Wendell
Date: 2015-02-12 16:12
To: fightfate@163.com
CC: user; dev
Subject: Re: Re: Sort Shuffle performance issues about using AppendOnlyMap for large data sets
The map will start with a capacity of 64, but will grow to accommodate
new data. Are you using the groupBy operator in Spark or are you using
Spark SQL's group by? This usually happens if you are grouping or
aggregating in a way that doesn't sufficiently condense the data
created from each input partition.
 
- Patrick
 
On Wed, Feb 11, 2015 at 9:37 PM, fightfate@163.com <fi...@163.com> wrote:
> Hi,
>
> Really have no adequate solution got for this issue. Expecting any available
> analytical rules or hints.
>
> Thanks,
> Sun.
>
> ________________________________
> fightfate@163.com
>
>
> From: fightfate@163.com
> Date: 2015-02-09 11:56
> To: user; dev
> Subject: Re: Sort Shuffle performance issues about using AppendOnlyMap for
> large data sets
> Hi,
> Problem still exists. Any experts would take a look at this?
>
> Thanks,
> Sun.
>
> ________________________________
> fightfate@163.com
>
>
> From: fightfate@163.com
> Date: 2015-02-06 17:54
> To: user; dev
> Subject: Sort Shuffle performance issues about using AppendOnlyMap for large
> data sets
> Hi, all
> Recently we had caught performance issues when using spark 1.2.0 to read
> data from hbase and do some summary work.
> Our scenario means to : read large data sets from hbase (maybe 100G+ file) ,
> form hbaseRDD, transform to schemardd,
> groupby and aggregate the data while got fewer new summary data sets,
> loading data into hbase (phoenix).
>
> Our major issue lead to : aggregate large datasets to get summary data sets
> would consume too long time (1 hour +) , while that
> should be supposed not so bad performance. We got the dump file attached and
> stacktrace from jstack like the following:
>
> From the stacktrace and dump file we can identify that processing large
> datasets would cause frequent AppendOnlyMap growing, and
> leading to huge map entrysize. We had referenced the source code of
> org.apache.spark.util.collection.AppendOnlyMap and found that
> the map had been initialized with capacity of 64. That would be too small
> for our use case.
>
> So the question is : Does anyone had encounted such issues before? How did
> that be resolved? I cannot find any jira issues for such problems and
> if someone had seen, please kindly let us know.
>
> More specified solution would goes to : Does any possibility exists for user
> defining the map capacity releatively in spark? If so, please
> tell how to achieve that.
>
> Best Thanks,
> Sun.
>
>    Thread 22432: (state = IN_JAVA)
> - org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87,
> line=224 (Compiled frame; information may be imprecise)
> - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable()
> @bci=1, line=38 (Interpreted frame)
> - org.apache.spark.util.collection.AppendOnlyMap.incrementSize() @bci=22,
> line=198 (Compiled frame)
> -
> org.apache.spark.util.collection.AppendOnlyMap.changeValue(java.lang.Object,
> scala.Function2) @bci=201, line=145 (Compiled frame)
> -
> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(java.lang.Object,
> scala.Function2) @bci=3, line=32 (Compiled frame)
> -
> org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator)
> @bci=141, line=205 (Compiled frame)
> -
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(scala.collection.Iterator)
> @bci=74, line=58 (Interpreted frame)
> -
> org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
> @bci=169, line=68 (Interpreted frame)
> -
> org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
> @bci=2, line=41 (Interpreted frame)
> - org.apache.spark.scheduler.Task.run(long) @bci=77, line=56 (Interpreted
> frame)
> - org.apache.spark.executor.Executor$TaskRunner.run() @bci=310, line=196
> (Interpreted frame)
> -
> java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker)
> @bci=95, line=1145 (Interpreted frame)
> - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615
> (Interpreted frame)
> - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)
>
>
> Thread 22431: (state = IN_JAVA)
> - org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87,
> line=224 (Compiled frame; information may be imprecise)
> - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable()
> @bci=1, line=38 (Interpreted frame)
> - org.apache.spark.util.collection.AppendOnlyMap.incrementSize() @bci=22,
> line=198 (Compiled frame)
> -
> org.apache.spark.util.collection.AppendOnlyMap.changeValue(java.lang.Object,
> scala.Function2) @bci=201, line=145 (Compiled frame)
> -
> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(java.lang.Object,
> scala.Function2) @bci=3, line=32 (Compiled frame)
> -
> org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator)
> @bci=141, line=205 (Compiled frame)
> -
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(scala.collection.Iterator)
> @bci=74, line=58 (Interpreted frame)
> -
> org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
> @bci=169, line=68 (Interpreted frame)
> -
> org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
> @bci=2, line=41 (Interpreted frame)
> - org.apache.spark.scheduler.Task.run(long) @bci=77, line=56 (Interpreted
> frame)
> - org.apache.spark.executor.Executor$TaskRunner.run() @bci=310, line=196
> (Interpreted frame)
> -
> java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker)
> @bci=95, line=1145 (Interpreted frame)
> - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615
> (Interpreted frame)
> - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)
>
>
> fightfate@163.com
> 1 attachments
> dump.png(42K) download preview


Re: Re: Sort Shuffle performance issues about using AppendOnlyMap for large data sets

Posted by "fightfate@163.com" <fi...@163.com>.
Hi, there
Which version are you using ? Actually the problem seems gone after we change our spark version from 1.2.0 to 1.3.0 

Not sure what the internal changes did.

Best,
Sun.



fightfate@163.com
 
From: Night Wolf
Date: 2015-05-12 22:05
To: fightfate@163.com
CC: Patrick Wendell; user; dev
Subject: Re: Re: Sort Shuffle performance issues about using AppendOnlyMap for large data sets
Seeing similar issues, did you find a solution? One would be to increase the number of partitions if you're doing lots of object creation. 

On Thu, Feb 12, 2015 at 7:26 PM, fightfate@163.com <fi...@163.com> wrote:
Hi, patrick

Really glad to get your reply. 
Yes, we are doing group by operations for our work. We know that this is common for growTable when processing large data sets.

The problem actually goes to : Do we have any possible chance to self-modify the initialCapacity using specifically for our 
application? Does spark provide such configs for achieving that goal? 

We know that this is trickle to get it working. Just want to know that how could this be resolved, or from other possible channel for
we did not cover.

Expecting for your kind advice.

Thanks,
Sun.



fightfate@163.com
 
From: Patrick Wendell
Date: 2015-02-12 16:12
To: fightfate@163.com
CC: user; dev
Subject: Re: Re: Sort Shuffle performance issues about using AppendOnlyMap for large data sets
The map will start with a capacity of 64, but will grow to accommodate
new data. Are you using the groupBy operator in Spark or are you using
Spark SQL's group by? This usually happens if you are grouping or
aggregating in a way that doesn't sufficiently condense the data
created from each input partition.
 
- Patrick
 
On Wed, Feb 11, 2015 at 9:37 PM, fightfate@163.com <fi...@163.com> wrote:
> Hi,
>
> Really have no adequate solution got for this issue. Expecting any available
> analytical rules or hints.
>
> Thanks,
> Sun.
>
> ________________________________
> fightfate@163.com
>
>
> From: fightfate@163.com
> Date: 2015-02-09 11:56
> To: user; dev
> Subject: Re: Sort Shuffle performance issues about using AppendOnlyMap for
> large data sets
> Hi,
> Problem still exists. Any experts would take a look at this?
>
> Thanks,
> Sun.
>
> ________________________________
> fightfate@163.com
>
>
> From: fightfate@163.com
> Date: 2015-02-06 17:54
> To: user; dev
> Subject: Sort Shuffle performance issues about using AppendOnlyMap for large
> data sets
> Hi, all
> Recently we had caught performance issues when using spark 1.2.0 to read
> data from hbase and do some summary work.
> Our scenario means to : read large data sets from hbase (maybe 100G+ file) ,
> form hbaseRDD, transform to schemardd,
> groupby and aggregate the data while got fewer new summary data sets,
> loading data into hbase (phoenix).
>
> Our major issue lead to : aggregate large datasets to get summary data sets
> would consume too long time (1 hour +) , while that
> should be supposed not so bad performance. We got the dump file attached and
> stacktrace from jstack like the following:
>
> From the stacktrace and dump file we can identify that processing large
> datasets would cause frequent AppendOnlyMap growing, and
> leading to huge map entrysize. We had referenced the source code of
> org.apache.spark.util.collection.AppendOnlyMap and found that
> the map had been initialized with capacity of 64. That would be too small
> for our use case.
>
> So the question is : Does anyone had encounted such issues before? How did
> that be resolved? I cannot find any jira issues for such problems and
> if someone had seen, please kindly let us know.
>
> More specified solution would goes to : Does any possibility exists for user
> defining the map capacity releatively in spark? If so, please
> tell how to achieve that.
>
> Best Thanks,
> Sun.
>
>    Thread 22432: (state = IN_JAVA)
> - org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87,
> line=224 (Compiled frame; information may be imprecise)
> - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable()
> @bci=1, line=38 (Interpreted frame)
> - org.apache.spark.util.collection.AppendOnlyMap.incrementSize() @bci=22,
> line=198 (Compiled frame)
> -
> org.apache.spark.util.collection.AppendOnlyMap.changeValue(java.lang.Object,
> scala.Function2) @bci=201, line=145 (Compiled frame)
> -
> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(java.lang.Object,
> scala.Function2) @bci=3, line=32 (Compiled frame)
> -
> org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator)
> @bci=141, line=205 (Compiled frame)
> -
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(scala.collection.Iterator)
> @bci=74, line=58 (Interpreted frame)
> -
> org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
> @bci=169, line=68 (Interpreted frame)
> -
> org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
> @bci=2, line=41 (Interpreted frame)
> - org.apache.spark.scheduler.Task.run(long) @bci=77, line=56 (Interpreted
> frame)
> - org.apache.spark.executor.Executor$TaskRunner.run() @bci=310, line=196
> (Interpreted frame)
> -
> java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker)
> @bci=95, line=1145 (Interpreted frame)
> - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615
> (Interpreted frame)
> - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)
>
>
> Thread 22431: (state = IN_JAVA)
> - org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87,
> line=224 (Compiled frame; information may be imprecise)
> - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable()
> @bci=1, line=38 (Interpreted frame)
> - org.apache.spark.util.collection.AppendOnlyMap.incrementSize() @bci=22,
> line=198 (Compiled frame)
> -
> org.apache.spark.util.collection.AppendOnlyMap.changeValue(java.lang.Object,
> scala.Function2) @bci=201, line=145 (Compiled frame)
> -
> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(java.lang.Object,
> scala.Function2) @bci=3, line=32 (Compiled frame)
> -
> org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator)
> @bci=141, line=205 (Compiled frame)
> -
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(scala.collection.Iterator)
> @bci=74, line=58 (Interpreted frame)
> -
> org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
> @bci=169, line=68 (Interpreted frame)
> -
> org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
> @bci=2, line=41 (Interpreted frame)
> - org.apache.spark.scheduler.Task.run(long) @bci=77, line=56 (Interpreted
> frame)
> - org.apache.spark.executor.Executor$TaskRunner.run() @bci=310, line=196
> (Interpreted frame)
> -
> java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker)
> @bci=95, line=1145 (Interpreted frame)
> - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615
> (Interpreted frame)
> - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)
>
>
> fightfate@163.com
> 1 attachments
> dump.png(42K) download preview


Re: Re: Sort Shuffle performance issues about using AppendOnlyMap for large data sets

Posted by Night Wolf <ni...@gmail.com>.
Seeing similar issues, did you find a solution? One would be to increase
the number of partitions if you're doing lots of object creation.

On Thu, Feb 12, 2015 at 7:26 PM, fightfate@163.com <fi...@163.com>
wrote:

> Hi, patrick
>
> Really glad to get your reply.
> Yes, we are doing group by operations for our work. We know that this is
> common for growTable when processing large data sets.
>
> The problem actually goes to : Do we have any possible chance to
> self-modify the initialCapacity using specifically for our
> application? Does spark provide such configs for achieving that goal?
>
> We know that this is trickle to get it working. Just want to know that how
> could this be resolved, or from other possible channel for
> we did not cover.
>
> Expecting for your kind advice.
>
> Thanks,
> Sun.
>
> ------------------------------
> fightfate@163.com
>
>
> *From:* Patrick Wendell <pw...@gmail.com>
> *Date:* 2015-02-12 16:12
> *To:* fightfate@163.com
> *CC:* user <us...@spark.apache.org>; dev <de...@spark.apache.org>
> *Subject:* Re: Re: Sort Shuffle performance issues about using
> AppendOnlyMap for large data sets
> The map will start with a capacity of 64, but will grow to accommodate
> new data. Are you using the groupBy operator in Spark or are you using
> Spark SQL's group by? This usually happens if you are grouping or
> aggregating in a way that doesn't sufficiently condense the data
> created from each input partition.
>
> - Patrick
>
> On Wed, Feb 11, 2015 at 9:37 PM, fightfate@163.com <fi...@163.com>
> wrote:
> > Hi,
> >
> > Really have no adequate solution got for this issue. Expecting any
> available
> > analytical rules or hints.
> >
> > Thanks,
> > Sun.
> >
> > ________________________________
> > fightfate@163.com
> >
> >
> > From: fightfate@163.com
> > Date: 2015-02-09 11:56
> > To: user; dev
> > Subject: Re: Sort Shuffle performance issues about using AppendOnlyMap
> for
> > large data sets
> > Hi,
> > Problem still exists. Any experts would take a look at this?
> >
> > Thanks,
> > Sun.
> >
> > ________________________________
> > fightfate@163.com
> >
> >
> > From: fightfate@163.com
> > Date: 2015-02-06 17:54
> > To: user; dev
> > Subject: Sort Shuffle performance issues about using AppendOnlyMap for
> large
> > data sets
> > Hi, all
> > Recently we had caught performance issues when using spark 1.2.0 to read
> > data from hbase and do some summary work.
> > Our scenario means to : read large data sets from hbase (maybe 100G+
> file) ,
> > form hbaseRDD, transform to schemardd,
> > groupby and aggregate the data while got fewer new summary data sets,
> > loading data into hbase (phoenix).
> >
> > Our major issue lead to : aggregate large datasets to get summary data
> sets
> > would consume too long time (1 hour +) , while that
> > should be supposed not so bad performance. We got the dump file attached
> and
> > stacktrace from jstack like the following:
> >
> > From the stacktrace and dump file we can identify that processing large
> > datasets would cause frequent AppendOnlyMap growing, and
> > leading to huge map entrysize. We had referenced the source code of
> > org.apache.spark.util.collection.AppendOnlyMap and found that
> > the map had been initialized with capacity of 64. That would be too small
> > for our use case.
> >
> > So the question is : Does anyone had encounted such issues before? How
> did
> > that be resolved? I cannot find any jira issues for such problems and
> > if someone had seen, please kindly let us know.
> >
> > More specified solution would goes to : Does any possibility exists for
> user
> > defining the map capacity releatively in spark? If so, please
> > tell how to achieve that.
> >
> > Best Thanks,
> > Sun.
> >
> >    Thread 22432: (state = IN_JAVA)
> > - org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87,
> > line=224 (Compiled frame; information may be imprecise)
> > - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable()
> > @bci=1, line=38 (Interpreted frame)
> > - org.apache.spark.util.collection.AppendOnlyMap.incrementSize() @bci=22,
> > line=198 (Compiled frame)
> > -
> >
> org.apache.spark.util.collection.AppendOnlyMap.changeValue(java.lang.Object,
> > scala.Function2) @bci=201, line=145 (Compiled frame)
> > -
> >
> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(java.lang.Object,
> > scala.Function2) @bci=3, line=32 (Compiled frame)
> > -
> >
> org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator)
> > @bci=141, line=205 (Compiled frame)
> > -
> >
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(scala.collection.Iterator)
> > @bci=74, line=58 (Interpreted frame)
> > -
> >
> org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
> > @bci=169, line=68 (Interpreted frame)
> > -
> >
> org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
> > @bci=2, line=41 (Interpreted frame)
> > - org.apache.spark.scheduler.Task.run(long) @bci=77, line=56 (Interpreted
> > frame)
> > - org.apache.spark.executor.Executor$TaskRunner.run() @bci=310, line=196
> > (Interpreted frame)
> > -
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker)
> > @bci=95, line=1145 (Interpreted frame)
> > - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615
> > (Interpreted frame)
> > - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)
> >
> >
> > Thread 22431: (state = IN_JAVA)
> > - org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87,
> > line=224 (Compiled frame; information may be imprecise)
> > - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable()
> > @bci=1, line=38 (Interpreted frame)
> > - org.apache.spark.util.collection.AppendOnlyMap.incrementSize() @bci=22,
> > line=198 (Compiled frame)
> > -
> >
> org.apache.spark.util.collection.AppendOnlyMap.changeValue(java.lang.Object,
> > scala.Function2) @bci=201, line=145 (Compiled frame)
> > -
> >
> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(java.lang.Object,
> > scala.Function2) @bci=3, line=32 (Compiled frame)
> > -
> >
> org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator)
> > @bci=141, line=205 (Compiled frame)
> > -
> >
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(scala.collection.Iterator)
> > @bci=74, line=58 (Interpreted frame)
> > -
> >
> org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
> > @bci=169, line=68 (Interpreted frame)
> > -
> >
> org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
> > @bci=2, line=41 (Interpreted frame)
> > - org.apache.spark.scheduler.Task.run(long) @bci=77, line=56 (Interpreted
> > frame)
> > - org.apache.spark.executor.Executor$TaskRunner.run() @bci=310, line=196
> > (Interpreted frame)
> > -
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker)
> > @bci=95, line=1145 (Interpreted frame)
> > - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615
> > (Interpreted frame)
> > - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)
> >
> >
> > fightfate@163.com
> > 1 attachments
> > dump.png(42K) download preview
>
>

Re: Re: Sort Shuffle performance issues about using AppendOnlyMap for large data sets

Posted by Night Wolf <ni...@gmail.com>.
Seeing similar issues, did you find a solution? One would be to increase
the number of partitions if you're doing lots of object creation.

On Thu, Feb 12, 2015 at 7:26 PM, fightfate@163.com <fi...@163.com>
wrote:

> Hi, patrick
>
> Really glad to get your reply.
> Yes, we are doing group by operations for our work. We know that this is
> common for growTable when processing large data sets.
>
> The problem actually goes to : Do we have any possible chance to
> self-modify the initialCapacity using specifically for our
> application? Does spark provide such configs for achieving that goal?
>
> We know that this is trickle to get it working. Just want to know that how
> could this be resolved, or from other possible channel for
> we did not cover.
>
> Expecting for your kind advice.
>
> Thanks,
> Sun.
>
> ------------------------------
> fightfate@163.com
>
>
> *From:* Patrick Wendell <pw...@gmail.com>
> *Date:* 2015-02-12 16:12
> *To:* fightfate@163.com
> *CC:* user <us...@spark.apache.org>; dev <de...@spark.apache.org>
> *Subject:* Re: Re: Sort Shuffle performance issues about using
> AppendOnlyMap for large data sets
> The map will start with a capacity of 64, but will grow to accommodate
> new data. Are you using the groupBy operator in Spark or are you using
> Spark SQL's group by? This usually happens if you are grouping or
> aggregating in a way that doesn't sufficiently condense the data
> created from each input partition.
>
> - Patrick
>
> On Wed, Feb 11, 2015 at 9:37 PM, fightfate@163.com <fi...@163.com>
> wrote:
> > Hi,
> >
> > Really have no adequate solution got for this issue. Expecting any
> available
> > analytical rules or hints.
> >
> > Thanks,
> > Sun.
> >
> > ________________________________
> > fightfate@163.com
> >
> >
> > From: fightfate@163.com
> > Date: 2015-02-09 11:56
> > To: user; dev
> > Subject: Re: Sort Shuffle performance issues about using AppendOnlyMap
> for
> > large data sets
> > Hi,
> > Problem still exists. Any experts would take a look at this?
> >
> > Thanks,
> > Sun.
> >
> > ________________________________
> > fightfate@163.com
> >
> >
> > From: fightfate@163.com
> > Date: 2015-02-06 17:54
> > To: user; dev
> > Subject: Sort Shuffle performance issues about using AppendOnlyMap for
> large
> > data sets
> > Hi, all
> > Recently we had caught performance issues when using spark 1.2.0 to read
> > data from hbase and do some summary work.
> > Our scenario means to : read large data sets from hbase (maybe 100G+
> file) ,
> > form hbaseRDD, transform to schemardd,
> > groupby and aggregate the data while got fewer new summary data sets,
> > loading data into hbase (phoenix).
> >
> > Our major issue lead to : aggregate large datasets to get summary data
> sets
> > would consume too long time (1 hour +) , while that
> > should be supposed not so bad performance. We got the dump file attached
> and
> > stacktrace from jstack like the following:
> >
> > From the stacktrace and dump file we can identify that processing large
> > datasets would cause frequent AppendOnlyMap growing, and
> > leading to huge map entrysize. We had referenced the source code of
> > org.apache.spark.util.collection.AppendOnlyMap and found that
> > the map had been initialized with capacity of 64. That would be too small
> > for our use case.
> >
> > So the question is : Does anyone had encounted such issues before? How
> did
> > that be resolved? I cannot find any jira issues for such problems and
> > if someone had seen, please kindly let us know.
> >
> > More specified solution would goes to : Does any possibility exists for
> user
> > defining the map capacity releatively in spark? If so, please
> > tell how to achieve that.
> >
> > Best Thanks,
> > Sun.
> >
> >    Thread 22432: (state = IN_JAVA)
> > - org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87,
> > line=224 (Compiled frame; information may be imprecise)
> > - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable()
> > @bci=1, line=38 (Interpreted frame)
> > - org.apache.spark.util.collection.AppendOnlyMap.incrementSize() @bci=22,
> > line=198 (Compiled frame)
> > -
> >
> org.apache.spark.util.collection.AppendOnlyMap.changeValue(java.lang.Object,
> > scala.Function2) @bci=201, line=145 (Compiled frame)
> > -
> >
> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(java.lang.Object,
> > scala.Function2) @bci=3, line=32 (Compiled frame)
> > -
> >
> org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator)
> > @bci=141, line=205 (Compiled frame)
> > -
> >
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(scala.collection.Iterator)
> > @bci=74, line=58 (Interpreted frame)
> > -
> >
> org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
> > @bci=169, line=68 (Interpreted frame)
> > -
> >
> org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
> > @bci=2, line=41 (Interpreted frame)
> > - org.apache.spark.scheduler.Task.run(long) @bci=77, line=56 (Interpreted
> > frame)
> > - org.apache.spark.executor.Executor$TaskRunner.run() @bci=310, line=196
> > (Interpreted frame)
> > -
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker)
> > @bci=95, line=1145 (Interpreted frame)
> > - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615
> > (Interpreted frame)
> > - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)
> >
> >
> > Thread 22431: (state = IN_JAVA)
> > - org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87,
> > line=224 (Compiled frame; information may be imprecise)
> > - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable()
> > @bci=1, line=38 (Interpreted frame)
> > - org.apache.spark.util.collection.AppendOnlyMap.incrementSize() @bci=22,
> > line=198 (Compiled frame)
> > -
> >
> org.apache.spark.util.collection.AppendOnlyMap.changeValue(java.lang.Object,
> > scala.Function2) @bci=201, line=145 (Compiled frame)
> > -
> >
> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(java.lang.Object,
> > scala.Function2) @bci=3, line=32 (Compiled frame)
> > -
> >
> org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator)
> > @bci=141, line=205 (Compiled frame)
> > -
> >
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(scala.collection.Iterator)
> > @bci=74, line=58 (Interpreted frame)
> > -
> >
> org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
> > @bci=169, line=68 (Interpreted frame)
> > -
> >
> org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
> > @bci=2, line=41 (Interpreted frame)
> > - org.apache.spark.scheduler.Task.run(long) @bci=77, line=56 (Interpreted
> > frame)
> > - org.apache.spark.executor.Executor$TaskRunner.run() @bci=310, line=196
> > (Interpreted frame)
> > -
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker)
> > @bci=95, line=1145 (Interpreted frame)
> > - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615
> > (Interpreted frame)
> > - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)
> >
> >
> > fightfate@163.com
> > 1 attachments
> > dump.png(42K) download preview
>
>

Re: Re: Sort Shuffle performance issues about using AppendOnlyMap for large data sets

Posted by "fightfate@163.com" <fi...@163.com>.
Hi, patrick

Really glad to get your reply. 
Yes, we are doing group by operations for our work. We know that this is common for growTable when processing large data sets.

The problem actually goes to : Do we have any possible chance to self-modify the initialCapacity using specifically for our 
application? Does spark provide such configs for achieving that goal? 

We know that this is trickle to get it working. Just want to know that how could this be resolved, or from other possible channel for
we did not cover.

Expecting for your kind advice.

Thanks,
Sun.



fightfate@163.com
 
From: Patrick Wendell
Date: 2015-02-12 16:12
To: fightfate@163.com
CC: user; dev
Subject: Re: Re: Sort Shuffle performance issues about using AppendOnlyMap for large data sets
The map will start with a capacity of 64, but will grow to accommodate
new data. Are you using the groupBy operator in Spark or are you using
Spark SQL's group by? This usually happens if you are grouping or
aggregating in a way that doesn't sufficiently condense the data
created from each input partition.
 
- Patrick
 
On Wed, Feb 11, 2015 at 9:37 PM, fightfate@163.com <fi...@163.com> wrote:
> Hi,
>
> Really have no adequate solution got for this issue. Expecting any available
> analytical rules or hints.
>
> Thanks,
> Sun.
>
> ________________________________
> fightfate@163.com
>
>
> From: fightfate@163.com
> Date: 2015-02-09 11:56
> To: user; dev
> Subject: Re: Sort Shuffle performance issues about using AppendOnlyMap for
> large data sets
> Hi,
> Problem still exists. Any experts would take a look at this?
>
> Thanks,
> Sun.
>
> ________________________________
> fightfate@163.com
>
>
> From: fightfate@163.com
> Date: 2015-02-06 17:54
> To: user; dev
> Subject: Sort Shuffle performance issues about using AppendOnlyMap for large
> data sets
> Hi, all
> Recently we had caught performance issues when using spark 1.2.0 to read
> data from hbase and do some summary work.
> Our scenario means to : read large data sets from hbase (maybe 100G+ file) ,
> form hbaseRDD, transform to schemardd,
> groupby and aggregate the data while got fewer new summary data sets,
> loading data into hbase (phoenix).
>
> Our major issue lead to : aggregate large datasets to get summary data sets
> would consume too long time (1 hour +) , while that
> should be supposed not so bad performance. We got the dump file attached and
> stacktrace from jstack like the following:
>
> From the stacktrace and dump file we can identify that processing large
> datasets would cause frequent AppendOnlyMap growing, and
> leading to huge map entrysize. We had referenced the source code of
> org.apache.spark.util.collection.AppendOnlyMap and found that
> the map had been initialized with capacity of 64. That would be too small
> for our use case.
>
> So the question is : Does anyone had encounted such issues before? How did
> that be resolved? I cannot find any jira issues for such problems and
> if someone had seen, please kindly let us know.
>
> More specified solution would goes to : Does any possibility exists for user
> defining the map capacity releatively in spark? If so, please
> tell how to achieve that.
>
> Best Thanks,
> Sun.
>
>    Thread 22432: (state = IN_JAVA)
> - org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87,
> line=224 (Compiled frame; information may be imprecise)
> - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable()
> @bci=1, line=38 (Interpreted frame)
> - org.apache.spark.util.collection.AppendOnlyMap.incrementSize() @bci=22,
> line=198 (Compiled frame)
> -
> org.apache.spark.util.collection.AppendOnlyMap.changeValue(java.lang.Object,
> scala.Function2) @bci=201, line=145 (Compiled frame)
> -
> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(java.lang.Object,
> scala.Function2) @bci=3, line=32 (Compiled frame)
> -
> org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator)
> @bci=141, line=205 (Compiled frame)
> -
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(scala.collection.Iterator)
> @bci=74, line=58 (Interpreted frame)
> -
> org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
> @bci=169, line=68 (Interpreted frame)
> -
> org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
> @bci=2, line=41 (Interpreted frame)
> - org.apache.spark.scheduler.Task.run(long) @bci=77, line=56 (Interpreted
> frame)
> - org.apache.spark.executor.Executor$TaskRunner.run() @bci=310, line=196
> (Interpreted frame)
> -
> java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker)
> @bci=95, line=1145 (Interpreted frame)
> - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615
> (Interpreted frame)
> - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)
>
>
> Thread 22431: (state = IN_JAVA)
> - org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87,
> line=224 (Compiled frame; information may be imprecise)
> - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable()
> @bci=1, line=38 (Interpreted frame)
> - org.apache.spark.util.collection.AppendOnlyMap.incrementSize() @bci=22,
> line=198 (Compiled frame)
> -
> org.apache.spark.util.collection.AppendOnlyMap.changeValue(java.lang.Object,
> scala.Function2) @bci=201, line=145 (Compiled frame)
> -
> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(java.lang.Object,
> scala.Function2) @bci=3, line=32 (Compiled frame)
> -
> org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator)
> @bci=141, line=205 (Compiled frame)
> -
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(scala.collection.Iterator)
> @bci=74, line=58 (Interpreted frame)
> -
> org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
> @bci=169, line=68 (Interpreted frame)
> -
> org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
> @bci=2, line=41 (Interpreted frame)
> - org.apache.spark.scheduler.Task.run(long) @bci=77, line=56 (Interpreted
> frame)
> - org.apache.spark.executor.Executor$TaskRunner.run() @bci=310, line=196
> (Interpreted frame)
> -
> java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker)
> @bci=95, line=1145 (Interpreted frame)
> - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615
> (Interpreted frame)
> - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)
>
>
> fightfate@163.com
> 1 attachments
> dump.png(42K) download preview

Re: Re: Sort Shuffle performance issues about using AppendOnlyMap for large data sets

Posted by "fightfate@163.com" <fi...@163.com>.
Hi, patrick

Really glad to get your reply. 
Yes, we are doing group by operations for our work. We know that this is common for growTable when processing large data sets.

The problem actually goes to : Do we have any possible chance to self-modify the initialCapacity using specifically for our 
application? Does spark provide such configs for achieving that goal? 

We know that this is trickle to get it working. Just want to know that how could this be resolved, or from other possible channel for
we did not cover.

Expecting for your kind advice.

Thanks,
Sun.



fightfate@163.com
 
From: Patrick Wendell
Date: 2015-02-12 16:12
To: fightfate@163.com
CC: user; dev
Subject: Re: Re: Sort Shuffle performance issues about using AppendOnlyMap for large data sets
The map will start with a capacity of 64, but will grow to accommodate
new data. Are you using the groupBy operator in Spark or are you using
Spark SQL's group by? This usually happens if you are grouping or
aggregating in a way that doesn't sufficiently condense the data
created from each input partition.
 
- Patrick
 
On Wed, Feb 11, 2015 at 9:37 PM, fightfate@163.com <fi...@163.com> wrote:
> Hi,
>
> Really have no adequate solution got for this issue. Expecting any available
> analytical rules or hints.
>
> Thanks,
> Sun.
>
> ________________________________
> fightfate@163.com
>
>
> From: fightfate@163.com
> Date: 2015-02-09 11:56
> To: user; dev
> Subject: Re: Sort Shuffle performance issues about using AppendOnlyMap for
> large data sets
> Hi,
> Problem still exists. Any experts would take a look at this?
>
> Thanks,
> Sun.
>
> ________________________________
> fightfate@163.com
>
>
> From: fightfate@163.com
> Date: 2015-02-06 17:54
> To: user; dev
> Subject: Sort Shuffle performance issues about using AppendOnlyMap for large
> data sets
> Hi, all
> Recently we had caught performance issues when using spark 1.2.0 to read
> data from hbase and do some summary work.
> Our scenario means to : read large data sets from hbase (maybe 100G+ file) ,
> form hbaseRDD, transform to schemardd,
> groupby and aggregate the data while got fewer new summary data sets,
> loading data into hbase (phoenix).
>
> Our major issue lead to : aggregate large datasets to get summary data sets
> would consume too long time (1 hour +) , while that
> should be supposed not so bad performance. We got the dump file attached and
> stacktrace from jstack like the following:
>
> From the stacktrace and dump file we can identify that processing large
> datasets would cause frequent AppendOnlyMap growing, and
> leading to huge map entrysize. We had referenced the source code of
> org.apache.spark.util.collection.AppendOnlyMap and found that
> the map had been initialized with capacity of 64. That would be too small
> for our use case.
>
> So the question is : Does anyone had encounted such issues before? How did
> that be resolved? I cannot find any jira issues for such problems and
> if someone had seen, please kindly let us know.
>
> More specified solution would goes to : Does any possibility exists for user
> defining the map capacity releatively in spark? If so, please
> tell how to achieve that.
>
> Best Thanks,
> Sun.
>
>    Thread 22432: (state = IN_JAVA)
> - org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87,
> line=224 (Compiled frame; information may be imprecise)
> - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable()
> @bci=1, line=38 (Interpreted frame)
> - org.apache.spark.util.collection.AppendOnlyMap.incrementSize() @bci=22,
> line=198 (Compiled frame)
> -
> org.apache.spark.util.collection.AppendOnlyMap.changeValue(java.lang.Object,
> scala.Function2) @bci=201, line=145 (Compiled frame)
> -
> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(java.lang.Object,
> scala.Function2) @bci=3, line=32 (Compiled frame)
> -
> org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator)
> @bci=141, line=205 (Compiled frame)
> -
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(scala.collection.Iterator)
> @bci=74, line=58 (Interpreted frame)
> -
> org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
> @bci=169, line=68 (Interpreted frame)
> -
> org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
> @bci=2, line=41 (Interpreted frame)
> - org.apache.spark.scheduler.Task.run(long) @bci=77, line=56 (Interpreted
> frame)
> - org.apache.spark.executor.Executor$TaskRunner.run() @bci=310, line=196
> (Interpreted frame)
> -
> java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker)
> @bci=95, line=1145 (Interpreted frame)
> - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615
> (Interpreted frame)
> - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)
>
>
> Thread 22431: (state = IN_JAVA)
> - org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87,
> line=224 (Compiled frame; information may be imprecise)
> - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable()
> @bci=1, line=38 (Interpreted frame)
> - org.apache.spark.util.collection.AppendOnlyMap.incrementSize() @bci=22,
> line=198 (Compiled frame)
> -
> org.apache.spark.util.collection.AppendOnlyMap.changeValue(java.lang.Object,
> scala.Function2) @bci=201, line=145 (Compiled frame)
> -
> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(java.lang.Object,
> scala.Function2) @bci=3, line=32 (Compiled frame)
> -
> org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator)
> @bci=141, line=205 (Compiled frame)
> -
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(scala.collection.Iterator)
> @bci=74, line=58 (Interpreted frame)
> -
> org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
> @bci=169, line=68 (Interpreted frame)
> -
> org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
> @bci=2, line=41 (Interpreted frame)
> - org.apache.spark.scheduler.Task.run(long) @bci=77, line=56 (Interpreted
> frame)
> - org.apache.spark.executor.Executor$TaskRunner.run() @bci=310, line=196
> (Interpreted frame)
> -
> java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker)
> @bci=95, line=1145 (Interpreted frame)
> - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615
> (Interpreted frame)
> - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)
>
>
> fightfate@163.com
> 1 attachments
> dump.png(42K) download preview

Re: Re: Sort Shuffle performance issues about using AppendOnlyMap for large data sets

Posted by Patrick Wendell <pw...@gmail.com>.
The map will start with a capacity of 64, but will grow to accommodate
new data. Are you using the groupBy operator in Spark or are you using
Spark SQL's group by? This usually happens if you are grouping or
aggregating in a way that doesn't sufficiently condense the data
created from each input partition.

- Patrick

On Wed, Feb 11, 2015 at 9:37 PM, fightfate@163.com <fi...@163.com> wrote:
> Hi,
>
> Really have no adequate solution got for this issue. Expecting any available
> analytical rules or hints.
>
> Thanks,
> Sun.
>
> ________________________________
> fightfate@163.com
>
>
> From: fightfate@163.com
> Date: 2015-02-09 11:56
> To: user; dev
> Subject: Re: Sort Shuffle performance issues about using AppendOnlyMap for
> large data sets
> Hi,
> Problem still exists. Any experts would take a look at this?
>
> Thanks,
> Sun.
>
> ________________________________
> fightfate@163.com
>
>
> From: fightfate@163.com
> Date: 2015-02-06 17:54
> To: user; dev
> Subject: Sort Shuffle performance issues about using AppendOnlyMap for large
> data sets
> Hi, all
> Recently we had caught performance issues when using spark 1.2.0 to read
> data from hbase and do some summary work.
> Our scenario means to : read large data sets from hbase (maybe 100G+ file) ,
> form hbaseRDD, transform to schemardd,
> groupby and aggregate the data while got fewer new summary data sets,
> loading data into hbase (phoenix).
>
> Our major issue lead to : aggregate large datasets to get summary data sets
> would consume too long time (1 hour +) , while that
> should be supposed not so bad performance. We got the dump file attached and
> stacktrace from jstack like the following:
>
> From the stacktrace and dump file we can identify that processing large
> datasets would cause frequent AppendOnlyMap growing, and
> leading to huge map entrysize. We had referenced the source code of
> org.apache.spark.util.collection.AppendOnlyMap and found that
> the map had been initialized with capacity of 64. That would be too small
> for our use case.
>
> So the question is : Does anyone had encounted such issues before? How did
> that be resolved? I cannot find any jira issues for such problems and
> if someone had seen, please kindly let us know.
>
> More specified solution would goes to : Does any possibility exists for user
> defining the map capacity releatively in spark? If so, please
> tell how to achieve that.
>
> Best Thanks,
> Sun.
>
>    Thread 22432: (state = IN_JAVA)
> - org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87,
> line=224 (Compiled frame; information may be imprecise)
> - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable()
> @bci=1, line=38 (Interpreted frame)
> - org.apache.spark.util.collection.AppendOnlyMap.incrementSize() @bci=22,
> line=198 (Compiled frame)
> -
> org.apache.spark.util.collection.AppendOnlyMap.changeValue(java.lang.Object,
> scala.Function2) @bci=201, line=145 (Compiled frame)
> -
> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(java.lang.Object,
> scala.Function2) @bci=3, line=32 (Compiled frame)
> -
> org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator)
> @bci=141, line=205 (Compiled frame)
> -
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(scala.collection.Iterator)
> @bci=74, line=58 (Interpreted frame)
> -
> org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
> @bci=169, line=68 (Interpreted frame)
> -
> org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
> @bci=2, line=41 (Interpreted frame)
> - org.apache.spark.scheduler.Task.run(long) @bci=77, line=56 (Interpreted
> frame)
> - org.apache.spark.executor.Executor$TaskRunner.run() @bci=310, line=196
> (Interpreted frame)
> -
> java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker)
> @bci=95, line=1145 (Interpreted frame)
> - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615
> (Interpreted frame)
> - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)
>
>
> Thread 22431: (state = IN_JAVA)
> - org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87,
> line=224 (Compiled frame; information may be imprecise)
> - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable()
> @bci=1, line=38 (Interpreted frame)
> - org.apache.spark.util.collection.AppendOnlyMap.incrementSize() @bci=22,
> line=198 (Compiled frame)
> -
> org.apache.spark.util.collection.AppendOnlyMap.changeValue(java.lang.Object,
> scala.Function2) @bci=201, line=145 (Compiled frame)
> -
> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(java.lang.Object,
> scala.Function2) @bci=3, line=32 (Compiled frame)
> -
> org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator)
> @bci=141, line=205 (Compiled frame)
> -
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(scala.collection.Iterator)
> @bci=74, line=58 (Interpreted frame)
> -
> org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
> @bci=169, line=68 (Interpreted frame)
> -
> org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
> @bci=2, line=41 (Interpreted frame)
> - org.apache.spark.scheduler.Task.run(long) @bci=77, line=56 (Interpreted
> frame)
> - org.apache.spark.executor.Executor$TaskRunner.run() @bci=310, line=196
> (Interpreted frame)
> -
> java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker)
> @bci=95, line=1145 (Interpreted frame)
> - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615
> (Interpreted frame)
> - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)
>
>
> fightfate@163.com
> 1 attachments
> dump.png(42K) download preview

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: Re: Sort Shuffle performance issues about using AppendOnlyMap for large data sets

Posted by Patrick Wendell <pw...@gmail.com>.
The map will start with a capacity of 64, but will grow to accommodate
new data. Are you using the groupBy operator in Spark or are you using
Spark SQL's group by? This usually happens if you are grouping or
aggregating in a way that doesn't sufficiently condense the data
created from each input partition.

- Patrick

On Wed, Feb 11, 2015 at 9:37 PM, fightfate@163.com <fi...@163.com> wrote:
> Hi,
>
> Really have no adequate solution got for this issue. Expecting any available
> analytical rules or hints.
>
> Thanks,
> Sun.
>
> ________________________________
> fightfate@163.com
>
>
> From: fightfate@163.com
> Date: 2015-02-09 11:56
> To: user; dev
> Subject: Re: Sort Shuffle performance issues about using AppendOnlyMap for
> large data sets
> Hi,
> Problem still exists. Any experts would take a look at this?
>
> Thanks,
> Sun.
>
> ________________________________
> fightfate@163.com
>
>
> From: fightfate@163.com
> Date: 2015-02-06 17:54
> To: user; dev
> Subject: Sort Shuffle performance issues about using AppendOnlyMap for large
> data sets
> Hi, all
> Recently we had caught performance issues when using spark 1.2.0 to read
> data from hbase and do some summary work.
> Our scenario means to : read large data sets from hbase (maybe 100G+ file) ,
> form hbaseRDD, transform to schemardd,
> groupby and aggregate the data while got fewer new summary data sets,
> loading data into hbase (phoenix).
>
> Our major issue lead to : aggregate large datasets to get summary data sets
> would consume too long time (1 hour +) , while that
> should be supposed not so bad performance. We got the dump file attached and
> stacktrace from jstack like the following:
>
> From the stacktrace and dump file we can identify that processing large
> datasets would cause frequent AppendOnlyMap growing, and
> leading to huge map entrysize. We had referenced the source code of
> org.apache.spark.util.collection.AppendOnlyMap and found that
> the map had been initialized with capacity of 64. That would be too small
> for our use case.
>
> So the question is : Does anyone had encounted such issues before? How did
> that be resolved? I cannot find any jira issues for such problems and
> if someone had seen, please kindly let us know.
>
> More specified solution would goes to : Does any possibility exists for user
> defining the map capacity releatively in spark? If so, please
> tell how to achieve that.
>
> Best Thanks,
> Sun.
>
>    Thread 22432: (state = IN_JAVA)
> - org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87,
> line=224 (Compiled frame; information may be imprecise)
> - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable()
> @bci=1, line=38 (Interpreted frame)
> - org.apache.spark.util.collection.AppendOnlyMap.incrementSize() @bci=22,
> line=198 (Compiled frame)
> -
> org.apache.spark.util.collection.AppendOnlyMap.changeValue(java.lang.Object,
> scala.Function2) @bci=201, line=145 (Compiled frame)
> -
> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(java.lang.Object,
> scala.Function2) @bci=3, line=32 (Compiled frame)
> -
> org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator)
> @bci=141, line=205 (Compiled frame)
> -
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(scala.collection.Iterator)
> @bci=74, line=58 (Interpreted frame)
> -
> org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
> @bci=169, line=68 (Interpreted frame)
> -
> org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
> @bci=2, line=41 (Interpreted frame)
> - org.apache.spark.scheduler.Task.run(long) @bci=77, line=56 (Interpreted
> frame)
> - org.apache.spark.executor.Executor$TaskRunner.run() @bci=310, line=196
> (Interpreted frame)
> -
> java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker)
> @bci=95, line=1145 (Interpreted frame)
> - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615
> (Interpreted frame)
> - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)
>
>
> Thread 22431: (state = IN_JAVA)
> - org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87,
> line=224 (Compiled frame; information may be imprecise)
> - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable()
> @bci=1, line=38 (Interpreted frame)
> - org.apache.spark.util.collection.AppendOnlyMap.incrementSize() @bci=22,
> line=198 (Compiled frame)
> -
> org.apache.spark.util.collection.AppendOnlyMap.changeValue(java.lang.Object,
> scala.Function2) @bci=201, line=145 (Compiled frame)
> -
> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(java.lang.Object,
> scala.Function2) @bci=3, line=32 (Compiled frame)
> -
> org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator)
> @bci=141, line=205 (Compiled frame)
> -
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(scala.collection.Iterator)
> @bci=74, line=58 (Interpreted frame)
> -
> org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
> @bci=169, line=68 (Interpreted frame)
> -
> org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
> @bci=2, line=41 (Interpreted frame)
> - org.apache.spark.scheduler.Task.run(long) @bci=77, line=56 (Interpreted
> frame)
> - org.apache.spark.executor.Executor$TaskRunner.run() @bci=310, line=196
> (Interpreted frame)
> -
> java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker)
> @bci=95, line=1145 (Interpreted frame)
> - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615
> (Interpreted frame)
> - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)
>
>
> fightfate@163.com
> 1 attachments
> dump.png(42K) download preview

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org