You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Pete Robbins <ro...@gmail.com> on 2015/09/14 13:16:11 UTC

Unable to acquire memory errors in HiveCompatibilitySuite

I keep hitting errors running the tests on 1.5 such as


- join31 *** FAILED ***
  Failed to execute query using catalyst:
  Error: Job aborted due to stage failure: Task 9 in stage 3653.0 failed 1
times, most recent failure: Lost task 9.0 in stage 3653.0 (TID 123363,
localhost): java.io.IOException: Unable to acquire 4194304 bytes of memory
      at
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368)


This is using the command
build/mvn -Pyarn -Phadoop-2.2 -Phive -Phive-thriftserver  test


I don't see these errors in any of the amplab jenkins builds. Do those
builds have any configuration/environment that I may be missing? My build
is running with whatever defaults are in the top level pom.xml, eg -Xmx3G.

I can make these tests pass by setting spark.shuffle.memoryFraction=0.6 in
the HiveCompatibilitySuite rather than the default 0.2 value.

Trying to analyze what is going on with the test it is related to the
number of active tasks, which seems to rise to 32, and so the
ShuffleMemoryManager allows less memory per task even though most of those
tasks do not have any memory allocated to them.

Has anyone seen issues like this before?

Re: Unable to acquire memory errors in HiveCompatibilitySuite

Posted by Pete Robbins <ro...@gmail.com>.
This is the culprit:

https://issues.apache.org/jira/browse/SPARK-8406

"2.  Make `TestHive` use a local mode `SparkContext` with 32 threads to
increase parallelism

    The major reason for this is that, the original parallelism of 2 is too
low to reproduce
the data loss issue.  Also, higher concurrency may potentially caught more
concurrency bugs
during testing phase. (It did help us spotted SPARK-8501.)"

Specific change:

http://git-wip-us.apache.org/repos/asf/spark/blob/0818fdec/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index f901bd8..ea325cc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -49,7 +49,7 @@ import scala.collection.JavaConversions._
 object TestHive
   extends TestHiveContext(
     new SparkContext(
-      System.getProperty("spark.sql.test.master", "local[2]"),
+      System.getProperty("spark.sql.test.master", "local[32]"),
       "TestSQLContext",
       new SparkConf()
         .set("spark.sql.test", "")



Setting that to local[8] to match my cores the HiveCompatibilitySuite
passes (and runs so much faster!) so maybe that should be changed to limit
threads to num cores?

Cheers,

On 15 September 2015 at 08:50, Pete Robbins <ro...@gmail.com> wrote:

> Ok so it looks like the max number of active tasks reaches 30. I'm not
> setting anything as it is a clean environment with clean spark code
> checkout. I'll dig further to see why so many tasks are active.
>
> Cheers,
>
> On 15 September 2015 at 07:22, Reynold Xin <rx...@databricks.com> wrote:
>
>> Yea I think this is where the heuristics is failing -- it uses 8 cores to
>> approximate the number of active tasks, but the tests somehow is using 32
>> (maybe because it explicitly sets it to that, or you set it yourself? I'm
>> not sure which one)
>>
>> On Mon, Sep 14, 2015 at 11:06 PM, Pete Robbins <ro...@gmail.com>
>> wrote:
>>
>>> Reynold, thanks for replying.
>>>
>>> getPageSize parameters: maxMemory=515396075, numCores=0
>>> Calculated values: cores=8, default=4194304
>>>
>>> So am I getting a large page size as I only have 8 cores?
>>>
>>> On 15 September 2015 at 00:40, Reynold Xin <rx...@databricks.com> wrote:
>>>
>>>> Pete - can you do me a favor?
>>>>
>>>>
>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174
>>>>
>>>> Print the parameters that are passed into the getPageSize function, and
>>>> check their values.
>>>>
>>>> On Mon, Sep 14, 2015 at 4:32 PM, Reynold Xin <rx...@databricks.com>
>>>> wrote:
>>>>
>>>>> Is this on latest master / branch-1.5?
>>>>>
>>>>> out of the box we reserve only 16% (0.2 * 0.8) of the memory for
>>>>> execution (e.g. aggregate, join) / shuffle sorting. With a 3GB heap, that's
>>>>> 480MB. So each task gets 480MB / 32 = 15MB, and each operator reserves at
>>>>> least one page for execution. If your page size is 4MB, it only takes 3
>>>>> operators to use up its memory.
>>>>>
>>>>> The thing is page size is dynamically determined -- and in your case
>>>>> it should be smaller than 4MB.
>>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174
>>>>>
>>>>> Maybe there is a place that in the maven tests that we explicitly set
>>>>> the page size (spark.buffer.pageSize) to 4MB? If yes, we need to find it
>>>>> and just remove it.
>>>>>
>>>>>
>>>>> On Mon, Sep 14, 2015 at 4:16 AM, Pete Robbins <ro...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I keep hitting errors running the tests on 1.5 such as
>>>>>>
>>>>>>
>>>>>> - join31 *** FAILED ***
>>>>>>   Failed to execute query using catalyst:
>>>>>>   Error: Job aborted due to stage failure: Task 9 in stage 3653.0
>>>>>> failed 1 times, most recent failure: Lost task 9.0 in stage 3653.0 (TID
>>>>>> 123363, localhost): java.io.IOException: Unable to acquire 4194304 bytes of
>>>>>> memory
>>>>>>       at
>>>>>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368)
>>>>>>
>>>>>>
>>>>>> This is using the command
>>>>>> build/mvn -Pyarn -Phadoop-2.2 -Phive -Phive-thriftserver  test
>>>>>>
>>>>>>
>>>>>> I don't see these errors in any of the amplab jenkins builds. Do
>>>>>> those builds have any configuration/environment that I may be missing? My
>>>>>> build is running with whatever defaults are in the top level pom.xml, eg
>>>>>> -Xmx3G.
>>>>>>
>>>>>> I can make these tests pass by setting
>>>>>> spark.shuffle.memoryFraction=0.6 in the HiveCompatibilitySuite rather than
>>>>>> the default 0.2 value.
>>>>>>
>>>>>> Trying to analyze what is going on with the test it is related to the
>>>>>> number of active tasks, which seems to rise to 32, and so the
>>>>>> ShuffleMemoryManager allows less memory per task even though most of those
>>>>>> tasks do not have any memory allocated to them.
>>>>>>
>>>>>> Has anyone seen issues like this before?
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Unable to acquire memory errors in HiveCompatibilitySuite

Posted by Pete Robbins <ro...@gmail.com>.
Ok so it looks like the max number of active tasks reaches 30. I'm not
setting anything as it is a clean environment with clean spark code
checkout. I'll dig further to see why so many tasks are active.

Cheers,

On 15 September 2015 at 07:22, Reynold Xin <rx...@databricks.com> wrote:

> Yea I think this is where the heuristics is failing -- it uses 8 cores to
> approximate the number of active tasks, but the tests somehow is using 32
> (maybe because it explicitly sets it to that, or you set it yourself? I'm
> not sure which one)
>
> On Mon, Sep 14, 2015 at 11:06 PM, Pete Robbins <ro...@gmail.com>
> wrote:
>
>> Reynold, thanks for replying.
>>
>> getPageSize parameters: maxMemory=515396075, numCores=0
>> Calculated values: cores=8, default=4194304
>>
>> So am I getting a large page size as I only have 8 cores?
>>
>> On 15 September 2015 at 00:40, Reynold Xin <rx...@databricks.com> wrote:
>>
>>> Pete - can you do me a favor?
>>>
>>>
>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174
>>>
>>> Print the parameters that are passed into the getPageSize function, and
>>> check their values.
>>>
>>> On Mon, Sep 14, 2015 at 4:32 PM, Reynold Xin <rx...@databricks.com>
>>> wrote:
>>>
>>>> Is this on latest master / branch-1.5?
>>>>
>>>> out of the box we reserve only 16% (0.2 * 0.8) of the memory for
>>>> execution (e.g. aggregate, join) / shuffle sorting. With a 3GB heap, that's
>>>> 480MB. So each task gets 480MB / 32 = 15MB, and each operator reserves at
>>>> least one page for execution. If your page size is 4MB, it only takes 3
>>>> operators to use up its memory.
>>>>
>>>> The thing is page size is dynamically determined -- and in your case it
>>>> should be smaller than 4MB.
>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174
>>>>
>>>> Maybe there is a place that in the maven tests that we explicitly set
>>>> the page size (spark.buffer.pageSize) to 4MB? If yes, we need to find it
>>>> and just remove it.
>>>>
>>>>
>>>> On Mon, Sep 14, 2015 at 4:16 AM, Pete Robbins <ro...@gmail.com>
>>>> wrote:
>>>>
>>>>> I keep hitting errors running the tests on 1.5 such as
>>>>>
>>>>>
>>>>> - join31 *** FAILED ***
>>>>>   Failed to execute query using catalyst:
>>>>>   Error: Job aborted due to stage failure: Task 9 in stage 3653.0
>>>>> failed 1 times, most recent failure: Lost task 9.0 in stage 3653.0 (TID
>>>>> 123363, localhost): java.io.IOException: Unable to acquire 4194304 bytes of
>>>>> memory
>>>>>       at
>>>>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368)
>>>>>
>>>>>
>>>>> This is using the command
>>>>> build/mvn -Pyarn -Phadoop-2.2 -Phive -Phive-thriftserver  test
>>>>>
>>>>>
>>>>> I don't see these errors in any of the amplab jenkins builds. Do those
>>>>> builds have any configuration/environment that I may be missing? My build
>>>>> is running with whatever defaults are in the top level pom.xml, eg -Xmx3G.
>>>>>
>>>>> I can make these tests pass by setting
>>>>> spark.shuffle.memoryFraction=0.6 in the HiveCompatibilitySuite rather than
>>>>> the default 0.2 value.
>>>>>
>>>>> Trying to analyze what is going on with the test it is related to the
>>>>> number of active tasks, which seems to rise to 32, and so the
>>>>> ShuffleMemoryManager allows less memory per task even though most of those
>>>>> tasks do not have any memory allocated to them.
>>>>>
>>>>> Has anyone seen issues like this before?
>>>>>
>>>>
>>>>
>>>
>>
>

RE: Unable to acquire memory errors in HiveCompatibilitySuite

Posted by "Cheng, Hao" <ha...@intel.com>.
We actually meet the similiar problem in a real case, see https://issues.apache.org/jira/browse/SPARK-10474

After checking the source code, the external sort memory management strategy seems the root cause of the issue.

Currently, we allocate the 4MB (page size) buffer as initial in the beginning of the sorting, and during the processing of each input record, we possible run into the cycle of spill => de-allocate buffer => try allocate a buffer with size x2. I know this strategy is quite flexible in some cases. However, for example in a data skew case, says 2 tasks with large amount of records runs at a single executor, the keep growing buffer strategy will eventually eat out the pre-set offheap memory threshold, and then exception thrown like what we’ve seen.

I mean can we just take a simple memory management strategy for external sorter, like:
Step 1) Allocate a fixed size  buffer for the current task (maybe: MAX_MEMORY_THRESHOLD/(2 * PARALLEL_TASKS_PER_EXECUTOR))
Step 2) for (record in the input) { if (hasMemoryForRecord(record)) insert(record) else spill(buffer); insert(record); }
Step 3) Deallocate(buffer)

Probably we’d better to move the discussion in jira.
From: Reynold Xin [mailto:rxin@databricks.com]
Sent: Thursday, September 17, 2015 12:28 AM
To: Pete Robbins
Cc: Dev
Subject: Re: Unable to acquire memory errors in HiveCompatibilitySuite

SparkEnv for the driver was created in SparkContext. The default parallelism field is set to the number of slots (max number of active tasks). Maybe we can just use the default parallelism to compute that in local mode.

On Wednesday, September 16, 2015, Pete Robbins <ro...@gmail.com>> wrote:
so forcing the ShuffleMemoryManager to assume 32 cores and therefore calculate a pagesize of 1MB passes the tests.
How can we determine the correct value to use in getPageSize rather than Runtime.getRuntime.availableProcessors()?

On 16 September 2015 at 10:17, Pete Robbins <robbinspg@gmail.com<javascript:_e(%7B%7D,'cvml','robbinspg@gmail.com');>> wrote:
I see what you are saying. Full stack trace:

java.io.IOException: Unable to acquire 4194304 bytes of memory
      at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368)
      at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:349)
      at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertKVRecord(UnsafeExternalSorter.java:478)
      at org.apache.spark.sql.execution.UnsafeKVExternalSorter.insertKV(UnsafeKVExternalSorter.java:138)
      at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:489)
      at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:379)
      at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
      at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org<http://1.org>$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)
      at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
      at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
      at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
      at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
      at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:99)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
      at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
      at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
      at org.apache.spark.scheduler.Task.run(Task.scala:88)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      at java.lang.Thread.run(Thread.java:785)

On 16 September 2015 at 09:30, Reynold Xin <rxin@databricks.com<javascript:_e(%7B%7D,'cvml','rxin@databricks.com');>> wrote:
Can you paste the entire stacktrace of the error? In your original email you only included the last function call.

Maybe I'm missing something here, but I still think the bad heuristics is the issue.

Some operators pre-reserve memory before running anything in order to avoid starvation. For example, imagine we have an aggregate followed by a sort. If the aggregate is very high cardinality, and uses up all the memory and even starts spilling (falling back to sort-based aggregate), there isn't memory available at all for the sort operator to use. To work around this, each operator reserves a page of memory before they process any data.

Page size is computed by Spark using:

the total amount of execution memory / (maximum number of active tasks * 16)

and then rounded to the next power of 2, and cap between 1MB and 64MB.

That is to say, in the worst case, we should be able to reserve at least 8 pages (16 rounded up to the next power of 2).

However, in your case, the max number of active tasks is 32 (set by test env), while the page size is determined using # cores (8 in your case). So it is off by a factor of 4. As a result, with this page size, we can only reserve at least 2 pages. That is to say, if you have more than 3 operators that need page reservation (e.g. an aggregate followed by a join on the group by key followed by a shuffle - which seems to be the case of join31.q), the task can fail to reserve memory before running anything.


There is a 2nd problem (maybe this is the one you were trying to point out?) that is tasks running at the same time can be competing for memory with each other.  Spark allows each task to claim up to 2/N share of memory, where N is the number of active tasks. If a task is launched before others and hogs a lot of memory quickly, the other tasks that are launched after it might not be able to get enough memory allocation, and thus will fail. This is not super ideal, but probably fine because tasks can be retried, and can succeed in retries.


On Wed, Sep 16, 2015 at 1:07 AM, Pete Robbins <robbinspg@gmail.com<javascript:_e(%7B%7D,'cvml','robbinspg@gmail.com');>> wrote:
ok so let me try again ;-)
I don't think that the page size calculation matters apart from hitting the allocation limit earlier if the page size is too large.

If a task is going to need X bytes, it is going to need X bytes. In this case, for at least one of the tasks, X > maxmemory/no_active_tasks at some point during execution. A smaller page size may use the memory more efficiently but would not necessarily avoid this issue.
The next question would be: Is the memory limit per task of max_memory/no_active_tasks reasonable? It seems fair but if this limit is reached currently an exception is thrown, maybe the task could wait for no_active_tasks to decrease?
I think what causes my test issue is that the 32 tasks don't execute as quickly on my 8 core box so more are active at any one time.
I will experiment with the page size calculation to see what effect it has.

