You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "macdoor615 (Jira)" <ji...@apache.org> on 2020/12/28 08:55:00 UTC

[jira] [Updated] (FLINK-20784) .staging_xxx does not exist, when insert into hive

     [ https://issues.apache.org/jira/browse/FLINK-20784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

macdoor615 updated FLINK-20784:
-------------------------------
    Description: 
standalone cluster, batch mode,

periodically execute this sql with "sql-client.sh", only change date parameter:
{quote}{{insert overwrite snmpprobehive.snmpprobe.p_port_traffic_5m}}
 {{select}}
 ipp.binaryid as binaryid,
 ipp.id as id,
 'all' as ver,
 p0m.coltime as coltime,
 p5m.ifhcinoctets - p0m.ifhcinoctets as inoctets,
 p5m.ifhcoutoctets - p0m.ifhcoutoctets as outoctets,
 (p5m.ifhcinoctets - p0m.ifhcinoctets) + (p5m.ifhcoutoctets - p0m.ifhcoutoctets) as bi_octets,
 if((p5m.ifhcinoctets - p0m.ifhcinoctets) >= (p5m.ifhcoutoctets - p0m.ifhcoutoctets), (p5m.ifhcinoctets - p0m.ifhcinoctets), (p5m.ifhcoutoctets - p0m.ifhcoutoctets)) as unimax_octets,
 cast((p5m.ifhcinoctets - p0m.ifhcinoctets) as double)/(5*60)*8 as in_speed,
 cast((p5m.ifhcoutoctets - p0m.ifhcoutoctets) as double)/(5*60)*8 as out_speed,
 cast(((p5m.ifhcinoctets - p0m.ifhcinoctets) + (p5m.ifhcoutoctets - p0m.ifhcoutoctets)) as double)/(5*60)*8 as bi_speed,
 cast(if((p5m.ifhcinoctets - p0m.ifhcinoctets) >= (p5m.ifhcoutoctets - p0m.ifhcoutoctets), (p5m.ifhcinoctets - p0m.ifhcinoctets), (p5m.ifhcoutoctets - p0m.ifhcoutoctets)) as double)/(5*60)*8 unimax_speed,
 cast((p5m.ifhcinoctets - p0m.ifhcinoctets) as double)/(5*60)*8/(cast(p0m.ifhighspeed as bigint)*1000000) as in_util,
 cast((p5m.ifhcoutoctets - p0m.ifhcoutoctets) as double)/(5*60)*8/(cast(p0m.ifhighspeed as bigint)*1000000) as out_util,
 cast(((p5m.ifhcinoctets - p0m.ifhcinoctets) + (p5m.ifhcoutoctets - p0m.ifhcoutoctets)) as double)/(5*60)*8/(cast(p0m.ifhighspeed as bigint)*1000000*2) as bi_util,
 cast(if((p5m.ifhcinoctets - p0m.ifhcinoctets) >= (p5m.ifhcoutoctets - p0m.ifhcoutoctets), (p5m.ifhcinoctets - p0m.ifhcinoctets), (p5m.ifhcoutoctets - p0m.ifhcoutoctets)) as double)/(5*60)*8/(p0m.ifhighspeed*1000000) as unimax_util,
 case
 when (p5m.ifhcoutoctets - p0m.ifhcoutoctets) =0 or (p5m.ifhcoutoctets - p0m.ifhcoutoctets) is null then null
 else cast((p5m.ifhcinoctets - p0m.ifhcinoctets) as double)/(p5m.ifhcoutoctets - p0m.ifhcoutoctets)
 end as inout_ratio,
 p0m.ifhighspeed as bandwidth,
 p0m.ip as origin,
 now(),
 p0m.dt, p0m.hh, p0m.mi
from snmpprobehive.snmpprobe.p_snmp_ifxtable p0m
inner join snmpprobehive.snmpprobe.p_snmp_ifxtable p5m on p0m.id=p5m.id and p0m.mibindex=p5m.mibindex
inner join snmpprobehive.snmpprobe.rv_ip_port_hive ipp on ipp.did=p0m.id and ipp.ifindex=p0m.mibindex
where p5m.dt='2020-12-28' and p5m.hh='15' and p5m.mi='20' 
and p0m.dt='2020-12-28' and p0m.hh='15' and p0m.mi='15' 
and p0m.ifhighspeed > 0 
 {{and ipp.operstatus ='up'}}
{quote}
when insert into hive table, randomly get  java.io.FileNotFoundException error.
{quote}{{org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy}}
 \{{ at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)}}
 \{{ at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:89)}}
 \{{ at org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:240)}}
 \{{ at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyGlobalFailure(UpdateSchedulerNgOnInternalFailuresListener.java:65)}}
 \{{ at org.apache.flink.runtime.executiongraph.ExecutionGraph.failGlobal(ExecutionGraph.java:1055)}}
 \{{ at org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1305)}}
 \{{ at org.apache.flink.runtime.executiongraph.ExecutionVertex.executionFinished(ExecutionVertex.java:849)}}
 \{{ at org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:1127)}}
 \{{ at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateStateInternal(ExecutionGraph.java:1512)}}
 \{{ at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1485)}}
 \{{ at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:604)}}
 \{{ at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)}}
 \{{ at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)}}
 \{{ at sun.reflect.GeneratedMethodAccessor56.invoke(Unknown Source)}}
 \{{ at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)}}
 \{{ at java.lang.reflect.Method.invoke(Method.java:498)}}
 \{{ at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)}}
 \{{ at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)}}
 \{{ at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)}}
 \{{ at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)}}
 \{{ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)}}
 \{{ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)}}
 \{{ at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)}}
 \{{ at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)}}
 \{{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)}}
 \{{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)}}
 \{{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)}}
 \{{ at akka.actor.Actor$class.aroundReceive(Actor.scala:517)}}
 \{{ at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)}}
 \{{ at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)}}
 \{{ at akka.actor.ActorCell.invoke(ActorCell.scala:561)}}
 \{{ at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)}}
 \{{ at akka.dispatch.Mailbox.run(Mailbox.scala:225)}}
 \{{ at akka.dispatch.Mailbox.exec(Mailbox.scala:235)}}
 \{{ at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)}}
 \{{ at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)}}
 \{{ at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)}}
 \{{ at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)}}
 {{Caused by: java.lang.Exception: Failed to finalize execution on master}}
 \{{ ... 33 more}}
 {{Caused by: org.apache.flink.table.api.TableException: Exception in finalizeGlobal}}
 \{{ at org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:97)}}
 \{{ at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.finalizeOnMaster(InputOutputFormatVertex.java:131)}}
 \{{ at org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1299)}}
 \{{ ... 32 more}}
 {{Caused by: java.io.FileNotFoundException: File hdfs://service1/user/hive/warehouse/snmpprobe.db/p_port_packet_loss_5m/.staging_1609143677636 does not exist.}}
 \{{ at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:901)}}
 \{{ at org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:112)}}
 \{{ at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:961)}}
 \{{ at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:958)}}
 \{{ at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)}}
 \{{ at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:958)}}
 \{{ at org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:165)}}
 \{{ at org.apache.flink.table.filesystem.PartitionTempFileManager.headCheckpoints(PartitionTempFileManager.java:140)}}
 \{{ at org.apache.flink.table.filesystem.FileSystemCommitter.commitUpToCheckpoint(FileSystemCommitter.java:98)}}
 \{{ at org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:95)}}
 \{{ ... 34 more}}
{quote}

  was:
