You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Peihui He <pe...@gmail.com> on 2020/09/04 10:02:54 UTC

flink sql 1.11.1 FileSystem SQL Connector path directory slow

hi, all
我这边用flink sql client 创建表的时候

CREATE TABLE MyUserTable (
  column_name1 INT,
  column_name2 STRING,) PARTITIONED BY (part_name1, part_name2) WITH (
  'connector' = 'filesystem',           -- required: specify the connector
  'path' = 'file:///path/to/whatever',  -- required: path to a directory
  'format' = 'json',                     -- required: file system connector)

当path后面多一个"/"时, 比如: 'path' = 'file:///path/to/whatever/'
sql client 提交job会很慢,最后会报错

Caused by: org.apache.flink.runtime.rest.util.RestClientException:
[Internal server error., <Exception on server side:
org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
already been submitted. at
org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:280)
at sun.reflect.GeneratedMethodAccessor127.invoke(Unknown Source) at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
akka.actor.Actor$class.aroundReceive(Actor.scala:517) at
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at
akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at
akka.actor.ActorCell.invoke(ActorCell.scala:561) at
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at
akka.dispatch.Mailbox.run(Mailbox.scala:225) at
akka.dispatch.Mailbox.exec(Mailbox.scala:235) at
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
End of exception on server side>] at
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
at
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)


flink session cluster job 页面基本上都打不开,要过好久才可以。最后看到job 确实提交成功了。

这种情况不知道有没有遇到过?

Best Wishes.

Re: flink sql 1.11.1 FileSystem SQL Connector path directory slow

Posted by Peihui He <pe...@gmail.com>.
Hi,

就是用hdfs的。

Jingsong Li <ji...@gmail.com> 于2020年9月7日周一 上午11:16写道:

> 另外,可能和使用本地文件系统有关?换成HDFS试试?
>
> On Mon, Sep 7, 2020 at 11:15 AM Jingsong Li <ji...@gmail.com>
> wrote:
>
> > Hi,
> >
> > 可以在JobMaster里面看一下jstack吗?看下具体卡在哪里?
> >
> > On Sat, Sep 5, 2020 at 11:11 PM Peihui He <pe...@gmail.com> wrote:
> >
> >> Hi, all
> >>
> >> 经过这几天的测试发现,当hdfs目录下的文件比较多的时候就是出现上述情况,比如我这边文件个数接近2k个。
> >> 简单的测试当文件个数为1到2个的时候会很快提交job,并且flink session web 页面也没有感觉到卡着。
> >>
> >> 请问有什么好的解决方式没呢?
> >>
> >> Best Wishes.
> >>
> >> Peihui He <pe...@gmail.com> 于2020年9月4日周五 下午6:25写道:
> >>
> >>> Hi, all
> >>>
> >>> 当指定partition的时候这个问题通过path 也没法解决了
> >>>
> >>> CREATE TABLE MyUserTable (
> >>>   column_name1 INT,
> >>>   column_name2 STRING,  dt string,) PARTITIONED BY (dt) WITH (
> >>>   'connector' = 'filesystem',           -- required: specify the
> connector
> >>>   'path' = 'file:///path/to/whatever',  -- required: path to a
> directory
> >>>   'format' = 'json',                     -- required: file system
> connector)
> >>>
> >>>
> >>> select  * from  MyUserTable  limit 10;
> >>>
> >>> job 会一直卡在一个地方
> >>> [image: image.png]
> >>>
> >>> 这种改怎么解决呢?
> >>>
> >>> Peihui He <pe...@gmail.com> 于2020年9月4日周五 下午6:02写道:
> >>>
> >>>> hi, all
> >>>> 我这边用flink sql client 创建表的时候
> >>>>
> >>>> CREATE TABLE MyUserTable (
> >>>>   column_name1 INT,
> >>>>   column_name2 STRING,) PARTITIONED BY (part_name1, part_name2) WITH (
> >>>>   'connector' = 'filesystem',           -- required: specify the
> connector
> >>>>   'path' = 'file:///path/to/whatever',  -- required: path to a
> directory
> >>>>   'format' = 'json',                     -- required: file system
> connector)
> >>>>
> >>>> 当path后面多一个"/"时, 比如: 'path' = 'file:///path/to/whatever/'
> >>>> sql client 提交job会很慢,最后会报错
> >>>>
> >>>> Caused by: org.apache.flink.runtime.rest.util.RestClientException:
> >>>> [Internal server error., <Exception on server side:
> >>>> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job
> has
> >>>> already been submitted. at
> >>>>
> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:280)
> >>>> at sun.reflect.GeneratedMethodAccessor127.invoke(Unknown Source) at
> >>>>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >>>> at java.lang.reflect.Method.invoke(Method.java:498) at
> >>>>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
> >>>> at
> >>>>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
> >>>> at
> >>>>
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> >>>> at
> >>>>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> >>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at
> >>>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at
> >>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at
> >>>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at
> >>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at
> >>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
> >>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
> >>>> akka.actor.Actor$class.aroundReceive(Actor.scala:517) at
> >>>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at
> >>>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at
> >>>> akka.actor.ActorCell.invoke(ActorCell.scala:561) at
> >>>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at
> >>>> akka.dispatch.Mailbox.run(Mailbox.scala:225) at
> >>>> akka.dispatch.Mailbox.exec(Mailbox.scala:235) at
> >>>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
> >>>>
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >>>> at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
> >>>>
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >>>> End of exception on server side>] at
> >>>>
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
> >>>> at
> >>>>
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
> >>>> at
> >>>>
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
> >>>> at
> >>>>
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
> >>>>
> >>>>
> >>>> flink session cluster job 页面基本上都打不开,要过好久才可以。最后看到job 确实提交成功了。
> >>>>
> >>>> 这种情况不知道有没有遇到过?
> >>>>
> >>>> Best Wishes.
> >>>>
> >>>>
> >>>>
> >>>
> >
> > --
> > Best, Jingsong Lee
> >
>
>
> --
> Best, Jingsong Lee
>

