You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/09/20 14:23:05 UTC

[GitHub] [iceberg] tuziling opened a new issue #3156: Flink reads iceberg in real time and reports errors

tuziling opened a new issue #3156:
URL: https://github.com/apache/iceberg/issues/3156


   Flink reads the data in real time and reports an error. Before querying, use flink to delete the existing primary key values in real time. The relevant codes are as follows:
   
   Flink reads real time code is:
   ```
   import org.apache.flink.streaming.api.datastream.DataStream;
   import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
   import org.apache.flink.table.data.RowData;
   import org.apache.hadoop.conf.Configuration;
   import org.apache.iceberg.catalog.TableIdentifier;
   import org.apache.iceberg.flink.CatalogLoader;
   import org.apache.iceberg.flink.TableLoader;
   import org.apache.iceberg.flink.source.FlinkSource;
   import org.slf4j.Logger;
   import org.slf4j.LoggerFactory;
   
   import java.util.HashMap;
   import java.util.Map;
   
   public class StreamingRead {
       private static Logger log = LoggerFactory.getLogger(StreamingRead.class);
   
       public static void main(String[] args) throws Exception{
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
     
           String catalogName = "songfuhao_catalog";
           Map<String,String> map = new HashMap<>();
           map.put("warehouse","s3://songfuhao-bucket/songfuhao");
           map.put("catalog-impl","org.apache.iceberg.aws.glue.GlueCatalog");
           map.put("io-impl","org.apache.iceberg.aws.s3.S3FileIO");
           map.put("lock-impl","org.apache.iceberg.aws.glue.DynamoLockManager");
           map.put("lock.table","myGlueLockTable");
   
           Configuration hadoopConf = new Configuration();
           String impl = "org.apache.iceberg.aws.glue.GlueCatalog";
           CatalogLoader tzlCatalog = CatalogLoader.custom(catalogName, map, hadoopConf, impl);
           TableIdentifier identifier = TableIdentifier.of("songfh", "sfuhao_test");
           /*Catalog catalog = tzlCatalog.loadCatalog();
           Table table = catalog.loadTable(identifier);
           TableOperations operations = ((BaseTable) table).operations();
           TableMetadata metadata = operations.current();
           operations.commit(metadata, metadata.upgradeToFormatVersion(2));*/
           TableLoader tableLoader = TableLoader.fromCatalog(tzlCatalog, identifier);
   
   
           DataStream<RowData> stream = FlinkSource.forRowData()
                   .env(env)
                   .tableLoader(tableLoader)
                   .streaming(true)
              //     .startSnapshotId(5868847536461701834L)
                   .build();
   
   
           stream.print();
   
           env.execute(StreamingRead.class.getSimpleName());
       }
   }
   ```
   The error log is as follows
   ```
   Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
   	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
   	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
   	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1719)
   	at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
   	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)
   	at com.kingsoft.data.read.StreamingRead.main(StreamingRead.java:61)
   Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
   	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
   	at org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:186)
   	at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
   	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
   	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
   	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
   	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
   	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
   	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
   	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
   	at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:892)
   	at akka.dispatch.OnComplete.internal(Future.scala:264)
   	at akka.dispatch.OnComplete.internal(Future.scala:261)
   	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
   	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
   	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
   	at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
   	at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
   	at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
   	at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
   	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
   	at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)
   	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
   	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
   	at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
   	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
   	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
   	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
   	at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
   	at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
   	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
   	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
   	at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
   	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
   	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
   	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
   Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
   	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
   	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
   	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
   	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
   	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
   	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503)
   	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386)
   	at jdk.internal.reflect.GeneratedMethodAccessor17.invoke(Unknown Source)
   	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
   	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
   	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
   	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
   	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
   	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
   	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
   	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
   	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
   	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
   	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
   	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
   	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
   	at akka.actor.Actor.aroundReceive(Actor.scala:517)
   	at akka.actor.Actor.aroundReceive$(Actor.scala:515)
   	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
   	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
   	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
   	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
   	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
   	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
   	... 4 more
   Caused by: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
   Serialization trace:
   columnSizes (org.apache.iceberg.GenericDeleteFile)
   deletes (org.apache.iceberg.BaseFileScanTask)
   fileScanTask (org.apache.iceberg.BaseFileScanTask$SplitScanTask)
   tasks (org.apache.iceberg.BaseCombinedScanTask)
   task (org.apache.iceberg.flink.source.FlinkInputSplit)
   	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
   	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
   	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
   	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:378)
   	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:289)
   	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
   	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
   	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
   	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
   	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
   	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
   	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
   	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:378)
   	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:289)
   	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
   	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
   	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
   	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
   	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
   	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
   	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
   	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
   	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:205)
   	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
   	at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
   	at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
   	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
   	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
   	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:146)
   	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
   	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
   	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
   	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
   	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
   	at java.base/java.lang.Thread.run(Thread.java:829)
   Caused by: java.lang.UnsupportedOperationException
   	at java.base/java.util.Collections$UnmodifiableMap.put(Collections.java:1457)
   	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:144)
   	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
   	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
   	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
   	... 37 more
   ```
   Does anyone know what the problem is?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org