You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by guanyq <dl...@163.com> on 2020/06/08 13:48:33 UTC

flink1.9 on yarn 消费kafka数据中文乱码

kafka 0.11版本
首先kafka source topic数据是正常的,kafka客户端消费出来无中文乱码问题
1.本地idea debug运行,无中文乱码问题
2.服务器Standalone模式运行,无中文乱码问题
3.服务器on yarn提交方式,就出现中文乱码问题


flink 消费kafka的api用的是这个
new FlinkKafkaConsumer<>(topicList, new SimpleStringSchema(), props);


根据1,2,3分析问题可能和yarn有关系。请教一下大佬们,还需要怎么调查,才能解决这个问题。

回复: flink1.9 on yarn

Posted by "17610775726@163.com" <17...@163.com>.
问题1

./bin/flink run -m yarn-cluster每次提交是都会产生一个新的APP-ID如:application_1567067657620_0254

当yarn application -kill application_1567067657620_0254后,

在提交./bin/flink run -m yarn-cluster如何不让这个appid递增?

问题2

./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上?

 

Re:Re:flink1.9 on yarn 消费kafka数据中文乱码

Posted by guanyq <dl...@163.com>.
非常感谢,问题解决了!

















在 2020-06-09 08:27:47,"马阳阳" <ma...@163.com> 写道:
>
>
>
>我们也遇到过这个问题,我们当时遇到的问题是YARN NM上的默认charset是ascii。
>通过在flink-conf.yaml文件里添加如下配置解决了该问题:
>env.java.opts.taskmanager: "-Dfile.encoding=UTF-8"
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-06-08 21:48:33,"guanyq" <dl...@163.com> 写道:
>>kafka 0.11版本
>>首先kafka source topic数据是正常的,kafka客户端消费出来无中文乱码问题
>>1.本地idea debug运行,无中文乱码问题
>>2.服务器Standalone模式运行,无中文乱码问题
>>3.服务器on yarn提交方式,就出现中文乱码问题
>>
>>
>>flink 消费kafka的api用的是这个
>>new FlinkKafkaConsumer<>(topicList, new SimpleStringSchema(), props);
>>
>>
>>根据1,2,3分析问题可能和yarn有关系。请教一下大佬们,还需要怎么调查,才能解决这个问题。

Re:flink1.9 on yarn

Posted by Roc Marshal <fl...@126.com>.
Hi, guanyq.

关于问题1:在提交./bin/flink run -m yarn-cluster如何不让这个appid递增?
这个appid的自增策略并不是根据Flink负责生成,如果有必要,你可以对hadoop-yarn进行调研,并做出你的结论。



关于问题2 ./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上?
我是否可以理解为,flink yarn-session模式的集群更适合你的作业需求呢?因为在问题中提到的提交方式为per-job,job关闭后,Flink即关闭集群。
可参考: https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html#start-flink-session
Best,
Roc Marshal

在 2020-06-28 09:09:43,"guanyq" <dl...@163.com> 写道:
>问题1
>
>./bin/flink run -m yarn-cluster每次提交是都会产生一个新的APP-ID如:application_1567067657620_0254
>
>当yarn application -kill application_1567067657620_0254后,
>
>在提交./bin/flink run -m yarn-cluster如何不让这个appid递增?
>
>问题2
>
>./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上?
>
> 

Re: flink1.9 on yarn

Posted by LakeShen <sh...@gmail.com>.
Hi guanyq,

你为什么希望 app id 不变呢?

Best,
LakeShen

guanyq <dl...@163.com> 于2020年6月28日周日 上午9:10写道:

> 问题1
>
> ./bin/flink run -m
> yarn-cluster每次提交是都会产生一个新的APP-ID如:application_1567067657620_0254
>
> 当yarn application -kill application_1567067657620_0254后,
>
> 在提交./bin/flink run -m yarn-cluster如何不让这个appid递增?
>
> 问题2
>
> ./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上?
>
>

