You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "谢波 (Jira)" <ji...@apache.org> on 2020/12/09 03:04:00 UTC

[jira] [Created] (FLINK-20542) flink-1.11.2 - stream join stream savepoint fail

谢波 created FLINK-20542:
--------------------------

             Summary: flink-1.11.2 - stream join stream savepoint fail
                 Key: FLINK-20542
                 URL: https://issues.apache.org/jira/browse/FLINK-20542
             Project: Flink
          Issue Type: Bug
         Environment: <flink.version>1.11.2</flink.version>
<scala.binary.version>2.11</scala.binary.version>

{color:#FF0000}tEnv.getConfig.setIdleStateRetentionTime(Time.hours(12), Time.hours(24)){color}

env.enableCheckpointing(2 * 60 * 1000L)
val checkpointConfig = env.getCheckpointConfig
checkpointConfig.setCheckpointTimeout(20 * 60 * 1000)
checkpointConfig.setTolerableCheckpointFailureNumber(2)

 

{color:#FF0000}env.setStateBackend({color}
{color:#FF0000} new RocksDBStateBackend(checkpointPath, true){color}
{color:#FF0000} .asInstanceOf[StateBackend]{color}
{color:#FF0000}){color}
            Reporter: 谢波


I have a SQL job that uses stream join stream, but at savepoint, the job fails.

I can't manually trigger savepoint right now because the task will die once it's triggered.

Can anyone help?Thanks

 

!image-2020-12-09-10-59-58-757.png!

 

2020-12-09 10:46:21,801 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-xiebo.2020-12-09 10:46:21,801 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-xiebo.2020-12-09 10:46:21,801 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-xiebo.2020-12-09 10:46:22,024 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/home/xiebo/module/flink/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.2020-12-09 10:46:22,414 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar2020-12-09 10:46:22,575 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface shd183.yonghui.cn:45855 of application 'application_1594105654926_6844100'.Triggering savepoint for job 749a7a45d6324f57ac859d3eab55a56c.Waiting for response...
------------------------------------------------------------ The program finished with the following exception:
org.apache.flink.util.FlinkException: Triggering a savepoint for the job 749a7a45d6324f57ac859d3eab55a56c failed. at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:668) at org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:646) at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:864) at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:643) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:934) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: The job has failed. at org.apache.flink.runtime.scheduler.SchedulerBase.lambda$triggerSavepoint$3(SchedulerBase.java:764) at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: The job has failed. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$null$0(CheckpointCoordinator.java:467) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abort(PendingCheckpoint.java:494) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1663) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:932) at org.apache.flink.runtime.scheduler.SchedulerBase.lambda$declineCheckpoint$5(SchedulerBase.java:827) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: The job has failed. at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abort(PendingCheckpoint.java:493) ... 10 moreCaused by: java.lang.Exception: Could not materialize checkpoint 439 for operator Join(joinType=[LeftOuterJoin], where=[(transNumber = transNumber0)], select=[transNumber, retailNumber, werks, businessDayDate, goodsId, retailQuantity, salesAmount, rtPlu, updateTime, keyBy, busi_grp_id, busi_grp_name, sales_dist, sales_dist_name, region_id_new, region_name_new, prov_code, prov_name, city_code, city_name, shop_name, vendor_id, vendor_name, firm_id, firm_name, div_id, dept_id, dept_name, catg_l_id, catg_l_name, catg_m_id, catg_m_name, catg_s_id, catg_s_name, goodsname, brand, brand_name, operation_type_id, operation_type_name, efct_sign_id, efct_sign_name, shop_goods_sts_id, shop_goods_sts_name, bravo_region_id, bravo_region_name, shop_belong, shop_belong_desc, zone_id, zone_name, bd_id, bd_name, firm_g1_id, firm_g1_name, firm_g2_id, firm_g2_name, pur_tax_rate, cost_price, vendor_name_new, vendor_id_new, transNumber0, returnFlag, channelFlag, commodityType, endTimestamp], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) -> Calc(select=[businessDayDate AS sdt, werks AS shopid, CAST(((CAST((endTimestamp IS NULL IF updateTime IF endTimestamp)) UNIX_TIMESTAMP _UTF-16LE'yyyy-MM-dd HH:mm:ss.SSS') * 1000)) AS updatetime, retailNumber AS serialid, transNumber AS sheetid, goodsId AS goodsid, keyBy AS key_by, CAST((endTimestamp IS NULL IF (FLAG(HOUR) EXTRACT updateTime) IF (FLAG(HOUR) EXTRACT endTimestamp))) AS timeframe, region_id_new AS regionid, region_name_new AS regionname, shop_name AS shopname, bravo_region_id AS serviceregionid, bravo_region_name AS serviceregionname, shop_belong AS shopbelongid, shop_belong_desc AS shopbelongname, sales_dist AS distid, sales_dist_name AS distname, prov_code AS provinceid, prov_name AS provincename, city_code AS cityid, city_name AS cityname, rtPlu AS pluid, salesAmount AS salevalue, commodityType AS commoditytype, CAST(_UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS memnum, busi_grp_id AS groupid, busi_grp_name AS groupname, div_id AS categoryid, goodsname, (((retailQuantity * cost_price) * (1 + (pur_tax_rate / 100))) ROUND 2) AS cost, retailQuantity AS amount, CAST(channelFlag) AS channelflag, CAST(returnFlag) AS isreturn, firm_id AS firmid, firm_name AS firmname, dept_id AS deptid, dept_name AS deptname, brand, brand_name AS brandname, operation_type_id AS operationtypeid, operation_type_name AS operationtypename, efct_sign_id AS efctsignid, efct_sign_name AS efctsignname, shop_goods_sts_id AS shopgoodsstsid, shop_goods_sts_name AS shopgoodsstsname, vendor_id AS vendorid, vendor_name AS vendorname, vendor_id_new AS vendoridnew, vendor_name_new AS vendornamenew, catg_l_id AS catglid, catg_l_name AS catglname, catg_m_id AS catgmid, catg_m_name AS catgmname, catg_s_id AS catgsid, catg_s_name AS catgsname, zone_id AS zoneid, zone_name AS zonename, bd_id AS bdid, bd_name AS bdname, firm_g1_id AS firmg1id, firm_g1_name AS firmg1name, firm_g2_id AS firmg2id, firm_g2_name AS firmg2name, ((cost_price * (1 + (pur_tax_rate / 100))) ROUND 2) AS cycleunitprice, businessDayDate AS saledate]) (1/10). at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:191) at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:138) ... 3 moreCaused by: java.util.concurrent.CancellationException at java.util.concurrent.FutureTask.report(FutureTask.java:121) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:479) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:50) at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:102) ... 3 more



--
This message was sent by Atlassian Jira
(v8.3.4#803005)