You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by jing <m1...@163.com> on 2020/11/03 01:22:45 UTC

Re: 【PyFlink】对于数据以Csv()格式写入kafka报错,以及使用python udf时无法启动udf

Hi, jincheng.

最近也遇到了类似问题,请问有什么思路吗?

flink-python 的 jar 都是有的,且版本是对的。

版本是 1.11.1,这个主要是在引入了 udf 时出现的,之前是正常的。

尝试过 virtualenv 打包 python 环境也没有效果。

具体报错如下:

Caused by: java.lang.NoClassDefFoundError: Could not initialize class
org.apache.beam.sdk.options.PipelineOptionsFactory
        at
org.apache.flink.python.AbstractPythonFunctionRunner.open(AbstractPythonFunctionRunner.java:152)
        at
org.apache.flink.table.runtime.runners.python.scalar.AbstractGeneralPythonScalarFunctionRunner.open(AbstractGeneralPythonScalarFunctionRunner.java:65)
        at
org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator$ProjectUdfInputPythonScalarFunctionRunner.open(AbstractStatelessFunctionOperator.java:186)
        at
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:143)
        at
org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:131)
        at
org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:88)
        at
org.apache.flink.table.runtime.operators.python.scalar.AbstractRowDataPythonScalarFunctionOperator.open(AbstractRowDataPythonScalarFunctionOperator.java:80)
        at
org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.open(RowDataPythonScalarFunctionOperator.java:64)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
        at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
        at java.lang.Thread.run(Thread.java:748)

org.apache.flink.client.program.ProgramAbortException
        at
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
        at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
        at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
        at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
        at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
        at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
        at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)




--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 【PyFlink】对于数据以Csv()格式写入kafka报错,以及使用python udf时无法启动udf

Posted by Xingbo Huang <hx...@gmail.com>.
Hi,

1. python demo.py运行正常,说明代码和你本地执行的python环境都没有问题。
2. 你现在给的错误和你前一封邮件给的那个编译报错不是一个报错,一个一个问题来。
3.
你这个运行报错是你集群运行环境里面的python环境的beam包冲突了,就是我一开始回答的,你去得去检查集群运行环境里面的python环境是否符合要求。

Best,
Xingbo

jing <m1...@163.com> 于2020年11月3日周二 下午6:09写道:

> 1. 重新安装之后并没有解决。
> 本地提交和远程提交都有问题。用 python demo.py 的方式是正常的。
> 2. 作业是已经提交了。
> 有在提示 Job has been submitted with JobID 05fcaebfec3aca731df408418ebea80c
> 然后立马会出现下面的错误:
>
> 即:Caused by: java.lang.NoClassDefFoundError: Could not initialize class
> org.apache.beam.sdk.options.PipelineOptionsFactory
>
>
>
> Traceback (most recent call last):
>   File "docs/examples/udf/demo.py", line 37, in <module>
>     word_count()
>   File "docs/examples/udf/demo.py", line 32, in word_count
>     t_env.execute("word_count")
>   File
>
> "/usr/local/lib/python3.7/site-packages/pyflink/table/table_environment.py",
> line 1057, in execute
>     return JobExecutionResult(self._j_tenv.execute(job_name))
>   File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line
> 1286, in __call__
>     answer, self.gateway_client, self.target_id, self.name)
>   File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py",
> line 147, in deco
>     return f(*a, **kw)
>   File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 328,
> in get_return_value
>     format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o4.execute.
> : java.util.concurrent.ExecutionException:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: 05fcaebfec3aca731df408418ebea80c)
>         at
>
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
>         at
>
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
>         at
>
> org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:116)
>         at
>
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
>         at
>
> org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:52)
>         at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1214)
>         at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>         at
>
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
>
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>         at
>
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>         at
>
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>         at
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>         at
>
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>         at
>
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>         at
>
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>         at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
> failed (JobID: 05fcaebfec3aca731df408418ebea80c)
>         at
>
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:116)
>         at
>
> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
>         at
>
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>         at
>
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
>         at
>
> org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602)
>         at
>
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>         at
>
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>         at
>
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>         at
>
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
>         at
>
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:309)
>         at
>
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>         at
>
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>         at
>
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>         at
>
> java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610)
>         at
>
> java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085)
>         at
>
> java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
>         at
>
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>         at
>
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>         ... 1 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
>         at
>
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>         at
>
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:114)
>         ... 18 more
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy
>         at
>
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
>         at
>
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
>         at
>
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
>         at
>
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
>         at
>
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
>         at
>
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503)
>         at
>
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386)
>         at jdk.internal.reflect.GeneratedMethodAccessor25.invoke(Unknown
> Source)
>         at
>
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>         at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
>         at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
>         at
>
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>         at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>         at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>         at
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>         at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>         at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
>
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
>
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.NoClassDefFoundError: Could not initialize class
> org.apache.beam.sdk.options.PipelineOptionsFactory
>         at
>
> org.apache.flink.python.AbstractPythonFunctionRunner.open(AbstractPythonFunctionRunner.java:152)
>         at
>
> org.apache.flink.table.runtime.runners.python.scalar.AbstractGeneralPythonScalarFunctionRunner.open(AbstractGeneralPythonScalarFunctionRunner.java:65)
>         at
>
> org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator$ProjectUdfInputPythonScalarFunctionRunner.open(AbstractStatelessFunctionOperator.java:186)
>         at
>
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:143)
>         at
>
> org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:131)
>         at
>
> org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:88)
>         at
>
> org.apache.flink.table.runtime.operators.python.scalar.AbstractRowDataPythonScalarFunctionOperator.open(AbstractRowDataPythonScalarFunctionOperator.java:80)
>         at
>
> org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.open(RowDataPythonScalarFunctionOperator.java:64)
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>         at java.base/java.lang.Thread.run(Thread.java:834)
>
> org.apache.flink.client.program.ProgramAbortException
>         at
> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
>         at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>         at
>
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
>
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>         at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>         at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>         at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>         at
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
>         at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
>         at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
>         at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>         at
>
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>         at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>