Re: flink sql 1.11.1 FileSystem SQL Connector path directory slow

Posted by Jingsong Li <ji...@gmail.com>.
另外,可能和使用本地文件系统有关?换成HDFS试试?

On Mon, Sep 7, 2020 at 11:15 AM Jingsong Li <ji...@gmail.com> wrote:

> Hi,
>
> 可以在JobMaster里面看一下jstack吗?看下具体卡在哪里?
>
> On Sat, Sep 5, 2020 at 11:11 PM Peihui He <pe...@gmail.com> wrote:
>
>> Hi, all
>>
>> 经过这几天的测试发现,当hdfs目录下的文件比较多的时候就是出现上述情况,比如我这边文件个数接近2k个。
>> 简单的测试当文件个数为1到2个的时候会很快提交job,并且flink session web 页面也没有感觉到卡着。
>>
>> 请问有什么好的解决方式没呢?
>>
>> Best Wishes.
>>
>> Peihui He <pe...@gmail.com> 于2020年9月4日周五 下午6:25写道:
>>
>>> Hi, all
>>>
>>> 当指定partition的时候这个问题通过path 也没法解决了
>>>
>>> CREATE TABLE MyUserTable (
>>>   column_name1 INT,
>>>   column_name2 STRING,  dt string,) PARTITIONED BY (dt) WITH (
>>>   'connector' = 'filesystem',           -- required: specify the connector
>>>   'path' = 'file:///path/to/whatever',  -- required: path to a directory
>>>   'format' = 'json',                     -- required: file system connector)
>>>
>>>
>>> select  * from  MyUserTable  limit 10;
>>>
>>> job 会一直卡在一个地方
>>> [image: image.png]
>>>
>>> 这种改怎么解决呢?
>>>
>>> Peihui He <pe...@gmail.com> 于2020年9月4日周五 下午6:02写道:
>>>
>>>> hi, all
>>>> 我这边用flink sql client 创建表的时候
>>>>
>>>> CREATE TABLE MyUserTable (
>>>>   column_name1 INT,
>>>>   column_name2 STRING,) PARTITIONED BY (part_name1, part_name2) WITH (
>>>>   'connector' = 'filesystem',           -- required: specify the connector
>>>>   'path' = 'file:///path/to/whatever',  -- required: path to a directory
>>>>   'format' = 'json',                     -- required: file system connector)
>>>>
>>>> 当path后面多一个"/"时, 比如: 'path' = 'file:///path/to/whatever/'
>>>> sql client 提交job会很慢,最后会报错
>>>>
>>>> Caused by: org.apache.flink.runtime.rest.util.RestClientException:
>>>> [Internal server error., <Exception on server side:
>>>> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
>>>> already been submitted. at
>>>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:280)
>>>> at sun.reflect.GeneratedMethodAccessor127.invoke(Unknown Source) at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> at java.lang.reflect.Method.invoke(Method.java:498) at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at
>>>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at
>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at
>>>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at
>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at
>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
>>>> akka.actor.Actor$class.aroundReceive(Actor.scala:517) at
>>>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at
>>>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at
>>>> akka.actor.ActorCell.invoke(ActorCell.scala:561) at
>>>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at
>>>> akka.dispatch.Mailbox.run(Mailbox.scala:225) at
>>>> akka.dispatch.Mailbox.exec(Mailbox.scala:235) at
>>>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
>>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> End of exception on server side>] at
>>>> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
>>>> at
>>>> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
>>>> at
>>>> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
>>>> at
>>>> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>>>>
>>>>
>>>> flink session cluster job 页面基本上都打不开,要过好久才可以。最后看到job 确实提交成功了。
>>>>
>>>> 这种情况不知道有没有遇到过?
>>>>
>>>> Best Wishes.
>>>>
>>>>
>>>>
>>>
>
> --
> Best, Jingsong Lee
>


