You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/07/13 14:10:15 UTC

[GitHub] [iceberg] naseerscorpio commented on issue #5174: MERGE INTO TABLE is not supported temporarily on Spark3.2.0, Scala2.12, Iceberg0.13.1

naseerscorpio commented on issue #5174:
URL: https://github.com/apache/iceberg/issues/5174#issuecomment-1183275972

   Hi, I'm also facing this issue when using iceberg with AWS Glue integration. Here is my classpath and code to reproduce the issue. Strangely, same queries work when run from `spark-shell`
   
   **Classpath**
   
   ```scala
   val sparkVersion   = "3.2.1"
   val icebergVersion = "0.13.2"
   val awsSDKVersion = "2.17.131"
   
   libraryDependencies := Seq(
     "org.apache.spark" %% "spark-core" % sparkVersion,
     "org.apache.spark" %% "spark-sql" % sparkVersion,
     "org.apache.iceberg"     % "iceberg-spark-runtime-3.2_2.12" % icebergVersion,
     "software.amazon.awssdk" % "url-connection-client"          % awsSDKVersion,
     "software.amazon.awssdk" % "bundle"          % awsSDKVersion,
     "org.apache.hadoop" % "hadoop-common" % "3.3.1",
     "org.apache.hadoop"      % "hadoop-aws"  % "3.3.1",
     "com.typesafe" % "config" % "1.3.3",
     "com.github.scopt" %% "scopt" % "3.7.0"
   )
   ```
   
   ```scala
   object IcebergApp extends App {
       val conf = new SparkConf()
         conf.setMaster("local[*]")
         conf.setAppName("iceberg-demo")
         conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
         conf.set("spark.kryoserializer.buffer.max", "256m")
         conf.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog")
         conf.set("sparl.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
         conf.set("spark.sql.catalog.spark_catalog.warehouse", "s3://some-bucket/test_iceberg/")
         conf.set("spark.sql.catalog.spark_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
         conf.set("spark.sql.catalog.spark_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
         conf.set("spark.sql.iceberg.handle-timestamp-without-timezone", "true")
       
         val sparkSession = SparkSession.builder().config(conf).getOrCreate()
       
         import org.apache.spark.sql.DataFrame
       
         def readInputDF() = {
           import org.apache.spark.sql.types._
           val sparkSchema = StructType(
             Array(
               StructField("id",LongType,true),
               StructField("dep",StringType,true),
               StructField("created_ts",TimestampType,true)
             ))
           val input_df =  sparkSession.read
             .schema(sparkSchema)
             .option("inferSchema", "false")
             .option("mode", "PERMISSIVE")
             .format("json")
             .load("employees_part1.json")
       
           input_df.sortWithinPartitions("created_ts")
             .createOrReplaceTempView("input_df")
         }
       
       
         def doMerge() = {
           sparkSession.sql("MERGE INTO test_iceberg.employees t USING (SELECT * FROM input_df) s ON t.id = s.id WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *")
         }
       
       
         private def createTableIfNotExists(): DataFrame = {
           sparkSession.sql(s"""
                               |CREATE OR REPLACE TABLE test_iceberg.employees (
                               |  id bigint,
                               |dep string,
                               |created_ts timestamp
                               |)
                               |USING ICEBERG
                               |PARTITIONED BY (days(created_ts))
                               |LOCATION "s3://some-bucket/test_iceberg/employees/"
                               |TBLPROPERTIES (
                               |  'write.distribution-mode'='hash',
                               |  'write.metadata.delete-after-commit.enabled'='true',
                               |  'write.metadata.previous-versions-max'='9'
                               |)
                               |""".stripMargin)
         }
       
         createTableIfNotExists()
         readInputDF()
         doMerge()
   }
   ```
   
   **Error**
   ```
   Exception in thread "main" java.lang.UnsupportedOperationException: MERGE INTO TABLE is not supported temporarily.
   ```
   
   Could you advise if am missing something here ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org