You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/12/30 21:12:55 UTC
[GitHub] [iceberg] andersonad opened a new issue #3829: Spark Stored Procedure expire_snapshots fails with AWS S3 FileIO implementation.
andersonad opened a new issue #3829:
URL: https://github.com/apache/iceberg/issues/3829
When I ran the following code to test the `expire_snapshots` procedure in pyspark, I get a "failed to get file system" for an s3 file, despite using the S3FileIO for the `spark.sql.catalog.{catalog_name}.io-impl`. Is there a way to get around this error in order to execute `expire_snapshots` on AWS s3 iceberg tables?
Note that I do not get this error when using `rewrite_manifests`
```
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import time
spark_packages = [
'org.apache.iceberg:iceberg-spark3-runtime:0.12.1',
'software.amazon.awssdk:bundle:2.16.43',
'software.amazon.awssdk:url-connection-client:2.16.43',
]
catalog_name = 'iceberg_dev'
spark = SparkSession \
.builder \
.config('spark.jars.packages', ','.join(spark_packages)) \
.config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
.config(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog') \
.config(f'spark.sql.catalog.{catalog_name}.warehouse', f's3://your_bucket_here/{catalog_name}') \
.config(f'spark.sql.catalog.{catalog_name}.catalog-impl', 'org.apache.iceberg.aws.glue.GlueCatalog') \
.config(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO') \
.config(f'spark.sql.catalog.{catalog_name}.lock-impl', 'org.apache.iceberg.aws.glue.DynamoLockManager') \
.config(f'spark.sql.catalog.{catalog_name}.lock.table', f'{catalog_name}') \
.getOrCreate()
subName = 'product.testtable'
tableName = f"{catalog_name}.{subName}"
spark.sql(f"DROP TABLE IF EXISTS {tableName}")
spark.sql(f"CREATE TABLE {tableName} (id bigint NOT NULL, data string) USING iceberg")
spark.sql(f"INSERT INTO TABLE {tableName} VALUES (1, 'a')")
time.sleep(3)
spark.sql(f"INSERT INTO TABLE {tableName} VALUES (2, 'b')")
expiretime = str(spark.sql(f"SELECT * FROM {tableName}.snapshots").tail(1)[0]['committed_at'])
spark.sql(f"CALL {catalog_name}.system.expire_snapshots('{subName}', TIMESTAMP '{expiretime}')").show()
```
The error is as follows:
```Py4JJavaError: An error occurred while calling o40.sql.
: org.apache.iceberg.exceptions.RuntimeIOException: Failed to get file system for path: s3://iceberg_dev/product.db/testtable/metadata/00002-3942827a-4fa3-4b6b-8993-353f34bae6f5.metadata.json
at org.apache.iceberg.hadoop.Util.getFs(Util.java:50)
at org.apache.iceberg.hadoop.HadoopInputFile.fromPath(HadoopInputFile.java:75)
at org.apache.iceberg.hadoop.HadoopInputFile.fromLocation(HadoopInputFile.java:54)
at org.apache.iceberg.hadoop.HadoopFileIO.newInputFile(HadoopFileIO.java:59)
at org.apache.iceberg.TableMetadataParser.read(TableMetadataParser.java:233)
at org.apache.iceberg.StaticTableOperations.<init>(StaticTableOperations.java:40)
at org.apache.iceberg.hadoop.HadoopTables.newTableOps(HadoopTables.java:195)
at org.apache.iceberg.hadoop.HadoopTables.loadMetadataTable(HadoopTables.java:121)
at org.apache.iceberg.hadoop.HadoopTables.load(HadoopTables.java:82)
at org.apache.iceberg.spark.SparkCatalog.load(SparkCatalog.java:451)
at org.apache.iceberg.spark.SparkCatalog.loadTable(SparkCatalog.java:116)
at org.apache.iceberg.spark.SparkCatalog.loadTable(SparkCatalog.java:79)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$1(DataFrameReader.scala:271)
at scala.Option.map(Option.scala:230)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:248)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:232)
at org.apache.iceberg.actions.BaseSparkAction.loadMetadataTable(BaseSparkAction.java:148)
at org.apache.iceberg.actions.BaseSparkAction.buildValidDataFileDF(BaseSparkAction.java:95)
at org.apache.iceberg.actions.ExpireSnapshotsAction.buildValidFileDF(ExpireSnapshotsAction.java:226)
at org.apache.iceberg.actions.ExpireSnapshotsAction.expire(ExpireSnapshotsAction.java:185)
at org.apache.iceberg.actions.ExpireSnapshotsAction.execute(ExpireSnapshotsAction.java:217)
at org.apache.iceberg.spark.procedures.ExpireSnapshotsProcedure.lambda$call$0(ExpireSnapshotsProcedure.java:97)
at org.apache.iceberg.spark.procedures.BaseProcedure.execute(BaseProcedure.java:75)
at org.apache.iceberg.spark.procedures.BaseProcedure.modifyIcebergTable(BaseProcedure.java:64)
at org.apache.iceberg.spark.procedures.ExpireSnapshotsProcedure.call(ExpireSnapshotsProcedure.java:84)
at org.apache.spark.sql.execution.datasources.v2.CallExec.run(CallExec.scala:33)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:39)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:39)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:45)
at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:610)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:605)
at jdk.internal.reflect.GeneratedMethodAccessor32.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3281)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3301)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
at org.apache.iceberg.hadoop.Util.getFs(Util.java:48)
... 53 more
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] andersonad commented on issue #3829: Spark sql Stored Procedure expire_snapshots fails with AWS S3 FileIO implementation.
Posted by GitBox <gi...@apache.org>.
andersonad commented on issue #3829:
URL: https://github.com/apache/iceberg/issues/3829#issuecomment-1004230103
Tested this on master with the 0.13.0 snapshot and it works correctly. Closing this issue, sorry.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] andersonad closed issue #3829: Spark sql Stored Procedure expire_snapshots fails with AWS S3 FileIO implementation.
Posted by GitBox <gi...@apache.org>.
andersonad closed issue #3829:
URL: https://github.com/apache/iceberg/issues/3829
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org