You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/02/05 11:47:52 UTC

[GitHub] [iceberg] liubo1022126 opened a new issue #2219: RewriteDataFiles error when row level delete by equal exists

liubo1022126 opened a new issue #2219:
URL: https://github.com/apache/iceberg/issues/2219


   I try to test flink cdc and flink rewriteDataFiles on iceberg 0.11, when I write the msg append (msg is +I,1,aaa,20210128), everything is ok, but when i write a row level delete by id, rewriteDataFiles has an error, the same to DataStream streaming read.
   
   `Caused by: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
   Serialization trace:
   nullValueCounts (org.apache.iceberg.GenericDataFile)
   file (org.apache.iceberg.BaseFileScanTask)
   fileScanTask (org.apache.iceberg.BaseFileScanTask$SplitScanTask)
   tasks (org.apache.iceberg.BaseCombinedScanTask)
   task (org.apache.iceberg.flink.source.FlinkInputSplit)
   	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
   	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
   	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
   	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
   	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
   	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
   	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
   	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
   	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
   	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:378)
   	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:289)
   	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
   	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
   	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
   	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
   	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
   	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
   	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
   	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
   	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:205)
   	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
   	at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
   	at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
   	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
   	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
   	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:146)
   	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
   	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
   	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
   	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
   	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
   	at java.lang.Thread.run(Thread.java:748)
   Caused by: java.lang.UnsupportedOperationException
   	at java.util.Collections$UnmodifiableMap.put(Collections.java:1457)
   	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:144)
   	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
   	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
   	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
   	... 34 more`
   
   flink cdc code is like
   `public class IcebergSinkSample11 {
       private static Settings.TableBuilder tableBuilder = new Settings.TableBuilder();
       private static TableLoader tableLoader = tableBuilder.tableLoader();
       public static TableSchema tableSchema = FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(tableBuilder.load().getSchema()));
   
       private static final Map<String, RowKind> ROW_KIND_MAP = ImmutableMap.of(
               "+I", RowKind.INSERT,
               "-D", RowKind.DELETE,
               "-U", RowKind.UPDATE_BEFORE,
               "+U", RowKind.UPDATE_AFTER);
   
       private static StreamExecutionEnvironment env;
   
       public static void main(String[] args) throws Exception {
           env = StreamExecutionEnvironment.getExecutionEnvironment()
                   .enableCheckpointing(60000)
                   .setParallelism(1)
                   .setMaxParallelism(1);
           env.getConfig().enableSysoutLogging();
   
           DataStream<String> dataStream = env.socketTextStream("localhost", 9999);   //nc -lk 9999
   
           DataStream<Row> rowStream = dataStream
                   .flatMap(new FlatMapFunction<String, Row>() {
                       @Override
                       public void flatMap(String rowContent, Collector<Row> collector) throws Exception {
                           try {
                               String[] rowElements = rowContent.split(",");
                               String ops = rowElements[0];
                               Integer id = Integer.parseInt(rowElements[1]);
   
                               if ("+I".equals(ops)) {
                                   collector.collect(row("+I", id, rowElements[2], rowElements[3]));
                               } else if ("-D".equals(ops)) {
                                   collector.collect(row("-D", id, "", rowElements[2]));
                               } else if ("-U".equals(ops) || "+U".equals(ops)) {
                                   collector.collect(row("-D", id, rowElements[2], rowElements[3]));
                                   collector.collect(row("+I", id, rowElements[2], rowElements[3]));
                               } else {
                                   throw new IllegalArgumentException("cdc type unknown: " + ops);
                               }
                           } catch (NumberFormatException e) {
                               e.printStackTrace();
                           }
                       }
                   });
   
           /*
            * +I,1,aaa,20201228
            * -D,1,20201228
            * +I,1,bbb,20201228
            * -U,1,ccc,20201228
            */
           runV2(rowStream);
       }
   
       private static void runV2(DataStream<Row> rowStream) throws Exception {
           DataStream<Row> rowDataStream = rowStream.keyBy(row -> Row.of(row.getField(0)));
   
           FlinkSink.forRow(rowDataStream, tableSchema)
                   .tableLoader(tableLoader)
                   .tableSchema(tableSchema)
                   .writeParallelism(1)
                   .equalityFieldColumns(ImmutableList.of("id"))
                   .build();
   
           env.execute("Test Iceberg DataStream");
       }
   
       private static Row row(String rowKind, int id, String data, String pt) {
           RowKind kind = ROW_KIND_MAP.get(rowKind);
           if (kind == null) {
               throw new IllegalArgumentException("Unknown row kind: " + rowKind);
           }
   
           return Row.ofKind(kind, id, data, pt);
       }
   }`
   
   flink DataStream streaming read code is
   `public class IcebergSourceSample11 {
       private static Settings.TableBuilder tableBuilder = new Settings.TableBuilder();
       private static TableLoader tableLoader = tableBuilder.tableLoader();
       private static Table table = tableBuilder.load().getTable();
   
       public static void main(String[] args) throws Exception {
           RowType rowType = FlinkSchemaUtil.convert(table.schema());
           DataStructureConverter<Object, Object> converter = DataStructureConverters.getConverter(
                   TypeConversions.fromLogicalToDataType(rowType));
   
           StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
           DataStream<RowData> stream = FlinkSource.forRowData()
                   .env(env)
                   .tableLoader(tableLoader)
                   .streaming(true)
                   .build();
   
           stream.map(new MapFunction<RowData, Row>() {
               @Override
               public Row map(RowData rowData) throws Exception {
                   return (Row) converter.toExternal(rowData);
               }
           }).print();
   
           env.execute("Test Iceberg Read");
       }
   }`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] liubo1022126 closed issue #2219: RewriteDataFiles & DataStream streaming read error when row level delete by equal