-- 
Best, Jingsong Lee

Re: flink sql 1.11.1 FileSystem SQL Connector path directory slow

Posted by Peihui He <pe...@gmail.com>.
Hi,

详细jstack信息,见附件。

Peihui He <pe...@gmail.com> 于2020年9月7日周一 下午7:22写道:

> Hi,
>
> 从jstack 分析,因该是卡在下面这里了。看代码好像是需要遍历所有hdfs上指定path的文件。是这样的不?如果文件很多的话不是要很慢?
>
>
> "flink-akka.actor.default-dispatcher-30" #103 prio=5 os_prio=0
> tid=0x00007f6264001000 nid=0x4a93 in Object.wait() [0x00007f62964f1000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.Object.wait(Object.java:502)
> at org.apache.hadoop.util.concurrent.AsyncGet$Util.wait(AsyncGet.java:59)
> at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1533)
> - locked <0x00000000ebd49e50> (a org.apache.hadoop.ipc.Client$Call)
> at org.apache.hadoop.ipc.Client.call(Client.java:1491)
> at org.apache.hadoop.ipc.Client.call(Client.java:1388)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:118)
> at com.sun.proxy.$Proxy45.getBlockLocations(Unknown Source)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:324)
> at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
> - locked <0x00000000ebd49d40> (a
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
> at com.sun.proxy.$Proxy46.getBlockLocations(Unknown Source)
> at
> org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:864)
> at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:853)
> at org.apache.hadoop.hdfs.DFSClient.getBlockLocations(DFSClient.java:910)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$2.doCall(DistributedFileSystem.java:267)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$2.doCall(DistributedFileSystem.java:264)
> at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:274)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:248)
> at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileBlockLocations(HadoopFileSystem.java:98)
> at
> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:652)
> at
> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:62)
> at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:258)
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:814)
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:228)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:229)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:119)
> at
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)
> at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:272)
> at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
> at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
> at
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:140)
> at
> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388)
> at
> org.apache.flink.runtime.dispatcher.Dispatcher$$Lambda$241/1691741073.get(Unknown
> Source)
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
> Jingsong Li <ji...@gmail.com> 于2020年9月7日周一 上午11:15写道:
>
>> Hi,
>>
>> 可以在JobMaster里面看一下jstack吗?看下具体卡在哪里?
>>
>> On Sat, Sep 5, 2020 at 11:11 PM Peihui He <pe...@gmail.com> wrote:
>>
>> > Hi, all
>> >
>> > 经过这几天的测试发现,当hdfs目录下的文件比较多的时候就是出现上述情况,比如我这边文件个数接近2k个。
>> > 简单的测试当文件个数为1到2个的时候会很快提交job,并且flink session web 页面也没有感觉到卡着。
>> >
>> > 请问有什么好的解决方式没呢?
>> >
>> > Best Wishes.
>> >
>> > Peihui He <pe...@gmail.com> 于2020年9月4日周五 下午6:25写道:
>> >
>> >> Hi, all
>> >>
>> >> 当指定partition的时候这个问题通过path 也没法解决了
>> >>
>> >> CREATE TABLE MyUserTable (
>> >>   column_name1 INT,
>> >>   column_name2 STRING,  dt string,) PARTITIONED BY (dt) WITH (
>> >>   'connector' = 'filesystem',           -- required: specify the
>> connector
>> >>   'path' = 'file:///path/to/whatever',  -- required: path to a
>> directory
>> >>   'format' = 'json',                     -- required: file system
>> connector)
>> >>
>> >>
>> >> select  * from  MyUserTable  limit 10;
>> >>
>> >> job 会一直卡在一个地方
>> >> [image: image.png]
>> >>
>> >> 这种改怎么解决呢?
>> >>
>> >> Peihui He <pe...@gmail.com> 于2020年9月4日周五 下午6:02写道:
>> >>
>> >>> hi, all
>> >>> 我这边用flink sql client 创建表的时候
>> >>>
>> >>> CREATE TABLE MyUserTable (
>> >>>   column_name1 INT,
>> >>>   column_name2 STRING,) PARTITIONED BY (part_name1, part_name2) WITH (
>> >>>   'connector' = 'filesystem',           -- required: specify the
>> connector
>> >>>   'path' = 'file:///path/to/whatever',  -- required: path to a
>> directory
>> >>>   'format' = 'json',                     -- required: file system
>> connector)
>> >>>
>> >>> 当path后面多一个"/"时, 比如: 'path' = 'file:///path/to/whatever/'
>> >>> sql client 提交job会很慢,最后会报错
>> >>>
>> >>> Caused by: org.apache.flink.runtime.rest.util.RestClientException:
>> >>> [Internal server error., <Exception on server side:
>> >>> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job
>> has
>> >>> already been submitted. at
>> >>>
>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:280)
>> >>> at sun.reflect.GeneratedMethodAccessor127.invoke(Unknown Source) at
>> >>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >>> at java.lang.reflect.Method.invoke(Method.java:498) at
>> >>>
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
>> >>> at
>> >>>
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
>> >>> at
>> >>>
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>> >>> at
>> >>>
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>> >>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at
>> >>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at
>> >>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at
>> >>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> at
>> >>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at
>> >>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
>> >>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
>> >>> akka.actor.Actor$class.aroundReceive(Actor.scala:517) at
>> >>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at
>> >>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at
>> >>> akka.actor.ActorCell.invoke(ActorCell.scala:561) at
>> >>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at
>> >>> akka.dispatch.Mailbox.run(Mailbox.scala:225) at
>> >>> akka.dispatch.Mailbox.exec(Mailbox.scala:235) at
>> >>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
>> >>>
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> >>> at
>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
>> >>>
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> >>> End of exception on server side>] at
>> >>>
>> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
>> >>> at
>> >>>
>> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
>> >>> at
>> >>>
>> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
>> >>> at
>> >>>
>> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>> >>>
>> >>>
>> >>> flink session cluster job 页面基本上都打不开,要过好久才可以。最后看到job 确实提交成功了。
>> >>>
>> >>> 这种情况不知道有没有遇到过?
>> >>>
>> >>> Best Wishes.
>> >>>
>> >>>
>> >>>
>> >>
>>
>> --
>> Best, Jingsong Lee
>>
>

