You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/09/09 22:23:29 UTC

[GitHub] [iceberg] mohitgargk opened a new issue, #5739: unresolved operator 'ReplaceData when mergeInto involved not null join keys

mohitgargk opened a new issue, #5739:
URL: https://github.com/apache/iceberg/issues/5739

   spark 3.1.2 + iceberg 0.14.1 has been running successfully.
   spark 3.2.0 + iceberg 0.14.1 results in following error
   
   
   **Command**
   `./bin/spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.14.1    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions     --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog     --conf spark.sql.catalog.spark_catalog.type=hive     --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog     --conf spark.sql.catalog.local.type=hadoop     --conf spark.sql.catalog.local.warehouse=$PWD/warehouse;
   `
   
   **Exception**
   
   > org.apache.spark.sql.AnalysisException: unresolved operator 'ReplaceData RelationV2[id#219, firstname#220, lastname#221, age#222, date_id#223] local.db.target;
   > 'MergeIntoIcebergTable (cast(id#219 as bigint) = id#224L), [deleteaction(Some(((operation_type#230 = DELETE) AND (isnull(date_id#223) OR (arrival_time#229 > date_id#223))))), updateaction(Some((((operation_type#230 = UPSERT) OR (operation_type#230 = APPEND)) AND (isnull(date_id#223) OR (arrival_time#229 > date_id#223)))), assignment(id#219, ansi_cast(id#224L as string)), assignment(firstname#220, firstname#225), assignment(lastname#221, lastname#226), assignment(age#222, age#227), assignment(date_id#223, date_id#228))], [insertaction(Some(NOT (operation_type#230 = DELETE)), assignment(id#219, ansi_cast(id#224L as string)), assignment(firstname#220, firstname#225), assignment(lastname#221, lastname#226), assignment(age#222, age#227), assignment(date_id#223, date_id#228))]
   > :- SubqueryAlias target
   > :  +- SubqueryAlias local.db.target
   > :     +- RelationV2[id#219, firstname#220, lastname#221, age#222, date_id#223] local.db.target
   > :- SubqueryAlias source
   > :  +- SubqueryAlias local.db.source
   > :     +- RelationV2[id#224L, firstname#225, lastname#226, age#227, date_id#228, arrival_time#229, operation_type#230] local.db.source
   > +- 'ReplaceData
   >    +- MergeRows[id#219, firstname#220, lastname#221, age#222, date_id#223, _file#233]
   >       +- Join FullOuter, (cast(id#219 as bigint) = id#244L), leftHint=(strategy=no_broadcast_hash)
   >          :- NoStatsUnaryNode
   >          :  +- Project [id#219, firstname#220, lastname#221, age#222, date_id#223, _file#233, true AS __row_from_target#236, monotonically_increasing_id() AS __row_id#237L]
   >          :     +- RelationV2[id#219, firstname#220, lastname#221, age#222, date_id#223, _file#233] local.db.target
   >          +- Project [id#244L, firstname#245, lastname#246, age#247, date_id#248, arrival_time#249, operation_type#250, true AS __row_from_source#238]
   >             +- SubqueryAlias source
   >                +- SubqueryAlias local.db.source
   >                   +- RelationV2[id#244L, firstname#245, lastname#246, age#247, date_id#248, arrival_time#249, operation_type#250] local.db.source
   
   **spark-shell code**
   
   ```
   import org.apache.spark.sql.Column;
   import org.apache.spark.sql.DataFrame;
   import org.apache.spark.sql.Row;
   import org.apache.spark.sql.types.StructField;
   import org.apache.spark.sql.types.StructType;
   import org.apache.iceberg.{PartitionSpec, Schema, Table}
   import org.apache.iceberg.catalog.TableIdentifier
   import org.apache.iceberg.hadoop.HadoopCatalog
   import org.apache.iceberg.types.Types
   import org.apache.iceberg.TableProperties
   import spark.implicits._
   import java.sql.Timestamp
   
   // Utility
   def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) : DataFrame = {
     val schema = df.schema
     val newSchema = StructType(schema.map {
       case StructField( c, t, _, m) if c.equals(cn) => StructField( c, t, nullable = nullable, m)
       case y: StructField => y
     })
     df.sqlContext.createDataFrame( df.rdd, newSchema )
   }
   
   // 1. Prepare target data
   spark.sql("drop table local.db.target")
   spark.sql("CREATE TABLE local.db.target (id string not null, firstname string, lastname string, age int, date_id timestamp) USING iceberg");
   
   val namespace = "db"
   val target = "target"
   val targetName = TableIdentifier.of(namespace, target)
   val catalog = new HadoopCatalog(spark.sparkContext.hadoopConfiguration, "file:///Users/mohit.garg/spark_iceberg/spark-3.2.0-bin-hadoop3.2/warehouse")
   val targetTable = catalog.loadTable(targetName)
   targetTable.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
   
   val ts = Timestamp.valueOf("2022-01-01 00:00:00");
   val range = (1 to 10).toList
   val targetData = range.map(id => (id.toString, "", "", 0, ts) )
   var targetDf = spark.createDataFrame(targetData).toDF("id", "firstname", "lastname", "age", "date_id")
   
   val newTargetDf = setNullableStateOfColumn(targetDf, "id", false)
   newTargetDf.registerTempTable("targetDf")
   
   spark.sql("INSERT INTO local.db.target SELECT * from targetDf")
   spark.sql("select * from local.db.target").show
   
   // 2. Prepare source data
   spark.sql("drop table local.db.source")
   spark.sql("CREATE TABLE local.db.source (id bigint not null, firstname string, lastname string, age int, date_id timestamp, arrival_time timestamp, operation_type string) USING iceberg");
   
   val source = "source"
   val sourceName = TableIdentifier.of(namespace, source)
   val sourceTable = catalog.loadTable(sourceName)
   sourceTable.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
   
   val sourceData = Seq( 
     (1, "mohit", "garg", 1, Timestamp.valueOf("2022-01-01 00:00:01"), Timestamp.valueOf("2022-01-01 00:00:01"), "UPSERT" ), 
     (2, "adam", "hancock", 1, Timestamp.valueOf("2022-01-01 00:00:01"), Timestamp.valueOf("2022-01-01 00:00:01"), "APPEND" ), 
     (3, "pradeep", "venkat", 1, Timestamp.valueOf("2022-01-01 00:00:01"), Timestamp.valueOf("2022-01-01 00:00:01"), "DELETE" ) ) ;
   var sourceDf = spark.createDataFrame(sourceData).toDF("id", "firstname", "lastname", "age", "date_id", "arrival_time", "operation_type")
   sourceDf.registerTempTable("sourceDf")
   val newSourceDf = setNullableStateOfColumn(sourceDf, "id", false)
   newSourceDf.registerTempTable("sourceDf")
   
   spark.sql("INSERT INTO local.db.source SELECT * from sourceDf")
   spark.sql("select * from local.db.source").show
   
   // 3. MergeInto
   
   spark.sql("""MERGE INTO local.db.target as target
   USING local.db.source as source ON target.id = source.id 
     WHEN MATCHED AND source.operation_type = 'DELETE' AND (target.date_id IS NULL OR source.arrival_time > target.date_id) 
       THEN DELETE 
   
     WHEN MATCHED AND (source.operation_type = 'UPSERT' OR source.operation_type = 'APPEND') AND (target.date_id IS NULL OR source.arrival_time > target.date_id) 
       THEN UPDATE SET target.id = source.id, target.firstname = source.firstname, target.lastname = source.lastname, target.age = source.age, target.date_id = source.date_id 
   
     WHEN NOT MATCHED AND source.operation_type != 'DELETE' 
       THEN INSERT (`id`, `firstname`, `lastname`, `age`, `date_id`) VALUES (source.`id`, source.`firstname`, source.`lastname`, source.`age`, source.`date_id`)""")
   
   spark.sql("select * from local.db.target").show
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on issue #5739: unresolved operator 'ReplaceData when mergeInto involves not null join keys

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on issue #5739:
URL: https://github.com/apache/iceberg/issues/5739#issuecomment-1258758152

   Sorry, I was off. Let me take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on issue #5739: unresolved operator 'ReplaceData when mergeInto involves not null join keys

Posted by GitBox <gi...@apache.org>.
Fokko commented on issue #5739:
URL: https://github.com/apache/iceberg/issues/5739#issuecomment-1242906872

   @aokolnychyi would you have any time to dig into this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on issue #5739: unresolved operator 'ReplaceData when mergeInto involves not null join keys

Posted by GitBox <gi...@apache.org>.
Fokko commented on issue #5739:
URL: https://github.com/apache/iceberg/issues/5739#issuecomment-1243003082

   From Slack, @singhpk234 has a PR for this for COW: https://github.com/apache/iceberg/pull/5679 MOR is still being worked on.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on issue #5739: unresolved operator 'ReplaceData when mergeInto involves not null join keys

Posted by GitBox <gi...@apache.org>.
Fokko commented on issue #5739:
URL: https://github.com/apache/iceberg/issues/5739#issuecomment-1242903069

   I can confirm this bug. Created a slightly modified script to reproduce this:
   ```
   ./bin/spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 \
   --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
   --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkCatalog \
   --conf spark.sql.catalog.spark_catalog.type=hadoop \
   --conf spark.sql.catalog.spark_catalog.warehouse=$PWD/warehouse
   ```
   
   ```scala
   import org.apache.spark.sql.Column;
   import org.apache.spark.sql.DataFrame;
   import org.apache.spark.sql.Row;
   import org.apache.spark.sql.types.StructField;
   import org.apache.spark.sql.types.StructType;
   import org.apache.iceberg.{PartitionSpec, Schema, Table}
   import org.apache.iceberg.catalog.TableIdentifier
   import org.apache.iceberg.hadoop.HadoopCatalog
   import org.apache.iceberg.types.Types
   import org.apache.iceberg.TableProperties
   import spark.implicits._
   import java.sql.Timestamp
   
   // Utility
   def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) : DataFrame = {
     val schema = df.schema
     val newSchema = StructType(schema.map {
       case StructField( c, t, _, m) if c.equals(cn) => StructField( c, t, nullable = nullable, m)
       case y: StructField => y
     })
     df.sqlContext.createDataFrame( df.rdd, newSchema )
   }
   
   // 1. Prepare target data
   spark.sql("DROP TABLE IF EXISTS target")
   spark.sql("CREATE TABLE target (id string not null, firstname string, lastname string, age int, date_id timestamp) USING iceberg");
   spark.sql("SELECT * FROM target").show
   
   val namespace = "default"
   val target = "target"
   val targetName = TableIdentifier.of(namespace, target)
   val catalog = new HadoopCatalog(spark.sparkContext.hadoopConfiguration, "/Users/fokkodriesprong/Desktop/spark-3.2.0-bin-hadoop3.2/warehouse")
   val targetTable = catalog.loadTable(targetName)
   targetTable.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
   
   // spark.sql("ALTER TABLE local.db.target SET('format-version'='2')").show()
   
   val ts = Timestamp.valueOf("2022-01-01 00:00:00");
   val range = (1 to 10).toList
   val targetData = range.map(id => (id.toString, "", "", 0, ts) )
   var targetDf = spark.createDataFrame(targetData).toDF("id", "firstname", "lastname", "age", "date_id")
   
   val newTargetDf = setNullableStateOfColumn(targetDf, "id", false)
   newTargetDf.registerTempTable("targetDf")
   
   spark.sql("INSERT INTO target SELECT * FROM targetDf")
   spark.sql("SELECT * FROM target").show
   
   // 2. Prepare source data
   spark.sql("DROP TABLE IF EXISTS source")
   spark.sql("CREATE TABLE source (id bigint not null, firstname string, lastname string, age int, date_id timestamp, arrival_time timestamp, operation_type string) USING iceberg");
   
   val source = "source"
   val sourceName = TableIdentifier.of(namespace, source)
   val sourceTable = catalog.loadTable(sourceName)
   sourceTable.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
   
   val sourceData = Seq( 
     (1, "mohit", "garg", 1, Timestamp.valueOf("2022-01-01 00:00:01"), Timestamp.valueOf("2022-01-01 00:00:01"), "UPSERT" ), 
     (2, "adam", "hancock", 1, Timestamp.valueOf("2022-01-01 00:00:01"), Timestamp.valueOf("2022-01-01 00:00:01"), "APPEND" ), 
     (3, "pradeep", "venkat", 1, Timestamp.valueOf("2022-01-01 00:00:01"), Timestamp.valueOf("2022-01-01 00:00:01"), "DELETE" ) ) ;
   var sourceDf = spark.createDataFrame(sourceData).toDF("id", "firstname", "lastname", "age", "date_id", "arrival_time", "operation_type")
   sourceDf.registerTempTable("sourceDf")
   val newSourceDf = setNullableStateOfColumn(sourceDf, "id", false)
   newSourceDf.registerTempTable("sourceDf")
   
   spark.sql("INSERT INTO source SELECT * FROM sourceDf")
   spark.sql("SELECT * FROM source").show
   
   // 3. MergeInto
   
   spark.sql("""MERGE INTO target
   USING source ON target.id = source.id 
     WHEN MATCHED AND source.operation_type = 'DELETE' AND (target.date_id IS NULL OR source.arrival_time > target.date_id) 
       THEN DELETE 
   
     WHEN MATCHED AND (source.operation_type = 'UPSERT' OR source.operation_type = 'APPEND') AND (target.date_id IS NULL OR source.arrival_time > target.date_id) 
       THEN UPDATE SET target.id = source.id, target.firstname = source.firstname, target.lastname = source.lastname, target.age = source.age, target.date_id = source.date_id 
   
     WHEN NOT MATCHED AND source.operation_type != 'DELETE' 
       THEN INSERT (`id`, `firstname`, `lastname`, `age`, `date_id`) VALUES (source.`id`, source.`firstname`, source.`lastname`, source.`age`, source.`date_id`)""")
   
   spark.sql("select * from target").show
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org