You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/01/05 05:00:22 UTC

[GitHub] [iceberg] liudi1184 opened a new issue #2028: The data generated by Actions cannot be serialized

liudi1184 opened a new issue #2028:
URL: https://github.com/apache/iceberg/issues/2028


   I want to use the following method to merge small files with Actions in the Flink task:
   
   ` StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
           env.setParallelism(1);
           env.setMaxParallelism(1);
           //registerSerializer();
           HadoopCatalog hadoopCatalog = HadoopCatalogHolder.getHadoopCatalog();
           List<Namespace> namespaces = hadoopCatalog.listNamespaces();
           for (Namespace namespace : namespaces) {
               List<TableIdentifier> tableIdentifiers = hadoopCatalog.listTables(namespace);
               for (TableIdentifier tableIdentifier : tableIdentifiers) {
                   logger.info("开始合并{}", tableIdentifier.toString());
                   Actions.forTable(env, hadoopCatalog.loadTable(tableIdentifier))
                           .rewriteDataFiles()
                           .maxParallelism(1)
                           .targetSizeInBytes(1024 * 1024 * 128)
                           .execute();
               }
           }
           env.execute("iceberg文件合并");`
   
   But there are always the following exceptions:
   
   `Caused by: java.io.IOException: Failed to deserialize an element from the source. If you are using user-defined serialization (Value and Writable types), check the serialization functions.
   Serializer is org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@9ef07cb2
           at org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:158) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
           at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
           at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
           at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
   Caused by: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
   Serialization trace:
   nullValueCounts (org.apache.iceberg.GenericDeleteFile)
   deletes (org.apache.iceberg.BaseFileScanTask)
   fileScanTask (org.apache.iceberg.BaseFileScanTask$SplitScanTask)
   tasks (org.apache.iceberg.BaseCombinedScanTask)
           at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
           at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
           at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
           at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:378) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
           at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:289) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
           at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
           at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
           at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
           at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
           at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
           at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
           at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
           at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:378) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
           at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:289) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
           at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
           at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
           at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
           at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
           at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
           at org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:155) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
           at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
           at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
           at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
   Caused by: java.lang.UnsupportedOperationException`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] liubo1022126 commented on issue #2028: The data generated by Actions cannot be serialized

Posted by GitBox <gi...@apache.org>.
liubo1022126 commented on issue #2028:
URL: https://github.com/apache/iceberg/issues/2028#issuecomment-775924996


   > Also, can you please verify what version of Iceberg you are using?
   
   I also meet this problem, we can discuss it here https://github.com/apache/iceberg/issues/2219


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on issue #2028: The data generated by Actions cannot be serialized

Posted by GitBox <gi...@apache.org>.
kbendick commented on issue #2028:
URL: https://github.com/apache/iceberg/issues/2028#issuecomment-756072537


   Do you know if anybody else encountered this issue? Additionally, is there any more stacktrace that was left off? It seems like there might be something after the `Caused by: java.lang.UnsupportedOperationException`. As an example, I found this in the twitter `Chill` issues:
   
   Example stack trace with helpful info after the UnsupportedOperationException: https://github.com/EsotericSoftware/kryo/issues/472
   ```
   com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException: ConfigObject is immutable, you can't call Map.put
   [error] Serialization trace: ...
   ```
   
   In general, the `Caused by` portion of the stack trace is likely to be the most informative. It's not exactly straight forward to tell what's happening here, as there's some code that seems to maybe be commented out for `registerSerializer` that could very well be very important to this serialization issue.
   
   Serializability issues of this sort are not an uncommon problem with both Kryo and Flink. In Flink, if one does not provide a custom type serializer, then it falls back to kryo. Kryo has limitations on its ability to (de)serialize objects, such as the mentioned limitation on serializing immutable objects (as well as some issues around fields that are or are not marked as final etc).
   
   If I had to guess, based on quickly poking around the code, specifically the `org.apache.iceberg.Metrics` class, I would say that likely the `Metrics` are being instantiated with an immutable map for that field. Kryo likely does not like this. If you're looking to take up this issue and don't have background in Kryo / Flink serialization, this stack overflow question might help you get a better understanding of the issue (as well the the above linked issue from the kryo repo):
   
   https://stackoverflow.com/questions/14499670/serializing-and-deserializing-object-arrays-in-kryo
   
   I can follow up with more information if need be, but it would be helpful if you provided any further stack trace after the `Caused by java.lang.UnsupportedOperationException` as well as any information about types that you register with the flink runtime (as it appears there is a registerSerializer() function that's probably highly relevant here).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on issue #2028: The data generated by Actions cannot be serialized

Posted by GitBox <gi...@apache.org>.
kbendick commented on issue #2028:
URL: https://github.com/apache/iceberg/issues/2028#issuecomment-756076279


   Also, can you please verify what version of Iceberg you are using?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] tuziling commented on issue #2028: The data generated by Actions cannot be serialized