Re: flink1.9 on yarn

Posted by zhisheng <zh...@gmail.com>.
hi,guanyq

你这种提交方式属于 Flink On YARN 的 per job 模式,机制是这样的,当新提一个作业的时候,AppID 是会变化的。

Best!
zhisheng

Yangze Guo <ka...@gmail.com> 于2020年6月28日周日 上午9:59写道:

> 我理解你需要使用session模式,即./bin/yarn-session.sh [1]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/yarn_setup.html#flink-yarn-session
>
> Best,
> Yangze Guo
>
> On Sun, Jun 28, 2020 at 9:10 AM guanyq <dl...@163.com> wrote:
> >
> > 问题1
> >
> > ./bin/flink run -m
> yarn-cluster每次提交是都会产生一个新的APP-ID如:application_1567067657620_0254
> >
> > 当yarn application -kill application_1567067657620_0254后,
> >
> > 在提交./bin/flink run -m yarn-cluster如何不让这个appid递增?
> >
> > 问题2
> >
> > ./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上?
> >
> >
>

Re: flink1.9 on yarn

Posted by Yangze Guo <ka...@gmail.com>.
我理解你需要使用session模式,即./bin/yarn-session.sh [1]

[1] https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/yarn_setup.html#flink-yarn-session

Best,
Yangze Guo

On Sun, Jun 28, 2020 at 9:10 AM guanyq <dl...@163.com> wrote:
>
> 问题1
>
> ./bin/flink run -m yarn-cluster每次提交是都会产生一个新的APP-ID如:application_1567067657620_0254
>
> 当yarn application -kill application_1567067657620_0254后,
>
> 在提交./bin/flink run -m yarn-cluster如何不让这个appid递增?
>
> 问题2
>
> ./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上?
>
>

flink1.9.2版本升级到1.12.0版本启动异常

Posted by guanyq <dl...@163.com>.
各位大佬。help


flink1.9.2版本升级到1.12.0版本
flink on yarn部署


异常日志如下
------------------------------------------------------------
The program finished with the following exception:


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Could not deploy Yarn job cluster.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy Yarn job cluster.
at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:460)
at org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1940)
at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1822)
at com.data.processing.publicbroadbandbusiness.tasks.DataProcessA.main(DataProcessA.java:582)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
... 11 more
Caused by: org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The YARN application unexpectedly switched to state FAILED during deployment.
Diagnostics from YARN: Application application_1607308494016_1858 failed 1 times (global limit =2; local limit is =1) due to AM Container for appattempt_1607308494016_1858_000001 exited with exitCode: 2
For more detailed output, check the application tracking page: http://nn01.hadoop.unicom:8088/cluster/app/application_1607308494016_1858 Then click on links to logsof each attempt.
Diagnostics: Exception from container-launch.
Container id: container_e70_1607308494016_1858_01_000001
Exit code: 2
Stack trace: ExitCodeException exitCode=2:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:933)
at org.apache.hadoop.util.Shell.run(Shell.java:844)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1123)
at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:237)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:317)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:83)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)



Container exited with a non-zero exit code 2
Failing this attempt. Failing the application.
If log aggregation is enabled on your cluster, use this command to further investigate the issue:
yarn logs -applicationId application_1607308494016_1858
at org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:1078)
at org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:558)
at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:453)
... 22 more

Re: 1.12.0版本启动异常 on yarn per job方式

Posted by Xintong Song <to...@gmail.com>.
检查一下你的作业 jar 包里是否把 hadoop 依赖也打进去了。一般情况下 hadoop 依赖应该设成 provided,如果作业确实有需要用到和
yarn 集群不同版本的 hadoop 依赖,需要 shade。

Thank you~
Xintong Song



Thank you~

Xintong Song



On Tue, Jan 19, 2021 at 3:31 PM guanyq <dl...@163.com> wrote:

