You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/08/03 02:51:32 UTC

[GitHub] [iceberg] jessiedanwang opened a new issue, #5424: MERGE INTO TABLE is not supported temporarily

jessiedanwang opened a new issue, #5424:
URL: https://github.com/apache/iceberg/issues/5424

   ### Query engine
   
   Spark 3.1.2 on EMR 6.5
   
   ### Question
   
   We are running Spark 3.1.2 application on EMR 6.5 in account BBB while writing data to S3 and Glue catalog in account AAA. I would appreciate it any ideas what could be wrong here
   
   Here is what we did,
   spark = SparkSession
           .builder()
           .config("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "CORRECTED")
           .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") // Iceberg related configs
           .config("spark.sql.autoBroadcastJoinThreshold", "-1")
           .config("spark.sql.adaptive.autoBroadcastJoinThreshold", "-1")
           .config(s"spark.sql.catalog.iceberg_catalog", "org.apache.iceberg.spark.SparkCatalog")
           .config(s"spark.sql.catalog.iceberg_catalog.warehouse", warehousePath)
           .config(s"spark.sql.catalog.iceberg_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
           .config(s"spark.sql.catalog.iceberg_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
           .config(s"spark.sql.catalog.iceberg_catalog.client.factory", "org.apache.iceberg.aws.AssumeRoleAwsClientFactory")
           .config(s"spark.sql.catalog.iceberg_catalog.client.assume-role.arn", s"arn:aws:iam::AAA:role/RoleToAssume")
           .config(s"spark.sql.catalog.iceberg_catalog.client.assume-role.region", "us-east-2")
           .config("spark.hadoop.hive.metastore.glue.catalogid", AAA)
           .enableHiveSupport()
           .getOrCreate()
   
   df.writeTo("ns_name.table_name")
         .tableProperty("table_type", "iceberg")
         .tableProperty("format-version", "2")
         //.tableProperty("engine.hive.enabled", "true")
         .tableProperty("write.distribution-mode", "hash")
         .tableProperty("write.spark.fanout.enabled", "true")
         .tableProperty("write.parquet.compression-codec", "snappy")
         .tableProperty("write.avro.compression-codec", "snappy")
         .tableProperty("write.metadata.delete-after-commit.enabled", "true")
         .tableProperty("write.metadata.previous-versions-max", "3")
         .tableProperty("write.target-file-size-bytes", s"$GIGABYTE")
         .create()
   
   df2.createOrReplaceGlobalTempView("temp_view")
       val query: String = s"""MERGE INTO ns_name.table_name t
                              |USING (SELECT * FROM global_temp.temp_view) s
                              |s.id = t.id
                              |WHEN MATCHED THEN UPDATE SET *
                              |WHEN NOT MATCHED THEN INSERT *
                              |""".stripMargin
    spark.sql(query)
   
   Got the following error when trying to do MERGE INTO
   java.lang.UnsupportedOperationException: MERGE INTO TABLE is not supported temporarily.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] suisenkotoba commented on issue #5424: MERGE INTO TABLE is not supported temporarily

Posted by "suisenkotoba (via GitHub)" <gi...@apache.org>.
suisenkotoba commented on issue #5424:
URL: https://github.com/apache/iceberg/issues/5424#issuecomment-1451638558

   anyone has any luck with this? I faced the same issue, I could not find what caused this. 
   I am not sure if there's something wrong with my spark configuration. I use spark  3.3 on dataproc ([image version 2.1](https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-release-2.1)) with iceberg 1.1.0. The dataproc cluster already had dataproc metastore attached. 
   This is my python code:
   ```python
   from pyspark.sql import SparkSession
   from pyspark import SparkConf
   from pyspark.sql.types import LongType, BooleanType, StructField, StructType
   
   
   conf = (
           SparkConf()
           .setAppName('test_iceberg')
           .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
           .set('spark.sql.catalog.spark_catalog', 'org.apache.iceberg.spark.SparkSessionCatalog')
           .set('spark.sql.catalog.spark_catalog.type', 'hive')
           .set(f'spark.sql.catalog.dev', 'org.apache.iceberg.spark.SparkCatalog')
           .set(f'spark.sql.catalog.dev.type', 'hive')
       )
   spark = SparkSession.builder.enableHiveSupport().config(conf=conf).getOrCreate()
   
   df = spark.read.option("mergeSchema", "true").parquet('gs://poc-kafka/mis').sort("timestamp")
   
   # Create iceberg table 
   schema = df.select("value.after.*").schema
   additional_fields = [StructField('start_at', LongType(), True), 
                  StructField('end_at', LongType(), True), 
                  StructField('is_current', BooleanType(), True)]
   schema = StructType(schema.fields + additional_fields)
   empty_df = spark.createDataFrame([], schema)
   spark.sql("CREATE DATABASE IF NOT EXISTS dev.dummy")
   empty_df.createOrReplaceTempView("empty")
   spark.sql("CREATE TABLE IF NOT EXISTS dev.dummy.fruit USING iceberg  "
            "TBLPROPERTIES ('format-version'='2') "
            "AS (SELECT * FROM empty) ")
   
   # Merge table
   df.createOrReplaceTempView("changelog")
   sql = "MERGE INTO dev.dummy.fruit d " \
         "USING (SELECT value.after.*, value.op op, " \
         "            value.source.ts_ms start_at, True is_current, " \
         "            NULL end_at " \
         "        FROM changelog) s " \
         "ON d.id = s.id " \
         "WHEN MATCHED THEN UPDATE SET d.is_current = False, d.end_at = s.start_at " \
         "WHEN NOT MATCHED THEN INSERT * "
   spark.sql(sql)
   
   ```
   
   This is the traceback I got:
   ```
   /usr/lib/spark/python/pyspark/sql/session.py in sql(self, sqlQuery, **kwargs)
      1032             sqlQuery = formatter.format(sqlQuery, **kwargs)
      1033         try:
   -> 1034             return DataFrame(self._jsparkSession.sql(sqlQuery), self)
      1035         finally:
      1036             if len(kwargs) > 0:
   
   /usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py in __call__(self, *args)
      1319 
      1320         answer = self.gateway_client.send_command(command)
   -> 1321         return_value = get_return_value(
      1322             answer, self.gateway_client, self.target_id, self.name)
      1323 
   
   /usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
       188     def deco(*a: Any, **kw: Any) -> Any:
       189         try:
   --> 190             return f(*a, **kw)
       191         except Py4JJavaError as e:
       192             converted = convert_exception(e.java_exception)
   
   /usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
       324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
       325             if answer[1] == REFERENCE_TYPE:
   --> 326                 raise Py4JJavaError(
       327                     "An error occurred while calling {0}{1}{2}.\n".
       328                     format(target_id, ".", name), value)
   
   Py4JJavaError: An error occurred while calling o67.sql.
   : java.lang.UnsupportedOperationException: MERGE INTO TABLE is not supported temporarily.
       at org.apache.spark.sql.errors.QueryExecutionErrors$.ddlUnsupportedTemporarilyError(QueryExecutionErrors.scala:891)
       at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:821)
       at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
       at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
       at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
       at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
       at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
       at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
       at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
       at scala.collection.Iterator.foreach(Iterator.scala:943)
       at scala.collection.Iterator.foreach$(Iterator.scala:943)
       at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
       at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
       at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
       at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
       at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
       at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
       at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
       at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
       at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:459)
       at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:145)
       at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
       at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
       at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
       at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
       at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
       at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:145)
       at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:138)
       at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:158)
       at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
       at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
       at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
       at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
       at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
       at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:158)
       at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151)
       at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:204)
       at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:249)
       at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:218)
       at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:103)
       at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
       at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
       at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
       at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
       at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
       at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
       at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
       at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
       at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
       at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
       at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
       at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
       at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
       at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
       at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
       at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
       at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
       at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
       at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
       at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
       at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622)
       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
       at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
       at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
       at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
       at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
       at java.base/java.lang.reflect.Method.invoke(Method.java:566)
       at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
       at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
       at py4j.Gateway.invoke(Gateway.java:282)
       at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
       at py4j.commands.CallCommand.execute(CallCommand.java:79)
       at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
       at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
       at java.base/java.lang.Thread.run(Thread.java:829)
   ```
   If it helps, this is the dataproc cluster properties I included when creating the cluster:
   `--properties='^#^dataproc:pip.packages=google-cloud-secret-manager==2.15.0,numpy==1.24.1#spark:spark.jars=https://jdbc.postgresql.org/download/postgresql-42.5.1.jar,https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-spark-runtime-3.3_2.13/1.1.0/iceberg-spark-runtime-3.3_2.13-1.1.0.jar#core:fs.defaultFS=gs://de-dev-dataproc-fs'`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rakesh-tmdc commented on issue #5424: MERGE INTO TABLE is not supported temporarily

Posted by GitBox <gi...@apache.org>.
rakesh-tmdc commented on issue #5424:
URL: https://github.com/apache/iceberg/issues/5424#issuecomment-1220688298

   Hi I am also facing similar kind of issue with merge into functionality. I am using spark3.3.0 with iceberg 0.14.0 and getting below error with complete subquery -
   
   `Caused by: org.apache.spark.sql.AnalysisException: unresolved operator 'ReplaceIcebergData RelationV2[__metadata#404, uuid#405, ts#406, user_id#407, segment_id#408, activity#409, activity_occurrence#410, repeated_at#411] default.segment_table; 
   `
   With merge into clause -
   `MERGE INTO default.segment_table old USING (SELECT * FROM segment_final) new ON old.uuid = new.uuid AND old.repeated_at IS NULL WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * `
   
   However when i am trying single merge condition seperatly like `WHEN MATCHED THEN UPDATE SET * ` or `WHEN NOT MATCHED THEN INSERT * ` its working fine.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] github-actions[bot] commented on issue #5424: MERGE INTO TABLE is not supported temporarily

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on issue #5424:
URL: https://github.com/apache/iceberg/issues/5424#issuecomment-1435413448

   This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] owencwl commented on issue #5424: MERGE INTO TABLE is not supported temporarily

Posted by "owencwl (via GitHub)" <gi...@apache.org>.
owencwl commented on issue #5424:
URL: https://github.com/apache/iceberg/issues/5424#issuecomment-1549324534

   I am also facing similar kind of issue with merge into functionality. spark 3.2 + iceberg 0.13.1:
   
   spark-sql:
   merge into `spark_catalog`.`default`.`merge_test` as t using (select 1 as id,'123456' as `data`,'flink1.14.5' as `category`,cast('2016-08-31' as date) as ts) as s  on t.id=s.id
   when matched then delete
   when not matched then insert *;
   
   
   spark-error:
   Error in query: Project [id#50L, data#51, category#52, ts#53]
   +- RowFilterAndDataMaskingMarker
      +- RelationV2[id#50L, data#51, category#52, ts#53] spark_catalog.default.merge_test
    is not an Iceberg table
   
   Is it the reason for the temporary table (named s)??
   
   scala-source-code:
   org.apache.spark.sql.catalyst.analysis.RewriteMergeIntoTable
   
        EliminateSubqueryAliases(aliasedTable) match {
           case r: DataSourceV2Relation => xxxxxxxx
          case p =>
             throw new AnalysisException(s"$p is not an Iceberg table")
   
   Why does such a problem arise????
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org