Re: 【PyFlink】对于数据以Csv()格式写入kafka报错,以及使用python udf时无法启动udf

Posted by jing <m1...@163.com>.
1. 重新安装之后并没有解决。
本地提交和远程提交都有问题。用 python demo.py 的方式是正常的。
2. 作业是已经提交了。
有在提示 Job has been submitted with JobID 05fcaebfec3aca731df408418ebea80c
然后立马会出现下面的错误:

即:Caused by: java.lang.NoClassDefFoundError: Could not initialize class
org.apache.beam.sdk.options.PipelineOptionsFactory



Traceback (most recent call last):
  File "docs/examples/udf/demo.py", line 37, in <module>
    word_count()
  File "docs/examples/udf/demo.py", line 32, in word_count
    t_env.execute("word_count")
  File
"/usr/local/lib/python3.7/site-packages/pyflink/table/table_environment.py",
line 1057, in execute
    return JobExecutionResult(self._j_tenv.execute(job_name))
  File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line
1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py",
line 147, in deco
    return f(*a, **kw)
  File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 328,
in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o4.execute.
: java.util.concurrent.ExecutionException:
org.apache.flink.client.program.ProgramInvocationException: Job failed
(JobID: 05fcaebfec3aca731df408418ebea80c)
        at
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
        at
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
        at
org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:116)
        at
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
        at
org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:52)
        at
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1214)
        at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
        at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
failed (JobID: 05fcaebfec3aca731df408418ebea80c)
        at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:116)
        at
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
        at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
        at
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
        at
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602)
        at
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
        at
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
        at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
        at
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
        at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:309)
        at
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
        at
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
        at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
        at
java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610)
        at
java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085)
        at
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
        at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        ... 1 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.
        at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
        at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:114)
        ... 18 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
        at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
        at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
        at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
        at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
        at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
        at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503)
        at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386)
        at jdk.internal.reflect.GeneratedMethodAccessor25.invoke(Unknown
Source)
        at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
        at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
        at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
        at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class
org.apache.beam.sdk.options.PipelineOptionsFactory
        at
org.apache.flink.python.AbstractPythonFunctionRunner.open(AbstractPythonFunctionRunner.java:152)
        at
org.apache.flink.table.runtime.runners.python.scalar.AbstractGeneralPythonScalarFunctionRunner.open(AbstractGeneralPythonScalarFunctionRunner.java:65)
        at
org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator$ProjectUdfInputPythonScalarFunctionRunner.open(AbstractStatelessFunctionOperator.java:186)
        at
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:143)
        at
org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:131)
        at
org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:88)
        at
org.apache.flink.table.runtime.operators.python.scalar.AbstractRowDataPythonScalarFunctionOperator.open(AbstractRowDataPythonScalarFunctionOperator.java:80)
        at
org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.open(RowDataPythonScalarFunctionOperator.java:64)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
        at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
        at java.base/java.lang.Thread.run(Thread.java:834)

org.apache.flink.client.program.ProgramAbortException
        at
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
        at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
        at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
        at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
        at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
        at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
        at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
        at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
        at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)