Posted by GitBox <gi...@apache.org>.
liubo1022126 closed issue #2219:
URL: https://github.com/apache/iceberg/issues/2219


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] liubo1022126 commented on issue #2219: RewriteDataFiles & DataStream streaming read error when row level delete by equal

Posted by GitBox <gi...@apache.org>.
liubo1022126 commented on issue #2219:
URL: https://github.com/apache/iceberg/issues/2219#issuecomment-774836369


   > I think it may be a flink serialization problem. Variables in flink operators (such as map and flatmap etc) need to be serializabled, and some variables do not meet the serialization requirements of flink.
   > 
   > https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] liubo1022126 removed a comment on issue #2219: RewriteDataFiles & DataStream streaming read error when row level delete by equal

Posted by GitBox <gi...@apache.org>.
liubo1022126 removed a comment on issue #2219:
URL: https://github.com/apache/iceberg/issues/2219#issuecomment-774836369


   > I think it may be a flink serialization problem. Variables in flink operators (such as map and flatmap etc) need to be serializabled, and some variables do not meet the serialization requirements of flink.
   > 
   > https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] liubo1022126 edited a comment on issue #2219: RewriteDataFiles & DataStream streaming read error when row level delete by equal

Posted by GitBox <gi...@apache.org>.
liubo1022126 edited a comment on issue #2219:
URL: https://github.com/apache/iceberg/issues/2219#issuecomment-775921861


   > I think it may be a flink serialization problem. Variables in flink operators (such as map and flatmap etc) need to be serializabled, and some variables do not meet the serialization requirements of flink.
   > 
   > https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html
   
   thanks zhangjun0x01
   
   yes, it seems that there are something wrong with kryo serialization, it use flink default serialization(kryo), I think https://github.com/EsotericSoftware/kryo/issues/693 is useful but not sure
   
   for test, if i declare the var with transient, it is ok, but I am not sure if var is required for the next operation in flink DAG
   
   **transient var like**
   
     private transient Map<Integer, Long> columnSizes = null;
     private transient Map<Integer, Long> valueCounts = null;
     private transient Map<Integer, Long> nullValueCounts = null;
     private transient Map<Integer, Long> nanValueCounts = null;
     private transient Map<Integer, ByteBuffer> lowerBounds = null;
     private transient Map<Integer, ByteBuffer> upperBounds = null;
   
   in https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseFile.java
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] liudi1184 commented on issue #2219: RewriteDataFiles & DataStream streaming read error when row level delete by equal

Posted by GitBox <gi...@apache.org>.
liudi1184 commented on issue #2219:
URL: https://github.com/apache/iceberg/issues/2219#issuecomment-781793924


   I also follow your method, but I think there may be problems. When tracing the source code, we found that these fields were serialized as UnmodifiableMap, and then put elements into it, so an exception occurred.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] liubo1022126 commented on issue #2219: RewriteDataFiles & DataStream streaming read error when row level delete by equal

Posted by GitBox <gi...@apache.org>.
liubo1022126 commented on issue #2219:
URL: https://github.com/apache/iceberg/issues/2219#issuecomment-851778653


   fix as https://github.com/apache/iceberg/pull/2343


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on issue #2219: RewriteDataFiles & DataStream streaming read error when row level delete by equal

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on issue #2219:
URL: https://github.com/apache/iceberg/issues/2219#issuecomment-782579099


   I submit a PR (#2258 ) to fix this


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] liubo1022126 closed issue #2219: RewriteDataFiles & DataStream streaming read error when row level delete by equal

Posted by GitBox <gi...@apache.org>.
liubo1022126 closed issue #2219:
URL: https://github.com/apache/iceberg/issues/2219


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] liubo1022126 commented on issue #2219: RewriteDataFiles & DataStream streaming read error when row level delete by equal

Posted by GitBox <gi...@apache.org>.
liubo1022126 commented on issue #2219:
URL: https://github.com/apache/iceberg/issues/2219#issuecomment-775921861


   > I think it may be a flink serialization problem. Variables in flink operators (such as map and flatmap etc) need to be serializabled, and some variables do not meet the serialization requirements of flink.
   > 
   > https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html
   
   thanks zhangjun0x01
   
   yes, it seems that there are something wrong with kryo serialization, it use flink default serialization(kryo), I think https://github.com/EsotericSoftware/kryo/issues/693 is useful but not sure
   
   for test, if i declare the var with transient, it is ok, but i'm not sure the var is need at the next operation in flink DAG
   
   **transient var like**
   
     private transient Map<Integer, Long> columnSizes = null;
     private transient Map<Integer, Long> valueCounts = null;
     private transient Map<Integer, Long> nullValueCounts = null;
     private transient Map<Integer, Long> nanValueCounts = null;
     private transient Map<Integer, ByteBuffer> lowerBounds = null;
     private transient Map<Integer, ByteBuffer> upperBounds = null;
   
   in https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseFile.java
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on issue #2219: RewriteDataFiles & DataStream streaming read error when row level delete by equal

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on issue #2219:
URL: https://github.com/apache/iceberg/issues/2219#issuecomment-774654657


   I think it may be a flink serialization problem. Variables in flink operators (such as map and flatmap etc) need to be serializabled, and some variables do not meet the serialization requirements of flink. 
   
   https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org