You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sebastian Daberdaku (Jira)" <ji...@apache.org> on 2023/10/19 10:05:00 UTC

[jira] [Commented] (SPARK-45289) ClassCastException when reading Delta table on AWS S3

    [ https://issues.apache.org/jira/browse/SPARK-45289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17777170#comment-17777170 ] 

Sebastian Daberdaku commented on SPARK-45289:
---------------------------------------------

Hello [~tanawatpan], you need to use the latest delta-spark version 3.0.0 which came out just yesterday. It now supports delta with Spark 3.5.0.
https://github.com/delta-io/delta/releases/tag/v3.0.0

> ClassCastException when reading Delta table on AWS S3
> -----------------------------------------------------
>
>                 Key: SPARK-45289
>                 URL: https://issues.apache.org/jira/browse/SPARK-45289
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, SQL
>    Affects Versions: 3.5.0
>         Environment: Spark version: 3.5.0
> Deployment mode: spark-shell
> OS: Ubuntu (Docker image)
> Java/JVM version: OpenJDK 11
> Packages: hadoop-aws:3.3.4, delta-core_2.12:2.4.0
>            Reporter: Tanawat Panmongkol
>            Priority: Major
>
> When attempting to read a Delta table from S3 using version 3.5.0, a _*{{ClassCastException}}*_ occurs involving {{_*org.apache.hadoop.fs.s3a.S3AFileStatus*_}} and {_}*{{org.apache.spark.sql.execution.datasources.FileStatusWithMetadata}}*{_}. The error appears to be related to the new feature SPARK-43039.
> _*Steps to Reproduce:*_
> {code:java}
> export AWS_ACCESS_KEY_ID='<ACCESS_KEY>'
> export AWS_SECRET_ACCESS_KEY='<SECRET_KEY>'
> export AWS_REGION='<REGION>'
> docker run --rm -it apache/spark:3.5.0-scala2.12-java11-ubuntu /opt/spark/bin/spark-shell \
> --packages 'org.apache.hadoop:hadoop-aws:3.3.4,io.delta:delta-core_2.12:2.4.0' \
> --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
> --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
> --conf "spark.hadoop.aws.region=$AWS_REGION" \
> --conf "spark.hadoop.fs.s3a.access.key=$AWS_ACCESS_KEY_ID" \
> --conf "spark.hadoop.fs.s3a.secret.key=$AWS_SECRET_ACCESS_KEY" \
> --conf "spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem" \
> --conf "spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem" \
> --conf "spark.hadoop.fs.s3a.path.style.access=true" \
> --conf "spark.hadoop.fs.s3a.connection.ssl.enabled=true" \
> --conf "spark.jars.ivy=/tmp/ivy/cache"{code}
> {code:java}
> scala> spark.read.format("delta").load("s3://<bucket_name>/<delta_table_name>/").show() {code}
> *Logs:*
> {code:java}
> java.lang.ClassCastException: class org.apache.hadoop.fs.s3a.S3AFileStatus cannot be cast to class org.apache.spark.sql.execution.datasources.FileStatusWithMetadata (org.apache.hadoop.fs.s3a.S3AFileStatus is in unnamed module of loader scala.reflect.internal.util.ScalaClassLoader$URLClassLoader @4552f905; org.apache.spark.sql.execution.datasources.FileStatusWithMetadata is in unnamed module of loader 'app')
>   at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
>   at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
>   at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:286)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:108)
>   at org.apache.spark.sql.execution.FileSourceScanLike.$anonfun$setFilesNumAndSizeMetric$2(DataSourceScanExec.scala:466)
>   at org.apache.spark.sql.execution.FileSourceScanLike.$anonfun$setFilesNumAndSizeMetric$2$adapted(DataSourceScanExec.scala:466)
>   at scala.collection.immutable.List.map(List.scala:293)
>   at org.apache.spark.sql.execution.FileSourceScanLike.setFilesNumAndSizeMetric(DataSourceScanExec.scala:466)
>   at org.apache.spark.sql.execution.FileSourceScanLike.selectedPartitions(DataSourceScanExec.scala:257)
>   at org.apache.spark.sql.execution.FileSourceScanLike.selectedPartitions$(DataSourceScanExec.scala:251)
>   at org.apache.spark.sql.execution.FileSourceScanExec.selectedPartitions$lzycompute(DataSourceScanExec.scala:506)
>   at org.apache.spark.sql.execution.FileSourceScanExec.selectedPartitions(DataSourceScanExec.scala:506)
>   at org.apache.spark.sql.execution.FileSourceScanLike.dynamicallySelectedPartitions(DataSourceScanExec.scala:286)
>   at org.apache.spark.sql.execution.FileSourceScanLike.dynamicallySelectedPartitions$(DataSourceScanExec.scala:267)
>   at org.apache.spark.sql.execution.FileSourceScanExec.dynamicallySelectedPartitions$lzycompute(DataSourceScanExec.scala:506)
>   at org.apache.spark.sql.execution.FileSourceScanExec.dynamicallySelectedPartitions(DataSourceScanExec.scala:506)
>   at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:553)
>   at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:537)
>   at org.apache.spark.sql.execution.FileSourceScanExec.doExecute(DataSourceScanExec.scala:575)
>   at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:195)
>   at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
>   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:191)
>   at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:527)
>   at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:455)
>   at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:454)
>   at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:498)
>   at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:51)
>   at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:242)
>   at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:51)
>   at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:751)
>   at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:195)
>   at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
>   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:191)
>   at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:364)
>   at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:445)
>   at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4344)
>   at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3585)
>   at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4334)
>   at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
>   at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4332)
>   at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
>   at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
>   at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
>   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
>   at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
>   at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4332)
>   at org.apache.spark.sql.Dataset.collect(Dataset.scala:3585)
>   at org.apache.spark.sql.delta.Snapshot.protocolAndMetadataReconstruction(Snapshot.scala:215)
>   at org.apache.spark.sql.delta.Snapshot.x$1$lzycompute(Snapshot.scala:134)
>   at org.apache.spark.sql.delta.Snapshot.x$1(Snapshot.scala:129)
>   at org.apache.spark.sql.delta.Snapshot._metadata$lzycompute(Snapshot.scala:129)
>   at org.apache.spark.sql.delta.Snapshot._metadata(Snapshot.scala:129)
>   at org.apache.spark.sql.delta.Snapshot.metadata(Snapshot.scala:197)
>   at org.apache.spark.sql.delta.Snapshot.toString(Snapshot.scala:390)
>   at java.base/java.lang.String.valueOf(Unknown Source)
>   at java.base/java.lang.StringBuilder.append(Unknown Source)
>   at org.apache.spark.sql.delta.Snapshot.$anonfun$new$1(Snapshot.scala:393)
>   at org.apache.spark.sql.delta.Snapshot.$anonfun$logInfo$1(Snapshot.scala:370)
>   at org.apache.spark.internal.Logging.logInfo(Logging.scala:60)
>   at org.apache.spark.internal.Logging.logInfo$(Logging.scala:59)
>   at org.apache.spark.sql.delta.Snapshot.logInfo(Snapshot.scala:370)
>   at org.apache.spark.sql.delta.Snapshot.<init>(Snapshot.scala:393)
>   at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$createSnapshot$4(SnapshotManagement.scala:356)
>   at org.apache.spark.sql.delta.SnapshotManagement.createSnapshotFromGivenOrEquivalentLogSegment(SnapshotManagement.scala:480)
>   at org.apache.spark.sql.delta.SnapshotManagement.createSnapshotFromGivenOrEquivalentLogSegment$(SnapshotManagement.scala:468)
>   at org.apache.spark.sql.delta.DeltaLog.createSnapshotFromGivenOrEquivalentLogSegment(DeltaLog.scala:74)
>   at org.apache.spark.sql.delta.SnapshotManagement.createSnapshot(SnapshotManagement.scala:349)
>   at org.apache.spark.sql.delta.SnapshotManagement.createSnapshot$(SnapshotManagement.scala:343)
>   at org.apache.spark.sql.delta.DeltaLog.createSnapshot(DeltaLog.scala:74)
>   at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$createSnapshotAtInitInternal$1(SnapshotManagement.scala:304)
>   at scala.Option.map(Option.scala:230)
>   at org.apache.spark.sql.delta.SnapshotManagement.createSnapshotAtInitInternal(SnapshotManagement.scala:301)
>   at org.apache.spark.sql.delta.SnapshotManagement.createSnapshotAtInitInternal$(SnapshotManagement.scala:298)
>   at org.apache.spark.sql.delta.DeltaLog.createSnapshotAtInitInternal(DeltaLog.scala:74)
>   at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$getSnapshotAtInit$1(SnapshotManagement.scala:293)
>   at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140)
>   at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138)
>   at org.apache.spark.sql.delta.DeltaLog.recordFrameProfile(DeltaLog.scala:74)
>   at org.apache.spark.sql.delta.SnapshotManagement.getSnapshotAtInit(SnapshotManagement.scala:288)
>   at org.apache.spark.sql.delta.SnapshotManagement.getSnapshotAtInit$(SnapshotManagement.scala:287)
>   at org.apache.spark.sql.delta.DeltaLog.getSnapshotAtInit(DeltaLog.scala:74)
>   at org.apache.spark.sql.delta.SnapshotManagement.$init$(SnapshotManagement.scala:57)
>   at org.apache.spark.sql.delta.DeltaLog.<init>(DeltaLog.scala:80)
>   at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$4(DeltaLog.scala:790)
>   at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
>   at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$3(DeltaLog.scala:785)
>   at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140)
>   at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138)
>   at org.apache.spark.sql.delta.DeltaLog$.recordFrameProfile(DeltaLog.scala:595)
>   at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:133)
>   at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128)
>   at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117)
>   at org.apache.spark.sql.delta.DeltaLog$.recordOperation(DeltaLog.scala:595)
>   at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:132)
>   at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:122)
>   at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:112)
>   at org.apache.spark.sql.delta.DeltaLog$.recordDeltaOperation(DeltaLog.scala:595)
>   at org.apache.spark.sql.delta.DeltaLog$.createDeltaLog$1(DeltaLog.scala:784)
>   at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$5(DeltaLog.scala:802)
>   at com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4792)
>   at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
>   at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
>   at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
>   at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257)
>   at com.google.common.cache.LocalCache.get(LocalCache.java:4000)
>   at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789)
>   at org.apache.spark.sql.delta.DeltaLog$.getDeltaLogFromCache$1(DeltaLog.scala:801)
>   at org.apache.spark.sql.delta.DeltaLog$.apply(DeltaLog.scala:811)
>   at org.apache.spark.sql.delta.DeltaLog$.forTable(DeltaLog.scala:658)
>   at org.apache.spark.sql.delta.catalog.DeltaTableV2.deltaLog$lzycompute(DeltaTableV2.scala:85)
>   at org.apache.spark.sql.delta.catalog.DeltaTableV2.deltaLog(DeltaTableV2.scala:84)
>   at org.apache.spark.sql.delta.catalog.DeltaTableV2.$anonfun$snapshot$3(DeltaTableV2.scala:122)
>   at scala.Option.getOrElse(Option.scala:189)
>   at org.apache.spark.sql.delta.catalog.DeltaTableV2.snapshot$lzycompute(DeltaTableV2.scala:122)
>   at org.apache.spark.sql.delta.catalog.DeltaTableV2.snapshot(DeltaTableV2.scala:103)
>   at org.apache.spark.sql.delta.catalog.DeltaTableV2.toBaseRelation(DeltaTableV2.scala:178)
>   at org.apache.spark.sql.delta.sources.DeltaDataSource.$anonfun$createRelation$5(DeltaDataSource.scala:230)
>   at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140)
>   at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138)
>   at org.apache.spark.sql.delta.sources.DeltaDataSource.recordFrameProfile(DeltaDataSource.scala:49)
>   at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:188)
>   at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
>   at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
>   at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
>   at scala.Option.getOrElse(Option.scala:189)
>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:186)
>   ... 47 elided
> {code}
> The issue does not occur with the following configurations:
>  # _Spark 3.3.3, hadoop-aws:3.3.2, delta-core_2.12:1.2.1_
>  # _Spark 3.4.1, hadoop-aws:3.3.4, delta-core_2.12:2.4.0_



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org