You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/07/02 02:13:13 UTC

[GitHub] [iceberg] openinx commented on a change in pull request #1145: Implement the flink stream writer to accept the row data and emit the complete data files event to downstream

openinx commented on a change in pull request #1145:
URL: https://github.com/apache/iceberg/pull/1145#discussion_r448709770



##########
File path: core/src/main/java/org/apache/iceberg/BaseFile.java
##########
@@ -360,7 +360,7 @@ public ByteBuffer keyMetadata() {
     if (list != null) {
       List<E> copy = Lists.newArrayListWithExpectedSize(list.size());
       copy.addAll(list);
-      return Collections.unmodifiableList(copy);

Review comment:
       Because flink is depending on the `com.esotericsoftware.kryo.serializers` to serialize & deserialize the class/object, it will fill few `null` value into the fields which is a collection data type. the code is here: https://github.com/EsotericSoftware/kryo/blob/46ef9788fa1d3fb020ce6e8f33f431c9fb54cb35/src/com/esotericsoftware/kryo/serializers/CollectionSerializer.java#L102. 
   
   And if we don't make it modifiable, then it will throw the stacktrace : 
   
   ```
   java.lang.UnsupportedOperationException
   Serialization trace:
   splitOffsets (org.apache.iceberg.GenericDataFile)
   com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
   Serialization trace:
   splitOffsets (org.apache.iceberg.GenericDataFile)
   	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
   	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
   	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
   	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
   	at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:239)
   	at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness$MockOutput.collect(AbstractStreamOperatorTestHarness.java:693)
   	at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness$MockOutput.collect(AbstractStreamOperatorTestHarness.java:661)
   	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
   	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
   	at org.apache.iceberg.flink.IcebergStreamWriter.emit(IcebergStreamWriter.java:149)
   	at org.apache.iceberg.flink.IcebergStreamWriter.prepareSnapshotPreBarrier(IcebergStreamWriter.java:119)
   	at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.prepareSnapshotPreBarrier(AbstractStreamOperatorTestHarness.java:565)
   	at org.apache.iceberg.flink.TestIcebergStreamWriter.testWritingTable(TestIcebergStreamWriter.java:101)
   Caused by: java.lang.UnsupportedOperationException
   	at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
   	at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
   	at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
   	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
   	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
   	... 70 more
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org