You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Takeshi Yamamuro (JIRA)" <ji...@apache.org> on 2018/07/03 09:16:00 UTC

[jira] [Commented] (SPARK-24729) Spark - stackoverflow error - org.apache.spark.sql.catalyst.plans.QueryPlan

    [ https://issues.apache.org/jira/browse/SPARK-24729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531071#comment-16531071 ] 

Takeshi Yamamuro commented on SPARK-24729:
------------------------------------------

Can you run on v2.3.1?

> Spark - stackoverflow error - org.apache.spark.sql.catalyst.plans.QueryPlan
> ---------------------------------------------------------------------------
>
>                 Key: SPARK-24729
>                 URL: https://issues.apache.org/jira/browse/SPARK-24729
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, SQL
>    Affects Versions: 2.1.1
>            Reporter: t oo
>            Priority: Major
>
> Ran a spark (v2.1.1) job that joins 2 rdds (one is .txt file from S3, another is parquet from S3) the job then merges the dataset (ie get latest row per PK, if PK exists in txt and parquet then take the row from the .txt) and writes out a new parquet to S3. Got this error but upon re-running it worked fine. Both the .txt and parquet have 302 columns. The .txt has 191 rows, the parquet has 156300 rows. Does anyone know the cause?
>  
> {code:java}
>  
> 18/07/02 13:51:56 INFO TaskSetManager: Starting task 0.0 in stage 14.0 (TID 134, 10.160.122.226, executor 0, partition 0, PROCESS_LOCAL, 6337 bytes)
> 18/07/02 13:51:56 INFO BlockManagerInfo: Added broadcast_18_piece0 in memory on 10.160.122.226:38011 (size: 27.2 KB, free: 4.6 GB)
> 18/07/02 13:51:56 INFO TaskSetManager: Finished task 0.0 in stage 14.0 (TID 134) in 295 ms on 10.160.122.226 (executor 0) (1/1)
> 18/07/02 13:51:56 INFO TaskSchedulerImpl: Removed TaskSet 14.0, whose tasks have all completed, from pool
> 18/07/02 13:51:56 INFO DAGScheduler: ResultStage 14 (load at Data.scala:25) finished in 0.295 s
> 18/07/02 13:51:56 INFO DAGScheduler: Job 7 finished: load at Data.scala:25, took 0.310932 s
> 18/07/02 13:51:57 INFO FileSourceStrategy: Pruning directories with:
> 18/07/02 13:51:57 INFO FileSourceStrategy: Post-Scan Filters:
> 18/07/02 13:51:57 INFO FileSourceStrategy: Output Data Schema: struct<row_id: string, created: timestamp, created_by: string, last_upd: timestamp, last_upd_by: string ... 300 more fields>
> 18/07/02 13:51:57 INFO FileSourceStrategy: Pushed Filters:
> 18/07/02 13:51:57 INFO MemoryStore: Block broadcast_19 stored as values in memory (estimated size 387.2 KB, free 911.2 MB)
> 18/07/02 13:51:57 INFO MemoryStore: Block broadcast_19_piece0 stored as bytes in memory (estimated size 33.7 KB, free 911.1 MB)
> 18/07/02 13:51:57 INFO BlockManagerInfo: Added broadcast_19_piece0 in memory on 10.160.123.242:38105 (size: 33.7 KB, free: 912.2 MB)
> 18/07/02 13:51:57 INFO SparkContext: Created broadcast 19 from cache at Upsert.scala:25
> 18/07/02 13:51:57 INFO FileSourceScanExec: Planning scan with bin packing, max size: 48443541 bytes, open cost is considered as scanning 4194304 bytes.
> 18/07/02 13:51:57 INFO SparkContext: Starting job: take at Utils.scala:28
> 18/07/02 13:51:57 INFO DAGScheduler: Got job 8 (take at Utils.scala:28) with 1 output partitions
> 18/07/02 13:51:57 INFO DAGScheduler: Final stage: ResultStage 15 (take at Utils.scala:28)
> 18/07/02 13:51:57 INFO DAGScheduler: Parents of final stage: List()
> 18/07/02 13:51:57 INFO DAGScheduler: Missing parents: List()
> 18/07/02 13:51:57 INFO DAGScheduler: Submitting ResultStage 15 (MapPartitionsRDD[65] at take at Utils.scala:28), which has no missing parents
> 18/07/02 13:51:57 INFO MemoryStore: Block broadcast_20 stored as values in memory (estimated size 321.5 KB, free 910.8 MB)
> 18/07/02 13:51:57 INFO MemoryStore: Block broadcast_20_piece0 stored as bytes in memory (estimated size 93.0 KB, free 910.7 MB)
> 18/07/02 13:51:57 INFO BlockManagerInfo: Added broadcast_20_piece0 in memory on 10.160.123.242:38105 (size: 93.0 KB, free: 912.1 MB)
> 18/07/02 13:51:57 INFO SparkContext: Created broadcast 20 from broadcast at DAGScheduler.scala:996
> 18/07/02 13:51:57 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 15 (MapPartitionsRDD[65] at take at Utils.scala:28)
> 18/07/02 13:51:57 INFO TaskSchedulerImpl: Adding task set 15.0 with 1 tasks
> 18/07/02 13:51:57 INFO TaskSetManager: Starting task 0.0 in stage 15.0 (TID 135, 10.160.122.226, executor 0, partition 0, PROCESS_LOCAL, 9035 bytes)
> 18/07/02 13:51:57 INFO BlockManagerInfo: Added broadcast_20_piece0 in memory on 10.160.122.226:38011 (size: 93.0 KB, free: 4.6 GB)
> 18/07/02 13:51:57 INFO BlockManagerInfo: Added broadcast_19_piece0 in memory on 10.160.122.226:38011 (size: 33.7 KB, free: 4.6 GB)
> 18/07/02 13:52:05 INFO BlockManagerInfo: Added rdd_61_0 in memory on 10.160.122.226:38011 (size: 38.9 MB, free: 4.5 GB)
> 18/07/02 13:52:09 INFO BlockManagerInfo: Added rdd_63_0 in memory on 10.160.122.226:38011 (size: 38.9 MB, free: 4.5 GB)
> 18/07/02 13:52:09 INFO TaskSetManager: Finished task 0.0 in stage 15.0 (TID 135) in 11751 ms on 10.160.122.226 (executor 0) (1/1)
> 18/07/02 13:52:09 INFO TaskSchedulerImpl: Removed TaskSet 15.0, whose tasks have all completed, from pool
> 18/07/02 13:52:09 INFO DAGScheduler: ResultStage 15 (take at Utils.scala:28) finished in 11.751 s
> 18/07/02 13:52:09 INFO DAGScheduler: Job 8 finished: take at Utils.scala:28, took 11.772561 s
> 18/07/02 13:52:09 INFO CodeGenerator: Code generated in 185.277258 ms
> 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3459
> 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3452
> 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3456
> 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3455
> 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3458
> 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3450
> 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3460
> 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3449
> 18/07/02 13:52:09 INFO BlockManagerInfo: Removed broadcast_18_piece0 on 10.160.123.242:38105 in memory (size: 27.2 KB, free: 912.1 MB)
> 18/07/02 13:52:09 INFO BlockManagerInfo: Removed broadcast_18_piece0 on 10.160.122.226:38011 in memory (size: 27.2 KB, free: 4.5 GB)
> 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3462
> 18/07/02 13:52:09 INFO BlockManagerInfo: Removed broadcast_17_piece0 on 10.160.123.242:38105 in memory (size: 3.7 KB, free: 912.1 MB)
> 18/07/02 13:52:09 INFO BlockManagerInfo: Removed broadcast_17_piece0 on 10.160.122.226:38011 in memory (size: 3.7 KB, free: 4.5 GB)
> 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3451
> 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3684
> 18/07/02 13:52:09 INFO BlockManagerInfo: Removed broadcast_15_piece0 on 10.160.123.242:38105 in memory (size: 25.6 KB, free: 912.1 MB)
> 18/07/02 13:52:09 INFO BlockManagerInfo: Removed broadcast_15_piece0 on 10.160.122.226:38011 in memory (size: 25.6 KB, free: 4.5 GB)
> 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3453
> 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3457
> 18/07/02 13:52:09 INFO BlockManagerInfo: Removed broadcast_16_piece0 on 10.160.123.242:38105 in memory (size: 4.9 KB, free: 912.2 MB)
> 18/07/02 13:52:09 INFO BlockManagerInfo: Removed broadcast_16_piece0 on 10.160.122.226:38011 in memory (size: 4.9 KB, free: 4.5 GB)
> 18/07/02 13:52:09 INFO ContextCleaner: Cleaned shuffle 6
> 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3461
> 18/07/02 13:52:09 INFO BlockManagerInfo: Removed broadcast_20_piece0 on 10.160.123.242:38105 in memory (size: 93.0 KB, free: 912.2 MB)
> 18/07/02 13:52:09 INFO BlockManagerInfo: Removed broadcast_20_piece0 on 10.160.122.226:38011 in memory (size: 93.0 KB, free: 4.5 GB)
> 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3454
> 18/07/02 13:52:39 INFO SparkContext: Starting job: run at ThreadPoolExecutor.java:1149
> 18/07/02 13:52:39 INFO DAGScheduler: Got job 9 (run at ThreadPoolExecutor.java:1149) with 4 output partitions
> 18/07/02 13:52:39 INFO DAGScheduler: Final stage: ResultStage 16 (run at ThreadPoolExecutor.java:1149)
> 18/07/02 13:52:39 INFO DAGScheduler: Parents of final stage: List()
> 18/07/02 13:52:39 INFO DAGScheduler: Missing parents: List()
> 18/07/02 13:52:39 INFO DAGScheduler: Submitting ResultStage 16 (MapPartitionsRDD[67] at run at ThreadPoolExecutor.java:1149), which has no missing parents
> 18/07/02 13:52:39 INFO MemoryStore: Block broadcast_21 stored as values in memory (estimated size 321.7 KB, free 911.3 MB)
> 18/07/02 13:52:39 INFO MemoryStore: Block broadcast_21_piece0 stored as bytes in memory (estimated size 93.0 KB, free 911.2 MB)
> 18/07/02 13:52:39 INFO BlockManagerInfo: Added broadcast_21_piece0 in memory on 10.160.123.242:38105 (size: 93.0 KB, free: 912.2 MB)
> 18/07/02 13:52:39 INFO SparkContext: Created broadcast 21 from broadcast at DAGScheduler.scala:996
> 18/07/02 13:52:39 INFO DAGScheduler: Submitting 4 missing tasks from ResultStage 16 (MapPartitionsRDD[67] at run at ThreadPoolExecutor.java:1149)
> 18/07/02 13:52:39 INFO TaskSchedulerImpl: Adding task set 16.0 with 4 tasks
> 18/07/02 13:52:39 INFO TaskSetManager: Starting task 0.0 in stage 16.0 (TID 136, 10.160.122.226, executor 0, partition 0, PROCESS_LOCAL, 9098 bytes)
> 18/07/02 13:52:39 INFO TaskSetManager: Starting task 1.0 in stage 16.0 (TID 137, 10.160.122.226, executor 0, partition 1, PROCESS_LOCAL, 9098 bytes)
> 18/07/02 13:52:39 INFO TaskSetManager: Starting task 2.0 in stage 16.0 (TID 138, 10.160.122.226, executor 0, partition 2, PROCESS_LOCAL, 9098 bytes)
> 18/07/02 13:52:39 INFO TaskSetManager: Starting task 3.0 in stage 16.0 (TID 139, 10.160.122.226, executor 0, partition 3, PROCESS_LOCAL, 9098 bytes)
> 18/07/02 13:52:39 INFO BlockManagerInfo: Added broadcast_21_piece0 in memory on 10.160.122.226:38011 (size: 93.0 KB, free: 4.5 GB)
> 18/07/02 13:52:39 INFO TaskSetManager: Finished task 0.0 in stage 16.0 (TID 136) in 47 ms on 10.160.122.226 (executor 0) (1/4)
> 18/07/02 13:52:46 INFO BlockManagerInfo: Added rdd_61_2 in memory on 10.160.122.226:38011 (size: 38.8 MB, free: 4.5 GB)
> 18/07/02 13:52:46 INFO BlockManagerInfo: Added rdd_61_3 in memory on 10.160.122.226:38011 (size: 38.7 MB, free: 4.4 GB)
> 18/07/02 13:52:46 INFO BlockManagerInfo: Added rdd_61_1 in memory on 10.160.122.226:38011 (size: 38.8 MB, free: 4.4 GB)
> 18/07/02 13:52:49 INFO BlockManagerInfo: Added rdd_63_2 in memory on 10.160.122.226:38011 (size: 38.8 MB, free: 4.3 GB)
> 18/07/02 13:52:49 INFO TaskSetManager: Finished task 2.0 in stage 16.0 (TID 138) in 10368 ms on 10.160.122.226 (executor 0) (2/4)
> 18/07/02 13:52:50 INFO BlockManagerInfo: Added rdd_63_3 in memory on 10.160.122.226:38011 (size: 38.7 MB, free: 4.3 GB)
> 18/07/02 13:52:50 INFO TaskSetManager: Finished task 3.0 in stage 16.0 (TID 139) in 10617 ms on 10.160.122.226 (executor 0) (3/4)
> 18/07/02 13:52:50 INFO BlockManagerInfo: Added rdd_63_1 in memory on 10.160.122.226:38011 (size: 38.8 MB, free: 4.3 GB)
> 18/07/02 13:52:50 INFO TaskSetManager: Finished task 1.0 in stage 16.0 (TID 137) in 10668 ms on 10.160.122.226 (executor 0) (4/4)
> 18/07/02 13:52:50 INFO TaskSchedulerImpl: Removed TaskSet 16.0, whose tasks have all completed, from pool
> 18/07/02 13:52:50 INFO DAGScheduler: ResultStage 16 (run at ThreadPoolExecutor.java:1149) finished in 10.669 s
> 18/07/02 13:52:50 INFO DAGScheduler: Job 9 finished: run at ThreadPoolExecutor.java:1149, took 10.684407 s
> 18/07/02 13:52:50 INFO CodeGenerator: Code generated in 7.746892 ms
> 18/07/02 13:52:50 INFO MemoryStore: Block broadcast_22 stored as values in memory (estimated size 19.0 MB, free 892.2 MB)
> 18/07/02 13:52:50 INFO MemoryStore: Block broadcast_22_piece0 stored as bytes in memory (estimated size 3.2 MB, free 889.0 MB)
> 18/07/02 13:52:50 INFO BlockManagerInfo: Added broadcast_22_piece0 in memory on 10.160.123.242:38105 (size: 3.2 MB, free: 909.0 MB)
> 18/07/02 13:52:50 INFO SparkContext: Created broadcast 22 from run at ThreadPoolExecutor.java:1149
> 18/07/02 13:52:50 INFO BlockManagerInfo: Removed broadcast_21_piece0 on 10.160.123.242:38105 in memory (size: 93.0 KB, free: 909.0 MB)
> 18/07/02 13:52:50 INFO BlockManagerInfo: Removed broadcast_21_piece0 on 10.160.122.226:38011 in memory (size: 93.0 KB, free: 4.3 GB)
> Exception in thread "main" java.lang.StackOverflowError
>  at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$sameResult$1.apply(QueryPlan.scala:373)
>  at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$sameResult$1.apply(QueryPlan.scala:373)
>  at scala.runtime.Tuple2Zipped$$anonfun$forall$extension$1.apply(Tuple2Zipped.scala:101)
>  at scala.runtime.Tuple2Zipped$$anonfun$forall$extension$1.apply(Tuple2Zipped.scala:101)
>  at scala.runtime.Tuple2Zipped$$anonfun$exists$extension$1.apply(Tuple2Zipped.scala:92)
>  at scala.runtime.Tuple2Zipped$$anonfun$exists$extension$1.apply(Tuple2Zipped.scala:90)
>  at scala.collection.immutable.List.foreach(List.scala:381)
>  at scala.runtime.Tuple2Zipped$.exists$extension(Tuple2Zipped.scala:90)
>  at scala.runtime.Tuple2Zipped$.forall$extension(Tuple2Zipped.scala:101)
>  at org.apache.spark.sql.catalyst.plans.QueryPlan.sameResult(QueryPlan.scala:373)
>  at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$sameResult$1.apply(QueryPlan.scala:373)
>  at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$sameResult$1.apply(QueryPlan.scala:373)
>  at scala.runtime.Tuple2Zipped$$anonfun$forall$extension$1.apply(Tuple2Zipped.scala:101)
>  at scala.runtime.Tuple2Zipped$$anonfun$forall$extension$1.apply(Tuple2Zipped.scala:101)
>  at scala.runtime.Tuple2Zipped$$anonfun$exists$extension$1.apply(Tuple2Zipped.scala:92)
>  at scala.runtime.Tuple2Zipped$$anonfun$exists$extension$1.apply(Tuple2Zipped.scala:90)
>  at scala.collection.immutable.List.foreach(List.scala:381)
>  at scala.runtime.Tuple2Zipped$.exists$extension(Tuple2Zipped.scala:90)
>  at scala.runtime.Tuple2Zipped$.forall$extension(Tuple2Zipped.scala:101)
>  at org.apache.spark.sql.catalyst.plans.QueryPlan.sameResult(QueryPlan.scala:373)
>  
> {code}
>  
> Code ran:
>  
> {code:java}
> object Upsert {
>  val logger = Logger.getLogger(getClass.getName)
> def finalDf(srcDf: DataFrame, partitionPath: Option[String], hiveSchema: StructType, pkList: List[String], srcSchema: StructType) = {
>  logger.info(s"""=====Joining the source file and previous hive partition=====""")
>  //val hiveCols = srcSchema.map(f => col(f.name))
>  val srcCols = srcSchema.map(f => col("_" + f.name))
>  val finalColsType = srcSchema.map(f =>
>  if(f.dataType.simpleString.contains("decimal")) (f.name, DecimalType(31,8)) 
>  else (f.name, f.dataType)
>  )
>  val finalCols = finalColsType.map(_._1)
>  val srcPkList = pkList.map("_" + _)
>  val hivedf = extract.Data.readHivePartition(sparkSession, partitionPath, hiveSchema).cache()
>  val hiveCols = hivedf.dtypes.toList.map(n => (n._1, stringToStructTypeMapping(n._2)))
>  val addedCols = finalColsType.toList.diff(hiveCols)
>  val hivedfNew = addMultipleColToDF(hivedf, addedCols).select(finalCols.map(col(_)):_*).cache()
>  val commonDataFilterCond = srcPkList
>  .zip(pkList)
>  .map{case(c1, c2) => (coalesce(col(c1), lit("null")) === coalesce(col(c2), lit("null")))}
>  .reduce(_ && _)
> isDfEmpty(hivedfNew) match {
>  case true => srcDf
>  case false => {
>  val srcRename = srcDf.toDF(srcSchema.map("_" + _.name):_*)
>  val joinData = srcRename.join(hivedfNew, commonDataFilterCond, "inner")
>  val commonData = joinData.select(srcCols:_*)
>  val currentData = srcRename.except(commonData).cache
>  val prevData = hivedfNew.except(joinData.select(finalCols.map(col(_)):_*))
>  currentData.unionAll(prevData).unionAll(commonData).toDF(finalCols:_*)
>  }
>  }
>  }
> }
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org