Re: flink sql 1.11.1 FileSystem SQL Connector path directory slow

Posted by Peihui He <pe...@gmail.com>.
Hi,

从jstack 分析,因该是卡在下面这里了。看代码好像是需要遍历所有hdfs上指定path的文件。是这样的不?如果文件很多的话不是要很慢?


"flink-akka.actor.default-dispatcher-30" #103 prio=5 os_prio=0
tid=0x00007f6264001000 nid=0x4a93 in Object.wait() [0x00007f62964f1000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:502)
at org.apache.hadoop.util.concurrent.AsyncGet$Util.wait(AsyncGet.java:59)
at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1533)
- locked <0x00000000ebd49e50> (a org.apache.hadoop.ipc.Client$Call)
at org.apache.hadoop.ipc.Client.call(Client.java:1491)
at org.apache.hadoop.ipc.Client.call(Client.java:1388)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:118)
at com.sun.proxy.$Proxy45.getBlockLocations(Unknown Source)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:324)
at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
- locked <0x00000000ebd49d40> (a
org.apache.hadoop.io.retry.RetryInvocationHandler$Call)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
at com.sun.proxy.$Proxy46.getBlockLocations(Unknown Source)
at
org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:864)
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:853)
at org.apache.hadoop.hdfs.DFSClient.getBlockLocations(DFSClient.java:910)
at
org.apache.hadoop.hdfs.DistributedFileSystem$2.doCall(DistributedFileSystem.java:267)
at
org.apache.hadoop.hdfs.DistributedFileSystem$2.doCall(DistributedFileSystem.java:264)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:274)
at
org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:248)
at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileBlockLocations(HadoopFileSystem.java:98)
at
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:652)
at
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:62)
at
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:258)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:814)
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:228)
at
org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269)
at
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242)
at
org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:229)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:119)
at
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
at
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:272)
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
at
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:140)
at
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388)
at
org.apache.flink.runtime.dispatcher.Dispatcher$$Lambda$241/1691741073.get(Unknown
Source)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



