You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sandeep Katta (Jira)" <ji...@apache.org> on 2021/04/15 17:48:00 UTC
[jira] [Created] (SPARK-35096) foreachBatch throws
ArrayIndexOutOfBoundsException if schema is case Insensitive
Sandeep Katta created SPARK-35096:
-------------------------------------
Summary: foreachBatch throws ArrayIndexOutOfBoundsException if schema is case Insensitive
Key: SPARK-35096
URL: https://issues.apache.org/jira/browse/SPARK-35096
Project: Spark
Issue Type: Bug
Components: Spark Core
Affects Versions: 3.0.0
Reporter: Sandeep Katta
Below code works fine before spark3, running on spark3 throwsÂ
java.lang.ArrayIndexOutOfBoundsException
{code:java}
val inputPath = "/Users/xyz/data/testcaseInsensitivity"
val output_path = "/Users/xyz/output"
spark.range(10).write.format("parquet").save(inputPath)
def process_row(microBatch: DataFrame, batchId: Long): Unit = {
val df = microBatch.select($"ID".alias("other")) // Doesn't work
df.write.format("parquet").mode("append").save(output_path)
}
val schema = new StructType().add("id", LongType)
val stream_df = spark.readStream.schema(schema).format("parquet").load(inputPath)
stream_df.writeStream.trigger(Trigger.Once).foreachBatch(process_row _)
.start().awaitTermination()
{code}
Stack Trace:
{code:java}
Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
at org.apache.spark.sql.types.StructType.apply(StructType.scala:414)
at org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$$anonfun$apply$4.$anonfun$applyOrElse$3(objects.scala:216)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.immutable.List.map(List.scala:298)
at org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$$anonfun$apply$4.applyOrElse(objects.scala:215)
at org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$$anonfun$apply$4.applyOrElse(objects.scala:203)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:309)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:309)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:314)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:314)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:298)
at org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$.apply(objects.scala:203)
at org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$.apply(objects.scala:121)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:149)
at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:146)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:138)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:138)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:116)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:116)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:82)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:133)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:82)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:79)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$4(QueryExecution.scala:197)
at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:381)
at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$writePlans(QueryExecution.scala:197)
at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:207)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:95)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:944)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:396)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:380)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:269)
at process_row(<console>:32)
at $anonfun$res4$1(<console>:30)
at $anonfun$res4$1$adapted(<console>:30)
at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:36)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:573)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:571)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:571)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:223)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191)
at org.apache.spark.sql.execution.streaming.OneTimeExecutor.execute(TriggerExecutor.scala:39)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
... 1 more
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org