standalone cluster, batch mode,

periodically execute this sql with "sql-client.sh", only change date parameter:
{quote}{{insert overwrite snmpprobehive.snmpprobe.p_port_traffic_5m}}
{{select}}
{{ ipp.binaryid as binaryid,}}
{{ ipp.id as id,}}
{{ 'all' as ver,}}
{{ p0m.coltime as coltime,}}
{{ p5m.ifhcinoctets - p0m.ifhcinoctets as inoctets,}}
{{ p5m.ifhcoutoctets - p0m.ifhcoutoctets as outoctets,}}
{{ (p5m.ifhcinoctets - p0m.ifhcinoctets) + (p5m.ifhcoutoctets - p0m.ifhcoutoctets) as bi_octets,}}
{{ if((p5m.ifhcinoctets - p0m.ifhcinoctets) >= (p5m.ifhcoutoctets - p0m.ifhcoutoctets), (p5m.ifhcinoctets - p0m.ifhcinoctets), (p5m.ifhcoutoctets - p0m.ifhcoutoctets)) as unimax_octets,}}
{{ cast((p5m.ifhcinoctets - p0m.ifhcinoctets) as double)/(5*60)*8 as in_speed,}}
{{ cast((p5m.ifhcoutoctets - p0m.ifhcoutoctets) as double)/(5*60)*8 as out_speed,}}
{{ cast(((p5m.ifhcinoctets - p0m.ifhcinoctets) + (p5m.ifhcoutoctets - p0m.ifhcoutoctets)) as double)/(5*60)*8 as bi_speed,}}
{{ cast(if((p5m.ifhcinoctets - p0m.ifhcinoctets) >= (p5m.ifhcoutoctets - p0m.ifhcoutoctets), (p5m.ifhcinoctets - p0m.ifhcinoctets), (p5m.ifhcoutoctets - p0m.ifhcoutoctets)) as double)/(5*60)*8 unimax_speed,}}
{{ cast((p5m.ifhcinoctets - p0m.ifhcinoctets) as double)/(5*60)*8/(cast(p0m.ifhighspeed as bigint)*1000000) as in_util,}}
{{ cast((p5m.ifhcoutoctets - p0m.ifhcoutoctets) as double)/(5*60)*8/(cast(p0m.ifhighspeed as bigint)*1000000) as out_util,}}
{{ cast(((p5m.ifhcinoctets - p0m.ifhcinoctets) + (p5m.ifhcoutoctets - p0m.ifhcoutoctets)) as double)/(5*60)*8/(cast(p0m.ifhighspeed as bigint)*1000000*2) as bi_util,}}
{{ cast(if((p5m.ifhcinoctets - p0m.ifhcinoctets) >= (p5m.ifhcoutoctets - p0m.ifhcoutoctets), (p5m.ifhcinoctets - p0m.ifhcinoctets), (p5m.ifhcoutoctets - p0m.ifhcoutoctets)) as double)/(5*60)*8/(p0m.ifhighspeed*1000000) as unimax_util,}}
{{ case}}
{{ when (p5m.ifhcoutoctets - p0m.ifhcoutoctets) =0 or (p5m.ifhcoutoctets - p0m.ifhcoutoctets) is null then null}}
{{ else cast((p5m.ifhcinoctets - p0m.ifhcinoctets) as double)/(p5m.ifhcoutoctets - p0m.ifhcoutoctets)}}
{{ end as inout_ratio,}}
{{ p0m.ifhighspeed as bandwidth,}}
{{ p0m.ip as origin,}}
{{ now(),}}
{{ p0m.dt, p0m.hh, p0m.mi}}
{{from snmpprobehive.snmpprobe.p_snmp_ifxtable p0m}}
{{inner join snmpprobehive.snmpprobe.p_snmp_ifxtable p5m on p0m.id=p5m.id and p0m.mibindex=p5m.mibindex}}
{{inner join snmpprobehive.snmpprobe.rv_ip_port_hive ipp on ipp.did=p0m.id and ipp.ifindex=p0m.mibindex}}
{{where p5m.dt='2020-12-28' and p5m.hh='15' and p5m.mi='20' }}
{{and p0m.dt='2020-12-28' and p0m.hh='15' and p0m.mi='15' }}
{{and p0m.ifhighspeed > 0 }}
{{and ipp.operstatus ='up'}}
{quote}
when insert into hive table, randomly get  java.io.FileNotFoundException error.
{quote}{{org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy}}
{{ at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)}}
{{ at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:89)}}
{{ at org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:240)}}
{{ at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyGlobalFailure(UpdateSchedulerNgOnInternalFailuresListener.java:65)}}
{{ at org.apache.flink.runtime.executiongraph.ExecutionGraph.failGlobal(ExecutionGraph.java:1055)}}
{{ at org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1305)}}
{{ at org.apache.flink.runtime.executiongraph.ExecutionVertex.executionFinished(ExecutionVertex.java:849)}}
{{ at org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:1127)}}
{{ at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateStateInternal(ExecutionGraph.java:1512)}}
{{ at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1485)}}
{{ at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:604)}}
{{ at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)}}
{{ at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)}}
{{ at sun.reflect.GeneratedMethodAccessor56.invoke(Unknown Source)}}
{{ at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)}}
{{ at java.lang.reflect.Method.invoke(Method.java:498)}}
{{ at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)}}
{{ at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)}}
{{ at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)}}
{{ at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)}}
{{ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)}}
{{ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)}}
{{ at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)}}
{{ at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)}}
{{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)}}
{{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)}}
{{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)}}
{{ at akka.actor.Actor$class.aroundReceive(Actor.scala:517)}}
{{ at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)}}
{{ at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)}}
{{ at akka.actor.ActorCell.invoke(ActorCell.scala:561)}}
{{ at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)}}
{{ at akka.dispatch.Mailbox.run(Mailbox.scala:225)}}
{{ at akka.dispatch.Mailbox.exec(Mailbox.scala:235)}}
{{ at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)}}
{{ at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)}}
{{ at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)}}
{{ at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)}}
{{Caused by: java.lang.Exception: Failed to finalize execution on master}}
{{ ... 33 more}}
{{Caused by: org.apache.flink.table.api.TableException: Exception in finalizeGlobal}}
{{ at org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:97)}}
{{ at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.finalizeOnMaster(InputOutputFormatVertex.java:131)}}
{{ at org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1299)}}
{{ ... 32 more}}
{{Caused by: java.io.FileNotFoundException: File hdfs://service1/user/hive/warehouse/snmpprobe.db/p_port_packet_loss_5m/.staging_1609143677636 does not exist.}}
{{ at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:901)}}
{{ at org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:112)}}
{{ at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:961)}}
{{ at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:958)}}
{{ at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)}}
{{ at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:958)}}
{{ at org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:165)}}
{{ at org.apache.flink.table.filesystem.PartitionTempFileManager.headCheckpoints(PartitionTempFileManager.java:140)}}
{{ at org.apache.flink.table.filesystem.FileSystemCommitter.commitUpToCheckpoint(FileSystemCommitter.java:98)}}
{{ at org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:95)}}
{{ ... 34 more}}{quote}


