You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/12/30 21:12:55 UTC

[GitHub] [iceberg] andersonad opened a new issue #3829: Spark Stored Procedure expire_snapshots fails with AWS S3 FileIO implementation.

andersonad opened a new issue #3829:
URL: https://github.com/apache/iceberg/issues/3829


   When I ran the following code to test the `expire_snapshots` procedure in pyspark, I get a "failed to get file system" for an s3 file, despite using the S3FileIO for the `spark.sql.catalog.{catalog_name}.io-impl`. Is there a way to get around this error in order to execute `expire_snapshots` on AWS s3 iceberg tables?
   
   Note that I do not get this error when using `rewrite_manifests`
   
   ```
   from pyspark.sql import SparkSession
   from pyspark.sql.types import StructType, StructField, StringType, IntegerType
   import time
   
   spark_packages = [
       'org.apache.iceberg:iceberg-spark3-runtime:0.12.1',
       'software.amazon.awssdk:bundle:2.16.43',
       'software.amazon.awssdk:url-connection-client:2.16.43',
   ]
   
   catalog_name = 'iceberg_dev'
   
   spark = SparkSession \
       .builder \
       .config('spark.jars.packages', ','.join(spark_packages)) \
       .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
       .config(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog') \
       .config(f'spark.sql.catalog.{catalog_name}.warehouse', f's3://your_bucket_here/{catalog_name}') \
       .config(f'spark.sql.catalog.{catalog_name}.catalog-impl', 'org.apache.iceberg.aws.glue.GlueCatalog') \
       .config(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO') \
       .config(f'spark.sql.catalog.{catalog_name}.lock-impl', 'org.apache.iceberg.aws.glue.DynamoLockManager') \
       .config(f'spark.sql.catalog.{catalog_name}.lock.table', f'{catalog_name}') \
       .getOrCreate()
   
   subName = 'product.testtable'
   tableName = f"{catalog_name}.{subName}"
   spark.sql(f"DROP TABLE IF EXISTS {tableName}")
   spark.sql(f"CREATE TABLE {tableName} (id bigint NOT NULL, data string) USING iceberg")
   spark.sql(f"INSERT INTO TABLE {tableName} VALUES (1, 'a')")
   time.sleep(3)
   spark.sql(f"INSERT INTO TABLE {tableName} VALUES (2, 'b')")
   expiretime = str(spark.sql(f"SELECT * FROM {tableName}.snapshots").tail(1)[0]['committed_at'])
   spark.sql(f"CALL {catalog_name}.system.expire_snapshots('{subName}', TIMESTAMP '{expiretime}')").show()
   ```
   
   The error is as follows:
   ```Py4JJavaError: An error occurred while calling o40.sql.
   : org.apache.iceberg.exceptions.RuntimeIOException: Failed to get file system for path: s3://iceberg_dev/product.db/testtable/metadata/00002-3942827a-4fa3-4b6b-8993-353f34bae6f5.metadata.json
   	at org.apache.iceberg.hadoop.Util.getFs(Util.java:50)
   	at org.apache.iceberg.hadoop.HadoopInputFile.fromPath(HadoopInputFile.java:75)
   	at org.apache.iceberg.hadoop.HadoopInputFile.fromLocation(HadoopInputFile.java:54)
   	at org.apache.iceberg.hadoop.HadoopFileIO.newInputFile(HadoopFileIO.java:59)
   	at org.apache.iceberg.TableMetadataParser.read(TableMetadataParser.java:233)
   	at org.apache.iceberg.StaticTableOperations.<init>(StaticTableOperations.java:40)
   	at org.apache.iceberg.hadoop.HadoopTables.newTableOps(HadoopTables.java:195)
   	at org.apache.iceberg.hadoop.HadoopTables.loadMetadataTable(HadoopTables.java:121)
   	at org.apache.iceberg.hadoop.HadoopTables.load(HadoopTables.java:82)
   	at org.apache.iceberg.spark.SparkCatalog.load(SparkCatalog.java:451)
   	at org.apache.iceberg.spark.SparkCatalog.loadTable(SparkCatalog.java:116)
   	at org.apache.iceberg.spark.SparkCatalog.loadTable(SparkCatalog.java:79)
   	at org.apache.spark.sql.DataFrameReader.$anonfun$load$1(DataFrameReader.scala:271)
   	at scala.Option.map(Option.scala:230)
   	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:248)
   	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:232)
   	at org.apache.iceberg.actions.BaseSparkAction.loadMetadataTable(BaseSparkAction.java:148)
   	at org.apache.iceberg.actions.BaseSparkAction.buildValidDataFileDF(BaseSparkAction.java:95)
   	at org.apache.iceberg.actions.ExpireSnapshotsAction.buildValidFileDF(ExpireSnapshotsAction.java:226)
   	at org.apache.iceberg.actions.ExpireSnapshotsAction.expire(ExpireSnapshotsAction.java:185)
   	at org.apache.iceberg.actions.ExpireSnapshotsAction.execute(ExpireSnapshotsAction.java:217)
   	at org.apache.iceberg.spark.procedures.ExpireSnapshotsProcedure.lambda$call$0(ExpireSnapshotsProcedure.java:97)
   	at org.apache.iceberg.spark.procedures.BaseProcedure.execute(BaseProcedure.java:75)
   	at org.apache.iceberg.spark.procedures.BaseProcedure.modifyIcebergTable(BaseProcedure.java:64)
   	at org.apache.iceberg.spark.procedures.ExpireSnapshotsProcedure.call(ExpireSnapshotsProcedure.java:84)
   	at org.apache.spark.sql.execution.datasources.v2.CallExec.run(CallExec.scala:33)
   	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:39)
   	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:39)
   	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:45)
   	at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
   	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
   	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
   	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
   	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
   	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
   	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
   	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
   	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
   	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
   	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
   	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
   	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:610)
   	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
   	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:605)
   	at jdk.internal.reflect.GeneratedMethodAccessor32.invoke(Unknown Source)
   	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
   	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
   	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
   	at py4j.Gateway.invoke(Gateway.java:282)
   	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
   	at py4j.commands.CallCommand.execute(CallCommand.java:79)
   	at py4j.GatewayConnection.run(GatewayConnection.java:238)
   	at java.base/java.lang.Thread.run(Thread.java:829)
   Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
   	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3281)
   	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3301)
   	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
   	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
   	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
   	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
   	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
   	at org.apache.iceberg.hadoop.Util.getFs(Util.java:48)
   	... 53 more
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] andersonad commented on issue #3829: Spark sql Stored Procedure expire_snapshots fails with AWS S3 FileIO implementation.

Posted by GitBox <gi...@apache.org>.
andersonad commented on issue #3829:
URL: https://github.com/apache/iceberg/issues/3829#issuecomment-1004230103


   Tested this on master with the 0.13.0 snapshot and it works correctly. Closing this issue, sorry.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] andersonad closed issue #3829: Spark sql Stored Procedure expire_snapshots fails with AWS S3 FileIO implementation.

Posted by GitBox <gi...@apache.org>.
andersonad closed issue #3829:
URL: https://github.com/apache/iceberg/issues/3829


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org