You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/09/21 04:05:26 UTC

[GitHub] [iceberg] tuziling commented on issue #2028: The data generated by Actions cannot be serialized

tuziling commented on issue #2028:
URL: https://github.com/apache/iceberg/issues/2028#issuecomment-923603971


   After writing the data to iceberg, use RowKind.DELETE to delete, use flink to read iceberg in real time and report an error. The log information is as follows:
   ```
   Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
   	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
   	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
   	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1719)
   	at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
   	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)
   	at com.kingsoft.data.read.StreamingRead.main(StreamingRead.java:59)
   Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
   	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
   	at org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:186)
   	at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
   	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
   	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
   	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
   	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
   	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
   	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
   	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
   	at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:892)
   	at akka.dispatch.OnComplete.internal(Future.scala:264)
   	at akka.dispatch.OnComplete.internal(Future.scala:261)
   	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
   	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
   	at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:60)
   	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala)
   	at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
   	at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
   	at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
   	at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
   	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
   	at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)
   	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
   	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
   	at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
   	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
   	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
   	at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:60)
   	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala)
   	at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
   	at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
   	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
   	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
   	at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
   	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
   	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
   	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
   Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
   	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
   	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
   	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
   	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
   	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
   	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503)
   	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386)
   	at jdk.internal.reflect.GeneratedMethodAccessor17.invoke(Unknown Source)
   	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
   	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
   	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
   	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
   	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
   	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
   	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
   	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
   	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
   	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
   	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
   	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
   	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
   	at akka.actor.Actor.aroundReceive(Actor.scala:517)
   	at akka.actor.Actor.aroundReceive$(Actor.scala:515)
   	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
   	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
   	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
   	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
   	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
   	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
   	... 4 more
   Caused by: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
   Serialization trace:
   nullValueCounts (org.apache.iceberg.GenericDataFile)
   file (org.apache.iceberg.BaseFileScanTask)
   fileScanTask (org.apache.iceberg.BaseFileScanTask$SplitScanTask)
   tasks (org.apache.iceberg.BaseCombinedScanTask)
   task (org.apache.iceberg.flink.source.FlinkInputSplit)
   	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
   	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
   	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
   	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
   	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
   	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
   	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
   	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
   	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
   	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:378)
   	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:289)
   	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
   	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
   	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
   	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
   	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
   	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
   	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
   	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
   	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:205)
   	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
   	at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
   	at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
   	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
   	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
   	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:146)
   	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
   	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
   	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
   	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
   	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
   	at java.base/java.lang.Thread.run(Thread.java:829)
   Caused by: java.lang.UnsupportedOperationException
   	at java.base/java.util.Collections$UnmodifiableMap.put(Collections.java:1457)
   	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:144)
   	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
   	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
   	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
   	... 34 more
   ```
   The reading code is as follows:
   ```
   import org.apache.flink.streaming.api.datastream.DataStream;
   import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
   import org.apache.flink.table.data.RowData;
   import org.apache.hadoop.conf.Configuration;
   import org.apache.iceberg.catalog.TableIdentifier;
   import org.apache.iceberg.flink.CatalogLoader;
   import org.apache.iceberg.flink.TableLoader;
   import org.apache.iceberg.flink.source.FlinkSource;
   import org.slf4j.Logger;
   import org.slf4j.LoggerFactory;
   
   import java.util.HashMap;
   import java.util.Map;
   
   public class StreamingRead {
       private static Logger log = LoggerFactory.getLogger(StreamingRead.class);
   
       public static void main(String[] args) throws Exception{
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
   
           String catalogName = "songfuhao_catalog";
           Map<String,String> map = new HashMap<>();
           map.put("warehouse","s3://songfuhao-bucket/songfuhao");
           map.put("catalog-impl","org.apache.iceberg.aws.glue.GlueCatalog");
           map.put("io-impl","org.apache.iceberg.aws.s3.S3FileIO");
           map.put("lock-impl","org.apache.iceberg.aws.glue.DynamoLockManager");
           map.put("lock.table","myGlueLockTable");
   
           Configuration hadoopConf = new Configuration();
           String impl = "org.apache.iceberg.aws.glue.GlueCatalog";
           CatalogLoader tzlCatalog = CatalogLoader.custom(catalogName, map, hadoopConf, impl);
           TableIdentifier identifier = TableIdentifier.of("songfh", "sfuhao_test");
           /*Catalog catalog = tzlCatalog.loadCatalog();
           Table table = catalog.loadTable(identifier);
           TableOperations operations = ((BaseTable) table).operations();
           TableMetadata metadata = operations.current();
           operations.commit(metadata, metadata.upgradeToFormatVersion(2));*/
           TableLoader tableLoader = TableLoader.fromCatalog(tzlCatalog, identifier);
   
   
           DataStream<RowData> stream = FlinkSource.forRowData()
                   .env(env)
                   .tableLoader(tableLoader)
                   .streaming(true)
                   .build();
   
           stream.print();
   
           env.execute(StreamingRead.class.getSimpleName());
       }
   }
   ```
   Do you know how to modify the data in iceberg to be read normally?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org