You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/08/03 02:51:32 UTC
[GitHub] [iceberg] jessiedanwang opened a new issue, #5424: MERGE INTO TABLE is not supported temporarily
jessiedanwang opened a new issue, #5424:
URL: https://github.com/apache/iceberg/issues/5424
### Query engine
Spark 3.1.2 on EMR 6.5
### Question
We are running Spark 3.1.2 application on EMR 6.5 in account BBB while writing data to S3 and Glue catalog in account AAA. I would appreciate it any ideas what could be wrong here
Here is what we did,
spark = SparkSession
.builder()
.config("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "CORRECTED")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") // Iceberg related configs
.config("spark.sql.autoBroadcastJoinThreshold", "-1")
.config("spark.sql.adaptive.autoBroadcastJoinThreshold", "-1")
.config(s"spark.sql.catalog.iceberg_catalog", "org.apache.iceberg.spark.SparkCatalog")
.config(s"spark.sql.catalog.iceberg_catalog.warehouse", warehousePath)
.config(s"spark.sql.catalog.iceberg_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
.config(s"spark.sql.catalog.iceberg_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
.config(s"spark.sql.catalog.iceberg_catalog.client.factory", "org.apache.iceberg.aws.AssumeRoleAwsClientFactory")
.config(s"spark.sql.catalog.iceberg_catalog.client.assume-role.arn", s"arn:aws:iam::AAA:role/RoleToAssume")
.config(s"spark.sql.catalog.iceberg_catalog.client.assume-role.region", "us-east-2")
.config("spark.hadoop.hive.metastore.glue.catalogid", AAA)
.enableHiveSupport()
.getOrCreate()
df.writeTo("ns_name.table_name")
.tableProperty("table_type", "iceberg")
.tableProperty("format-version", "2")
//.tableProperty("engine.hive.enabled", "true")
.tableProperty("write.distribution-mode", "hash")
.tableProperty("write.spark.fanout.enabled", "true")
.tableProperty("write.parquet.compression-codec", "snappy")
.tableProperty("write.avro.compression-codec", "snappy")
.tableProperty("write.metadata.delete-after-commit.enabled", "true")
.tableProperty("write.metadata.previous-versions-max", "3")
.tableProperty("write.target-file-size-bytes", s"$GIGABYTE")
.create()
df2.createOrReplaceGlobalTempView("temp_view")
val query: String = s"""MERGE INTO ns_name.table_name t
|USING (SELECT * FROM global_temp.temp_view) s
|s.id = t.id
|WHEN MATCHED THEN UPDATE SET *
|WHEN NOT MATCHED THEN INSERT *
|""".stripMargin
spark.sql(query)
Got the following error when trying to do MERGE INTO
java.lang.UnsupportedOperationException: MERGE INTO TABLE is not supported temporarily.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] suisenkotoba commented on issue #5424: MERGE INTO TABLE is not supported temporarily
Posted by "suisenkotoba (via GitHub)" <gi...@apache.org>.
suisenkotoba commented on issue #5424:
URL: https://github.com/apache/iceberg/issues/5424#issuecomment-1451638558
anyone has any luck with this? I faced the same issue, I could not find what caused this.
I am not sure if there's something wrong with my spark configuration. I use spark 3.3 on dataproc ([image version 2.1](https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-release-2.1)) with iceberg 1.1.0. The dataproc cluster already had dataproc metastore attached.
This is my python code:
```python
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import LongType, BooleanType, StructField, StructType
conf = (
SparkConf()
.setAppName('test_iceberg')
.set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
.set('spark.sql.catalog.spark_catalog', 'org.apache.iceberg.spark.SparkSessionCatalog')
.set('spark.sql.catalog.spark_catalog.type', 'hive')
.set(f'spark.sql.catalog.dev', 'org.apache.iceberg.spark.SparkCatalog')
.set(f'spark.sql.catalog.dev.type', 'hive')
)
spark = SparkSession.builder.enableHiveSupport().config(conf=conf).getOrCreate()
df = spark.read.option("mergeSchema", "true").parquet('gs://poc-kafka/mis').sort("timestamp")
# Create iceberg table
schema = df.select("value.after.*").schema
additional_fields = [StructField('start_at', LongType(), True),
StructField('end_at', LongType(), True),
StructField('is_current', BooleanType(), True)]
schema = StructType(schema.fields + additional_fields)
empty_df = spark.createDataFrame([], schema)
spark.sql("CREATE DATABASE IF NOT EXISTS dev.dummy")
empty_df.createOrReplaceTempView("empty")
spark.sql("CREATE TABLE IF NOT EXISTS dev.dummy.fruit USING iceberg "
"TBLPROPERTIES ('format-version'='2') "
"AS (SELECT * FROM empty) ")
# Merge table
df.createOrReplaceTempView("changelog")
sql = "MERGE INTO dev.dummy.fruit d " \
"USING (SELECT value.after.*, value.op op, " \
" value.source.ts_ms start_at, True is_current, " \
" NULL end_at " \
" FROM changelog) s " \
"ON d.id = s.id " \
"WHEN MATCHED THEN UPDATE SET d.is_current = False, d.end_at = s.start_at " \
"WHEN NOT MATCHED THEN INSERT * "
spark.sql(sql)
```
This is the traceback I got:
```
/usr/lib/spark/python/pyspark/sql/session.py in sql(self, sqlQuery, **kwargs)
1032 sqlQuery = formatter.format(sqlQuery, **kwargs)
1033 try:
-> 1034 return DataFrame(self._jsparkSession.sql(sqlQuery), self)
1035 finally:
1036 if len(kwargs) > 0:
/usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py in __call__(self, *args)
1319
1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
1322 answer, self.gateway_client, self.target_id, self.name)
1323
/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
188 def deco(*a: Any, **kw: Any) -> Any:
189 try:
--> 190 return f(*a, **kw)
191 except Py4JJavaError as e:
192 converted = convert_exception(e.java_exception)
/usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o67.sql.
: java.lang.UnsupportedOperationException: MERGE INTO TABLE is not supported temporarily.
at org.apache.spark.sql.errors.QueryExecutionErrors$.ddlUnsupportedTemporarilyError(QueryExecutionErrors.scala:891)
at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:821)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:459)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:145)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:145)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:138)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:158)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:158)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151)
at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:204)
at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:249)
at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:218)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)
```
If it helps, this is the dataproc cluster properties I included when creating the cluster:
`--properties='^#^dataproc:pip.packages=google-cloud-secret-manager==2.15.0,numpy==1.24.1#spark:spark.jars=https://jdbc.postgresql.org/download/postgresql-42.5.1.jar,https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-spark-runtime-3.3_2.13/1.1.0/iceberg-spark-runtime-3.3_2.13-1.1.0.jar#core:fs.defaultFS=gs://de-dev-dataproc-fs'`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] rakesh-tmdc commented on issue #5424: MERGE INTO TABLE is not supported temporarily
Posted by GitBox <gi...@apache.org>.
rakesh-tmdc commented on issue #5424:
URL: https://github.com/apache/iceberg/issues/5424#issuecomment-1220688298
Hi I am also facing similar kind of issue with merge into functionality. I am using spark3.3.0 with iceberg 0.14.0 and getting below error with complete subquery -
`Caused by: org.apache.spark.sql.AnalysisException: unresolved operator 'ReplaceIcebergData RelationV2[__metadata#404, uuid#405, ts#406, user_id#407, segment_id#408, activity#409, activity_occurrence#410, repeated_at#411] default.segment_table;
`
With merge into clause -
`MERGE INTO default.segment_table old USING (SELECT * FROM segment_final) new ON old.uuid = new.uuid AND old.repeated_at IS NULL WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * `
However when i am trying single merge condition seperatly like `WHEN MATCHED THEN UPDATE SET * ` or `WHEN NOT MATCHED THEN INSERT * ` its working fine.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] github-actions[bot] commented on issue #5424: MERGE INTO TABLE is not supported temporarily
Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on issue #5424:
URL: https://github.com/apache/iceberg/issues/5424#issuecomment-1435413448
This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] owencwl commented on issue #5424: MERGE INTO TABLE is not supported temporarily
Posted by "owencwl (via GitHub)" <gi...@apache.org>.
owencwl commented on issue #5424:
URL: https://github.com/apache/iceberg/issues/5424#issuecomment-1549324534
I am also facing similar kind of issue with merge into functionality. spark 3.2 + iceberg 0.13.1:
spark-sql:
merge into `spark_catalog`.`default`.`merge_test` as t using (select 1 as id,'123456' as `data`,'flink1.14.5' as `category`,cast('2016-08-31' as date) as ts) as s on t.id=s.id
when matched then delete
when not matched then insert *;
spark-error:
Error in query: Project [id#50L, data#51, category#52, ts#53]
+- RowFilterAndDataMaskingMarker
+- RelationV2[id#50L, data#51, category#52, ts#53] spark_catalog.default.merge_test
is not an Iceberg table
Is it the reason for the temporary table (named s)??
scala-source-code:
org.apache.spark.sql.catalyst.analysis.RewriteMergeIntoTable
EliminateSubqueryAliases(aliasedTable) match {
case r: DataSourceV2Relation => xxxxxxxx
case p =>
throw new AnalysisException(s"$p is not an Iceberg table")
Why does such a problem arise????
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org