--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 【PyFlink】对于数据以Csv()格式写入kafka报错,以及使用python udf时无法启动udf

Posted by Xingbo Huang <hx...@gmail.com>.
Hi,
1. 之前那个报错在重新安装完pyflink之后有没有解决(本地python
demo.py是否正常);之前那个报错是本地运行就报错,还是在远程提交才报的错。
2. 现在这个报错是作业提交时编译阶段就报错了,还没到作业运行。在作业提交的console界面是可以看到错误日志的,可否提供一下错误日志。

Best,
Xingbo

jing <m1...@163.com> 于2020年11月3日周二 下午5:36写道:

> Hi, xingbo.
> 在检查之后,并没有发现什么 beam 的其他版本,flink-python 版本是 2.11-1.11.1。不管是 pip install
> apache-beam==2.19.0 还是没有,都是一样的问题。
> 用 udf 的示例代码也不通过。本地 python demo.py 是可以正常的。
> 只有是 flink run -m localhost:8081 -py demo.py 是一直在出现这种问题。
> pyflink-shell.sh remote localhost 8081 也试过了。一样的结果。
>
> 示例代码如下:
>
> import logging
> import sys
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
> DataTypes
> from pyflink.table.udf import udf
>
>
> def word_count():
>     content = "line Licensed to the Apache Software Foundation ASF under
> one
> " \
>               "line or more contributor license agreements See the NOTICE
> file " \
>               "line distributed with this work for additional information "
> \
>               "line regarding copyright ownership The ASF licenses this
> file
> " \
>               "to you under the Apache License Version the " \
>               "License you may not use this file except in compliance " \
>               "with the License"
>     t_env = StreamTableEnvironment.create(
>         StreamExecutionEnvironment.get_execution_environment(),
>
>
> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
>     )
>
>
> t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed",
> True)
>     sink_ddl = """
>         create table Results(word VARCHAR,`count` BIGINT) with (
> 'connector'
> = 'print')
>         """
>     add = udf(lambda i: i + 1024, [DataTypes.BIGINT()], DataTypes.BIGINT())
>     t_env.register_function("add_test", add)
>     t_env.sql_update(sink_ddl)
>     elements = [(word, 1) for word in content.split(" ")]
>     t_env.from_elements(elements, ["word", "count"]) \
>         .group_by("word") \
>         .select("word, add_test(count(1)) as count") \
>         .insert_into("Results")
>     t_env.execute("word_count")
>
>
> if __name__ == '__main__':
>     logging.basicConfig(stream=sys.stdout, level=logging.INFO,
> format="%(message)s")
>     word_count()
>
>
> 环境是基于官方的 docker 镜像 flink:1.11.1-scala_2.11,JobManager 和 TaskManager 都正常,在没有
> udf 的时候作业都是正常的,jar 包只装了 jdbc,kafka,es 的connector,还有 csv 的 jar 包。
>
> 这个情况下需要装什么东西吗,还是需要改配置。
>
> 日志上提示是:
>
> 2020-11-03 09:24:05,792 ERROR org.apache.flink.client.python.PythonDriver
>
> [] - Run python process failed
> java.lang.RuntimeException: Python process exits with code: 1
>         at
> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:88)
> ~[flink-python_2.11-1.11.1.jar:1.11.1]
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:1.8.0_265]
>         at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:1.8.0_265]
>         at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_265]
>         at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_265]
>         at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
> 2020-11-03 09:24:05,798 ERROR org.apache.flink.client.cli.CliFrontend
>
> [] - Fatal error while running command line interface.
> org.apache.flink.client.program.ProgramAbortException: null
>         at
> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
> ~[flink-python_2.11-1.11.1.jar:1.11.1]
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:1.8.0_265]
>         at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:1.8.0_265]
>         at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_265]
>         at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_265]
>         at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
> org.apache.flink.client.program.ProgramAbortException
>         at
> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>         at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>         at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>         at
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
>         at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
>         at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
>         at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>         at
>
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>         at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>

Re: 【PyFlink】对于数据以Csv()格式写入kafka报错,以及使用python udf时无法启动udf

Posted by jing <m1...@163.com>.
Hi, xingbo.
在检查之后,并没有发现什么 beam 的其他版本,flink-python 版本是 2.11-1.11.1。不管是 pip install
apache-beam==2.19.0 还是没有,都是一样的问题。
用 udf 的示例代码也不通过。本地 python demo.py 是可以正常的。
只有是 flink run -m localhost:8081 -py demo.py 是一直在出现这种问题。
pyflink-shell.sh remote localhost 8081 也试过了。一样的结果。

示例代码如下:

import logging
import sys
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
DataTypes
from pyflink.table.udf import udf


