You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Yongqin Xiao (JIRA)" <ji...@apache.org> on 2018/01/22 22:26:00 UTC

[jira] [Updated] (SPARK-23183) Failure caused by TaskContext is missing in the thread spawned by Custom RDD

     [ https://issues.apache.org/jira/browse/SPARK-23183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yongqin Xiao updated SPARK-23183:
---------------------------------
    Description: 
This is related to the already resolved issue https://issues.apache.org/jira/browse/SPARK-18406., which was resolved by "explicitly pass the TID value to the {{unlock}} method".

The fix resolved the use cases at that time.

However, a new use case failed, which has a custom RDD followed by an aggregator. The stack trace is:
{noformat}
18/01/22 13:19:51 WARN TaskSetManager: Lost task 75.0 in stage 2.0 (TID 124, psrhcdh58c2c001.informatica.com, executor 3): java.lang.RuntimeException: DTM execution failed with error: com.informatica.powercenter.sdk.dtm.DTMException: java.lang.NullPointerException
[com.informatica.powercenter.sdk.dtm.DTMException: java.lang.NullPointerException
 at com.informatica.products.infatransform.spark.edtm.DataExchangeRunnable$DataChannelRunnable.pushInputDataToDTM(DataExchangeRunnable.java:554)
 at com.informatica.products.infatransform.spark.edtm.DataExchangeRunnable$DataChannelRunnable.run(DataExchangeRunnable.java:424)
 at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
 at  <---------------------------------------------------org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:419)
 at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:79)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
 at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31)
 at com.informatica.products.infatransform.spark.edtm.DataExchangeRunnable$DataChannelRunnable.pushInputDataToDTM(DataExchangeRunnable.java:486)
 ... 2 more
]
 at com.informatica.products.infatransform.spark.edtm.DTMIteratorForBulkOutput.finishUp(DTMIteratorForBulkOutput.java:94)
 at com.informatica.products.infatransform.spark.edtm.DTMIteratorForBulkOutput.hasNext(DTMIteratorForBulkOutput.java:53)
 at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
 at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215)
 at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:968)
 at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:959)
 at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:899)
 at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:959)
 at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:705)
 at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
 at org.apache.spark.scheduler.Task.run(Task.scala:99)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:748)
Caused by: com.informatica.products.infatransform.spark.boot.SparkDTMException: com.informatica.powercenter.sdk.dtm.DTMException: java.lang.NullPointerException
[com.informatica.powercenter.sdk.dtm.DTMException: java.lang.NullPointerException
 at com.informatica.products.infatransform.spark.edtm.DataExchangeRunnable$DataChannelRunnable.pushInputDataToDTM(DataExchangeRunnable.java:554)
 at com.informatica.products.infatransform.spark.edtm.DataExchangeRunnable$DataChannelRunnable.run(DataExchangeRunnable.java:424)
 at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
 at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:419)
 at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:79)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
 at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31)
 at com.informatica.products.infatransform.spark.edtm.DataExchangeRunnable$DataChannelRunnable.pushInputDataToDTM(DataExchangeRunnable.java:486)
 ... 2 more
]
 at com.informatica.products.infatransform.spark.edtm.DataExchangeRunnable.run(DataExcha{noformat}
ngeRunnable.java:220)
 ... 1 more

Code of TungstenAggregationIterator.scala:419:
 TaskMetrics metrics = TaskContext..MODULE$.get().taskMetrics();

It's possible that there are other spark code like this where TaskContext is expected to be set in thread.

So, we have a simple request for spark to make TaskContext.setTaskContext public instead of protected.

  was:
This is related to the already resolved issue https://issues.apache.org/jira/browse/SPARK-18406., which was resolved by "explicitly pass the TID value to the {{unlock}} method".

The fix resolved the use cases at that time.

However, a new use case failed, which has a custom RDD followed by an aggregator. The stack trace is:


{noformat}
18/01/22 13:19:51 WARN TaskSetManager: Lost task 75.0 in stage 2.0 (TID 124, psrhcdh58c2c001.informatica.com, executor 3): java.lang.RuntimeException: DTM execution failed with error: com.informatica.powercenter.sdk.dtm.DTMException: java.lang.NullPointerException
[com.informatica.powercenter.sdk.dtm.DTMException: java.lang.NullPointerException
 at com.informatica.products.infatransform.spark.edtm.DataExchangeRunnable$DataChannelRunnable.pushInputDataToDTM(DataExchangeRunnable.java:554)
 at com.informatica.products.infatransform.spark.edtm.DataExchangeRunnable$DataChannelRunnable.run(DataExchangeRunnable.java:424)
 at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
 at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:419)
 at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:79)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
 at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31)
 at com.informatica.products.infatransform.spark.edtm.DataExchangeRunnable$DataChannelRunnable.pushInputDataToDTM(DataExchangeRunnable.java:486)
 ... 2 more
]
 at com.informatica.products.infatransform.spark.edtm.DTMIteratorForBulkOutput.finishUp(DTMIteratorForBulkOutput.java:94)
 at com.informatica.products.infatransform.spark.edtm.DTMIteratorForBulkOutput.hasNext(DTMIteratorForBulkOutput.java:53)
 at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
 at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215)
 at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:968)
 at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:959)
 at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:899)
 at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:959)
 at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:705)
 at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
 at org.apache.spark.scheduler.Task.run(Task.scala:99)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:748)