Posted by GitBox <gi...@apache.org>.
tuziling commented on issue #2028:
URL: https://github.com/apache/iceberg/issues/2028#issuecomment-923603971


   After writing the data to iceberg, use RowKind.DELETE to delete, use flink to read iceberg in real time and report an error. The log information is as follows:
   ```
   Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
   	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
   	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
   	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1719)
   	at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
   	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)
   	at com.kingsoft.data.read.StreamingRead.main(StreamingRead.java:59)
   Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
   	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
   	at org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:186)
   	at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
   	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
   	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
   	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
   	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
   	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
   	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
   	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
   	at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:892)
   	at akka.dispatch.OnComplete.internal(Future.scala:264)
   	at akka.dispatch.OnComplete.internal(Future.scala:261)
   	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
   	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
   	at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:60)
   	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala)
   	at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
   	at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
   	at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
   	at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
   	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
   	at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)
   	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
   	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
   	at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
   	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
   	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
   	at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:60)
   	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala)
   	at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
   	at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
   	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
   	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
   	at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
   	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
   	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
   	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
   Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
   	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
   	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
   	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
   	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
   	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
   	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503)
   	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386)
   	at jdk.internal.reflect.GeneratedMethodAccessor17.invoke(Unknown Source)
   	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
   	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
   	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
   	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
   	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
   	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
   	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
   	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
   	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
   	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
   	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
   	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
   	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
   	at akka.actor.Actor.aroundReceive(Actor.scala:517)
   	at akka.actor.Actor.aroundReceive$(Actor.scala:515)
   	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
   	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
   	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
   	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
   	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
   	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
   	... 4 more
   Caused by: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
   Serialization trace:
   nullValueCounts (org.apache.iceberg.GenericDataFile)
   file (org.apache.iceberg.BaseFileScanTask)
   fileScanTask (org.apache.iceberg.BaseFileScanTask$SplitScanTask)
   tasks (org.apache.iceberg.BaseCombinedScanTask)
   task (org.apache.iceberg.flink.source.FlinkInputSplit)
   	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
   	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
   	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
   	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
   	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
   	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
   	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
   	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
   	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
   	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:378)
   	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:289)
   	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
   	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
   	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
   	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
   	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
   	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
   	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
   	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
   	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:205)
   	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
   	at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
   	at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
   	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
   	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
   	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:146)
   	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
   	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
   	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
   	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
   	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
   	at java.base/java.lang.Thread.run(Thread.java:829)
   Caused by: java.lang.UnsupportedOperationException
   	at java.base/java.util.Collections$UnmodifiableMap.put(Collections.java:1457)
   	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:144)
   	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
   	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
   	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
   	... 34 more
   ```
   The reading code is as follows:
   ```
   import org.apache.flink.streaming.api.datastream.DataStream;
   import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
   import org.apache.flink.table.data.RowData;
   import org.apache.hadoop.conf.Configuration;
   import org.apache.iceberg.catalog.TableIdentifier;
   import org.apache.iceberg.flink.CatalogLoader;
   import org.apache.iceberg.flink.TableLoader;
   import org.apache.iceberg.flink.source.FlinkSource;
   import org.slf4j.Logger;
   import org.slf4j.LoggerFactory;
   
   import java.util.HashMap;
   import java.util.Map;
   
   public class StreamingRead {
       private static Logger log = LoggerFactory.getLogger(StreamingRead.class);
   
       public static void main(String[] args) throws Exception{
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
   
           String catalogName = "songfuhao_catalog";
           Map<String,String> map = new HashMap<>();
           map.put("warehouse","s3://songfuhao-bucket/songfuhao");
           map.put("catalog-impl","org.apache.iceberg.aws.glue.GlueCatalog");
           map.put("io-impl","org.apache.iceberg.aws.s3.S3FileIO");
           map.put("lock-impl","org.apache.iceberg.aws.glue.DynamoLockManager");
           map.put("lock.table","myGlueLockTable");
   
           Configuration hadoopConf = new Configuration();
           String impl = "org.apache.iceberg.aws.glue.GlueCatalog";
           CatalogLoader tzlCatalog = CatalogLoader.custom(catalogName, map, hadoopConf, impl);
           TableIdentifier identifier = TableIdentifier.of("songfh", "sfuhao_test");
           /*Catalog catalog = tzlCatalog.loadCatalog();
           Table table = catalog.loadTable(identifier);
           TableOperations operations = ((BaseTable) table).operations();
           TableMetadata metadata = operations.current();
           operations.commit(metadata, metadata.upgradeToFormatVersion(2));*/
           TableLoader tableLoader = TableLoader.fromCatalog(tzlCatalog, identifier);
   
   
           DataStream<RowData> stream = FlinkSource.forRowData()
                   .env(env)
                   .tableLoader(tableLoader)
                   .streaming(true)
                   .build();
   
           stream.print();
   
           env.execute(StreamingRead.class.getSimpleName());
       }
   }
   ```
   Do you know how to modify the data in iceberg to be read normally?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org