Jingsong Li <ji...@gmail.com> 于2020年9月7日周一 上午11:15写道:

> Hi,
>
> 可以在JobMaster里面看一下jstack吗?看下具体卡在哪里?
>
> On Sat, Sep 5, 2020 at 11:11 PM Peihui He <pe...@gmail.com> wrote:
>
> > Hi, all
> >
> > 经过这几天的测试发现,当hdfs目录下的文件比较多的时候就是出现上述情况,比如我这边文件个数接近2k个。
> > 简单的测试当文件个数为1到2个的时候会很快提交job,并且flink session web 页面也没有感觉到卡着。
> >
> > 请问有什么好的解决方式没呢?
> >
> > Best Wishes.
> >
> > Peihui He <pe...@gmail.com> 于2020年9月4日周五 下午6:25写道:
> >
> >> Hi, all
> >>
> >> 当指定partition的时候这个问题通过path 也没法解决了
> >>
> >> CREATE TABLE MyUserTable (
> >>   column_name1 INT,
> >>   column_name2 STRING,  dt string,) PARTITIONED BY (dt) WITH (
> >>   'connector' = 'filesystem',           -- required: specify the
> connector
> >>   'path' = 'file:///path/to/whatever',  -- required: path to a directory
> >>   'format' = 'json',                     -- required: file system
> connector)
> >>
> >>
> >> select  * from  MyUserTable  limit 10;
> >>
> >> job 会一直卡在一个地方
> >> [image: image.png]
> >>
> >> 这种改怎么解决呢?
> >>
> >> Peihui He <pe...@gmail.com> 于2020年9月4日周五 下午6:02写道:
> >>
> >>> hi, all
> >>> 我这边用flink sql client 创建表的时候
> >>>
> >>> CREATE TABLE MyUserTable (
> >>>   column_name1 INT,
> >>>   column_name2 STRING,) PARTITIONED BY (part_name1, part_name2) WITH (
> >>>   'connector' = 'filesystem',           -- required: specify the
> connector
> >>>   'path' = 'file:///path/to/whatever',  -- required: path to a
> directory
> >>>   'format' = 'json',                     -- required: file system
> connector)
> >>>
> >>> 当path后面多一个"/"时, 比如: 'path' = 'file:///path/to/whatever/'
> >>> sql client 提交job会很慢,最后会报错
> >>>
> >>> Caused by: org.apache.flink.runtime.rest.util.RestClientException:
> >>> [Internal server error., <Exception on server side:
> >>> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job
> has
> >>> already been submitted. at
> >>>
> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:280)
> >>> at sun.reflect.GeneratedMethodAccessor127.invoke(Unknown Source) at
> >>>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >>> at java.lang.reflect.Method.invoke(Method.java:498) at
> >>>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
> >>> at
> >>>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
> >>> at
> >>>
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> >>> at
> >>>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> >>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at
> >>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at
> >>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at
> >>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at
> >>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at
> >>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
> >>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
> >>> akka.actor.Actor$class.aroundReceive(Actor.scala:517) at
> >>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at
> >>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at
> >>> akka.actor.ActorCell.invoke(ActorCell.scala:561) at
> >>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at
> >>> akka.dispatch.Mailbox.run(Mailbox.scala:225) at
> >>> akka.dispatch.Mailbox.exec(Mailbox.scala:235) at
> >>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
> >>>
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >>> at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
> >>>
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >>> End of exception on server side>] at
> >>>
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
> >>> at
> >>>
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
> >>> at
> >>>
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
> >>> at
> >>>
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
> >>>
> >>>
> >>> flink session cluster job 页面基本上都打不开,要过好久才可以。最后看到job 确实提交成功了。
> >>>
> >>> 这种情况不知道有没有遇到过?
> >>>
> >>> Best Wishes.
> >>>
> >>>
> >>>
> >>
>
> --
> Best, Jingsong Lee
>

