You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "rfyu (via GitHub)" <gi...@apache.org> on 2023/05/16 10:19:15 UTC

[GitHub] [iceberg] rfyu opened a new issue, #7618: Exceptions when FlinkSQL writes to partitioned tables

rfyu opened a new issue, #7618:
URL: https://github.com/apache/iceberg/issues/7618

   When using Flink SQL to write to Iceberg partitioned tables in upsert mode, the following exception will be thrown:
   ```
   Exception in thread "main" java.lang.IllegalStateException: In UPSERT mode, partition field '1000: d: identity(2)' should be included in equality fields: '[id]'
   	at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkState(Preconditions.java:821)
   	at org.apache.iceberg.flink.sink.FlinkSink$Builder.appendWriter(FlinkSink.java:455)
   	at org.apache.iceberg.flink.sink.FlinkSink$Builder.chainIcebergOperators(FlinkSink.java:358)
   	at org.apache.iceberg.flink.sink.FlinkSink$Builder.append(FlinkSink.java:374)
   	at org.apache.iceberg.flink.IcebergTableSink$1.consumeDataStream(IcebergTableSink.java:87)
   	at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.applySinkProvider(CommonExecSink.java:471)
   	at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.createSinkTransformation(CommonExecSink.java:203)
   	at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:176)
   	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:158)
   	at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85)
   	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
   	at scala.collection.Iterator.foreach(Iterator.scala:937)
   	at scala.collection.Iterator.foreach$(Iterator.scala:937)
   	at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
   	at scala.collection.IterableLike.foreach(IterableLike.scala:70)
   	at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
   	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
   	at scala.collection.TraversableLike.map(TraversableLike.scala:233)
   	at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
   	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
   	at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:84)
   	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:197)
   	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1733)
   	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:825)
   	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:918)
   	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
   	at com.ctrip.ops.muise.flink.template.UpsertDemo.main(UpsertDemo.java:82)
   ```
   The reason is: 
   In the Flink Iceberg sink, the upsert mode needs to specify primary key information in equalityColumns to determine whether it is an update record. However, for partitioned tables, update records also need to consider partition fields. Therefore, in upsert mode, the primary key equalityColumns must contain partition fields, otherwise update records cannot be correctly determined, leading to exceptions.
   
   The solution is to merge the primary key fields and partition fields obtained in the getSinkRuntimeProvider method of IcebergTableSink as the value of equalityColumns.
   For example:
   ```
   @Override
   public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
       String[] primaryKeys = ...;  // get primary key fields
       String[] partitionKeys = ...; // get partition key fields
       String[] equalityColumns = ArrayUtils.addAll(primaryKeys, partitionKeys);  
       return new SinkRuntimeProvider() {
           public StatefulFunction getStoreFunction() {
               return new IcebergTableSink(equalityColumns, partitionKeys, format, catalog); 
           }
       }  
   }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on issue #7618: Exceptions when FlinkSQL writes to partitioned tables

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on issue #7618:
URL: https://github.com/apache/iceberg/issues/7618#issuecomment-1631781273

   this is the intended behavior. This check requires users to add partition columns to the identifier/PK columns.
   https://github.com/apache/iceberg/blob/master/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java#L523
   
   I am not sure we need PR #7620. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] linyanghao commented on issue #7618: Exceptions when FlinkSQL writes to partitioned tables

Posted by "linyanghao (via GitHub)" <gi...@apache.org>.
linyanghao commented on issue #7618:
URL: https://github.com/apache/iceberg/issues/7618#issuecomment-1582078900

   Currently, it's necessary to include partition fields as a part of the pk. If not, iceberg won't be able to ensure the uniqueness of pk across partitions. For example: both ('partition1', 'key1'), ('partition2', 'key1') can exist in a table in the same time, which breaks the uniqueness constraint.
   To support not including partition fields as pk. We need to implement a global index like hudi's. Given a record's pk, the global index routes the record to the partition containing it. In this way, we can ensure that the pk is unique.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rfyu commented on issue #7618: Exceptions when FlinkSQL writes to partitioned tables

Posted by "rfyu (via GitHub)" <gi...@apache.org>.
rfyu commented on issue #7618:
URL: https://github.com/apache/iceberg/issues/7618#issuecomment-1582196947

   You're right.Currently, the partition field is not included in the pk, so I created this [PR](https://github.com/apache/iceberg/pull/7620).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [I] Exceptions when FlinkSQL writes to partitioned tables [iceberg]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on issue #7618:
URL: https://github.com/apache/iceberg/issues/7618#issuecomment-1882020269

   This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org