You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/07/13 14:10:15 UTC
[GitHub] [iceberg] naseerscorpio commented on issue #5174: MERGE INTO TABLE is not supported temporarily on Spark3.2.0, Scala2.12, Iceberg0.13.1
naseerscorpio commented on issue #5174:
URL: https://github.com/apache/iceberg/issues/5174#issuecomment-1183275972
Hi, I'm also facing this issue when using iceberg with AWS Glue integration. Here is my classpath and code to reproduce the issue. Strangely, same queries work when run from `spark-shell`
**Classpath**
```scala
val sparkVersion = "3.2.1"
val icebergVersion = "0.13.2"
val awsSDKVersion = "2.17.131"
libraryDependencies := Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.iceberg" % "iceberg-spark-runtime-3.2_2.12" % icebergVersion,
"software.amazon.awssdk" % "url-connection-client" % awsSDKVersion,
"software.amazon.awssdk" % "bundle" % awsSDKVersion,
"org.apache.hadoop" % "hadoop-common" % "3.3.1",
"org.apache.hadoop" % "hadoop-aws" % "3.3.1",
"com.typesafe" % "config" % "1.3.3",
"com.github.scopt" %% "scopt" % "3.7.0"
)
```
```scala
object IcebergApp extends App {
val conf = new SparkConf()
conf.setMaster("local[*]")
conf.setAppName("iceberg-demo")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryoserializer.buffer.max", "256m")
conf.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog")
conf.set("sparl.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
conf.set("spark.sql.catalog.spark_catalog.warehouse", "s3://some-bucket/test_iceberg/")
conf.set("spark.sql.catalog.spark_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
conf.set("spark.sql.catalog.spark_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
conf.set("spark.sql.iceberg.handle-timestamp-without-timezone", "true")
val sparkSession = SparkSession.builder().config(conf).getOrCreate()
import org.apache.spark.sql.DataFrame
def readInputDF() = {
import org.apache.spark.sql.types._
val sparkSchema = StructType(
Array(
StructField("id",LongType,true),
StructField("dep",StringType,true),
StructField("created_ts",TimestampType,true)
))
val input_df = sparkSession.read
.schema(sparkSchema)
.option("inferSchema", "false")
.option("mode", "PERMISSIVE")
.format("json")
.load("employees_part1.json")
input_df.sortWithinPartitions("created_ts")
.createOrReplaceTempView("input_df")
}
def doMerge() = {
sparkSession.sql("MERGE INTO test_iceberg.employees t USING (SELECT * FROM input_df) s ON t.id = s.id WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *")
}
private def createTableIfNotExists(): DataFrame = {
sparkSession.sql(s"""
|CREATE OR REPLACE TABLE test_iceberg.employees (
| id bigint,
|dep string,
|created_ts timestamp
|)
|USING ICEBERG
|PARTITIONED BY (days(created_ts))
|LOCATION "s3://some-bucket/test_iceberg/employees/"
|TBLPROPERTIES (
| 'write.distribution-mode'='hash',
| 'write.metadata.delete-after-commit.enabled'='true',
| 'write.metadata.previous-versions-max'='9'
|)
|""".stripMargin)
}
createTableIfNotExists()
readInputDF()
doMerge()
}
```
**Error**
```
Exception in thread "main" java.lang.UnsupportedOperationException: MERGE INTO TABLE is not supported temporarily.
```
Could you advise if am missing something here ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org