You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2023/01/04 05:45:00 UTC

[GitHub] [iceberg] yulongz opened a new issue, #6519: Flink: ClassCastException when delete a row in upsert mode using two discontinuous primary keys

yulongz opened a new issue, #6519:
URL: https://github.com/apache/iceberg/issues/6519

   ### Apache Iceberg version
   
   None
   
   ### Query engine
   
   Flink
   
   ### Please describe the bug 🐞
   
   Iceberg table have two discontinuous primary keys(`name`,`partition_key`) and primary keys type is varchar/StringData, other columns type is int/double. 
   ```
   CREATE TABLE IF NOT EXISTS default_catalog.default_database.aaa (
   `name` varchar,
   `class` int,
   `age` int,
   `score` double,
   `partition_key` varchar,
   PRIMARY KEY (`name`,`partition_key`) NOT ENFORCED)
   WITH (
   'write.upsert.enabled'='true',
   'write.metadata.delete-after-commit.enabled'='true',
   'write.metadata.previous-versions-max'='10',
   'commit.manifest.min-count-to-merge'='5',
   'table.dynamic-table-options.enabled'='true',
   'format-version'='2');
   ```
   When i use flink cdc and send a '-D' kind row in upsert mode, IcebergStreamWriter throw ClassCastException .
   `BaseDeltaTaskWriter write(), Row Kind: -D , Row: -D(a1,1,33,77.0,b)`
   Exception log as below,
   ```
   2023-01-03 15:00:35,653 WARN  org.apache.flink.runtime.taskmanager.Task                    
    - Source: TableSourceScan(table=[[default_catalog, default_database, aaa]], fields=[name, class, age, score, partition_key]) 
    -> NotNullEnforcer(fields=[name, partition_key]) 
    -> IcebergStreamWriter (1/1)#0 (5a04dd71dcf69ba92a2cbc1844b359d2) switched from RUNNING to FAILED with failure cause:
    java.lang.ClassCastException: java.lang.Integer cannot be cast to org.apache.flink.table.data.StringData
   	at org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169)
   	at org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$1(RowData.java:228)
   	at org.apache.iceberg.flink.data.FlinkParquetWriters$RowDataWriter.get(FlinkParquetWriters.java:469)
   	at org.apache.iceberg.flink.data.FlinkParquetWriters$RowDataWriter.get(FlinkParquetWriters.java:453)
   	at org.apache.iceberg.parquet.ParquetValueWriters$StructWriter.write(ParquetValueWriters.java:566)
   	at org.apache.iceberg.parquet.ParquetWriter.add(ParquetWriter.java:130)
   	at org.apache.iceberg.deletes.EqualityDeleteWriter.delete(EqualityDeleteWriter.java:82)
   	at org.apache.iceberg.io.BaseTaskWriter$RollingEqDeleteWriter.write(BaseTaskWriter.java:368)
   	at org.apache.iceberg.io.BaseTaskWriter$RollingEqDeleteWriter.write(BaseTaskWriter.java:351)
   	at org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.write(BaseTaskWriter.java:263)
   	at org.apache.iceberg.io.BaseTaskWriter$BaseEqualityDeltaWriter.delete(BaseTaskWriter.java:173)
   	at org.apache.iceberg.flink.sink.BaseDeltaTaskWriter.write(BaseDeltaTaskWriter.java:105)
   	at org.apache.iceberg.flink.sink.BaseDeltaTaskWriter.write(BaseDeltaTaskWriter.java:43)
   	at org.apache.iceberg.flink.sink.IcebergStreamWriter.processElement(IcebergStreamWriter.java:72)
   	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
   	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
   	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
   	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
   	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
   	at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
   	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
   	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
   	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
   	at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196)
   	at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
   	at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
   	at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:143)
   	at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.emit(RowDataDebeziumDeserializeSchema.java:157)
   	at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.deserialize(RowDataDebeziumDeserializeSchema.java:129)
   	at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:118)
   	at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:100)
   	at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:54)
   	at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
   	at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:351)
   	at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
   	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
   	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
   	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
   	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
   	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
   	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
   	at java.lang.Thread.run(Thread.java:748)
   ```
   Solution:  BaseDeltaTaskWriter.write() add deleteKey when DELETE in upsert mode.
   ```
     @Override
     public void write(RowData row) throws IOException {
       RowDataDeltaWriter writer = route(row);
   
       switch (row.getRowKind()) {
         case INSERT:
         case UPDATE_AFTER:
           if (upsert) {
             writer.deleteKey(keyProjection.wrap(row));
           }
           writer.write(row);
           break;
   
         case UPDATE_BEFORE:
           if (upsert) {
             break;  // UPDATE_BEFORE is not necessary for UPDATE, we do nothing to prevent delete one row twice
           }
           writer.delete(row);
           break;
         case DELETE:
           if (upsert) {
             writer.deleteKey(keyProjection.wrap(row));
           } else {
             writer.delete(row);
           }
           break;
   
         default:
           throw new UnsupportedOperationException("Unknown row kind: " + row.getRowKind());
       }
     }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yulongz commented on issue #6519: Flink: ClassCastException when delete a row in upsert mode using two discontinuous primary keys

Posted by GitBox <gi...@apache.org>.
yulongz commented on issue #6519:
URL: https://github.com/apache/iceberg/issues/6519#issuecomment-1378287808

   > @yulongz would you like to open a PR to fix this issue?
   
   https://github.com/apache/iceberg/pull/6560


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] nastra commented on issue #6519: Flink: ClassCastException when delete a row in upsert mode using two discontinuous primary keys

Posted by GitBox <gi...@apache.org>.
nastra commented on issue #6519:
URL: https://github.com/apache/iceberg/issues/6519#issuecomment-1376017800

   @yulongz would you like to open a PR to fix this issue?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] github-actions[bot] closed issue #6519: Flink: ClassCastException when delete a row in upsert mode using two discontinuous primary keys

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed issue #6519: Flink:  ClassCastException when delete a row in upsert mode using two discontinuous primary keys
URL: https://github.com/apache/iceberg/issues/6519


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] github-actions[bot] commented on issue #6519: Flink: ClassCastException when delete a row in upsert mode using two discontinuous primary keys

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on issue #6519:
URL: https://github.com/apache/iceberg/issues/6519#issuecomment-1629904633

   This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] github-actions[bot] commented on issue #6519: Flink: ClassCastException when delete a row in upsert mode using two discontinuous primary keys

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on issue #6519:
URL: https://github.com/apache/iceberg/issues/6519#issuecomment-1650732375

   This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org