You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sandeep Katta (Jira)" <ji...@apache.org> on 2021/04/15 17:48:00 UTC

[jira] [Created] (SPARK-35096) foreachBatch throws ArrayIndexOutOfBoundsException if schema is case Insensitive

Sandeep Katta created SPARK-35096:
-------------------------------------

             Summary: foreachBatch throws ArrayIndexOutOfBoundsException if schema is case Insensitive
                 Key: SPARK-35096
                 URL: https://issues.apache.org/jira/browse/SPARK-35096
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 3.0.0
            Reporter: Sandeep Katta


Below code works fine before spark3, running on spark3 throws 

java.lang.ArrayIndexOutOfBoundsException
{code:java}
val inputPath = "/Users/xyz/data/testcaseInsensitivity"
val output_path = "/Users/xyz/output"

spark.range(10).write.format("parquet").save(inputPath)

def process_row(microBatch: DataFrame, batchId: Long): Unit = {
  val df = microBatch.select($"ID".alias("other")) // Doesn't work
  df.write.format("parquet").mode("append").save(output_path)

}

val schema = new StructType().add("id", LongType)

val stream_df = spark.readStream.schema(schema).format("parquet").load(inputPath)
stream_df.writeStream.trigger(Trigger.Once).foreachBatch(process_row _)
  .start().awaitTermination()
{code}
Stack Trace:
{code:java}
Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
  at org.apache.spark.sql.types.StructType.apply(StructType.scala:414)
  at org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$$anonfun$apply$4.$anonfun$applyOrElse$3(objects.scala:216)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at scala.collection.TraversableLike.map(TraversableLike.scala:238)
  at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
  at scala.collection.immutable.List.map(List.scala:298)
  at org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$$anonfun$apply$4.applyOrElse(objects.scala:215)
  at org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$$anonfun$apply$4.applyOrElse(objects.scala:203)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:309)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:309)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:314)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:314)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:298)
  at org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$.apply(objects.scala:203)
  at org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$.apply(objects.scala:121)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:149)
  at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
  at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
  at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:146)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:138)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:138)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:116)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:116)
  at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:82)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
  at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:133)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
  at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:82)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:79)
  at org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$4(QueryExecution.scala:197)
  at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:381)
  at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$writePlans(QueryExecution.scala:197)
  at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:207)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:95)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:944)
  at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:396)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:380)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:269)
  at process_row(<console>:32)
  at $anonfun$res4$1(<console>:30)
  at $anonfun$res4$1$adapted(<console>:30)
  at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:36)
  at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:573)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:571)
  at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
  at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
  at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
  at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:571)
  at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:223)
  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
  at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
  at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
  at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191)
  at org.apache.spark.sql.execution.streaming.OneTimeExecutor.execute(TriggerExecutor.scala:39)
  at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
  at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
  ... 1 more
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org