You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2023/01/04 05:45:00 UTC
[GitHub] [iceberg] yulongz opened a new issue, #6519: Flink: ClassCastException when delete a row in upsert mode using two discontinuous primary keys
yulongz opened a new issue, #6519:
URL: https://github.com/apache/iceberg/issues/6519
### Apache Iceberg version
None
### Query engine
Flink
### Please describe the bug 🐞
Iceberg table have two discontinuous primary keys(`name`,`partition_key`) and primary keys type is varchar/StringData, other columns type is int/double.
```
CREATE TABLE IF NOT EXISTS default_catalog.default_database.aaa (
`name` varchar,
`class` int,
`age` int,
`score` double,
`partition_key` varchar,
PRIMARY KEY (`name`,`partition_key`) NOT ENFORCED)
WITH (
'write.upsert.enabled'='true',
'write.metadata.delete-after-commit.enabled'='true',
'write.metadata.previous-versions-max'='10',
'commit.manifest.min-count-to-merge'='5',
'table.dynamic-table-options.enabled'='true',
'format-version'='2');
```
When i use flink cdc and send a '-D' kind row in upsert mode, IcebergStreamWriter throw ClassCastException .
`BaseDeltaTaskWriter write(), Row Kind: -D , Row: -D(a1,1,33,77.0,b)`
Exception log as below,
```
2023-01-03 15:00:35,653 WARN org.apache.flink.runtime.taskmanager.Task
- Source: TableSourceScan(table=[[default_catalog, default_database, aaa]], fields=[name, class, age, score, partition_key])
-> NotNullEnforcer(fields=[name, partition_key])
-> IcebergStreamWriter (1/1)#0 (5a04dd71dcf69ba92a2cbc1844b359d2) switched from RUNNING to FAILED with failure cause:
java.lang.ClassCastException: java.lang.Integer cannot be cast to org.apache.flink.table.data.StringData
at org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169)
at org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$1(RowData.java:228)
at org.apache.iceberg.flink.data.FlinkParquetWriters$RowDataWriter.get(FlinkParquetWriters.java:469)
at org.apache.iceberg.flink.data.FlinkParquetWriters$RowDataWriter.get(FlinkParquetWriters.java:453)
at org.apache.iceberg.parquet.ParquetValueWriters$StructWriter.write(ParquetValueWriters.java:566)
at org.apache.iceberg.parquet.ParquetWriter.add(ParquetWriter.java:130)
at org.apache.iceberg.deletes.EqualityDeleteWriter.delete(EqualityDeleteWriter.java:82)
at org.apache.iceberg.io.BaseTaskWriter$RollingEqDeleteWriter.write(BaseTaskWriter.java:368)
at org.apache.iceberg.io.BaseTaskWriter$RollingEqDeleteWriter.write(BaseTaskWriter.java:351)
at org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.write(BaseTaskWriter.java:263)
at org.apache.iceberg.io.BaseTaskWriter$BaseEqualityDeltaWriter.delete(BaseTaskWriter.java:173)
at org.apache.iceberg.flink.sink.BaseDeltaTaskWriter.write(BaseDeltaTaskWriter.java:105)
at org.apache.iceberg.flink.sink.BaseDeltaTaskWriter.write(BaseDeltaTaskWriter.java:43)
at org.apache.iceberg.flink.sink.IcebergStreamWriter.processElement(IcebergStreamWriter.java:72)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:143)
at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.emit(RowDataDebeziumDeserializeSchema.java:157)
at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.deserialize(RowDataDebeziumDeserializeSchema.java:129)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:118)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:100)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:54)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:351)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)
```
Solution: BaseDeltaTaskWriter.write() add deleteKey when DELETE in upsert mode.
```
@Override
public void write(RowData row) throws IOException {
RowDataDeltaWriter writer = route(row);
switch (row.getRowKind()) {
case INSERT:
case UPDATE_AFTER:
if (upsert) {
writer.deleteKey(keyProjection.wrap(row));
}
writer.write(row);
break;
case UPDATE_BEFORE:
if (upsert) {
break; // UPDATE_BEFORE is not necessary for UPDATE, we do nothing to prevent delete one row twice
}
writer.delete(row);
break;
case DELETE:
if (upsert) {
writer.deleteKey(keyProjection.wrap(row));
} else {
writer.delete(row);
}
break;
default:
throw new UnsupportedOperationException("Unknown row kind: " + row.getRowKind());
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] yulongz commented on issue #6519: Flink: ClassCastException when delete a row in upsert mode using two discontinuous primary keys
Posted by GitBox <gi...@apache.org>.
yulongz commented on issue #6519:
URL: https://github.com/apache/iceberg/issues/6519#issuecomment-1378287808
> @yulongz would you like to open a PR to fix this issue?
https://github.com/apache/iceberg/pull/6560
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] nastra commented on issue #6519: Flink: ClassCastException when delete a row in upsert mode using two discontinuous primary keys
Posted by GitBox <gi...@apache.org>.
nastra commented on issue #6519:
URL: https://github.com/apache/iceberg/issues/6519#issuecomment-1376017800
@yulongz would you like to open a PR to fix this issue?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] github-actions[bot] closed issue #6519: Flink: ClassCastException when delete a row in upsert mode using two discontinuous primary keys
Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed issue #6519: Flink: ClassCastException when delete a row in upsert mode using two discontinuous primary keys
URL: https://github.com/apache/iceberg/issues/6519
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] github-actions[bot] commented on issue #6519: Flink: ClassCastException when delete a row in upsert mode using two discontinuous primary keys
Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on issue #6519:
URL: https://github.com/apache/iceberg/issues/6519#issuecomment-1629904633
This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] github-actions[bot] commented on issue #6519: Flink: ClassCastException when delete a row in upsert mode using two discontinuous primary keys
Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on issue #6519:
URL: https://github.com/apache/iceberg/issues/6519#issuecomment-1650732375
This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org