Re: flink sql 1.11.1 FileSystem SQL Connector path directory slow

Posted by Jingsong Li <ji...@gmail.com>.
Hi,

可以在JobMaster里面看一下jstack吗?看下具体卡在哪里?

On Sat, Sep 5, 2020 at 11:11 PM Peihui He <pe...@gmail.com> wrote:

> Hi, all
>
> 经过这几天的测试发现,当hdfs目录下的文件比较多的时候就是出现上述情况,比如我这边文件个数接近2k个。
> 简单的测试当文件个数为1到2个的时候会很快提交job,并且flink session web 页面也没有感觉到卡着。
>
> 请问有什么好的解决方式没呢?
>
> Best Wishes.
>
> Peihui He <pe...@gmail.com> 于2020年9月4日周五 下午6:25写道:
>
>> Hi, all
>>
>> 当指定partition的时候这个问题通过path 也没法解决了
>>
>> CREATE TABLE MyUserTable (
>>   column_name1 INT,
>>   column_name2 STRING,  dt string,) PARTITIONED BY (dt) WITH (
>>   'connector' = 'filesystem',           -- required: specify the connector
>>   'path' = 'file:///path/to/whatever',  -- required: path to a directory
>>   'format' = 'json',                     -- required: file system connector)
>>
>>
>> select  * from  MyUserTable  limit 10;
>>
>> job 会一直卡在一个地方
>> [image: image.png]
>>
>> 这种改怎么解决呢?
>>
>> Peihui He <pe...@gmail.com> 于2020年9月4日周五 下午6:02写道:
>>
>>> hi, all
>>> 我这边用flink sql client 创建表的时候
>>>
>>> CREATE TABLE MyUserTable (
>>>   column_name1 INT,
>>>   column_name2 STRING,) PARTITIONED BY (part_name1, part_name2) WITH (
>>>   'connector' = 'filesystem',           -- required: specify the connector
>>>   'path' = 'file:///path/to/whatever',  -- required: path to a directory
>>>   'format' = 'json',                     -- required: file system connector)
>>>
>>> 当path后面多一个"/"时, 比如: 'path' = 'file:///path/to/whatever/'
>>> sql client 提交job会很慢,最后会报错
>>>
>>> Caused by: org.apache.flink.runtime.rest.util.RestClientException:
>>> [Internal server error., <Exception on server side:
>>> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
>>> already been submitted. at
>>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:280)
>>> at sun.reflect.GeneratedMethodAccessor127.invoke(Unknown Source) at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498) at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
>>> at
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at
>>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at
>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at
>>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at
>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at
>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
>>> akka.actor.Actor$class.aroundReceive(Actor.scala:517) at
>>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at
>>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at
>>> akka.actor.ActorCell.invoke(ActorCell.scala:561) at
>>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at
>>> akka.dispatch.Mailbox.run(Mailbox.scala:225) at
>>> akka.dispatch.Mailbox.exec(Mailbox.scala:235) at
>>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> End of exception on server side>] at
>>> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
>>> at
>>> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
>>> at
>>> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
>>> at
>>> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>>>
>>>
>>> flink session cluster job 页面基本上都打不开,要过好久才可以。最后看到job 确实提交成功了。
>>>
>>> 这种情况不知道有没有遇到过?
>>>
>>> Best Wishes.
>>>
>>>
>>>
>>

-- 
Best, Jingsong Lee

Re: flink sql 1.11.1 FileSystem SQL Connector path directory slow

Posted by Peihui He <pe...@gmail.com>.
Hi, all

经过这几天的测试发现,当hdfs目录下的文件比较多的时候就是出现上述情况,比如我这边文件个数接近2k个。
简单的测试当文件个数为1到2个的时候会很快提交job,并且flink session web 页面也没有感觉到卡着。

请问有什么好的解决方式没呢?

Best Wishes.

Peihui He <pe...@gmail.com> 于2020年9月4日周五 下午6:25写道:

> Hi, all
>
> 当指定partition的时候这个问题通过path 也没法解决了
>
> CREATE TABLE MyUserTable (
>   column_name1 INT,
>   column_name2 STRING,  dt string,) PARTITIONED BY (dt) WITH (
>   'connector' = 'filesystem',           -- required: specify the connector
>   'path' = 'file:///path/to/whatever',  -- required: path to a directory
>   'format' = 'json',                     -- required: file system connector)
>
>
> select  * from  MyUserTable  limit 10;
>
> job 会一直卡在一个地方
> [image: image.png]
>
> 这种改怎么解决呢?
>
> Peihui He <pe...@gmail.com> 于2020年9月4日周五 下午6:02写道:
>
>> hi, all
>> 我这边用flink sql client 创建表的时候
>>
>> CREATE TABLE MyUserTable (
>>   column_name1 INT,
>>   column_name2 STRING,) PARTITIONED BY (part_name1, part_name2) WITH (
>>   'connector' = 'filesystem',           -- required: specify the connector
>>   'path' = 'file:///path/to/whatever',  -- required: path to a directory
>>   'format' = 'json',                     -- required: file system connector)
>>
>> 当path后面多一个"/"时, 比如: 'path' = 'file:///path/to/whatever/'
>> sql client 提交job会很慢,最后会报错
>>
>> Caused by: org.apache.flink.runtime.rest.util.RestClientException:
>> [Internal server error., <Exception on server side:
>> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
>> already been submitted. at
>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:280)
>> at sun.reflect.GeneratedMethodAccessor127.invoke(Unknown Source) at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498) at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at
>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at
>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at
>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
>> akka.actor.Actor$class.aroundReceive(Actor.scala:517) at
>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at
>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at
>> akka.actor.ActorCell.invoke(ActorCell.scala:561) at
>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at
>> akka.dispatch.Mailbox.run(Mailbox.scala:225) at
>> akka.dispatch.Mailbox.exec(Mailbox.scala:235) at
>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> End of exception on server side>] at
>> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
>> at
>> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
>> at
>> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
>> at
>> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>>
>>
>> flink session cluster job 页面基本上都打不开,要过好久才可以。最后看到job 确实提交成功了。
>>
>> 这种情况不知道有没有遇到过?
>>
>> Best Wishes.
>>
>>
>>
>

Re: flink sql 1.11.1 FileSystem SQL Connector path directory slow

Posted by Peihui He <pe...@gmail.com>.
Hi, all

当指定partition的时候这个问题通过path 也没法解决了

CREATE TABLE MyUserTable (
  column_name1 INT,
  column_name2 STRING,  dt string,) PARTITIONED BY (dt) WITH (
  'connector' = 'filesystem',           -- required: specify the connector
  'path' = 'file:///path/to/whatever',  -- required: path to a directory
  'format' = 'json',                     -- required: file system connector)


select  * from  MyUserTable  limit 10;

job 会一直卡在一个地方
[image: image.png]

这种改怎么解决呢?

Peihui He <pe...@gmail.com> 于2020年9月4日周五 下午6:02写道:

> hi, all
> 我这边用flink sql client 创建表的时候
>
> CREATE TABLE MyUserTable (
>   column_name1 INT,
>   column_name2 STRING,) PARTITIONED BY (part_name1, part_name2) WITH (
>   'connector' = 'filesystem',           -- required: specify the connector
>   'path' = 'file:///path/to/whatever',  -- required: path to a directory
>   'format' = 'json',                     -- required: file system connector)
>
> 当path后面多一个"/"时, 比如: 'path' = 'file:///path/to/whatever/'
> sql client 提交job会很慢,最后会报错
>
> Caused by: org.apache.flink.runtime.rest.util.RestClientException:
> [Internal server error., <Exception on server side:
> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
> already been submitted. at
> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:280)
> at sun.reflect.GeneratedMethodAccessor127.invoke(Unknown Source) at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498) at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
> akka.actor.Actor$class.aroundReceive(Actor.scala:517) at
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at
> akka.actor.ActorCell.invoke(ActorCell.scala:561) at
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at
> akka.dispatch.Mailbox.run(Mailbox.scala:225) at
> akka.dispatch.Mailbox.exec(Mailbox.scala:235) at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> End of exception on server side>] at
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
> at
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
> at
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
> at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>
>
> flink session cluster job 页面基本上都打不开,要过好久才可以。最后看到job 确实提交成功了。
>
> 这种情况不知道有没有遇到过?
>
> Best Wishes.
>
>
>