Cheers,


On 16 September 2015 at 06:53, Reynold Xin <rxin@databricks.com<javascript:_e(%7B%7D,'cvml','rxin@databricks.com');>> wrote:
It is exactly the issue here, isn't it?

We are using memory / N, where N should be the maximum number of active tasks. In the current master, we use the number of cores to approximate the number of tasks -- but it turned out to be a bad approximation in tests because it is set to 32 to increase concurrency.


On Tue, Sep 15, 2015 at 10:47 PM, Pete Robbins <robbinspg@gmail.com<javascript:_e(%7B%7D,'cvml','robbinspg@gmail.com');>> wrote:
Oops... I meant to say "The page size calculation is NOT the issue here"

On 16 September 2015 at 06:46, Pete Robbins <robbinspg@gmail.com<javascript:_e(%7B%7D,'cvml','robbinspg@gmail.com');>> wrote:
The page size calculation is the issue here as there is plenty of free memory, although there is maybe a fair bit of wasted space in some pages. It is that when we have a lot of tasks each is only allowed to reach 1/n of the available memory and several of the tasks bump in to that limit. With tasks 4 times the number of cores there will be some contention and so they remain active for longer.

So I think this is a test case issue configuring the number of executors too high.

On 15 September 2015 at 18:54, Reynold Xin <rxin@databricks.com<javascript:_e(%7B%7D,'cvml','rxin@databricks.com');>> wrote:
Maybe we can change the heuristics in memory calculation to use SparkContext.defaultParallelism if it is local mode.


On Tue, Sep 15, 2015 at 10:28 AM, Pete Robbins <robbinspg@gmail.com<javascript:_e(%7B%7D,'cvml','robbinspg@gmail.com');>> wrote:
Yes and at least there is an override by setting  spark.sql.test.master to local[8] , in fact local[16] worked on my 8 core box.

I'm happy to use this as a workaround but the 32 hard-coded will fail running build/tests on a clean checkout if you only have 8 cores.

On 15 September 2015 at 17:40, Marcelo Vanzin <vanzin@cloudera.com<javascript:_e(%7B%7D,'cvml','vanzin@cloudera.com');>> wrote:
That test explicitly sets the number of executor cores to 32.