> 看错误是与hadoop-common-2.7.4.jar冲突,但是不知道如何解决。
> help
> 2021-01-1915:12:47,922ERRORorg.apache.flink.runtime.resourcemanager.active.ActiveResourceManager
> [] - Fatal error occurred in ResourceManager.
> org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException:
> Could not start the ResourceManager akka.tcp://flink@dn138.hadoop.unicom
> :45554/user/rpc/resourcemanager_0
> at
> org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:220)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:183)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:551)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:172)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> Caused by: java.lang.NoSuchMethodError:
> org.apache.hadoop.io.retry.RetryPolicies.retryForeverWithFixedSleep(JLjava/util/concurrent/TimeUnit;)Lorg/apache/hadoop/io/retry/RetryPolicy;
> at
> org.apache.hadoop.yarn.client.RMProxy.createRetryPolicy(RMProxy.java:280)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.yarn.client.RMProxy.createRetryPolicy(RMProxy.java:211)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.createRetriableProxy(RequestHedgingRMFailoverProxyProvider.java:95)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.init(RequestHedgingRMFailoverProxyProvider.java:77)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.yarn.client.RMProxy.createRMFailoverProxyProvider(RMProxy.java:190)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.yarn.client.RMProxy.newProxyInstance(RMProxy.java:120)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:94)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.yarn.client.ClientRMProxy.createRMProxy(ClientRMProxy.java:72)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.serviceStart(AMRMClientImpl.java:186)
> ~[hadoop-yarn-client-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
> ~[FlinkDataProcess.jar:?]
> at
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl.serviceStart(AMRMClientAsyncImpl.java:93)
> ~[hadoop-yarn-client-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
> ~[FlinkDataProcess.jar:?]
> at
> org.apache.flink.yarn.YarnResourceManagerDriver.initializeInternal(YarnResourceManagerDriver.java:159)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver.initialize(AbstractResourceManagerDriver.java:80)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.initialize(ActiveResourceManager.java:121)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.resourcemanager.ResourceManager.startResourceManagerServices(ResourceManager.java:230)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:218)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> ... 22 more
> 2021-01-1915:12:47,929ERRORorg.apache.flink.runtime.entrypoint.ClusterEntrypoint
> [] - Fatal error occurred in the cluster entrypoint.
> org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException:
> Could not start the ResourceManager akka.tcp://flink@dn138.hadoop.unicom
> :45554/user/rpc/resourcemanager_0
> at
> org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:220)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:183)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:551)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:172)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> Caused by: java.lang.NoSuchMethodError:
> org.apache.hadoop.io.retry.RetryPolicies.retryForeverWithFixedSleep(JLjava/util/concurrent/TimeUnit;)Lorg/apache/hadoop/io/retry/RetryPolicy;
> at
> org.apache.hadoop.yarn.client.RMProxy.createRetryPolicy(RMProxy.java:280)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.yarn.client.RMProxy.createRetryPolicy(RMProxy.java:211)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.createRetriableProxy(RequestHedgingRMFailoverProxyProvider.java:95)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.init(RequestHedgingRMFailoverProxyProvider.java:77)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.yarn.client.RMProxy.createRMFailoverProxyProvider(RMProxy.java:190)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.yarn.client.RMProxy.newProxyInstance(RMProxy.java:120)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:94)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.yarn.client.ClientRMProxy.createRMProxy(ClientRMProxy.java:72)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.serviceStart(AMRMClientImpl.java:186)
> ~[hadoop-yarn-client-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
> ~[FlinkDataProcess.jar:?]
> at
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl.serviceStart(AMRMClientAsyncImpl.java:93)
> ~[hadoop-yarn-client-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
> ~[FlinkDataProcess.jar:?]
> at
> org.apache.flink.yarn.YarnResourceManagerDriver.initializeInternal(YarnResourceManagerDriver.java:159)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver.initialize(AbstractResourceManagerDriver.java:80)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.initialize(ActiveResourceManager.java:121)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.resourcemanager.ResourceManager.startResourceManagerServices(ResourceManager.java:230)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:218)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> ... 22 more
> 2021-01-1915:12:47,934INFOorg.apache.flink.runtime.blob.BlobServer [] -
> Stopped BLOB server at 0.0.0.0:44328

1.12.0版本启动异常 on yarn per job方式

Posted by guanyq <dl...@163.com>.
看错误是与hadoop-common-2.7.4.jar冲突,但是不知道如何解决。
help
2021-01-1915:12:47,922ERRORorg.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Fatal error occurred in ResourceManager.
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: Could not start the ResourceManager akka.tcp://flink@dn138.hadoop.unicom:45554/user/rpc/resourcemanager_0
at org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:220) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:183) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:551) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:172) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.12.0.jar:1.12.0]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-dist_2.12-1.12.0.jar:1.12.0]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.12.0.jar:1.12.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.12.0.jar:1.12.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.12.0.jar:1.12.0]
Caused by: java.lang.NoSuchMethodError: org.apache.hadoop.io.retry.RetryPolicies.retryForeverWithFixedSleep(JLjava/util/concurrent/TimeUnit;)Lorg/apache/hadoop/io/retry/RetryPolicy;
at org.apache.hadoop.yarn.client.RMProxy.createRetryPolicy(RMProxy.java:280) ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.yarn.client.RMProxy.createRetryPolicy(RMProxy.java:211) ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.createRetriableProxy(RequestHedgingRMFailoverProxyProvider.java:95) ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.init(RequestHedgingRMFailoverProxyProvider.java:77) ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.yarn.client.RMProxy.createRMFailoverProxyProvider(RMProxy.java:190) ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.yarn.client.RMProxy.newProxyInstance(RMProxy.java:120) ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:94) ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.yarn.client.ClientRMProxy.createRMProxy(ClientRMProxy.java:72) ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.serviceStart(AMRMClientImpl.java:186) ~[hadoop-yarn-client-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193) ~[FlinkDataProcess.jar:?]
at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl.serviceStart(AMRMClientAsyncImpl.java:93) ~[hadoop-yarn-client-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193) ~[FlinkDataProcess.jar:?]
at org.apache.flink.yarn.YarnResourceManagerDriver.initializeInternal(YarnResourceManagerDriver.java:159) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver.initialize(AbstractResourceManagerDriver.java:80) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.initialize(ActiveResourceManager.java:121) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.resourcemanager.ResourceManager.startResourceManagerServices(ResourceManager.java:230) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:218) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
... 22 more
2021-01-1915:12:47,929ERRORorg.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error occurred in the cluster entrypoint.
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: Could not start the ResourceManager akka.tcp://flink@dn138.hadoop.unicom:45554/user/rpc/resourcemanager_0
at org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:220) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:183) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:551) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:172) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.12.0.jar:1.12.0]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-dist_2.12-1.12.0.jar:1.12.0]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.12.0.jar:1.12.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.12.0.jar:1.12.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.12.0.jar:1.12.0]
Caused by: java.lang.NoSuchMethodError: org.apache.hadoop.io.retry.RetryPolicies.retryForeverWithFixedSleep(JLjava/util/concurrent/TimeUnit;)Lorg/apache/hadoop/io/retry/RetryPolicy;
at org.apache.hadoop.yarn.client.RMProxy.createRetryPolicy(RMProxy.java:280) ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.yarn.client.RMProxy.createRetryPolicy(RMProxy.java:211) ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.createRetriableProxy(RequestHedgingRMFailoverProxyProvider.java:95) ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.init(RequestHedgingRMFailoverProxyProvider.java:77) ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.yarn.client.RMProxy.createRMFailoverProxyProvider(RMProxy.java:190) ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.yarn.client.RMProxy.newProxyInstance(RMProxy.java:120) ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:94) ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.yarn.client.ClientRMProxy.createRMProxy(ClientRMProxy.java:72) ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.serviceStart(AMRMClientImpl.java:186) ~[hadoop-yarn-client-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193) ~[FlinkDataProcess.jar:?]
at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl.serviceStart(AMRMClientAsyncImpl.java:93) ~[hadoop-yarn-client-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193) ~[FlinkDataProcess.jar:?]
at org.apache.flink.yarn.YarnResourceManagerDriver.initializeInternal(YarnResourceManagerDriver.java:159) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver.initialize(AbstractResourceManagerDriver.java:80) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.initialize(ActiveResourceManager.java:121) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.resourcemanager.ResourceManager.startResourceManagerServices(ResourceManager.java:230) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:218) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
... 22 more
2021-01-1915:12:47,934INFOorg.apache.flink.runtime.blob.BlobServer [] - Stopped BLOB server at 0.0.0.0:44328