def word_count():
    content = "line Licensed to the Apache Software Foundation ASF under one
" \
              "line or more contributor license agreements See the NOTICE
file " \
              "line distributed with this work for additional information "
\
              "line regarding copyright ownership The ASF licenses this file
" \
              "to you under the Apache License Version the " \
              "License you may not use this file except in compliance " \
              "with the License"
    t_env = StreamTableEnvironment.create(
        StreamExecutionEnvironment.get_execution_environment(),
       
environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
    )
   
t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed",
True)
    sink_ddl = """
        create table Results(word VARCHAR,`count` BIGINT) with ( 'connector'
= 'print')
        """
    add = udf(lambda i: i + 1024, [DataTypes.BIGINT()], DataTypes.BIGINT())
    t_env.register_function("add_test", add)
    t_env.sql_update(sink_ddl)
    elements = [(word, 1) for word in content.split(" ")]
    t_env.from_elements(elements, ["word", "count"]) \
        .group_by("word") \
        .select("word, add_test(count(1)) as count") \
        .insert_into("Results")
    t_env.execute("word_count")


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format="%(message)s")
    word_count()


环境是基于官方的 docker 镜像 flink:1.11.1-scala_2.11,JobManager 和 TaskManager 都正常,在没有
udf 的时候作业都是正常的,jar 包只装了 jdbc,kafka,es 的connector,还有 csv 的 jar 包。

这个情况下需要装什么东西吗,还是需要改配置。

日志上提示是:

2020-11-03 09:24:05,792 ERROR org.apache.flink.client.python.PythonDriver                 
[] - Run python process failed
java.lang.RuntimeException: Python process exits with code: 1
        at
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:88)
~[flink-python_2.11-1.11.1.jar:1.11.1]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_265]
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_265]
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_265]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_265]
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
[flink-dist_2.11-1.11.1.jar:1.11.1]
2020-11-03 09:24:05,798 ERROR org.apache.flink.client.cli.CliFrontend                     
[] - Fatal error while running command line interface.
org.apache.flink.client.program.ProgramAbortException: null
        at
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
~[flink-python_2.11-1.11.1.jar:1.11.1]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_265]
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_265]
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_265]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_265]
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
[flink-dist_2.11-1.11.1.jar:1.11.1]
org.apache.flink.client.program.ProgramAbortException
        at
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
        at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
        at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
        at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
        at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
        at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
        at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 【PyFlink】对于数据以Csv()格式写入kafka报错,以及使用python udf时无法启动udf

Posted by Xingbo Huang <hx...@gmail.com>.
Hi,
看报错是NoClassDefFoundError,应该是包冲突了,你是不是自己安装了apache-beam(1.11.1依赖的beam是2.19.0的版本)的其他版本了,亦或者你可以看下是不是PipelineOptions这个beam类依赖的其他包里面存在了包冲突的问题

Best,
Xingbo

jing <m1...@163.com> 于2020年11月3日周二 上午10:16写道:

> Hi, jincheng.
>
> 最近也遇到了类似问题,请问有什么思路吗?
>
> flink-python 的 jar 都是有的,且版本是对的。
>
> 版本是 1.11.1,这个主要是在引入了 udf 时出现的,之前是正常的。
>
> 尝试过 virtualenv 打包 python 环境也没有效果。
>
> 具体报错如下:
>
> Caused by: java.lang.NoClassDefFoundError: Could not initialize class
> org.apache.beam.sdk.options.PipelineOptionsFactory
>         at
>
> org.apache.flink.python.AbstractPythonFunctionRunner.open(AbstractPythonFunctionRunner.java:152)
>         at
>
> org.apache.flink.table.runtime.runners.python.scalar.AbstractGeneralPythonScalarFunctionRunner.open(AbstractGeneralPythonScalarFunctionRunner.java:65)
>         at
>
> org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator$ProjectUdfInputPythonScalarFunctionRunner.open(AbstractStatelessFunctionOperator.java:186)
>         at
>
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:143)
>         at
>
> org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:131)
>         at
>
> org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:88)
>         at
>
> org.apache.flink.table.runtime.operators.python.scalar.AbstractRowDataPythonScalarFunctionOperator.open(AbstractRowDataPythonScalarFunctionOperator.java:80)
>         at
>
> org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.open(RowDataPythonScalarFunctionOperator.java:64)
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>         at java.lang.Thread.run(Thread.java:748)
>
> org.apache.flink.client.program.ProgramAbortException
>         at
> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>         at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>         at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>         at
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
>         at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
>         at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
>         at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>         at
>
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>         at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>