object TestHive
  extends TestHiveContext(
    new SparkContext(
      System.getProperty("spark.sql.test.master", "local[32]"),

On Mon, Sep 14, 2015 at 11:22 PM, Reynold Xin <rxin@databricks.com<javascript:_e(%7B%7D,'cvml','rxin@databricks.com');>> wrote:
> Yea I think this is where the heuristics is failing -- it uses 8 cores to
> approximate the number of active tasks, but the tests somehow is using 32
> (maybe because it explicitly sets it to that, or you set it yourself? I'm
> not sure which one)
>
> On Mon, Sep 14, 2015 at 11:06 PM, Pete Robbins <robbinspg@gmail.com<javascript:_e(%7B%7D,'cvml','robbinspg@gmail.com');>> wrote:
>>
>> Reynold, thanks for replying.
>>
>> getPageSize parameters: maxMemory=515396075, numCores=0
>> Calculated values: cores=8, default=4194304
>>
>> So am I getting a large page size as I only have 8 cores?
>>
>> On 15 September 2015 at 00:40, Reynold Xin <rxin@databricks.com<javascript:_e(%7B%7D,'cvml','rxin@databricks.com');>> wrote:
>>>
>>> Pete - can you do me a favor?
>>>
>>>
>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174
>>>
>>> Print the parameters that are passed into the getPageSize function, and
>>> check their values.
>>>
>>> On Mon, Sep 14, 2015 at 4:32 PM, Reynold Xin <rxin@databricks.com<javascript:_e(%7B%7D,'cvml','rxin@databricks.com');>> wrote:
>>>>
>>>> Is this on latest master / branch-1.5?
>>>>
>>>> out of the box we reserve only 16% (0.2 * 0.8) of the memory for
>>>> execution (e.g. aggregate, join) / shuffle sorting. With a 3GB heap, that's
>>>> 480MB. So each task gets 480MB / 32 = 15MB, and each operator reserves at
>>>> least one page for execution. If your page size is 4MB, it only takes 3
>>>> operators to use up its memory.
>>>>
>>>> The thing is page size is dynamically determined -- and in your case it
>>>> should be smaller than 4MB.
>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174
>>>>
>>>> Maybe there is a place that in the maven tests that we explicitly set
>>>> the page size (spark.buffer.pageSize) to 4MB? If yes, we need to find it and
>>>> just remove it.
>>>>
>>>>
>>>> On Mon, Sep 14, 2015 at 4:16 AM, Pete Robbins <robbinspg@gmail.com<javascript:_e(%7B%7D,'cvml','robbinspg@gmail.com');>>
>>>> wrote:
>>>>>
>>>>> I keep hitting errors running the tests on 1.5 such as
>>>>>
>>>>>
>>>>> - join31 *** FAILED ***
>>>>>   Failed to execute query using catalyst:
>>>>>   Error: Job aborted due to stage failure: Task 9 in stage 3653.0
>>>>> failed 1 times, most recent failure: Lost task 9.0 in stage 3653.0 (TID
>>>>> 123363, localhost): java.io.IOException: Unable to acquire 4194304 bytes of
>>>>> memory
>>>>>       at
>>>>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368)
>>>>>
>>>>>
>>>>> This is using the command
>>>>> build/mvn -Pyarn -Phadoop-2.2 -Phive -Phive-thriftserver  test
>>>>>
>>>>>
>>>>> I don't see these errors in any of the amplab jenkins builds. Do those
>>>>> builds have any configuration/environment that I may be missing? My build is
>>>>> running with whatever defaults are in the top level pom.xml, eg -Xmx3G.
>>>>>
>>>>> I can make these tests pass by setting spark.shuffle.memoryFraction=0.6
>>>>> in the HiveCompatibilitySuite rather than the default 0.2 value.
>>>>>
>>>>> Trying to analyze what is going on with the test it is related to the
>>>>> number of active tasks, which seems to rise to 32, and so the
>>>>> ShuffleMemoryManager allows less memory per task even though most of those
>>>>> tasks do not have any memory allocated to them.
>>>>>
>>>>> Has anyone seen issues like this before?
>>>>
>>>>
>>>
>>
>


--
Marcelo










Re: Unable to acquire memory errors in HiveCompatibilitySuite

Posted by Reynold Xin <rx...@databricks.com>.
SparkEnv for the driver was created in SparkContext. The default
parallelism field is set to the number of slots (max number of active
tasks). Maybe we can just use the default parallelism to compute that in
local mode.

On Wednesday, September 16, 2015, Pete Robbins <ro...@gmail.com> wrote:

> so forcing the ShuffleMemoryManager to assume 32 cores and therefore
> calculate a pagesize of 1MB passes the tests.
>
> How can we determine the correct value to use in getPageSize rather than
> Runtime.getRuntime.availableProcessors()?
>
> On 16 September 2015 at 10:17, Pete Robbins <robbinspg@gmail.com
> <javascript:_e(%7B%7D,'cvml','robbinspg@gmail.com');>> wrote:
>
>> I see what you are saying. Full stack trace:
>>
>> java.io.IOException: Unable to acquire 4194304 bytes of memory
>>       at
>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368)
>>       at
>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:349)
>>       at
>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertKVRecord(UnsafeExternalSorter.java:478)
>>       at
>> org.apache.spark.sql.execution.UnsafeKVExternalSorter.insertKV(UnsafeKVExternalSorter.java:138)
>>       at
>> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:489)
>>       at
>> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:379)
>>       at
>> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
>>       at
>> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$
>> 1.org
>> $apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)
>>       at
>> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
>>       at
>> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
>>       at
>> org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64)
>>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>       at
>> org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63)
>>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>       at
>> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:99)
>>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>       at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>       at
>> org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63)
>>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>       at
>> org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63)
>>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>       at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>       at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>       at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>       at org.apache.spark.scheduler.Task.run(Task.scala:88)
>>       at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>       at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
>>       at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>       at java.lang.Thread.run(Thread.java:785)
>>
>> On 16 September 2015 at 09:30, Reynold Xin <rxin@databricks.com
>> <javascript:_e(%7B%7D,'cvml','rxin@databricks.com');>> wrote:
>>
>>> Can you paste the entire stacktrace of the error? In your original email
>>> you only included the last function call.
>>>
>>> Maybe I'm missing something here, but I still think the bad heuristics
>>> is the issue.
>>>
>>> Some operators pre-reserve memory before running anything in order to
>>> avoid starvation. For example, imagine we have an aggregate followed by a
>>> sort. If the aggregate is very high cardinality, and uses up all the memory
>>> and even starts spilling (falling back to sort-based aggregate), there
>>> isn't memory available at all for the sort operator to use. To work around
>>> this, each operator reserves a page of memory before they process any data.
>>>
>>> Page size is computed by Spark using:
>>>
>>> the total amount of execution memory / (maximum number of active tasks *
>>> 16)
>>>
>>> and then rounded to the next power of 2, and cap between 1MB and 64MB.
>>>
>>> That is to say, in the worst case, we should be able to reserve at least
>>> 8 pages (16 rounded up to the next power of 2).
>>>
>>> However, in your case, the max number of active tasks is 32 (set by test
>>> env), while the page size is determined using # cores (8 in your case). So
>>> it is off by a factor of 4. As a result, with this page size, we can only
>>> reserve at least 2 pages. That is to say, if you have more than 3 operators
>>> that need page reservation (e.g. an aggregate followed by a join on the
>>> group by key followed by a shuffle - which seems to be the case of
>>> join31.q), the task can fail to reserve memory before running anything.
>>>
>>>
>>> There is a 2nd problem (maybe this is the one you were trying to point
>>> out?) that is tasks running at the same time can be competing for memory
>>> with each other.  Spark allows each task to claim up to 2/N share of
>>> memory, where N is the number of active tasks. If a task is launched before
>>> others and hogs a lot of memory quickly, the other tasks that are launched
>>> after it might not be able to get enough memory allocation, and thus will
>>> fail. This is not super ideal, but probably fine because tasks can be
>>> retried, and can succeed in retries.
>>>
>>>
>>> On Wed, Sep 16, 2015 at 1:07 AM, Pete Robbins <robbinspg@gmail.com
>>> <javascript:_e(%7B%7D,'cvml','robbinspg@gmail.com');>> wrote:
>>>
>>>> ok so let me try again ;-)
>>>>
>>>> I don't think that the page size calculation matters apart from hitting
>>>> the allocation limit earlier if the page size is too large.
>>>>
>>>> If a task is going to need X bytes, it is going to need X bytes. In
>>>> this case, for at least one of the tasks, X > maxmemory/no_active_tasks at
>>>> some point during execution. A smaller page size may use the memory more
>>>> efficiently but would not necessarily avoid this issue.
>>>>
>>>> The next question would be: Is the memory limit per task of
>>>> max_memory/no_active_tasks reasonable? It seems fair but if this limit is
>>>> reached currently an exception is thrown, maybe the task could wait for
>>>> no_active_tasks to decrease?
>>>>
>>>> I think what causes my test issue is that the 32 tasks don't execute as
>>>> quickly on my 8 core box so more are active at any one time.
>>>>
>>>> I will experiment with the page size calculation to see what effect it
>>>> has.
>>>>
>>>> Cheers,
>>>>
>>>>
>>>>
>>>> On 16 September 2015 at 06:53, Reynold Xin <rxin@databricks.com
>>>> <javascript:_e(%7B%7D,'cvml','rxin@databricks.com');>> wrote:
>>>>
>>>>> It is exactly the issue here, isn't it?
>>>>>
>>>>> We are using memory / N, where N should be the maximum number of
>>>>> active tasks. In the current master, we use the number of cores to
>>>>> approximate the number of tasks -- but it turned out to be a bad
>>>>> approximation in tests because it is set to 32 to increase concurrency.
>>>>>
>>>>>
>>>>> On Tue, Sep 15, 2015 at 10:47 PM, Pete Robbins <robbinspg@gmail.com
>>>>> <javascript:_e(%7B%7D,'cvml','robbinspg@gmail.com');>> wrote:
>>>>>
>>>>>> Oops... I meant to say "The page size calculation is NOT the issue
>>>>>> here"
>>>>>>
>>>>>> On 16 September 2015 at 06:46, Pete Robbins <robbinspg@gmail.com
>>>>>> <javascript:_e(%7B%7D,'cvml','robbinspg@gmail.com');>> wrote:
>>>>>>
>>>>>>> The page size calculation is the issue here as there is plenty of
>>>>>>> free memory, although there is maybe a fair bit of wasted space in some
>>>>>>> pages. It is that when we have a lot of tasks each is only allowed to reach
>>>>>>> 1/n of the available memory and several of the tasks bump in to that limit.
>>>>>>> With tasks 4 times the number of cores there will be some contention and so
>>>>>>> they remain active for longer.
>>>>>>>
>>>>>>> So I think this is a test case issue configuring the number of
>>>>>>> executors too high.
>>>>>>>
>>>>>>> On 15 September 2015 at 18:54, Reynold Xin <rxin@databricks.com
>>>>>>> <javascript:_e(%7B%7D,'cvml','rxin@databricks.com');>> wrote:
>>>>>>>
>>>>>>>> Maybe we can change the heuristics in memory calculation to use
>>>>>>>> SparkContext.defaultParallelism if it is local mode.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Sep 15, 2015 at 10:28 AM, Pete Robbins <robbinspg@gmail.com
>>>>>>>> <javascript:_e(%7B%7D,'cvml','robbinspg@gmail.com');>> wrote:
>>>>>>>>
>>>>>>>>> Yes and at least there is an override by setting
>>>>>>>>> spark.sql.test.master to local[8] , in fact local[16] worked on my 8 core
>>>>>>>>> box.
>>>>>>>>>
>>>>>>>>> I'm happy to use this as a workaround but the 32 hard-coded will
>>>>>>>>> fail running build/tests on a clean checkout if you only have 8 cores.
>>>>>>>>>
>>>>>>>>> On 15 September 2015 at 17:40, Marcelo Vanzin <vanzin@cloudera.com
>>>>>>>>> <javascript:_e(%7B%7D,'cvml','vanzin@cloudera.com');>> wrote:
>>>>>>>>>
>>>>>>>>>> That test explicitly sets the number of executor cores to 32.
>>>>>>>>>>
>>>>>>>>>> object TestHive
>>>>>>>>>>   extends TestHiveContext(
>>>>>>>>>>     new SparkContext(
>>>>>>>>>>       System.getProperty("spark.sql.test.master", "local[32]"),
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Sep 14, 2015 at 11:22 PM, Reynold Xin <
>>>>>>>>>> rxin@databricks.com
>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','rxin@databricks.com');>> wrote:
>>>>>>>>>> > Yea I think this is where the heuristics is failing -- it uses
>>>>>>>>>> 8 cores to
>>>>>>>>>> > approximate the number of active tasks, but the tests somehow
>>>>>>>>>> is using 32
>>>>>>>>>> > (maybe because it explicitly sets it to that, or you set it
>>>>>>>>>> yourself? I'm
>>>>>>>>>> > not sure which one)
>>>>>>>>>> >
>>>>>>>>>> > On Mon, Sep 14, 2015 at 11:06 PM, Pete Robbins <
>>>>>>>>>> robbinspg@gmail.com
>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','robbinspg@gmail.com');>> wrote:
>>>>>>>>>> >>
>>>>>>>>>> >> Reynold, thanks for replying.
>>>>>>>>>> >>
>>>>>>>>>> >> getPageSize parameters: maxMemory=515396075, numCores=0
>>>>>>>>>> >> Calculated values: cores=8, default=4194304
>>>>>>>>>> >>
>>>>>>>>>> >> So am I getting a large page size as I only have 8 cores?
>>>>>>>>>> >>
>>>>>>>>>> >> On 15 September 2015 at 00:40, Reynold Xin <
>>>>>>>>>> rxin@databricks.com
>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','rxin@databricks.com');>> wrote:
>>>>>>>>>> >>>
>>>>>>>>>> >>> Pete - can you do me a favor?
>>>>>>>>>> >>>
>>>>>>>>>> >>>
>>>>>>>>>> >>>
>>>>>>>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174
>>>>>>>>>> >>>
>>>>>>>>>> >>> Print the parameters that are passed into the getPageSize
>>>>>>>>>> function, and
>>>>>>>>>> >>> check their values.
>>>>>>>>>> >>>
>>>>>>>>>> >>> On Mon, Sep 14, 2015 at 4:32 PM, Reynold Xin <
>>>>>>>>>> rxin@databricks.com
>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','rxin@databricks.com');>> wrote:
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> Is this on latest master / branch-1.5?
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> out of the box we reserve only 16% (0.2 * 0.8) of the memory
>>>>>>>>>> for
>>>>>>>>>> >>>> execution (e.g. aggregate, join) / shuffle sorting. With a
>>>>>>>>>> 3GB heap, that's
>>>>>>>>>> >>>> 480MB. So each task gets 480MB / 32 = 15MB, and each
>>>>>>>>>> operator reserves at
>>>>>>>>>> >>>> least one page for execution. If your page size is 4MB, it
>>>>>>>>>> only takes 3
>>>>>>>>>> >>>> operators to use up its memory.
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> The thing is page size is dynamically determined -- and in
>>>>>>>>>> your case it
>>>>>>>>>> >>>> should be smaller than 4MB.
>>>>>>>>>> >>>>
>>>>>>>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> Maybe there is a place that in the maven tests that we
>>>>>>>>>> explicitly set
>>>>>>>>>> >>>> the page size (spark.buffer.pageSize) to 4MB? If yes, we
>>>>>>>>>> need to find it and
>>>>>>>>>> >>>> just remove it.
>>>>>>>>>> >>>>
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> On Mon, Sep 14, 2015 at 4:16 AM, Pete Robbins <
>>>>>>>>>> robbinspg@gmail.com
>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','robbinspg@gmail.com');>>
>>>>>>>>>> >>>> wrote:
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>> I keep hitting errors running the tests on 1.5 such as
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>> - join31 *** FAILED ***
>>>>>>>>>> >>>>>   Failed to execute query using catalyst:
>>>>>>>>>> >>>>>   Error: Job aborted due to stage failure: Task 9 in stage
>>>>>>>>>> 3653.0
>>>>>>>>>> >>>>> failed 1 times, most recent failure: Lost task 9.0 in stage
>>>>>>>>>> 3653.0 (TID
>>>>>>>>>> >>>>> 123363, localhost): java.io.IOException: Unable to acquire
>>>>>>>>>> 4194304 bytes of
>>>>>>>>>> >>>>> memory
>>>>>>>>>> >>>>>       at
>>>>>>>>>> >>>>>
>>>>>>>>>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368)
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>> This is using the command
>>>>>>>>>> >>>>> build/mvn -Pyarn -Phadoop-2.2 -Phive -Phive-thriftserver
>>>>>>>>>> test
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>> I don't see these errors in any of the amplab jenkins
>>>>>>>>>> builds. Do those
>>>>>>>>>> >>>>> builds have any configuration/environment that I may be
>>>>>>>>>> missing? My build is
>>>>>>>>>> >>>>> running with whatever defaults are in the top level
>>>>>>>>>> pom.xml, eg -Xmx3G.
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>> I can make these tests pass by setting
>>>>>>>>>> spark.shuffle.memoryFraction=0.6
>>>>>>>>>> >>>>> in the HiveCompatibilitySuite rather than the default 0.2
>>>>>>>>>> value.
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>> Trying to analyze what is going on with the test it is
>>>>>>>>>> related to the
>>>>>>>>>> >>>>> number of active tasks, which seems to rise to 32, and so
>>>>>>>>>> the
>>>>>>>>>> >>>>> ShuffleMemoryManager allows less memory per task even
>>>>>>>>>> though most of those
>>>>>>>>>> >>>>> tasks do not have any memory allocated to them.
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>> Has anyone seen issues like this before?
>>>>>>>>>> >>>>
>>>>>>>>>> >>>>
>>>>>>>>>> >>>
>>>>>>>>>> >>
>>>>>>>>>> >
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Marcelo
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Unable to acquire memory errors in HiveCompatibilitySuite

Posted by Pete Robbins <ro...@gmail.com>.
so forcing the ShuffleMemoryManager to assume 32 cores and therefore
calculate a pagesize of 1MB passes the tests.

How can we determine the correct value to use in getPageSize rather than
Runtime.getRuntime.availableProcessors()?

On 16 September 2015 at 10:17, Pete Robbins <ro...@gmail.com> wrote:

> I see what you are saying. Full stack trace:
>
> java.io.IOException: Unable to acquire 4194304 bytes of memory
>       at
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368)
>       at
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:349)
>       at
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertKVRecord(UnsafeExternalSorter.java:478)
>       at
> org.apache.spark.sql.execution.UnsafeKVExternalSorter.insertKV(UnsafeKVExternalSorter.java:138)
>       at
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:489)
>       at
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:379)
>       at
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
>       at
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$
> 1.org
> $apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)
>       at
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
>       at
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
>       at
> org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at
> org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:99)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at
> org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at
> org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>       at org.apache.spark.scheduler.Task.run(Task.scala:88)
>       at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>       at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
>       at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>       at java.lang.Thread.run(Thread.java:785)
>
> On 16 September 2015 at 09:30, Reynold Xin <rx...@databricks.com> wrote:
>
>> Can you paste the entire stacktrace of the error? In your original email
>> you only included the last function call.
>>
>> Maybe I'm missing something here, but I still think the bad heuristics is
>> the issue.
>>
>> Some operators pre-reserve memory before running anything in order to
>> avoid starvation. For example, imagine we have an aggregate followed by a
>> sort. If the aggregate is very high cardinality, and uses up all the memory
>> and even starts spilling (falling back to sort-based aggregate), there
>> isn't memory available at all for the sort operator to use. To work around
>> this, each operator reserves a page of memory before they process any data.
>>
>> Page size is computed by Spark using:
>>
>> the total amount of execution memory / (maximum number of active tasks *
>> 16)
>>
>> and then rounded to the next power of 2, and cap between 1MB and 64MB.
>>
>> That is to say, in the worst case, we should be able to reserve at least
>> 8 pages (16 rounded up to the next power of 2).
>>
>> However, in your case, the max number of active tasks is 32 (set by test
>> env), while the page size is determined using # cores (8 in your case). So
>> it is off by a factor of 4. As a result, with this page size, we can only
>> reserve at least 2 pages. That is to say, if you have more than 3 operators
>> that need page reservation (e.g. an aggregate followed by a join on the
>> group by key followed by a shuffle - which seems to be the case of
>> join31.q), the task can fail to reserve memory before running anything.
>>
>>
>> There is a 2nd problem (maybe this is the one you were trying to point
>> out?) that is tasks running at the same time can be competing for memory
>> with each other.  Spark allows each task to claim up to 2/N share of
>> memory, where N is the number of active tasks. If a task is launched before
>> others and hogs a lot of memory quickly, the other tasks that are launched
>> after it might not be able to get enough memory allocation, and thus will
>> fail. This is not super ideal, but probably fine because tasks can be
>> retried, and can succeed in retries.
>>
>>
>> On Wed, Sep 16, 2015 at 1:07 AM, Pete Robbins <ro...@gmail.com>
>> wrote:
>>
>>> ok so let me try again ;-)
>>>
>>> I don't think that the page size calculation matters apart from hitting
>>> the allocation limit earlier if the page size is too large.
>>>
>>> If a task is going to need X bytes, it is going to need X bytes. In this
>>> case, for at least one of the tasks, X > maxmemory/no_active_tasks at some
>>> point during execution. A smaller page size may use the memory more
>>> efficiently but would not necessarily avoid this issue.
>>>
>>> The next question would be: Is the memory limit per task of
>>> max_memory/no_active_tasks reasonable? It seems fair but if this limit is
>>> reached currently an exception is thrown, maybe the task could wait for
>>> no_active_tasks to decrease?
>>>
>>> I think what causes my test issue is that the 32 tasks don't execute as
>>> quickly on my 8 core box so more are active at any one time.
>>>
>>> I will experiment with the page size calculation to see what effect it
>>> has.
>>>
>>> Cheers,
>>>
>>>
>>>
>>> On 16 September 2015 at 06:53, Reynold Xin <rx...@databricks.com> wrote:
>>>
>>>> It is exactly the issue here, isn't it?
>>>>
>>>> We are using memory / N, where N should be the maximum number of active
>>>> tasks. In the current master, we use the number of cores to approximate the
>>>> number of tasks -- but it turned out to be a bad approximation in tests
>>>> because it is set to 32 to increase concurrency.
>>>>
>>>>
>>>> On Tue, Sep 15, 2015 at 10:47 PM, Pete Robbins <ro...@gmail.com>
>>>> wrote:
>>>>
>>>>> Oops... I meant to say "The page size calculation is NOT the issue
>>>>> here"
>>>>>
>>>>> On 16 September 2015 at 06:46, Pete Robbins <ro...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> The page size calculation is the issue here as there is plenty of
>>>>>> free memory, although there is maybe a fair bit of wasted space in some
>>>>>> pages. It is that when we have a lot of tasks each is only allowed to reach
>>>>>> 1/n of the available memory and several of the tasks bump in to that limit.
>>>>>> With tasks 4 times the number of cores there will be some contention and so
>>>>>> they remain active for longer.
>>>>>>
>>>>>> So I think this is a test case issue configuring the number of
>>>>>> executors too high.
>>>>>>
>>>>>> On 15 September 2015 at 18:54, Reynold Xin <rx...@databricks.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Maybe we can change the heuristics in memory calculation to use
>>>>>>> SparkContext.defaultParallelism if it is local mode.
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Sep 15, 2015 at 10:28 AM, Pete Robbins <ro...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Yes and at least there is an override by setting
>>>>>>>> spark.sql.test.master to local[8] , in fact local[16] worked on my 8 core
>>>>>>>> box.
>>>>>>>>
>>>>>>>> I'm happy to use this as a workaround but the 32 hard-coded will
>>>>>>>> fail running build/tests on a clean checkout if you only have 8 cores.
>>>>>>>>
>>>>>>>> On 15 September 2015 at 17:40, Marcelo Vanzin <va...@cloudera.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> That test explicitly sets the number of executor cores to 32.
>>>>>>>>>
>>>>>>>>> object TestHive
>>>>>>>>>   extends TestHiveContext(
>>>>>>>>>     new SparkContext(
>>>>>>>>>       System.getProperty("spark.sql.test.master", "local[32]"),
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Sep 14, 2015 at 11:22 PM, Reynold Xin <rx...@databricks.com>
>>>>>>>>> wrote:
>>>>>>>>> > Yea I think this is where the heuristics is failing -- it uses 8
>>>>>>>>> cores to
>>>>>>>>> > approximate the number of active tasks, but the tests somehow is
>>>>>>>>> using 32
>>>>>>>>> > (maybe because it explicitly sets it to that, or you set it
>>>>>>>>> yourself? I'm
>>>>>>>>> > not sure which one)
>>>>>>>>> >
>>>>>>>>> > On Mon, Sep 14, 2015 at 11:06 PM, Pete Robbins <
>>>>>>>>> robbinspg@gmail.com> wrote:
>>>>>>>>> >>
>>>>>>>>> >> Reynold, thanks for replying.
>>>>>>>>> >>
>>>>>>>>> >> getPageSize parameters: maxMemory=515396075, numCores=0
>>>>>>>>> >> Calculated values: cores=8, default=4194304
>>>>>>>>> >>
>>>>>>>>> >> So am I getting a large page size as I only have 8 cores?
>>>>>>>>> >>
>>>>>>>>> >> On 15 September 2015 at 00:40, Reynold Xin <rx...@databricks.com>
>>>>>>>>> wrote:
>>>>>>>>> >>>
>>>>>>>>> >>> Pete - can you do me a favor?
>>>>>>>>> >>>
>>>>>>>>> >>>
>>>>>>>>> >>>
>>>>>>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174
>>>>>>>>> >>>
>>>>>>>>> >>> Print the parameters that are passed into the getPageSize
>>>>>>>>> function, and
>>>>>>>>> >>> check their values.
>>>>>>>>> >>>
>>>>>>>>> >>> On Mon, Sep 14, 2015 at 4:32 PM, Reynold Xin <
>>>>>>>>> rxin@databricks.com> wrote:
>>>>>>>>> >>>>
>>>>>>>>> >>>> Is this on latest master / branch-1.5?
>>>>>>>>> >>>>
>>>>>>>>> >>>> out of the box we reserve only 16% (0.2 * 0.8) of the memory
>>>>>>>>> for
>>>>>>>>> >>>> execution (e.g. aggregate, join) / shuffle sorting. With a
>>>>>>>>> 3GB heap, that's
>>>>>>>>> >>>> 480MB. So each task gets 480MB / 32 = 15MB, and each operator
>>>>>>>>> reserves at
>>>>>>>>> >>>> least one page for execution. If your page size is 4MB, it
>>>>>>>>> only takes 3
>>>>>>>>> >>>> operators to use up its memory.
>>>>>>>>> >>>>
>>>>>>>>> >>>> The thing is page size is dynamically determined -- and in
>>>>>>>>> your case it
>>>>>>>>> >>>> should be smaller than 4MB.
>>>>>>>>> >>>>
>>>>>>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174
>>>>>>>>> >>>>
>>>>>>>>> >>>> Maybe there is a place that in the maven tests that we
>>>>>>>>> explicitly set
>>>>>>>>> >>>> the page size (spark.buffer.pageSize) to 4MB? If yes, we need
>>>>>>>>> to find it and
>>>>>>>>> >>>> just remove it.
>>>>>>>>> >>>>
>>>>>>>>> >>>>
>>>>>>>>> >>>> On Mon, Sep 14, 2015 at 4:16 AM, Pete Robbins <
>>>>>>>>> robbinspg@gmail.com>
>>>>>>>>> >>>> wrote:
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> I keep hitting errors running the tests on 1.5 such as
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> - join31 *** FAILED ***
>>>>>>>>> >>>>>   Failed to execute query using catalyst:
>>>>>>>>> >>>>>   Error: Job aborted due to stage failure: Task 9 in stage
>>>>>>>>> 3653.0
>>>>>>>>> >>>>> failed 1 times, most recent failure: Lost task 9.0 in stage
>>>>>>>>> 3653.0 (TID
>>>>>>>>> >>>>> 123363, localhost): java.io.IOException: Unable to acquire
>>>>>>>>> 4194304 bytes of
>>>>>>>>> >>>>> memory
>>>>>>>>> >>>>>       at
>>>>>>>>> >>>>>
>>>>>>>>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368)
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> This is using the command
>>>>>>>>> >>>>> build/mvn -Pyarn -Phadoop-2.2 -Phive -Phive-thriftserver
>>>>>>>>> test
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> I don't see these errors in any of the amplab jenkins
>>>>>>>>> builds. Do those
>>>>>>>>> >>>>> builds have any configuration/environment that I may be
>>>>>>>>> missing? My build is
>>>>>>>>> >>>>> running with whatever defaults are in the top level pom.xml,
>>>>>>>>> eg -Xmx3G.
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> I can make these tests pass by setting
>>>>>>>>> spark.shuffle.memoryFraction=0.6
>>>>>>>>> >>>>> in the HiveCompatibilitySuite rather than the default 0.2
>>>>>>>>> value.
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> Trying to analyze what is going on with the test it is
>>>>>>>>> related to the
>>>>>>>>> >>>>> number of active tasks, which seems to rise to 32, and so the
>>>>>>>>> >>>>> ShuffleMemoryManager allows less memory per task even though
>>>>>>>>> most of those
>>>>>>>>> >>>>> tasks do not have any memory allocated to them.
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> Has anyone seen issues like this before?
>>>>>>>>> >>>>
>>>>>>>>> >>>>
>>>>>>>>> >>>
>>>>>>>>> >>
>>>>>>>>> >
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Marcelo
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Unable to acquire memory errors in HiveCompatibilitySuite

Posted by Pete Robbins <ro...@gmail.com>.
I see what you are saying. Full stack trace:

java.io.IOException: Unable to acquire 4194304 bytes of memory
      at
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368)
      at
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:349)
      at
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertKVRecord(UnsafeExternalSorter.java:478)
      at