Re: flink1.9读取阿里Mq问题

Posted by zhisheng <zh...@gmail.com>.
hi,guanyq

社区版本的 Flink 应该默认没有和 RocketMQ 连接的 Connector,在 RocketMQ 的社区项目中看到和 Flink 整合的模块:
https://github.com/apache/rocketmq-externals/tree/master/rocketmq-flink

你说的 AccessKey,SecretKey 参数应该是 ACL 权限校验,看了代码应该是不支持的,不过可以自己去进行扩展。

Best!
zhisheng

guanyq <dl...@163.com> 于2020年7月3日周五 下午11:44写道:

> flink1.9读取阿里RocketMQ
> 如何设置AccessKey,SecretKey 参数
>
>
>
> finalRMQConnectionConfigconnectionConfig=newRMQConnectionConfig.Builder().setHost("localhost").setPort(5000)....build();

回复:flink1.9读取阿里Mq问题

Posted by 李军 <ho...@163.com>.
        您好!
        自定义source继承RichSourceFuntion.open() 里去构建Conumer 可以设置AccessKey,SecretKey 参数;
        


2020-7-4
| |
李军
|
|
hold_lijun@163.com
|
签名由网易邮箱大师定制
在2020年7月3日 23:44,guanyq<dl...@163.com> 写道:
flink1.9读取阿里RocketMQ
如何设置AccessKey,SecretKey 参数