> .staging_xxx does not exist, when insert into hive
> --------------------------------------------------
>
>                 Key: FLINK-20784
>                 URL: https://issues.apache.org/jira/browse/FLINK-20784
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Client, Table SQL / Runtime
>    Affects Versions: 1.12.0
>         Environment: standalone cluster, batch mode,
>            Reporter: macdoor615
>            Priority: Major
>             Fix For: 1.13.0, 1.12.1
>
>
> standalone cluster, batch mode,
> periodically execute this sql with "sql-client.sh", only change date parameter:
> {quote}{{insert overwrite snmpprobehive.snmpprobe.p_port_traffic_5m}}
>  {{select}}
>  ipp.binaryid as binaryid,
>  ipp.id as id,
>  'all' as ver,
>  p0m.coltime as coltime,
>  p5m.ifhcinoctets - p0m.ifhcinoctets as inoctets,
>  p5m.ifhcoutoctets - p0m.ifhcoutoctets as outoctets,
>  (p5m.ifhcinoctets - p0m.ifhcinoctets) + (p5m.ifhcoutoctets - p0m.ifhcoutoctets) as bi_octets,
>  if((p5m.ifhcinoctets - p0m.ifhcinoctets) >= (p5m.ifhcoutoctets - p0m.ifhcoutoctets), (p5m.ifhcinoctets - p0m.ifhcinoctets), (p5m.ifhcoutoctets - p0m.ifhcoutoctets)) as unimax_octets,
>  cast((p5m.ifhcinoctets - p0m.ifhcinoctets) as double)/(5*60)*8 as in_speed,
>  cast((p5m.ifhcoutoctets - p0m.ifhcoutoctets) as double)/(5*60)*8 as out_speed,
>  cast(((p5m.ifhcinoctets - p0m.ifhcinoctets) + (p5m.ifhcoutoctets - p0m.ifhcoutoctets)) as double)/(5*60)*8 as bi_speed,
>  cast(if((p5m.ifhcinoctets - p0m.ifhcinoctets) >= (p5m.ifhcoutoctets - p0m.ifhcoutoctets), (p5m.ifhcinoctets - p0m.ifhcinoctets), (p5m.ifhcoutoctets - p0m.ifhcoutoctets)) as double)/(5*60)*8 unimax_speed,
>  cast((p5m.ifhcinoctets - p0m.ifhcinoctets) as double)/(5*60)*8/(cast(p0m.ifhighspeed as bigint)*1000000) as in_util,
>  cast((p5m.ifhcoutoctets - p0m.ifhcoutoctets) as double)/(5*60)*8/(cast(p0m.ifhighspeed as bigint)*1000000) as out_util,
>  cast(((p5m.ifhcinoctets - p0m.ifhcinoctets) + (p5m.ifhcoutoctets - p0m.ifhcoutoctets)) as double)/(5*60)*8/(cast(p0m.ifhighspeed as bigint)*1000000*2) as bi_util,
>  cast(if((p5m.ifhcinoctets - p0m.ifhcinoctets) >= (p5m.ifhcoutoctets - p0m.ifhcoutoctets), (p5m.ifhcinoctets - p0m.ifhcinoctets), (p5m.ifhcoutoctets - p0m.ifhcoutoctets)) as double)/(5*60)*8/(p0m.ifhighspeed*1000000) as unimax_util,
>  case
>  when (p5m.ifhcoutoctets - p0m.ifhcoutoctets) =0 or (p5m.ifhcoutoctets - p0m.ifhcoutoctets) is null then null
>  else cast((p5m.ifhcinoctets - p0m.ifhcinoctets) as double)/(p5m.ifhcoutoctets - p0m.ifhcoutoctets)
>  end as inout_ratio,
>  p0m.ifhighspeed as bandwidth,
>  p0m.ip as origin,
>  now(),
>  p0m.dt, p0m.hh, p0m.mi
> from snmpprobehive.snmpprobe.p_snmp_ifxtable p0m
> inner join snmpprobehive.snmpprobe.p_snmp_ifxtable p5m on p0m.id=p5m.id and p0m.mibindex=p5m.mibindex
> inner join snmpprobehive.snmpprobe.rv_ip_port_hive ipp on ipp.did=p0m.id and ipp.ifindex=p0m.mibindex
> where p5m.dt='2020-12-28' and p5m.hh='15' and p5m.mi='20' 
> and p0m.dt='2020-12-28' and p0m.hh='15' and p0m.mi='15' 
> and p0m.ifhighspeed > 0 
>  {{and ipp.operstatus ='up'}}
> {quote}
> when insert into hive table, randomly get  java.io.FileNotFoundException error.
> {quote}{{org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy}}
>  \{{ at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)}}
>  \{{ at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:89)}}
>  \{{ at org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:240)}}
>  \{{ at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyGlobalFailure(UpdateSchedulerNgOnInternalFailuresListener.java:65)}}
>  \{{ at org.apache.flink.runtime.executiongraph.ExecutionGraph.failGlobal(ExecutionGraph.java:1055)}}
>  \{{ at org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1305)}}
>  \{{ at org.apache.flink.runtime.executiongraph.ExecutionVertex.executionFinished(ExecutionVertex.java:849)}}
>  \{{ at org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:1127)}}
>  \{{ at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateStateInternal(ExecutionGraph.java:1512)}}
>  \{{ at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1485)}}
>  \{{ at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:604)}}
>  \{{ at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)}}
>  \{{ at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)}}
>  \{{ at sun.reflect.GeneratedMethodAccessor56.invoke(Unknown Source)}}
>  \{{ at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)}}
>  \{{ at java.lang.reflect.Method.invoke(Method.java:498)}}
>  \{{ at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)}}
>  \{{ at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)}}
>  \{{ at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)}}
>  \{{ at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)}}
>  \{{ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)}}
>  \{{ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)}}
>  \{{ at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)}}
>  \{{ at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)}}
>  \{{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)}}
>  \{{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)}}
>  \{{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)}}
>  \{{ at akka.actor.Actor$class.aroundReceive(Actor.scala:517)}}
>  \{{ at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)}}
>  \{{ at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)}}
>  \{{ at akka.actor.ActorCell.invoke(ActorCell.scala:561)}}
>  \{{ at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)}}
>  \{{ at akka.dispatch.Mailbox.run(Mailbox.scala:225)}}
>  \{{ at akka.dispatch.Mailbox.exec(Mailbox.scala:235)}}
>  \{{ at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)}}
>  \{{ at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)}}
>  \{{ at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)}}
>  \{{ at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)}}
>  {{Caused by: java.lang.Exception: Failed to finalize execution on master}}
>  \{{ ... 33 more}}
>  {{Caused by: org.apache.flink.table.api.TableException: Exception in finalizeGlobal}}
>  \{{ at org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:97)}}
>  \{{ at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.finalizeOnMaster(InputOutputFormatVertex.java:131)}}
>  \{{ at org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1299)}}
>  \{{ ... 32 more}}
>  {{Caused by: java.io.FileNotFoundException: File hdfs://service1/user/hive/warehouse/snmpprobe.db/p_port_packet_loss_5m/.staging_1609143677636 does not exist.}}
>  \{{ at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:901)}}
>  \{{ at org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:112)}}
>  \{{ at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:961)}}
>  \{{ at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:958)}}
>  \{{ at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)}}
>  \{{ at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:958)}}
>  \{{ at org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:165)}}
>  \{{ at org.apache.flink.table.filesystem.PartitionTempFileManager.headCheckpoints(PartitionTempFileManager.java:140)}}
>  \{{ at org.apache.flink.table.filesystem.FileSystemCommitter.commitUpToCheckpoint(FileSystemCommitter.java:98)}}
>  \{{ at org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:95)}}
>  \{{ ... 34 more}}
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)