org.apache.spark.sql.execution.UnsafeKVExternalSorter.insertKV(UnsafeKVExternalSorter.java:138)
      at
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:489)
      at
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:379)
      at
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
      at
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$
1.org
$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)
      at
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
      at
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
      at
org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
      at
org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
      at
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:99)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
      at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
      at
org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
      at
org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
      at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
      at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
      at org.apache.spark.scheduler.Task.run(Task.scala:88)
      at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
      at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
      at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      at java.lang.Thread.run(Thread.java:785)

On 16 September 2015 at 09:30, Reynold Xin <rx...@databricks.com> wrote:

> Can you paste the entire stacktrace of the error? In your original email
> you only included the last function call.
>
> Maybe I'm missing something here, but I still think the bad heuristics is
> the issue.
>
> Some operators pre-reserve memory before running anything in order to
> avoid starvation. For example, imagine we have an aggregate followed by a
> sort. If the aggregate is very high cardinality, and uses up all the memory
> and even starts spilling (falling back to sort-based aggregate), there
> isn't memory available at all for the sort operator to use. To work around
> this, each operator reserves a page of memory before they process any data.
>
> Page size is computed by Spark using:
>
> the total amount of execution memory / (maximum number of active tasks *
> 16)
>
> and then rounded to the next power of 2, and cap between 1MB and 64MB.
>
> That is to say, in the worst case, we should be able to reserve at least 8
> pages (16 rounded up to the next power of 2).
>
> However, in your case, the max number of active tasks is 32 (set by test
> env), while the page size is determined using # cores (8 in your case). So
> it is off by a factor of 4. As a result, with this page size, we can only
> reserve at least 2 pages. That is to say, if you have more than 3 operators
> that need page reservation (e.g. an aggregate followed by a join on the
> group by key followed by a shuffle - which seems to be the case of
> join31.q), the task can fail to reserve memory before running anything.
>
>
> There is a 2nd problem (maybe this is the one you were trying to point
> out?) that is tasks running at the same time can be competing for memory
> with each other.  Spark allows each task to claim up to 2/N share of
> memory, where N is the number of active tasks. If a task is launched before
> others and hogs a lot of memory quickly, the other tasks that are launched
> after it might not be able to get enough memory allocation, and thus will
> fail. This is not super ideal, but probably fine because tasks can be
> retried, and can succeed in retries.
>
>
> On Wed, Sep 16, 2015 at 1:07 AM, Pete Robbins <ro...@gmail.com> wrote:
>
>> ok so let me try again ;-)
>>
>> I don't think that the page size calculation matters apart from hitting
>> the allocation limit earlier if the page size is too large.
>>
>> If a task is going to need X bytes, it is going to need X bytes. In this
>> case, for at least one of the tasks, X > maxmemory/no_active_tasks at some
>> point during execution. A smaller page size may use the memory more
>> efficiently but would not necessarily avoid this issue.
>>
>> The next question would be: Is the memory limit per task of
>> max_memory/no_active_tasks reasonable? It seems fair but if this limit is
>> reached currently an exception is thrown, maybe the task could wait for
>> no_active_tasks to decrease?
>>
>> I think what causes my test issue is that the 32 tasks don't execute as
>> quickly on my 8 core box so more are active at any one time.
>>
>> I will experiment with the page size calculation to see what effect it
>> has.
>>
>> Cheers,
>>
>>
>>
>> On 16 September 2015 at 06:53, Reynold Xin <rx...@databricks.com> wrote:
>>
>>> It is exactly the issue here, isn't it?
>>>
>>> We are using memory / N, where N should be the maximum number of active
>>> tasks. In the current master, we use the number of cores to approximate the
>>> number of tasks -- but it turned out to be a bad approximation in tests
>>> because it is set to 32 to increase concurrency.
>>>
>>>
>>> On Tue, Sep 15, 2015 at 10:47 PM, Pete Robbins <ro...@gmail.com>
>>> wrote:
>>>
>>>> Oops... I meant to say "The page size calculation is NOT the issue here"
>>>>
>>>> On 16 September 2015 at 06:46, Pete Robbins <ro...@gmail.com>
>>>> wrote:
>>>>
>>>>> The page size calculation is the issue here as there is plenty of free
>>>>> memory, although there is maybe a fair bit of wasted space in some pages.
>>>>> It is that when we have a lot of tasks each is only allowed to reach 1/n of
>>>>> the available memory and several of the tasks bump in to that limit. With
>>>>> tasks 4 times the number of cores there will be some contention and so they
>>>>> remain active for longer.
>>>>>
>>>>> So I think this is a test case issue configuring the number of
>>>>> executors too high.
>>>>>
>>>>> On 15 September 2015 at 18:54, Reynold Xin <rx...@databricks.com>
>>>>> wrote:
>>>>>
>>>>>> Maybe we can change the heuristics in memory calculation to use
>>>>>> SparkContext.defaultParallelism if it is local mode.
>>>>>>
>>>>>>
>>>>>> On Tue, Sep 15, 2015 at 10:28 AM, Pete Robbins <ro...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Yes and at least there is an override by setting
>>>>>>> spark.sql.test.master to local[8] , in fact local[16] worked on my 8 core
>>>>>>> box.
>>>>>>>
>>>>>>> I'm happy to use this as a workaround but the 32 hard-coded will
>>>>>>> fail running build/tests on a clean checkout if you only have 8 cores.
>>>>>>>
>>>>>>> On 15 September 2015 at 17:40, Marcelo Vanzin <va...@cloudera.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> That test explicitly sets the number of executor cores to 32.
>>>>>>>>
>>>>>>>> object TestHive
>>>>>>>>   extends TestHiveContext(
>>>>>>>>     new SparkContext(
>>>>>>>>       System.getProperty("spark.sql.test.master", "local[32]"),
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Sep 14, 2015 at 11:22 PM, Reynold Xin <rx...@databricks.com>
>>>>>>>> wrote:
>>>>>>>> > Yea I think this is where the heuristics is failing -- it uses 8
>>>>>>>> cores to
>>>>>>>> > approximate the number of active tasks, but the tests somehow is
>>>>>>>> using 32
>>>>>>>> > (maybe because it explicitly sets it to that, or you set it
>>>>>>>> yourself? I'm
>>>>>>>> > not sure which one)
>>>>>>>> >
>>>>>>>> > On Mon, Sep 14, 2015 at 11:06 PM, Pete Robbins <
>>>>>>>> robbinspg@gmail.com> wrote:
>>>>>>>> >>
>>>>>>>> >> Reynold, thanks for replying.
>>>>>>>> >>
>>>>>>>> >> getPageSize parameters: maxMemory=515396075, numCores=0
>>>>>>>> >> Calculated values: cores=8, default=4194304
>>>>>>>> >>
>>>>>>>> >> So am I getting a large page size as I only have 8 cores?
>>>>>>>> >>
>>>>>>>> >> On 15 September 2015 at 00:40, Reynold Xin <rx...@databricks.com>
>>>>>>>> wrote:
>>>>>>>> >>>
>>>>>>>> >>> Pete - can you do me a favor?
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174
>>>>>>>> >>>
>>>>>>>> >>> Print the parameters that are passed into the getPageSize
>>>>>>>> function, and
>>>>>>>> >>> check their values.
>>>>>>>> >>>
>>>>>>>> >>> On Mon, Sep 14, 2015 at 4:32 PM, Reynold Xin <
>>>>>>>> rxin@databricks.com> wrote:
>>>>>>>> >>>>
>>>>>>>> >>>> Is this on latest master / branch-1.5?
>>>>>>>> >>>>
>>>>>>>> >>>> out of the box we reserve only 16% (0.2 * 0.8) of the memory
>>>>>>>> for
>>>>>>>> >>>> execution (e.g. aggregate, join) / shuffle sorting. With a 3GB
>>>>>>>> heap, that's
>>>>>>>> >>>> 480MB. So each task gets 480MB / 32 = 15MB, and each operator
>>>>>>>> reserves at
>>>>>>>> >>>> least one page for execution. If your page size is 4MB, it
>>>>>>>> only takes 3
>>>>>>>> >>>> operators to use up its memory.
>>>>>>>> >>>>
>>>>>>>> >>>> The thing is page size is dynamically determined -- and in
>>>>>>>> your case it
>>>>>>>> >>>> should be smaller than 4MB.
>>>>>>>> >>>>
>>>>>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174
>>>>>>>> >>>>
>>>>>>>> >>>> Maybe there is a place that in the maven tests that we
>>>>>>>> explicitly set
>>>>>>>> >>>> the page size (spark.buffer.pageSize) to 4MB? If yes, we need
>>>>>>>> to find it and
>>>>>>>> >>>> just remove it.
>>>>>>>> >>>>
>>>>>>>> >>>>
>>>>>>>> >>>> On Mon, Sep 14, 2015 at 4:16 AM, Pete Robbins <
>>>>>>>> robbinspg@gmail.com>
>>>>>>>> >>>> wrote:
>>>>>>>> >>>>>
>>>>>>>> >>>>> I keep hitting errors running the tests on 1.5 such as
>>>>>>>> >>>>>
>>>>>>>> >>>>>
>>>>>>>> >>>>> - join31 *** FAILED ***
>>>>>>>> >>>>>   Failed to execute query using catalyst:
>>>>>>>> >>>>>   Error: Job aborted due to stage failure: Task 9 in stage
>>>>>>>> 3653.0
>>>>>>>> >>>>> failed 1 times, most recent failure: Lost task 9.0 in stage
>>>>>>>> 3653.0 (TID
>>>>>>>> >>>>> 123363, localhost): java.io.IOException: Unable to acquire
>>>>>>>> 4194304 bytes of
>>>>>>>> >>>>> memory
>>>>>>>> >>>>>       at
>>>>>>>> >>>>>
>>>>>>>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368)
>>>>>>>> >>>>>
>>>>>>>> >>>>>
>>>>>>>> >>>>> This is using the command
>>>>>>>> >>>>> build/mvn -Pyarn -Phadoop-2.2 -Phive -Phive-thriftserver  test
>>>>>>>> >>>>>
>>>>>>>> >>>>>
>>>>>>>> >>>>> I don't see these errors in any of the amplab jenkins builds.
>>>>>>>> Do those
>>>>>>>> >>>>> builds have any configuration/environment that I may be
>>>>>>>> missing? My build is
>>>>>>>> >>>>> running with whatever defaults are in the top level pom.xml,
>>>>>>>> eg -Xmx3G.
>>>>>>>> >>>>>
>>>>>>>> >>>>> I can make these tests pass by setting
>>>>>>>> spark.shuffle.memoryFraction=0.6
>>>>>>>> >>>>> in the HiveCompatibilitySuite rather than the default 0.2
>>>>>>>> value.
>>>>>>>> >>>>>
>>>>>>>> >>>>> Trying to analyze what is going on with the test it is
>>>>>>>> related to the
>>>>>>>> >>>>> number of active tasks, which seems to rise to 32, and so the
>>>>>>>> >>>>> ShuffleMemoryManager allows less memory per task even though
>>>>>>>> most of those
>>>>>>>> >>>>> tasks do not have any memory allocated to them.
>>>>>>>> >>>>>
>>>>>>>> >>>>> Has anyone seen issues like this before?
>>>>>>>> >>>>
>>>>>>>> >>>>
>>>>>>>> >>>
>>>>>>>> >>
>>>>>>>> >
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Marcelo
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Unable to acquire memory errors in HiveCompatibilitySuite

Posted by Reynold Xin <rx...@databricks.com>.
Can you paste the entire stacktrace of the error? In your original email
you only included the last function call.

Maybe I'm missing something here, but I still think the bad heuristics is
the issue.

Some operators pre-reserve memory before running anything in order to avoid
starvation. For example, imagine we have an aggregate followed by a sort.
If the aggregate is very high cardinality, and uses up all the memory and
even starts spilling (falling back to sort-based aggregate), there isn't
memory available at all for the sort operator to use. To work around this,
each operator reserves a page of memory before they process any data.

Page size is computed by Spark using:

the total amount of execution memory / (maximum number of active tasks * 16)

and then rounded to the next power of 2, and cap between 1MB and 64MB.

That is to say, in the worst case, we should be able to reserve at least 8
pages (16 rounded up to the next power of 2).

However, in your case, the max number of active tasks is 32 (set by test
env), while the page size is determined using # cores (8 in your case). So
it is off by a factor of 4. As a result, with this page size, we can only
reserve at least 2 pages. That is to say, if you have more than 3 operators
that need page reservation (e.g. an aggregate followed by a join on the
group by key followed by a shuffle - which seems to be the case of
join31.q), the task can fail to reserve memory before running anything.


There is a 2nd problem (maybe this is the one you were trying to point
out?) that is tasks running at the same time can be competing for memory
with each other.  Spark allows each task to claim up to 2/N share of
memory, where N is the number of active tasks. If a task is launched before
others and hogs a lot of memory quickly, the other tasks that are launched
after it might not be able to get enough memory allocation, and thus will
fail. This is not super ideal, but probably fine because tasks can be
retried, and can succeed in retries.


On Wed, Sep 16, 2015 at 1:07 AM, Pete Robbins <ro...@gmail.com> wrote:

> ok so let me try again ;-)
>
> I don't think that the page size calculation matters apart from hitting
> the allocation limit earlier if the page size is too large.
>
> If a task is going to need X bytes, it is going to need X bytes. In this
> case, for at least one of the tasks, X > maxmemory/no_active_tasks at some
> point during execution. A smaller page size may use the memory more
> efficiently but would not necessarily avoid this issue.
>
> The next question would be: Is the memory limit per task of
> max_memory/no_active_tasks reasonable? It seems fair but if this limit is
> reached currently an exception is thrown, maybe the task could wait for
> no_active_tasks to decrease?
>
> I think what causes my test issue is that the 32 tasks don't execute as
> quickly on my 8 core box so more are active at any one time.
>
> I will experiment with the page size calculation to see what effect it has.
>
> Cheers,
>
>
>
> On 16 September 2015 at 06:53, Reynold Xin <rx...@databricks.com> wrote:
>
>> It is exactly the issue here, isn't it?
>>
>> We are using memory / N, where N should be the maximum number of active
>> tasks. In the current master, we use the number of cores to approximate the
>> number of tasks -- but it turned out to be a bad approximation in tests
>> because it is set to 32 to increase concurrency.
>>
>>
>> On Tue, Sep 15, 2015 at 10:47 PM, Pete Robbins <ro...@gmail.com>
>> wrote:
>>
>>> Oops... I meant to say "The page size calculation is NOT the issue here"
>>>
>>> On 16 September 2015 at 06:46, Pete Robbins <ro...@gmail.com> wrote:
>>>
>>>> The page size calculation is the issue here as there is plenty of free
>>>> memory, although there is maybe a fair bit of wasted space in some pages.
>>>> It is that when we have a lot of tasks each is only allowed to reach 1/n of
>>>> the available memory and several of the tasks bump in to that limit. With
>>>> tasks 4 times the number of cores there will be some contention and so they
>>>> remain active for longer.
>>>>
>>>> So I think this is a test case issue configuring the number of
>>>> executors too high.
>>>>
>>>> On 15 September 2015 at 18:54, Reynold Xin <rx...@databricks.com> wrote:
>>>>
>>>>> Maybe we can change the heuristics in memory calculation to use
>>>>> SparkContext.defaultParallelism if it is local mode.
>>>>>
>>>>>
>>>>> On Tue, Sep 15, 2015 at 10:28 AM, Pete Robbins <ro...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Yes and at least there is an override by setting
>>>>>> spark.sql.test.master to local[8] , in fact local[16] worked on my 8 core
>>>>>> box.
>>>>>>
>>>>>> I'm happy to use this as a workaround but the 32 hard-coded will fail
>>>>>> running build/tests on a clean checkout if you only have 8 cores.
>>>>>>
>>>>>> On 15 September 2015 at 17:40, Marcelo Vanzin <va...@cloudera.com>
>>>>>> wrote:
>>>>>>
>>>>>>> That test explicitly sets the number of executor cores to 32.
>>>>>>>
>>>>>>> object TestHive
>>>>>>>   extends TestHiveContext(
>>>>>>>     new SparkContext(
>>>>>>>       System.getProperty("spark.sql.test.master", "local[32]"),
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Sep 14, 2015 at 11:22 PM, Reynold Xin <rx...@databricks.com>
>>>>>>> wrote:
>>>>>>> > Yea I think this is where the heuristics is failing -- it uses 8
>>>>>>> cores to
>>>>>>> > approximate the number of active tasks, but the tests somehow is
>>>>>>> using 32
>>>>>>> > (maybe because it explicitly sets it to that, or you set it
>>>>>>> yourself? I'm
>>>>>>> > not sure which one)
>>>>>>> >
>>>>>>> > On Mon, Sep 14, 2015 at 11:06 PM, Pete Robbins <
>>>>>>> robbinspg@gmail.com> wrote:
>>>>>>> >>
>>>>>>> >> Reynold, thanks for replying.
>>>>>>> >>
>>>>>>> >> getPageSize parameters: maxMemory=515396075, numCores=0
>>>>>>> >> Calculated values: cores=8, default=4194304
>>>>>>> >>
>>>>>>> >> So am I getting a large page size as I only have 8 cores?
>>>>>>> >>
>>>>>>> >> On 15 September 2015 at 00:40, Reynold Xin <rx...@databricks.com>
>>>>>>> wrote:
>>>>>>> >>>
>>>>>>> >>> Pete - can you do me a favor?
>>>>>>> >>>
>>>>>>> >>>
>>>>>>> >>>
>>>>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174
>>>>>>> >>>
>>>>>>> >>> Print the parameters that are passed into the getPageSize
>>>>>>> function, and
>>>>>>> >>> check their values.
>>>>>>> >>>
>>>>>>> >>> On Mon, Sep 14, 2015 at 4:32 PM, Reynold Xin <
>>>>>>> rxin@databricks.com> wrote:
>>>>>>> >>>>
>>>>>>> >>>> Is this on latest master / branch-1.5?
>>>>>>> >>>>
>>>>>>> >>>> out of the box we reserve only 16% (0.2 * 0.8) of the memory for
>>>>>>> >>>> execution (e.g. aggregate, join) / shuffle sorting. With a 3GB
>>>>>>> heap, that's
>>>>>>> >>>> 480MB. So each task gets 480MB / 32 = 15MB, and each operator
>>>>>>> reserves at
>>>>>>> >>>> least one page for execution. If your page size is 4MB, it only
>>>>>>> takes 3
>>>>>>> >>>> operators to use up its memory.
>>>>>>> >>>>
>>>>>>> >>>> The thing is page size is dynamically determined -- and in your
>>>>>>> case it
>>>>>>> >>>> should be smaller than 4MB.
>>>>>>> >>>>
>>>>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174
>>>>>>> >>>>
>>>>>>> >>>> Maybe there is a place that in the maven tests that we
>>>>>>> explicitly set
>>>>>>> >>>> the page size (spark.buffer.pageSize) to 4MB? If yes, we need
>>>>>>> to find it and
>>>>>>> >>>> just remove it.
>>>>>>> >>>>
>>>>>>> >>>>
>>>>>>> >>>> On Mon, Sep 14, 2015 at 4:16 AM, Pete Robbins <
>>>>>>> robbinspg@gmail.com>
>>>>>>> >>>> wrote:
>>>>>>> >>>>>
>>>>>>> >>>>> I keep hitting errors running the tests on 1.5 such as
>>>>>>> >>>>>
>>>>>>> >>>>>
>>>>>>> >>>>> - join31 *** FAILED ***
>>>>>>> >>>>>   Failed to execute query using catalyst:
>>>>>>> >>>>>   Error: Job aborted due to stage failure: Task 9 in stage
>>>>>>> 3653.0
>>>>>>> >>>>> failed 1 times, most recent failure: Lost task 9.0 in stage
>>>>>>> 3653.0 (TID
>>>>>>> >>>>> 123363, localhost): java.io.IOException: Unable to acquire
>>>>>>> 4194304 bytes of
>>>>>>> >>>>> memory
>>>>>>> >>>>>       at
>>>>>>> >>>>>
>>>>>>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368)
>>>>>>> >>>>>
>>>>>>> >>>>>
>>>>>>> >>>>> This is using the command
>>>>>>> >>>>> build/mvn -Pyarn -Phadoop-2.2 -Phive -Phive-thriftserver  test
>>>>>>> >>>>>
>>>>>>> >>>>>
>>>>>>> >>>>> I don't see these errors in any of the amplab jenkins builds.
>>>>>>> Do those
>>>>>>> >>>>> builds have any configuration/environment that I may be
>>>>>>> missing? My build is
>>>>>>> >>>>> running with whatever defaults are in the top level pom.xml,
>>>>>>> eg -Xmx3G.
>>>>>>> >>>>>
>>>>>>> >>>>> I can make these tests pass by setting
>>>>>>> spark.shuffle.memoryFraction=0.6
>>>>>>> >>>>> in the HiveCompatibilitySuite rather than the default 0.2
>>>>>>> value.
>>>>>>> >>>>>
>>>>>>> >>>>> Trying to analyze what is going on with the test it is related
>>>>>>> to the
>>>>>>> >>>>> number of active tasks, which seems to rise to 32, and so the
>>>>>>> >>>>> ShuffleMemoryManager allows less memory per task even though
>>>>>>> most of those
>>>>>>> >>>>> tasks do not have any memory allocated to them.
>>>>>>> >>>>>
>>>>>>> >>>>> Has anyone seen issues like this before?
>>>>>>> >>>>
>>>>>>> >>>>
>>>>>>> >>>
>>>>>>> >>
>>>>>>> >
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Marcelo
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Unable to acquire memory errors in HiveCompatibilitySuite

Posted by Pete Robbins <ro...@gmail.com>.
ok so let me try again ;-)