finalRMQConnectionConfigconnectionConfig=newRMQConnectionConfig.Builder().setHost("localhost").setPort(5000)....build();

flink1.9读取阿里Mq问题

Posted by guanyq <dl...@163.com>.
flink1.9读取阿里RocketMQ
如何设置AccessKey,SecretKey 参数


finalRMQConnectionConfigconnectionConfig=newRMQConnectionConfig.Builder().setHost("localhost").setPort(5000)....build();

flink 1.12.0版本 消费0.10版本kafka集群数据==>0.9版本kafka集群

Posted by guanyq <dl...@163.com>.
请问下如何选择kafka connector的版本
如果选择1.12.0版本,就没有FlinkKafkaProducer09/FlinkKafkaConsumer09
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
</dependency>

flink1.9自定义实现source的问题

Posted by guanyq <dl...@163.com>.
附件图片,想把listener出来的数据,传给ctx。
如何实现这个数据的传递。
public class RMQRichParallelSource extends RichParallelSourceFunction<String> implements MessageOrderListener {

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Properties properties = new Properties();

// 在订阅消息前,必须调用 start 方法来启动 Consumer,只需调用一次即可。
OrderConsumer consumer = ONSFactory.createOrderedConsumer(properties);

consumer.subscribe(
"PRODNOC_KB_SYNC_CUST_ORDER",
"*",
                this);
consumer.start();
}

@Override
public void run(SourceContext<String> ctx) {


    }