Caused by: com.informatica.products.infatransform.spark.boot.SparkDTMException: com.informatica.powercenter.sdk.dtm.DTMException: java.lang.NullPointerException
[com.informatica.powercenter.sdk.dtm.DTMException: java.lang.NullPointerException
 at com.informatica.products.infatransform.spark.edtm.DataExchangeRunnable$DataChannelRunnable.pushInputDataToDTM(DataExchangeRunnable.java:554)
 at com.informatica.products.infatransform.spark.edtm.DataExchangeRunnable$DataChannelRunnable.run(DataExchangeRunnable.java:424)
 at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
 at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:419)
 at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:79)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
 at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31)
 at com.informatica.products.infatransform.spark.edtm.DataExchangeRunnable$DataChannelRunnable.pushInputDataToDTM(DataExchangeRunnable.java:486)
 ... 2 more
]
 at com.informatica.products.infatransform.spark.edtm.DataExchangeRunnable.run(DataExcha{noformat}
ngeRunnable.java:220)
 ... 1 more

Code of TungstenAggregationIterator.scala:419:
TaskMetrics metrics = TaskContext..MODULE$.get().taskMetrics();

It's possible that there are other spark code like this where TaskContext is expected to be set in thread.

So, we have a simple request for spark to make TaskContext.setTaskContext public instead of protected.


> Failure caused by TaskContext is missing in the thread spawned by Custom RDD 
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-23183
>                 URL: https://issues.apache.org/jira/browse/SPARK-23183
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.1.0, 2.2.1
>         Environment: This issue is only reproducible when user code spawns a new thread where the RDD iterator is used. It can be reproduced in any spark version.
>            Reporter: Yongqin Xiao
>            Priority: Critical
>              Labels: easyfix
>
> This is related to the already resolved issue https://issues.apache.org/jira/browse/SPARK-18406., which was resolved by "explicitly pass the TID value to the {{unlock}} method".
> The fix resolved the use cases at that time.
> However, a new use case failed, which has a custom RDD followed by an aggregator. The stack trace is:
> {noformat}
> 18/01/22 13:19:51 WARN TaskSetManager: Lost task 75.0 in stage 2.0 (TID 124, psrhcdh58c2c001.informatica.com, executor 3): java.lang.RuntimeException: DTM execution failed with error: com.informatica.powercenter.sdk.dtm.DTMException: java.lang.NullPointerException
> [com.informatica.powercenter.sdk.dtm.DTMException: java.lang.NullPointerException
>  at com.informatica.products.infatransform.spark.edtm.DataExchangeRunnable$DataChannelRunnable.pushInputDataToDTM(DataExchangeRunnable.java:554)
>  at com.informatica.products.infatransform.spark.edtm.DataExchangeRunnable$DataChannelRunnable.run(DataExchangeRunnable.java:424)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
>  at  <---------------------------------------------------org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:419)
>  at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:79)
>  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>  at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31)
>  at com.informatica.products.infatransform.spark.edtm.DataExchangeRunnable$DataChannelRunnable.pushInputDataToDTM(DataExchangeRunnable.java:486)
>  ... 2 more
> ]
>  at com.informatica.products.infatransform.spark.edtm.DTMIteratorForBulkOutput.finishUp(DTMIteratorForBulkOutput.java:94)
>  at com.informatica.products.infatransform.spark.edtm.DTMIteratorForBulkOutput.hasNext(DTMIteratorForBulkOutput.java:53)
>  at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
>  at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>  at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215)
>  at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:968)
>  at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:959)
>  at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:899)
>  at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:959)
>  at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:705)
>  at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>  at org.apache.spark.scheduler.Task.run(Task.scala:99)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
>  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: com.informatica.products.infatransform.spark.boot.SparkDTMException: com.informatica.powercenter.sdk.dtm.DTMException: java.lang.NullPointerException
> [com.informatica.powercenter.sdk.dtm.DTMException: java.lang.NullPointerException
>  at com.informatica.products.infatransform.spark.edtm.DataExchangeRunnable$DataChannelRunnable.pushInputDataToDTM(DataExchangeRunnable.java:554)
>  at com.informatica.products.infatransform.spark.edtm.DataExchangeRunnable$DataChannelRunnable.run(DataExchangeRunnable.java:424)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
>  at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:419)
>  at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:79)
>  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>  at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31)
>  at com.informatica.products.infatransform.spark.edtm.DataExchangeRunnable$DataChannelRunnable.pushInputDataToDTM(DataExchangeRunnable.java:486)
>  ... 2 more
> ]
>  at com.informatica.products.infatransform.spark.edtm.DataExchangeRunnable.run(DataExcha{noformat}
> ngeRunnable.java:220)
>  ... 1 more
> Code of TungstenAggregationIterator.scala:419:
>  TaskMetrics metrics = TaskContext..MODULE$.get().taskMetrics();
> It's possible that there are other spark code like this where TaskContext is expected to be set in thread.
> So, we have a simple request for spark to make TaskContext.setTaskContext public instead of protected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org