I don't think that the page size calculation matters apart from hitting the
allocation limit earlier if the page size is too large.

If a task is going to need X bytes, it is going to need X bytes. In this
case, for at least one of the tasks, X > maxmemory/no_active_tasks at some
point during execution. A smaller page size may use the memory more
efficiently but would not necessarily avoid this issue.

The next question would be: Is the memory limit per task of
max_memory/no_active_tasks reasonable? It seems fair but if this limit is
reached currently an exception is thrown, maybe the task could wait for
no_active_tasks to decrease?

I think what causes my test issue is that the 32 tasks don't execute as
quickly on my 8 core box so more are active at any one time.

I will experiment with the page size calculation to see what effect it has.

Cheers,



On 16 September 2015 at 06:53, Reynold Xin <rx...@databricks.com> wrote:

> It is exactly the issue here, isn't it?
>
> We are using memory / N, where N should be the maximum number of active
> tasks. In the current master, we use the number of cores to approximate the
> number of tasks -- but it turned out to be a bad approximation in tests
> because it is set to 32 to increase concurrency.
>
>
> On Tue, Sep 15, 2015 at 10:47 PM, Pete Robbins <ro...@gmail.com>
> wrote:
>
>> Oops... I meant to say "The page size calculation is NOT the issue here"
>>
>> On 16 September 2015 at 06:46, Pete Robbins <ro...@gmail.com> wrote:
>>
>>> The page size calculation is the issue here as there is plenty of free
>>> memory, although there is maybe a fair bit of wasted space in some pages.
>>> It is that when we have a lot of tasks each is only allowed to reach 1/n of
>>> the available memory and several of the tasks bump in to that limit. With
>>> tasks 4 times the number of cores there will be some contention and so they
>>> remain active for longer.
>>>
>>> So I think this is a test case issue configuring the number of executors
>>> too high.
>>>
>>> On 15 September 2015 at 18:54, Reynold Xin <rx...@databricks.com> wrote:
>>>
>>>> Maybe we can change the heuristics in memory calculation to use
>>>> SparkContext.defaultParallelism if it is local mode.
>>>>
>>>>
>>>> On Tue, Sep 15, 2015 at 10:28 AM, Pete Robbins <ro...@gmail.com>
>>>> wrote:
>>>>
>>>>> Yes and at least there is an override by setting
>>>>> spark.sql.test.master to local[8] , in fact local[16] worked on my 8 core
>>>>> box.
>>>>>
>>>>> I'm happy to use this as a workaround but the 32 hard-coded will fail
>>>>> running build/tests on a clean checkout if you only have 8 cores.
>>>>>
>>>>> On 15 September 2015 at 17:40, Marcelo Vanzin <va...@cloudera.com>
>>>>> wrote:
>>>>>
>>>>>> That test explicitly sets the number of executor cores to 32.
>>>>>>
>>>>>> object TestHive
>>>>>>   extends TestHiveContext(
>>>>>>     new SparkContext(
>>>>>>       System.getProperty("spark.sql.test.master", "local[32]"),
>>>>>>
>>>>>>
>>>>>> On Mon, Sep 14, 2015 at 11:22 PM, Reynold Xin <rx...@databricks.com>
>>>>>> wrote:
>>>>>> > Yea I think this is where the heuristics is failing -- it uses 8
>>>>>> cores to
>>>>>> > approximate the number of active tasks, but the tests somehow is
>>>>>> using 32
>>>>>> > (maybe because it explicitly sets it to that, or you set it
>>>>>> yourself? I'm
>>>>>> > not sure which one)
>>>>>> >
>>>>>> > On Mon, Sep 14, 2015 at 11:06 PM, Pete Robbins <ro...@gmail.com>
>>>>>> wrote:
>>>>>> >>
>>>>>> >> Reynold, thanks for replying.
>>>>>> >>
>>>>>> >> getPageSize parameters: maxMemory=515396075, numCores=0
>>>>>> >> Calculated values: cores=8, default=4194304
>>>>>> >>
>>>>>> >> So am I getting a large page size as I only have 8 cores?
>>>>>> >>
>>>>>> >> On 15 September 2015 at 00:40, Reynold Xin <rx...@databricks.com>
>>>>>> wrote:
>>>>>> >>>
>>>>>> >>> Pete - can you do me a favor?
>>>>>> >>>
>>>>>> >>>
>>>>>> >>>
>>>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174
>>>>>> >>>
>>>>>> >>> Print the parameters that are passed into the getPageSize
>>>>>> function, and
>>>>>> >>> check their values.
>>>>>> >>>
>>>>>> >>> On Mon, Sep 14, 2015 at 4:32 PM, Reynold Xin <rx...@databricks.com>
>>>>>> wrote:
>>>>>> >>>>
>>>>>> >>>> Is this on latest master / branch-1.5?
>>>>>> >>>>
>>>>>> >>>> out of the box we reserve only 16% (0.2 * 0.8) of the memory for
>>>>>> >>>> execution (e.g. aggregate, join) / shuffle sorting. With a 3GB
>>>>>> heap, that's
>>>>>> >>>> 480MB. So each task gets 480MB / 32 = 15MB, and each operator
>>>>>> reserves at
>>>>>> >>>> least one page for execution. If your page size is 4MB, it only
>>>>>> takes 3
>>>>>> >>>> operators to use up its memory.
>>>>>> >>>>
>>>>>> >>>> The thing is page size is dynamically determined -- and in your
>>>>>> case it
>>>>>> >>>> should be smaller than 4MB.
>>>>>> >>>>
>>>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174
>>>>>> >>>>
>>>>>> >>>> Maybe there is a place that in the maven tests that we
>>>>>> explicitly set
>>>>>> >>>> the page size (spark.buffer.pageSize) to 4MB? If yes, we need to
>>>>>> find it and
>>>>>> >>>> just remove it.
>>>>>> >>>>
>>>>>> >>>>
>>>>>> >>>> On Mon, Sep 14, 2015 at 4:16 AM, Pete Robbins <
>>>>>> robbinspg@gmail.com>
>>>>>> >>>> wrote:
>>>>>> >>>>>
>>>>>> >>>>> I keep hitting errors running the tests on 1.5 such as
>>>>>> >>>>>
>>>>>> >>>>>
>>>>>> >>>>> - join31 *** FAILED ***
>>>>>> >>>>>   Failed to execute query using catalyst:
>>>>>> >>>>>   Error: Job aborted due to stage failure: Task 9 in stage
>>>>>> 3653.0
>>>>>> >>>>> failed 1 times, most recent failure: Lost task 9.0 in stage
>>>>>> 3653.0 (TID
>>>>>> >>>>> 123363, localhost): java.io.IOException: Unable to acquire
>>>>>> 4194304 bytes of
>>>>>> >>>>> memory
>>>>>> >>>>>       at
>>>>>> >>>>>
>>>>>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368)
>>>>>> >>>>>
>>>>>> >>>>>
>>>>>> >>>>> This is using the command
>>>>>> >>>>> build/mvn -Pyarn -Phadoop-2.2 -Phive -Phive-thriftserver  test
>>>>>> >>>>>
>>>>>> >>>>>
>>>>>> >>>>> I don't see these errors in any of the amplab jenkins builds.
>>>>>> Do those
>>>>>> >>>>> builds have any configuration/environment that I may be
>>>>>> missing? My build is
>>>>>> >>>>> running with whatever defaults are in the top level pom.xml, eg
>>>>>> -Xmx3G.
>>>>>> >>>>>
>>>>>> >>>>> I can make these tests pass by setting
>>>>>> spark.shuffle.memoryFraction=0.6
>>>>>> >>>>> in the HiveCompatibilitySuite rather than the default 0.2 value.
>>>>>> >>>>>
>>>>>> >>>>> Trying to analyze what is going on with the test it is related
>>>>>> to the
>>>>>> >>>>> number of active tasks, which seems to rise to 32, and so the
>>>>>> >>>>> ShuffleMemoryManager allows less memory per task even though
>>>>>> most of those
>>>>>> >>>>> tasks do not have any memory allocated to them.
>>>>>> >>>>>
>>>>>> >>>>> Has anyone seen issues like this before?
>>>>>> >>>>
>>>>>> >>>>
>>>>>> >>>
>>>>>> >>
>>>>>> >
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Marcelo
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Unable to acquire memory errors in HiveCompatibilitySuite

Posted by Reynold Xin <rx...@databricks.com>.
It is exactly the issue here, isn't it?

We are using memory / N, where N should be the maximum number of active
tasks. In the current master, we use the number of cores to approximate the
number of tasks -- but it turned out to be a bad approximation in tests
because it is set to 32 to increase concurrency.


On Tue, Sep 15, 2015 at 10:47 PM, Pete Robbins <ro...@gmail.com> wrote:

> Oops... I meant to say "The page size calculation is NOT the issue here"
>
> On 16 September 2015 at 06:46, Pete Robbins <ro...@gmail.com> wrote:
>
>> The page size calculation is the issue here as there is plenty of free
>> memory, although there is maybe a fair bit of wasted space in some pages.
>> It is that when we have a lot of tasks each is only allowed to reach 1/n of
>> the available memory and several of the tasks bump in to that limit. With
>> tasks 4 times the number of cores there will be some contention and so they
>> remain active for longer.
>>
>> So I think this is a test case issue configuring the number of executors
>> too high.
>>
>> On 15 September 2015 at 18:54, Reynold Xin <rx...@databricks.com> wrote:
>>
>>> Maybe we can change the heuristics in memory calculation to use
>>> SparkContext.defaultParallelism if it is local mode.
>>>
>>>
>>> On Tue, Sep 15, 2015 at 10:28 AM, Pete Robbins <ro...@gmail.com>
>>> wrote:
>>>
>>>> Yes and at least there is an override by setting  spark.sql.test.master
>>>> to local[8] , in fact local[16] worked on my 8 core box.
>>>>
>>>> I'm happy to use this as a workaround but the 32 hard-coded will fail
>>>> running build/tests on a clean checkout if you only have 8 cores.
>>>>
>>>> On 15 September 2015 at 17:40, Marcelo Vanzin <va...@cloudera.com>
>>>> wrote:
>>>>
>>>>> That test explicitly sets the number of executor cores to 32.
>>>>>
>>>>> object TestHive
>>>>>   extends TestHiveContext(
>>>>>     new SparkContext(
>>>>>       System.getProperty("spark.sql.test.master", "local[32]"),
>>>>>
>>>>>
>>>>> On Mon, Sep 14, 2015 at 11:22 PM, Reynold Xin <rx...@databricks.com>
>>>>> wrote:
>>>>> > Yea I think this is where the heuristics is failing -- it uses 8
>>>>> cores to
>>>>> > approximate the number of active tasks, but the tests somehow is
>>>>> using 32
>>>>> > (maybe because it explicitly sets it to that, or you set it
>>>>> yourself? I'm
>>>>> > not sure which one)
>>>>> >
>>>>> > On Mon, Sep 14, 2015 at 11:06 PM, Pete Robbins <ro...@gmail.com>
>>>>> wrote:
>>>>> >>
>>>>> >> Reynold, thanks for replying.
>>>>> >>
>>>>> >> getPageSize parameters: maxMemory=515396075, numCores=0
>>>>> >> Calculated values: cores=8, default=4194304
>>>>> >>
>>>>> >> So am I getting a large page size as I only have 8 cores?
>>>>> >>
>>>>> >> On 15 September 2015 at 00:40, Reynold Xin <rx...@databricks.com>
>>>>> wrote:
>>>>> >>>
>>>>> >>> Pete - can you do me a favor?
>>>>> >>>
>>>>> >>>
>>>>> >>>
>>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174
>>>>> >>>
>>>>> >>> Print the parameters that are passed into the getPageSize
>>>>> function, and
>>>>> >>> check their values.
>>>>> >>>
>>>>> >>> On Mon, Sep 14, 2015 at 4:32 PM, Reynold Xin <rx...@databricks.com>
>>>>> wrote:
>>>>> >>>>
>>>>> >>>> Is this on latest master / branch-1.5?
>>>>> >>>>
>>>>> >>>> out of the box we reserve only 16% (0.2 * 0.8) of the memory for
>>>>> >>>> execution (e.g. aggregate, join) / shuffle sorting. With a 3GB
>>>>> heap, that's
>>>>> >>>> 480MB. So each task gets 480MB / 32 = 15MB, and each operator
>>>>> reserves at
>>>>> >>>> least one page for execution. If your page size is 4MB, it only
>>>>> takes 3
>>>>> >>>> operators to use up its memory.
>>>>> >>>>
>>>>> >>>> The thing is page size is dynamically determined -- and in your
>>>>> case it
>>>>> >>>> should be smaller than 4MB.
>>>>> >>>>
>>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174
>>>>> >>>>
>>>>> >>>> Maybe there is a place that in the maven tests that we explicitly
>>>>> set
>>>>> >>>> the page size (spark.buffer.pageSize) to 4MB? If yes, we need to
>>>>> find it and
>>>>> >>>> just remove it.
>>>>> >>>>
>>>>> >>>>
>>>>> >>>> On Mon, Sep 14, 2015 at 4:16 AM, Pete Robbins <
>>>>> robbinspg@gmail.com>
>>>>> >>>> wrote:
>>>>> >>>>>
>>>>> >>>>> I keep hitting errors running the tests on 1.5 such as
>>>>> >>>>>
>>>>> >>>>>
>>>>> >>>>> - join31 *** FAILED ***
>>>>> >>>>>   Failed to execute query using catalyst:
>>>>> >>>>>   Error: Job aborted due to stage failure: Task 9 in stage 3653.0
>>>>> >>>>> failed 1 times, most recent failure: Lost task 9.0 in stage
>>>>> 3653.0 (TID
>>>>> >>>>> 123363, localhost): java.io.IOException: Unable to acquire
>>>>> 4194304 bytes of
>>>>> >>>>> memory
>>>>> >>>>>       at
>>>>> >>>>>
>>>>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368)
>>>>> >>>>>
>>>>> >>>>>
>>>>> >>>>> This is using the command
>>>>> >>>>> build/mvn -Pyarn -Phadoop-2.2 -Phive -Phive-thriftserver  test
>>>>> >>>>>
>>>>> >>>>>
>>>>> >>>>> I don't see these errors in any of the amplab jenkins builds. Do
>>>>> those
>>>>> >>>>> builds have any configuration/environment that I may be missing?
>>>>> My build is
>>>>> >>>>> running with whatever defaults are in the top level pom.xml, eg
>>>>> -Xmx3G.
>>>>> >>>>>
>>>>> >>>>> I can make these tests pass by setting
>>>>> spark.shuffle.memoryFraction=0.6
>>>>> >>>>> in the HiveCompatibilitySuite rather than the default 0.2 value.
>>>>> >>>>>
>>>>> >>>>> Trying to analyze what is going on with the test it is related
>>>>> to the
>>>>> >>>>> number of active tasks, which seems to rise to 32, and so the
>>>>> >>>>> ShuffleMemoryManager allows less memory per task even though
>>>>> most of those
>>>>> >>>>> tasks do not have any memory allocated to them.
>>>>> >>>>>
>>>>> >>>>> Has anyone seen issues like this before?
>>>>> >>>>
>>>>> >>>>
>>>>> >>>
>>>>> >>
>>>>> >
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Marcelo
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Unable to acquire memory errors in HiveCompatibilitySuite

Posted by Pete Robbins <ro...@gmail.com>.
Oops... I meant to say "The page size calculation is NOT the issue here"

On 16 September 2015 at 06:46, Pete Robbins <ro...@gmail.com> wrote:

> The page size calculation is the issue here as there is plenty of free
> memory, although there is maybe a fair bit of wasted space in some pages.
> It is that when we have a lot of tasks each is only allowed to reach 1/n of
> the available memory and several of the tasks bump in to that limit. With
> tasks 4 times the number of cores there will be some contention and so they
> remain active for longer.
>
> So I think this is a test case issue configuring the number of executors
> too high.
>
> On 15 September 2015 at 18:54, Reynold Xin <rx...@databricks.com> wrote:
>
>> Maybe we can change the heuristics in memory calculation to use
>> SparkContext.defaultParallelism if it is local mode.
>>
>>
>> On Tue, Sep 15, 2015 at 10:28 AM, Pete Robbins <ro...@gmail.com>
>> wrote:
>>
>>> Yes and at least there is an override by setting  spark.sql.test.master
>>> to local[8] , in fact local[16] worked on my 8 core box.
>>>
>>> I'm happy to use this as a workaround but the 32 hard-coded will fail
>>> running build/tests on a clean checkout if you only have 8 cores.
>>>
>>> On 15 September 2015 at 17:40, Marcelo Vanzin <va...@cloudera.com>
>>> wrote:
>>>
>>>> That test explicitly sets the number of executor cores to 32.
>>>>
>>>> object TestHive
>>>>   extends TestHiveContext(
>>>>     new SparkContext(
>>>>       System.getProperty("spark.sql.test.master", "local[32]"),
>>>>
>>>>
>>>> On Mon, Sep 14, 2015 at 11:22 PM, Reynold Xin <rx...@databricks.com>
>>>> wrote:
>>>> > Yea I think this is where the heuristics is failing -- it uses 8
>>>> cores to
>>>> > approximate the number of active tasks, but the tests somehow is
>>>> using 32
>>>> > (maybe because it explicitly sets it to that, or you set it yourself?
>>>> I'm
>>>> > not sure which one)
>>>> >
>>>> > On Mon, Sep 14, 2015 at 11:06 PM, Pete Robbins <ro...@gmail.com>
>>>> wrote:
>>>> >>
>>>> >> Reynold, thanks for replying.
>>>> >>
>>>> >> getPageSize parameters: maxMemory=515396075, numCores=0
>>>> >> Calculated values: cores=8, default=4194304
>>>> >>
>>>> >> So am I getting a large page size as I only have 8 cores?
>>>> >>
>>>> >> On 15 September 2015 at 00:40, Reynold Xin <rx...@databricks.com>
>>>> wrote:
>>>> >>>
>>>> >>> Pete - can you do me a favor?
>>>> >>>
>>>> >>>
>>>> >>>
>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174
>>>> >>>
>>>> >>> Print the parameters that are passed into the getPageSize function,
>>>> and
>>>> >>> check their values.
>>>> >>>
>>>> >>> On Mon, Sep 14, 2015 at 4:32 PM, Reynold Xin <rx...@databricks.com>
>>>> wrote:
>>>> >>>>
>>>> >>>> Is this on latest master / branch-1.5?
>>>> >>>>
>>>> >>>> out of the box we reserve only 16% (0.2 * 0.8) of the memory for
>>>> >>>> execution (e.g. aggregate, join) / shuffle sorting. With a 3GB
>>>> heap, that's
>>>> >>>> 480MB. So each task gets 480MB / 32 = 15MB, and each operator
>>>> reserves at
>>>> >>>> least one page for execution. If your page size is 4MB, it only
>>>> takes 3
>>>> >>>> operators to use up its memory.
>>>> >>>>
>>>> >>>> The thing is page size is dynamically determined -- and in your
>>>> case it
>>>> >>>> should be smaller than 4MB.
>>>> >>>>
>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174
>>>> >>>>
>>>> >>>> Maybe there is a place that in the maven tests that we explicitly
>>>> set
>>>> >>>> the page size (spark.buffer.pageSize) to 4MB? If yes, we need to
>>>> find it and
>>>> >>>> just remove it.
>>>> >>>>
>>>> >>>>
>>>> >>>> On Mon, Sep 14, 2015 at 4:16 AM, Pete Robbins <robbinspg@gmail.com
>>>> >
>>>> >>>> wrote:
>>>> >>>>>
>>>> >>>>> I keep hitting errors running the tests on 1.5 such as
>>>> >>>>>
>>>> >>>>>
>>>> >>>>> - join31 *** FAILED ***
>>>> >>>>>   Failed to execute query using catalyst:
>>>> >>>>>   Error: Job aborted due to stage failure: Task 9 in stage 3653.0
>>>> >>>>> failed 1 times, most recent failure: Lost task 9.0 in stage
>>>> 3653.0 (TID
>>>> >>>>> 123363, localhost): java.io.IOException: Unable to acquire
>>>> 4194304 bytes of
>>>> >>>>> memory
>>>> >>>>>       at
>>>> >>>>>
>>>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368)
>>>> >>>>>
>>>> >>>>>
>>>> >>>>> This is using the command
>>>> >>>>> build/mvn -Pyarn -Phadoop-2.2 -Phive -Phive-thriftserver  test
>>>> >>>>>
>>>> >>>>>
>>>> >>>>> I don't see these errors in any of the amplab jenkins builds. Do
>>>> those
>>>> >>>>> builds have any configuration/environment that I may be missing?
>>>> My build is
>>>> >>>>> running with whatever defaults are in the top level pom.xml, eg
>>>> -Xmx3G.
>>>> >>>>>
>>>> >>>>> I can make these tests pass by setting
>>>> spark.shuffle.memoryFraction=0.6
>>>> >>>>> in the HiveCompatibilitySuite rather than the default 0.2 value.
>>>> >>>>>
>>>> >>>>> Trying to analyze what is going on with the test it is related to
>>>> the
>>>> >>>>> number of active tasks, which seems to rise to 32, and so the
>>>> >>>>> ShuffleMemoryManager allows less memory per task even though most
>>>> of those
>>>> >>>>> tasks do not have any memory allocated to them.
>>>> >>>>>
>>>> >>>>> Has anyone seen issues like this before?
>>>> >>>>
>>>> >>>>
>>>> >>>
>>>> >>
>>>> >
>>>>
>>>>
>>>>
>>>> --
>>>> Marcelo
>>>>
>>>
>>>
>>
>

Re: Unable to acquire memory errors in HiveCompatibilitySuite

Posted by Pete Robbins <ro...@gmail.com>.
The page size calculation is the issue here as there is plenty of free
memory, although there is maybe a fair bit of wasted space in some pages.
It is that when we have a lot of tasks each is only allowed to reach 1/n of
the available memory and several of the tasks bump in to that limit. With
tasks 4 times the number of cores there will be some contention and so they
remain active for longer.

So I think this is a test case issue configuring the number of executors
too high.

On 15 September 2015 at 18:54, Reynold Xin <rx...@databricks.com> wrote:

> Maybe we can change the heuristics in memory calculation to use
> SparkContext.defaultParallelism if it is local mode.
>
>
> On Tue, Sep 15, 2015 at 10:28 AM, Pete Robbins <ro...@gmail.com>
> wrote:
>
>> Yes and at least there is an override by setting  spark.sql.test.master
>> to local[8] , in fact local[16] worked on my 8 core box.
>>
>> I'm happy to use this as a workaround but the 32 hard-coded will fail
>> running build/tests on a clean checkout if you only have 8 cores.
>>
>> On 15 September 2015 at 17:40, Marcelo Vanzin <va...@cloudera.com>
>> wrote:
>>
>>> That test explicitly sets the number of executor cores to 32.
>>>
>>> object TestHive
>>>   extends TestHiveContext(
>>>     new SparkContext(
>>>       System.getProperty("spark.sql.test.master", "local[32]"),
>>>
>>>
>>> On Mon, Sep 14, 2015 at 11:22 PM, Reynold Xin <rx...@databricks.com>
>>> wrote:
>>> > Yea I think this is where the heuristics is failing -- it uses 8 cores
>>> to
>>> > approximate the number of active tasks, but the tests somehow is using
>>> 32
>>> > (maybe because it explicitly sets it to that, or you set it yourself?
>>> I'm
>>> > not sure which one)
>>> >
>>> > On Mon, Sep 14, 2015 at 11:06 PM, Pete Robbins <ro...@gmail.com>
>>> wrote:
>>> >>
>>> >> Reynold, thanks for replying.
>>> >>
>>> >> getPageSize parameters: maxMemory=515396075, numCores=0
>>> >> Calculated values: cores=8, default=4194304
>>> >>
>>> >> So am I getting a large page size as I only have 8 cores?
>>> >>
>>> >> On 15 September 2015 at 00:40, Reynold Xin <rx...@databricks.com>
>>> wrote:
>>> >>>
>>> >>> Pete - can you do me a favor?
>>> >>>
>>> >>>
>>> >>>
>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174
>>> >>>
>>> >>> Print the parameters that are passed into the getPageSize function,
>>> and
>>> >>> check their values.
>>> >>>
>>> >>> On Mon, Sep 14, 2015 at 4:32 PM, Reynold Xin <rx...@databricks.com>
>>> wrote:
>>> >>>>
>>> >>>> Is this on latest master / branch-1.5?
>>> >>>>
>>> >>>> out of the box we reserve only 16% (0.2 * 0.8) of the memory for
>>> >>>> execution (e.g. aggregate, join) / shuffle sorting. With a 3GB
>>> heap, that's
>>> >>>> 480MB. So each task gets 480MB / 32 = 15MB, and each operator
>>> reserves at
>>> >>>> least one page for execution. If your page size is 4MB, it only
>>> takes 3
>>> >>>> operators to use up its memory.
>>> >>>>
>>> >>>> The thing is page size is dynamically determined -- and in your
>>> case it
>>> >>>> should be smaller than 4MB.
>>> >>>>
>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174
>>> >>>>
>>> >>>> Maybe there is a place that in the maven tests that we explicitly
>>> set
>>> >>>> the page size (spark.buffer.pageSize) to 4MB? If yes, we need to
>>> find it and
>>> >>>> just remove it.
>>> >>>>
>>> >>>>
>>> >>>> On Mon, Sep 14, 2015 at 4:16 AM, Pete Robbins <ro...@gmail.com>
>>> >>>> wrote:
>>> >>>>>
>>> >>>>> I keep hitting errors running the tests on 1.5 such as
>>> >>>>>
>>> >>>>>
>>> >>>>> - join31 *** FAILED ***
>>> >>>>>   Failed to execute query using catalyst:
>>> >>>>>   Error: Job aborted due to stage failure: Task 9 in stage 3653.0
>>> >>>>> failed 1 times, most recent failure: Lost task 9.0 in stage 3653.0
>>> (TID
>>> >>>>> 123363, localhost): java.io.IOException: Unable to acquire 4194304
>>> bytes of
>>> >>>>> memory
>>> >>>>>       at
>>> >>>>>
>>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368)
>>> >>>>>
>>> >>>>>
>>> >>>>> This is using the command
>>> >>>>> build/mvn -Pyarn -Phadoop-2.2 -Phive -Phive-thriftserver  test
>>> >>>>>
>>> >>>>>
>>> >>>>> I don't see these errors in any of the amplab jenkins builds. Do
>>> those
>>> >>>>> builds have any configuration/environment that I may be missing?
>>> My build is
>>> >>>>> running with whatever defaults are in the top level pom.xml, eg
>>> -Xmx3G.
>>> >>>>>
>>> >>>>> I can make these tests pass by setting
>>> spark.shuffle.memoryFraction=0.6
>>> >>>>> in the HiveCompatibilitySuite rather than the default 0.2 value.
>>> >>>>>
>>> >>>>> Trying to analyze what is going on with the test it is related to
>>> the
>>> >>>>> number of active tasks, which seems to rise to 32, and so the
>>> >>>>> ShuffleMemoryManager allows less memory per task even though most
>>> of those
>>> >>>>> tasks do not have any memory allocated to them.
>>> >>>>>
>>> >>>>> Has anyone seen issues like this before?
>>> >>>>
>>> >>>>
>>> >>>
>>> >>
>>> >
>>>
>>>
>>>
>>> --
>>> Marcelo
>>>
>>
>>
>

Re: Unable to acquire memory errors in HiveCompatibilitySuite

Posted by Reynold Xin <rx...@databricks.com>.
Maybe we can change the heuristics in memory calculation to use
SparkContext.defaultParallelism if it is local mode.


On Tue, Sep 15, 2015 at 10:28 AM, Pete Robbins <ro...@gmail.com> wrote:

> Yes and at least there is an override by setting  spark.sql.test.master to
> local[8] , in fact local[16] worked on my 8 core box.
>
> I'm happy to use this as a workaround but the 32 hard-coded will fail
> running build/tests on a clean checkout if you only have 8 cores.
>
> On 15 September 2015 at 17:40, Marcelo Vanzin <va...@cloudera.com> wrote:
>
>> That test explicitly sets the number of executor cores to 32.
>>
>> object TestHive
>>   extends TestHiveContext(
>>     new SparkContext(
>>       System.getProperty("spark.sql.test.master", "local[32]"),
>>
>>
>> On Mon, Sep 14, 2015 at 11:22 PM, Reynold Xin <rx...@databricks.com>
>> wrote:
>> > Yea I think this is where the heuristics is failing -- it uses 8 cores
>> to
>> > approximate the number of active tasks, but the tests somehow is using
>> 32
>> > (maybe because it explicitly sets it to that, or you set it yourself?
>> I'm
>> > not sure which one)
>> >
>> > On Mon, Sep 14, 2015 at 11:06 PM, Pete Robbins <ro...@gmail.com>
>> wrote:
>> >>
>> >> Reynold, thanks for replying.
>> >>
>> >> getPageSize parameters: maxMemory=515396075, numCores=0
>> >> Calculated values: cores=8, default=4194304
>> >>
>> >> So am I getting a large page size as I only have 8 cores?
>> >>
>> >> On 15 September 2015 at 00:40, Reynold Xin <rx...@databricks.com>
>> wrote:
>> >>>
>> >>> Pete - can you do me a favor?
>> >>>
>> >>>
>> >>>
>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174
>> >>>
>> >>> Print the parameters that are passed into the getPageSize function,
>> and
>> >>> check their values.
>> >>>
>> >>> On Mon, Sep 14, 2015 at 4:32 PM, Reynold Xin <rx...@databricks.com>
>> wrote:
>> >>>>
>> >>>> Is this on latest master / branch-1.5?
>> >>>>
>> >>>> out of the box we reserve only 16% (0.2 * 0.8) of the memory for
>> >>>> execution (e.g. aggregate, join) / shuffle sorting. With a 3GB heap,
>> that's
>> >>>> 480MB. So each task gets 480MB / 32 = 15MB, and each operator
>> reserves at
>> >>>> least one page for execution. If your page size is 4MB, it only
>> takes 3
>> >>>> operators to use up its memory.
>> >>>>
>> >>>> The thing is page size is dynamically determined -- and in your case
>> it
>> >>>> should be smaller than 4MB.
>> >>>>
>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174
>> >>>>
>> >>>> Maybe there is a place that in the maven tests that we explicitly set
>> >>>> the page size (spark.buffer.pageSize) to 4MB? If yes, we need to
>> find it and
>> >>>> just remove it.
>> >>>>
>> >>>>
>> >>>> On Mon, Sep 14, 2015 at 4:16 AM, Pete Robbins <ro...@gmail.com>
>> >>>> wrote:
>> >>>>>
>> >>>>> I keep hitting errors running the tests on 1.5 such as
>> >>>>>
>> >>>>>
>> >>>>> - join31 *** FAILED ***
>> >>>>>   Failed to execute query using catalyst:
>> >>>>>   Error: Job aborted due to stage failure: Task 9 in stage 3653.0
>> >>>>> failed 1 times, most recent failure: Lost task 9.0 in stage 3653.0
>> (TID
>> >>>>> 123363, localhost): java.io.IOException: Unable to acquire 4194304
>> bytes of
>> >>>>> memory
>> >>>>>       at
>> >>>>>
>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368)
>> >>>>>
>> >>>>>
>> >>>>> This is using the command
>> >>>>> build/mvn -Pyarn -Phadoop-2.2 -Phive -Phive-thriftserver  test
>> >>>>>
>> >>>>>
>> >>>>> I don't see these errors in any of the amplab jenkins builds. Do
>> those
>> >>>>> builds have any configuration/environment that I may be missing? My
>> build is
>> >>>>> running with whatever defaults are in the top level pom.xml, eg
>> -Xmx3G.
>> >>>>>
>> >>>>> I can make these tests pass by setting
>> spark.shuffle.memoryFraction=0.6
>> >>>>> in the HiveCompatibilitySuite rather than the default 0.2 value.
>> >>>>>
>> >>>>> Trying to analyze what is going on with the test it is related to
>> the
>> >>>>> number of active tasks, which seems to rise to 32, and so the
>> >>>>> ShuffleMemoryManager allows less memory per task even though most
>> of those
>> >>>>> tasks do not have any memory allocated to them.
>> >>>>>
>> >>>>> Has anyone seen issues like this before?
>> >>>>
>> >>>>
>> >>>
>> >>
>> >
>>
>>
>>
>> --
>> Marcelo
>>
>
>

Re: Unable to acquire memory errors in HiveCompatibilitySuite

Posted by Pete Robbins <ro...@gmail.com>.
Yes and at least there is an override by setting  spark.sql.test.master to
local[8] , in fact local[16] worked on my 8 core box.

I'm happy to use this as a workaround but the 32 hard-coded will fail
running build/tests on a clean checkout if you only have 8 cores.

On 15 September 2015 at 17:40, Marcelo Vanzin <va...@cloudera.com> wrote:

> That test explicitly sets the number of executor cores to 32.
>
> object TestHive
>   extends TestHiveContext(
>     new SparkContext(
>       System.getProperty("spark.sql.test.master", "local[32]"),
>
>
> On Mon, Sep 14, 2015 at 11:22 PM, Reynold Xin <rx...@databricks.com> wrote:
> > Yea I think this is where the heuristics is failing -- it uses 8 cores to
> > approximate the number of active tasks, but the tests somehow is using 32
> > (maybe because it explicitly sets it to that, or you set it yourself? I'm
> > not sure which one)
> >
> > On Mon, Sep 14, 2015 at 11:06 PM, Pete Robbins <ro...@gmail.com>
> wrote:
> >>
> >> Reynold, thanks for replying.
> >>
> >> getPageSize parameters: maxMemory=515396075, numCores=0
> >> Calculated values: cores=8, default=4194304
> >>
> >> So am I getting a large page size as I only have 8 cores?
> >>
> >> On 15 September 2015 at 00:40, Reynold Xin <rx...@databricks.com> wrote:
> >>>
> >>> Pete - can you do me a favor?
> >>>
> >>>
> >>>
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174
> >>>
> >>> Print the parameters that are passed into the getPageSize function, and
> >>> check their values.
> >>>
> >>> On Mon, Sep 14, 2015 at 4:32 PM, Reynold Xin <rx...@databricks.com>
> wrote:
> >>>>
> >>>> Is this on latest master / branch-1.5?
> >>>>
> >>>> out of the box we reserve only 16% (0.2 * 0.8) of the memory for
> >>>> execution (e.g. aggregate, join) / shuffle sorting. With a 3GB heap,
> that's
> >>>> 480MB. So each task gets 480MB / 32 = 15MB, and each operator
> reserves at
> >>>> least one page for execution. If your page size is 4MB, it only takes
> 3
> >>>> operators to use up its memory.
> >>>>
> >>>> The thing is page size is dynamically determined -- and in your case
> it
> >>>> should be smaller than 4MB.
> >>>>
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174
> >>>>
> >>>> Maybe there is a place that in the maven tests that we explicitly set
> >>>> the page size (spark.buffer.pageSize) to 4MB? If yes, we need to find
> it and
> >>>> just remove it.
> >>>>
> >>>>
> >>>> On Mon, Sep 14, 2015 at 4:16 AM, Pete Robbins <ro...@gmail.com>
> >>>> wrote:
> >>>>>
> >>>>> I keep hitting errors running the tests on 1.5 such as
> >>>>>
> >>>>>
> >>>>> - join31 *** FAILED ***
> >>>>>   Failed to execute query using catalyst:
> >>>>>   Error: Job aborted due to stage failure: Task 9 in stage 3653.0
> >>>>> failed 1 times, most recent failure: Lost task 9.0 in stage 3653.0
> (TID
> >>>>> 123363, localhost): java.io.IOException: Unable to acquire 4194304
> bytes of
> >>>>> memory
> >>>>>       at
> >>>>>
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368)
> >>>>>
> >>>>>
> >>>>> This is using the command
> >>>>> build/mvn -Pyarn -Phadoop-2.2 -Phive -Phive-thriftserver  test
> >>>>>
> >>>>>
> >>>>> I don't see these errors in any of the amplab jenkins builds. Do
> those
> >>>>> builds have any configuration/environment that I may be missing? My
> build is
> >>>>> running with whatever defaults are in the top level pom.xml, eg
> -Xmx3G.
> >>>>>
> >>>>> I can make these tests pass by setting
> spark.shuffle.memoryFraction=0.6
> >>>>> in the HiveCompatibilitySuite rather than the default 0.2 value.
> >>>>>
> >>>>> Trying to analyze what is going on with the test it is related to the
> >>>>> number of active tasks, which seems to rise to 32, and so the
> >>>>> ShuffleMemoryManager allows less memory per task even though most of
> those
> >>>>> tasks do not have any memory allocated to them.
> >>>>>
> >>>>> Has anyone seen issues like this before?
> >>>>
> >>>>
> >>>
> >>
> >
>
>
>
> --
> Marcelo
>

Re: Unable to acquire memory errors in HiveCompatibilitySuite

Posted by Marcelo Vanzin <va...@cloudera.com>.
That test explicitly sets the number of executor cores to 32.

object TestHive
  extends TestHiveContext(
    new SparkContext(
      System.getProperty("spark.sql.test.master", "local[32]"),


On Mon, Sep 14, 2015 at 11:22 PM, Reynold Xin <rx...@databricks.com> wrote:
> Yea I think this is where the heuristics is failing -- it uses 8 cores to
> approximate the number of active tasks, but the tests somehow is using 32
> (maybe because it explicitly sets it to that, or you set it yourself? I'm
> not sure which one)
>
> On Mon, Sep 14, 2015 at 11:06 PM, Pete Robbins <ro...@gmail.com> wrote:
>>
>> Reynold, thanks for replying.
>>
>> getPageSize parameters: maxMemory=515396075, numCores=0
>> Calculated values: cores=8, default=4194304
>>
>> So am I getting a large page size as I only have 8 cores?
>>
>> On 15 September 2015 at 00:40, Reynold Xin <rx...@databricks.com> wrote:
>>>
>>> Pete - can you do me a favor?
>>>
>>>
>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174
>>>
>>> Print the parameters that are passed into the getPageSize function, and
>>> check their values.
>>>
>>> On Mon, Sep 14, 2015 at 4:32 PM, Reynold Xin <rx...@databricks.com> wrote:
>>>>
>>>> Is this on latest master / branch-1.5?
>>>>
>>>> out of the box we reserve only 16% (0.2 * 0.8) of the memory for
>>>> execution (e.g. aggregate, join) / shuffle sorting. With a 3GB heap, that's
>>>> 480MB. So each task gets 480MB / 32 = 15MB, and each operator reserves at
>>>> least one page for execution. If your page size is 4MB, it only takes 3
>>>> operators to use up its memory.
>>>>
>>>> The thing is page size is dynamically determined -- and in your case it
>>>> should be smaller than 4MB.
>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174
>>>>
>>>> Maybe there is a place that in the maven tests that we explicitly set
>>>> the page size (spark.buffer.pageSize) to 4MB? If yes, we need to find it and
>>>> just remove it.
>>>>
>>>>
>>>> On Mon, Sep 14, 2015 at 4:16 AM, Pete Robbins <ro...@gmail.com>
>>>> wrote:
>>>>>
>>>>> I keep hitting errors running the tests on 1.5 such as
>>>>>
>>>>>
>>>>> - join31 *** FAILED ***
>>>>>   Failed to execute query using catalyst:
>>>>>   Error: Job aborted due to stage failure: Task 9 in stage 3653.0
>>>>> failed 1 times, most recent failure: Lost task 9.0 in stage 3653.0 (TID
>>>>> 123363, localhost): java.io.IOException: Unable to acquire 4194304 bytes of
>>>>> memory
>>>>>       at
>>>>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368)
>>>>>
>>>>>
>>>>> This is using the command
>>>>> build/mvn -Pyarn -Phadoop-2.2 -Phive -Phive-thriftserver  test
>>>>>
>>>>>
>>>>> I don't see these errors in any of the amplab jenkins builds. Do those
>>>>> builds have any configuration/environment that I may be missing? My build is
>>>>> running with whatever defaults are in the top level pom.xml, eg -Xmx3G.
>>>>>
>>>>> I can make these tests pass by setting spark.shuffle.memoryFraction=0.6
>>>>> in the HiveCompatibilitySuite rather than the default 0.2 value.
>>>>>
>>>>> Trying to analyze what is going on with the test it is related to the
>>>>> number of active tasks, which seems to rise to 32, and so the
>>>>> ShuffleMemoryManager allows less memory per task even though most of those
>>>>> tasks do not have any memory allocated to them.
>>>>>
>>>>> Has anyone seen issues like this before?
>>>>
>>>>
>>>
>>
>



-- 
Marcelo

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: Unable to acquire memory errors in HiveCompatibilitySuite

Posted by Reynold Xin <rx...@databricks.com>.
Yea I think this is where the heuristics is failing -- it uses 8 cores to
approximate the number of active tasks, but the tests somehow is using 32
(maybe because it explicitly sets it to that, or you set it yourself? I'm
not sure which one)

On Mon, Sep 14, 2015 at 11:06 PM, Pete Robbins <ro...@gmail.com> wrote:

> Reynold, thanks for replying.
>
> getPageSize parameters: maxMemory=515396075, numCores=0
> Calculated values: cores=8, default=4194304
>
> So am I getting a large page size as I only have 8 cores?
>
> On 15 September 2015 at 00:40, Reynold Xin <rx...@databricks.com> wrote:
>
>> Pete - can you do me a favor?
>>
>>
>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174
>>
>> Print the parameters that are passed into the getPageSize function, and
>> check their values.
>>
>> On Mon, Sep 14, 2015 at 4:32 PM, Reynold Xin <rx...@databricks.com> wrote:
>>
>>> Is this on latest master / branch-1.5?
>>>
>>> out of the box we reserve only 16% (0.2 * 0.8) of the memory for
>>> execution (e.g. aggregate, join) / shuffle sorting. With a 3GB heap, that's
>>> 480MB. So each task gets 480MB / 32 = 15MB, and each operator reserves at
>>> least one page for execution. If your page size is 4MB, it only takes 3
>>> operators to use up its memory.
>>>
>>> The thing is page size is dynamically determined -- and in your case it
>>> should be smaller than 4MB.
>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174
>>>
>>> Maybe there is a place that in the maven tests that we explicitly set
>>> the page size (spark.buffer.pageSize) to 4MB? If yes, we need to find it
>>> and just remove it.
>>>
>>>
>>> On Mon, Sep 14, 2015 at 4:16 AM, Pete Robbins <ro...@gmail.com>
>>> wrote:
>>>
>>>> I keep hitting errors running the tests on 1.5 such as
>>>>
>>>>
>>>> - join31 *** FAILED ***
>>>>   Failed to execute query using catalyst:
>>>>   Error: Job aborted due to stage failure: Task 9 in stage 3653.0
>>>> failed 1 times, most recent failure: Lost task 9.0 in stage 3653.0 (TID
>>>> 123363, localhost): java.io.IOException: Unable to acquire 4194304 bytes of
>>>> memory
>>>>       at
>>>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368)
>>>>
>>>>
>>>> This is using the command
>>>> build/mvn -Pyarn -Phadoop-2.2 -Phive -Phive-thriftserver  test
>>>>
>>>>
>>>> I don't see these errors in any of the amplab jenkins builds. Do those
>>>> builds have any configuration/environment that I may be missing? My build
>>>> is running with whatever defaults are in the top level pom.xml, eg -Xmx3G.
>>>>
>>>> I can make these tests pass by setting spark.shuffle.memoryFraction=0.6
>>>> in the HiveCompatibilitySuite rather than the default 0.2 value.
>>>>
>>>> Trying to analyze what is going on with the test it is related to the
>>>> number of active tasks, which seems to rise to 32, and so the
>>>> ShuffleMemoryManager allows less memory per task even though most of those
>>>> tasks do not have any memory allocated to them.
>>>>
>>>> Has anyone seen issues like this before?
>>>>
>>>
>>>
>>
>

Re: Unable to acquire memory errors in HiveCompatibilitySuite

Posted by Pete Robbins <ro...@gmail.com>.
Reynold, thanks for replying.

getPageSize parameters: maxMemory=515396075, numCores=0
Calculated values: cores=8, default=4194304

So am I getting a large page size as I only have 8 cores?

On 15 September 2015 at 00:40, Reynold Xin <rx...@databricks.com> wrote:

> Pete - can you do me a favor?
>
>
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174
>
> Print the parameters that are passed into the getPageSize function, and
> check their values.
>
> On Mon, Sep 14, 2015 at 4:32 PM, Reynold Xin <rx...@databricks.com> wrote:
>
>> Is this on latest master / branch-1.5?
>>
>> out of the box we reserve only 16% (0.2 * 0.8) of the memory for
>> execution (e.g. aggregate, join) / shuffle sorting. With a 3GB heap, that's
>> 480MB. So each task gets 480MB / 32 = 15MB, and each operator reserves at
>> least one page for execution. If your page size is 4MB, it only takes 3
>> operators to use up its memory.
>>
>> The thing is page size is dynamically determined -- and in your case it
>> should be smaller than 4MB.
>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174
>>
>> Maybe there is a place that in the maven tests that we explicitly set the
>> page size (spark.buffer.pageSize) to 4MB? If yes, we need to find it and
>> just remove it.
>>
>>
>> On Mon, Sep 14, 2015 at 4:16 AM, Pete Robbins <ro...@gmail.com>
>> wrote:
>>
>>> I keep hitting errors running the tests on 1.5 such as
>>>
>>>
>>> - join31 *** FAILED ***
>>>   Failed to execute query using catalyst:
>>>   Error: Job aborted due to stage failure: Task 9 in stage 3653.0 failed
>>> 1 times, most recent failure: Lost task 9.0 in stage 3653.0 (TID 123363,
>>> localhost): java.io.IOException: Unable to acquire 4194304 bytes of memory
>>>       at
>>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368)
>>>
>>>
>>> This is using the command
>>> build/mvn -Pyarn -Phadoop-2.2 -Phive -Phive-thriftserver  test
>>>
>>>
>>> I don't see these errors in any of the amplab jenkins builds. Do those
>>> builds have any configuration/environment that I may be missing? My build
>>> is running with whatever defaults are in the top level pom.xml, eg -Xmx3G.
>>>
>>> I can make these tests pass by setting spark.shuffle.memoryFraction=0.6
>>> in the HiveCompatibilitySuite rather than the default 0.2 value.
>>>
>>> Trying to analyze what is going on with the test it is related to the
>>> number of active tasks, which seems to rise to 32, and so the
>>> ShuffleMemoryManager allows less memory per task even though most of those
>>> tasks do not have any memory allocated to them.
>>>
>>> Has anyone seen issues like this before?
>>>
>>
>>
>

Re: Unable to acquire memory errors in HiveCompatibilitySuite

Posted by Reynold Xin <rx...@databricks.com>.
Pete - can you do me a favor?

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174

Print the parameters that are passed into the getPageSize function, and
check their values.

On Mon, Sep 14, 2015 at 4:32 PM, Reynold Xin <rx...@databricks.com> wrote:

> Is this on latest master / branch-1.5?
>
> out of the box we reserve only 16% (0.2 * 0.8) of the memory for execution
> (e.g. aggregate, join) / shuffle sorting. With a 3GB heap, that's 480MB. So
> each task gets 480MB / 32 = 15MB, and each operator reserves at least one
> page for execution. If your page size is 4MB, it only takes 3 operators to
> use up its memory.
>
> The thing is page size is dynamically determined -- and in your case it
> should be smaller than 4MB.
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174
>
> Maybe there is a place that in the maven tests that we explicitly set the
> page size (spark.buffer.pageSize) to 4MB? If yes, we need to find it and
> just remove it.
>
>
> On Mon, Sep 14, 2015 at 4:16 AM, Pete Robbins <ro...@gmail.com> wrote:
>
>> I keep hitting errors running the tests on 1.5 such as
>>
>>
>> - join31 *** FAILED ***
>>   Failed to execute query using catalyst:
>>   Error: Job aborted due to stage failure: Task 9 in stage 3653.0 failed
>> 1 times, most recent failure: Lost task 9.0 in stage 3653.0 (TID 123363,
>> localhost): java.io.IOException: Unable to acquire 4194304 bytes of memory
>>       at
>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368)
>>
>>
>> This is using the command
>> build/mvn -Pyarn -Phadoop-2.2 -Phive -Phive-thriftserver  test
>>
>>
>> I don't see these errors in any of the amplab jenkins builds. Do those
>> builds have any configuration/environment that I may be missing? My build
>> is running with whatever defaults are in the top level pom.xml, eg -Xmx3G.
>>
>> I can make these tests pass by setting spark.shuffle.memoryFraction=0.6
>> in the HiveCompatibilitySuite rather than the default 0.2 value.
>>
>> Trying to analyze what is going on with the test it is related to the
>> number of active tasks, which seems to rise to 32, and so the
>> ShuffleMemoryManager allows less memory per task even though most of those
>> tasks do not have any memory allocated to them.
>>
>> Has anyone seen issues like this before?
>>
>
>

Re: Unable to acquire memory errors in HiveCompatibilitySuite

Posted by Reynold Xin <rx...@databricks.com>.
Is this on latest master / branch-1.5?

out of the box we reserve only 16% (0.2 * 0.8) of the memory for execution
(e.g. aggregate, join) / shuffle sorting. With a 3GB heap, that's 480MB. So
each task gets 480MB / 32 = 15MB, and each operator reserves at least one
page for execution. If your page size is 4MB, it only takes 3 operators to
use up its memory.

The thing is page size is dynamically determined -- and in your case it
should be smaller than 4MB.
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174

Maybe there is a place that in the maven tests that we explicitly set the
page size (spark.buffer.pageSize) to 4MB? If yes, we need to find it and
just remove it.


On Mon, Sep 14, 2015 at 4:16 AM, Pete Robbins <ro...@gmail.com> wrote:

> I keep hitting errors running the tests on 1.5 such as
>
>
> - join31 *** FAILED ***
>   Failed to execute query using catalyst:
>   Error: Job aborted due to stage failure: Task 9 in stage 3653.0 failed 1
> times, most recent failure: Lost task 9.0 in stage 3653.0 (TID 123363,
> localhost): java.io.IOException: Unable to acquire 4194304 bytes of memory
>       at
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368)
>
>
> This is using the command
> build/mvn -Pyarn -Phadoop-2.2 -Phive -Phive-thriftserver  test
>
>
> I don't see these errors in any of the amplab jenkins builds. Do those
> builds have any configuration/environment that I may be missing? My build
> is running with whatever defaults are in the top level pom.xml, eg -Xmx3G.
>
> I can make these tests pass by setting spark.shuffle.memoryFraction=0.6 in
> the HiveCompatibilitySuite rather than the default 0.2 value.
>
> Trying to analyze what is going on with the test it is related to the
> number of active tasks, which seems to rise to 32, and so the
> ShuffleMemoryManager allows less memory per task even though most of those
> tasks do not have any memory allocated to them.
>
> Has anyone seen issues like this before?
>