@Override
public OrderAction consume(Message message, ConsumeOrderContext consumeOrderContext) {
try {
            System.out.println(new String(message.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
            e.printStackTrace();
}
return OrderAction.Success;
}

@Override
public void cancel() {
    }

@Override
public void close() throws Exception {
super.close();
}
}

flink1.9 on yarn

Posted by guanyq <dl...@163.com>.
问题1

./bin/flink run -m yarn-cluster每次提交是都会产生一个新的APP-ID如:application_1567067657620_0254

当yarn application -kill application_1567067657620_0254后,

在提交./bin/flink run -m yarn-cluster如何不让这个appid递增?

问题2

./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上?

 

Re: flink1.9 on yarn 运行二个多月之后出现错误

Posted by Congxian Qiu <qc...@gmail.com>.
你给的日志不像第一次失败的作业的日志,你可能需要看一下之前那个 job 的 jm log 看看是啥原因导致的失败。

Best,
Congxian


LakeShen <sh...@gmail.com> 于2020年6月23日周二 下午10:07写道:

> Hi guanyq,
>
> 从日志中,我看到 TaskManager 所在机器的本地存储几乎快用完了。
>
> 看下是否因为 TaskManager 所在机器的存储不够导致
>
> Best,
> LakeShen
>
> xueaohui_com@163.com <xu...@163.com> 于2020年6月20日周六 上午9:57写道:
>
> > 不知道有没有yarn上面的详细日志。
> >
> > hdfs是否有权限问题
> >
> >
> >
> > xueaohui_com@163.com
> >
> > 发件人: guanyq
> > 发送时间: 2020-06-20 08:48
> > 收件人: user-zh
> > 主题: flink1.9 on yarn 运行二个多月之后出现错误
> > 附件为错误日志。哪位大佬帮忙分析下。
> >
> >
> >
> >
>

Re: flink1.9 on yarn 运行二个多月之后出现错误

Posted by LakeShen <sh...@gmail.com>.
Hi guanyq,

从日志中,我看到 TaskManager 所在机器的本地存储几乎快用完了。

看下是否因为 TaskManager 所在机器的存储不够导致

Best,
LakeShen

xueaohui_com@163.com <xu...@163.com> 于2020年6月20日周六 上午9:57写道:

> 不知道有没有yarn上面的详细日志。
>
> hdfs是否有权限问题
>
>
>
> xueaohui_com@163.com
>
> 发件人: guanyq
> 发送时间: 2020-06-20 08:48
> 收件人: user-zh
> 主题: flink1.9 on yarn 运行二个多月之后出现错误
> 附件为错误日志。哪位大佬帮忙分析下。
>
>
>
>

回复: flink1.9 on yarn 运行二个多月之后出现错误

Posted by "xueaohui_com@163.com" <xu...@163.com>.
不知道有没有yarn上面的详细日志。

hdfs是否有权限问题



xueaohui_com@163.com
 
发件人: guanyq
发送时间: 2020-06-20 08:48
收件人: user-zh
主题: flink1.9 on yarn 运行二个多月之后出现错误
附件为错误日志。哪位大佬帮忙分析下。


 

flink1.9 on yarn 运行二个多月之后出现错误

Posted by guanyq <dl...@163.com>.
附件为错误日志。哪位大佬帮忙分析下。

Re:flink1.9 on yarn 消费kafka数据中文乱码

Posted by 马阳阳 <ma...@163.com>.


我们也遇到过这个问题,我们当时遇到的问题是YARN NM上的默认charset是ascii。
通过在flink-conf.yaml文件里添加如下配置解决了该问题:
env.java.opts.taskmanager: "-Dfile.encoding=UTF-8"














在 2020-06-08 21:48:33,"guanyq" <dl...@163.com> 写道:
>kafka 0.11版本
>首先kafka source topic数据是正常的,kafka客户端消费出来无中文乱码问题
>1.本地idea debug运行,无中文乱码问题
>2.服务器Standalone模式运行,无中文乱码问题
>3.服务器on yarn提交方式,就出现中文乱码问题
>
>
>flink 消费kafka的api用的是这个
>new FlinkKafkaConsumer<>(topicList, new SimpleStringSchema(), props);
>
>
>根据1,2,3分析问题可能和yarn有关系。请教一下大佬们,还需要怎么调查,才能解决这个问题。