You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "harshit.varshney@iktara.ai" <Ha...@iktara.ai> on 2022/11/21 12:50:56 UTC
Facing Issue in running Python Flink Program in flink cluster (Version=1.15.2)
Dear Team,
I am facing a issue while running pyflink program in flink cluster as it
stop running while reading the machine learning model
This is the error :
./bin/flink run --python /home/desktop/ProjectFiles/test_new.py
Job has been submitted with JobID 0a561cb330eeac5aa7b40ac047d3c6a3
/home/desktop/.local/lib/python3.8/site-packages/sklearn/base.py:329:
UserWarning: Trying to unpickle estimator LabelEncoder from version 1.1.1
when using version 1.1.2. This might lead to breaking code or invalid
results. Use at your own risk. For more info please refer to:
https://scikit-learn.org/stable/model_persistence.html#security-maintainabil
ity-limitations
warnings.warn(
/home/desktop/.local/lib/python3.8/site-packages/xgboost/compat.py:31:
FutureWarning: pandas.Int64Index is deprecated and will be removed from
pandas in a future version. Use pandas.Index with the appropriate dtype
instead.
from pandas import MultiIndex, Int64Index
/home/desktop/.local/lib/python3.8/site-packages/pkg_resources/__init__.py:1
16: PkgResourcesDeprecationWarning: 0.1.36ubuntu1 is an invalid version and
will not be supported in a future release
warnings.warn(
/home/desktop/.local/lib/python3.8/site-packages/pkg_resources/__init__.py:1
16: PkgResourcesDeprecationWarning: 0.23ubuntu1 is an invalid version and
will not be supported in a future release
warnings.warn(
Traceback (most recent call last):
File "/usr/lib/python3.8/runpy.py", line 194, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/usr/lib/python3.8/runpy.py", line 87, in _run_code
exec(code, run_globals)
File
"/tmp/pyflink/bc5f6233-c53d-45a0-b8f2-21b41ea0c6ad/71b84799-5210-4a85-ae80-9
efe0fb192f1/test_new.py", line 510, in <module>
new_user_ratings(envir)
File
"/tmp/pyflink/bc5f6233-c53d-45a0-b8f2-21b41ea0c6ad/71b84799-5210-4a85-ae80-9
efe0fb192f1/test_new.py", line 504, in new_user_ratings
environment.execute('new_user_ratings')
File
"/home/desktop/Downloads/flink/opt/python/pyflink.zip/pyflink/datastream/str
eam_execution_environment.py", line 761, in execute
File
"/home/desktop/Downloads/flink/opt/python/py4j-0.10.9.3-src.zip/py4j/java_ga
teway.py", line 1321, in __call__
File
"/home/desktop/Downloads/flink/opt/python/pyflink.zip/pyflink/util/exception
s.py", line 146, in deco
File
"/home/desktop/Downloads/flink/opt/python/py4j-0.10.9.3-src.zip/py4j/protoco
l.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o8.execute.
: java.util.concurrent.ExecutionException:
org.apache.flink.client.program.ProgramInvocationException: Job failed
(JobID: 0a561cb330eeac5aa7b40ac047d3c6a3)
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at
org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResu
lt(StreamContextEnvironment.java:173)
at
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamConte
xtEnvironment.java:123)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62
)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl
.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(Meth
odInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(R
eflectionEngine.java:357)
at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMetho
d(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCom
mand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnect
ion.java:238)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
failed (JobID: 0a561cb330eeac5aa7b40ac047d3c6a3)
at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null
$6(ClusterClientJobClientAdapter.java:130)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.ja
va:591)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:4
88)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at
org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$
9(FutureUtils.java:403)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.jav
a:774)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFu
ture.java:750)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:4
88)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAs
ync$26(RestClusterClient.java:708)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.jav
a:774)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFu
ture.java:750)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:4
88)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at
org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$
9(FutureUtils.java:403)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.jav
a:774)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFu
ture.java:750)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:4
88)
at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.
java:943)
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java
:456)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11
49)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6
24)
... 1 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.
java:144)
at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null
$6(ClusterClientJobClientAdapter.java:128)
... 24 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandl
er.handleFailure(ExecutionFailureHandler.java:138)
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandl
er.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(Defaul
tScheduler.java:301)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(D
efaultScheduler.java:291)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionState
Internal(DefaultScheduler.java:282)
at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(Sc
hedulerBase.java:739)
at
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(Sche
dulerNG.java:78)
at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMas
ter.java:443)
at sun.reflect.GeneratedMethodAccessor125.invoke(Unknown
Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl
.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(
AkkaRpcActor.java:304)
at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextCla
ssLoader(ClassLoadingUtils.java:83)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcAc
tor.java:302)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor
.java:217)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(Fenced
AkkaRpcActor.java:78)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.ja
va:163)
at
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at
scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at
scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at
java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at
java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: org.apache.flink.runtime.taskmanager.AsynchronousException:
Caught exception while processing timer.
at
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncException
Handler.handleAsyncException(StreamTask.java:1535)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(Str
eamTask.java:1510)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCall
back(StreamTask.java:1650)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$21(StreamTas
k.java:1639)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$Synchroniz
edStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
at
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMai
lsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMai
l(MailboxProcessor.java:324)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailbox
Loop(MailboxProcessor.java:201)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTas
k.java:804)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:7
53)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.j
ava:948)
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:750)
Caused by: TimerException{java.lang.RuntimeException: Error while waiting
for BeamPythonFunctionRunner flush}
... 14 more
Caused by: java.lang.RuntimeException: Error while waiting for
BeamPythonFunctionRunner flush
at
org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFuncti
onOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:10
6)
at
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperat
or.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:299)
at
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperat
or.lambda$open$0(AbstractPythonFunctionOperator.java:115)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCall
back(StreamTask.java:1648)
... 13 more
Caused by: java.lang.RuntimeException: Failed to close remote bundle
at
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.
finishBundle(BeamPythonFunctionRunner.java:382)
at
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.
flush(BeamPythonFunctionRunner.java:366)
at
org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFuncti
onOperator.lambda$invokeFinishBundle$0(AbstractExternalPythonFunctionOperato
r.java:85)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11
49)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6
24)
... 1 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.RuntimeException: Error received from SDK harness for instruction
1: Traceback (most recent call last):
File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/sdk_worker.py", line 289, in _execute
response = task()
File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/sdk_worker.py", line 362, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/sdk_worker.py", line 606, in do_instruction
return getattr(self, request_type)(
File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/sdk_worker.py", line 637, in process_bundle
bundle_processor = self.bundle_processor_cache.get(
File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/sdk_worker.py", line 463, in get
processor = bundle_processor.BundleProcessor(
File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 868, in __init__
self.ops = self.create_execution_tree(self.process_bundle_descriptor)
File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 921, in create_execution_tree
return collections.OrderedDict([(
File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 924, in <listcomp>
get_operation(transform_id))) for transform_id in sorted(
File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 812, in wrapper
result = cache[args] = func(*args)
File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 903, in get_operation
transform_consumers = {
File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 904, in <dictcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 904, in <listcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 812, in wrapper
result = cache[args] = func(*args)
File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 908, in get_operation
return transform_factory.create_operation(
File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 1198, in create_operation
return creator(self, transform_id, transform_proto, payload, consumers)
File
"/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/beam/
beam_operations.py", line 129, in create_data_stream_keyed_process_function
return _create_user_defined_function_operation(
File
"/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/beam/
beam_operations.py", line 200, in _create_user_defined_function_operation
return beam_operation_cls(
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 198, in
pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__
init__
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 129, in
pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 202, in
pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.ge
nerate_operation
File
"/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/datas
tream/operations.py", line 75, in __init__
extract_stateless_function(
File
"/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/datas
tream/operations.py", line 155, in extract_stateless_function
user_defined_func = pickle.loads(user_defined_function_proto.payload)
File
"/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/pickl
e.py", line 29, in loads
return cloudpickle.loads(payload)
File
"/home/desktop/.local/lib/python3.8/site-packages/xgboost/__init__.py", line
9, in <module>
from .core import DMatrix, DeviceQuantileDMatrix, Booster
File "/home/desktop/.local/lib/python3.8/site-packages/xgboost/core.py",
line 23, in <module>
from .compat import (STRING_TYPES, DataFrame, py_str, PANDAS_INSTALLED,
File "/home/desktop/.local/lib/python3.8/site-packages/xgboost/compat.py",
line 110, in <module>
import sparse
File
"/home/desktop/.local/lib/python3.8/site-packages/sparse/__init__.py", line
1, in <module>
from ._coo import COO, as_coo
File
"/home/desktop/.local/lib/python3.8/site-packages/sparse/_coo/__init__.py",
line 1, in <module>
from .core import COO, as_coo
File
"/home/desktop/.local/lib/python3.8/site-packages/sparse/_coo/core.py", line
9, in <module>
import numba
File "/home/desktop/.local/lib/python3.8/site-packages/numba/__init__.py",
line 38, in <module>
from numba.core.decorators import (cfunc, generated_jit, jit, njit,
stencil,
File
"/home/desktop/.local/lib/python3.8/site-packages/numba/core/decorators.py",
line 12, in <module>
from numba.stencils.stencil import stencil
File
"/home/desktop/.local/lib/python3.8/site-packages/numba/stencils/stencil.py"
, line 11, in <module>
from numba.core import types, typing, utils, ir, config, ir_utils,
registry
File
"/home/desktop/.local/lib/python3.8/site-packages/numba/core/registry.py",
line 4, in <module>
from numba.core import utils, typing, dispatcher, cpu
File
"/home/desktop/.local/lib/python3.8/site-packages/numba/core/dispatcher.py",
line 13, in <module>
from numba.core import (
File
"/home/desktop/.local/lib/python3.8/site-packages/numba/core/compiler.py",
line 6, in <module>
from numba.core import (utils, errors, typing, interpreter, bytecode,
postproc,
File
"/home/desktop/.local/lib/python3.8/site-packages/numba/core/callconv.py",
line 12, in <module>
from numba.core.base import PYOBJECT, GENERIC_POINTER
File
"/home/desktop/.local/lib/python3.8/site-packages/numba/core/base.py", line
24, in <module>
from numba.cpython import builtins
File
"/home/desktop/.local/lib/python3.8/site-packages/numba/cpython/builtins.py"
, line 524, in <module>
from numba.core.typing.builtins import IndexValue, IndexValueType
File
"/home/desktop/.local/lib/python3.8/site-packages/numba/core/typing/builtins
.py", line 22, in <module>
@infer_global(print)
File
"/home/desktop/.local/lib/python3.8/site-packages/numba/core/typing/template
s.py", line 1278, in register_global
if getattr(mod, val.__name__) is not val:
AttributeError: module 'pyflink.fn_execution.beam.beam_sdk_worker_main' has
no attribute 'print'
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at
org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
at
org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor
$ActiveBundle.close(SdkHarnessClient.java:504)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleSt
ageBundleFactory$1.close(DefaultJobBundleFactory.java:555)
at
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.
finishBundle(BeamPythonFunctionRunner.java:380)
... 7 more
Caused by: java.lang.RuntimeException: Error received from SDK harness for
instruction 1: Traceback (most recent call last):
File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/sdk_worker.py", line 289, in _execute
response = task()
File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/sdk_worker.py", line 362, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/sdk_worker.py", line 606, in do_instruction
return getattr(self, request_type)(
File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/sdk_worker.py", line 637, in process_bundle
bundle_processor = self.bundle_processor_cache.get(
File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/sdk_worker.py", line 463, in get
processor = bundle_processor.BundleProcessor(
File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 868, in __init__
self.ops = self.create_execution_tree(self.process_bundle_descriptor)
File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 921, in create_execution_tree
return collections.OrderedDict([(
File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 924, in <listcomp>
get_operation(transform_id))) for transform_id in sorted(
File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 812, in wrapper
result = cache[args] = func(*args)
File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 903, in get_operation
transform_consumers = {
File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 904, in <dictcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 904, in <listcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 812, in wrapper
result = cache[args] = func(*args)
File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 908, in get_operation
return transform_factory.create_operation(
File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 1198, in create_operation
return creator(self, transform_id, transform_proto, payload, consumers)
File
"/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/beam/
beam_operations.py", line 129, in create_data_stream_keyed_process_function
return _create_user_defined_function_operation(
File
"/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/beam/
beam_operations.py", line 200, in _create_user_defined_function_operation
return beam_operation_cls(
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 198, in
pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__
init__
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 129, in
pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 202, in
pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.ge
nerate_operation
File
"/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/datas
tream/operations.py", line 75, in __init__
extract_stateless_function(
File
"/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/datas
tream/operations.py", line 155, in extract_stateless_function
user_defined_func = pickle.loads(user_defined_function_proto.payload)
File
"/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/pickl
e.py", line 29, in loads
return cloudpickle.loads(payload)
File
"/home/desktop/.local/lib/python3.8/site-packages/xgboost/__init__.py", line
9, in <module>
from .core import DMatrix, DeviceQuantileDMatrix, Booster
File "/home/desktop/.local/lib/python3.8/site-packages/xgboost/core.py",
line 23, in <module>
from .compat import (STRING_TYPES, DataFrame, py_str, PANDAS_INSTALLED,
File "/home/desktop/.local/lib/python3.8/site-packages/xgboost/compat.py",
line 110, in <module>
import sparse
File
"/home/desktop/.local/lib/python3.8/site-packages/sparse/__init__.py", line
1, in <module>
from ._coo import COO, as_coo
File
"/home/desktop/.local/lib/python3.8/site-packages/sparse/_coo/__init__.py",
line 1, in <module>
from .core import COO, as_coo
File
"/home/desktop/.local/lib/python3.8/site-packages/sparse/_coo/core.py", line
9, in <module>
import numba
File "/home/desktop/.local/lib/python3.8/site-packages/numba/__init__.py",
line 38, in <module>
from numba.core.decorators import (cfunc, generated_jit, jit, njit,
stencil,
File
"/home/desktop/.local/lib/python3.8/site-packages/numba/core/decorators.py",
line 12, in <module>
from numba.stencils.stencil import stencil
File
"/home/desktop/.local/lib/python3.8/site-packages/numba/stencils/stencil.py"
, line 11, in <module>
from numba.core import types, typing, utils, ir, config, ir_utils,
registry
File
"/home/desktop/.local/lib/python3.8/site-packages/numba/core/registry.py",
line 4, in <module>
from numba.core import utils, typing, dispatcher, cpu
File
"/home/desktop/.local/lib/python3.8/site-packages/numba/core/dispatcher.py",
line 13, in <module>
from numba.core import (
File
"/home/desktop/.local/lib/python3.8/site-packages/numba/core/compiler.py",
line 6, in <module>
from numba.core import (utils, errors, typing, interpreter, bytecode,
postproc,
File
"/home/desktop/.local/lib/python3.8/site-packages/numba/core/callconv.py",
line 12, in <module>
from numba.core.base import PYOBJECT, GENERIC_POINTER
File
"/home/desktop/.local/lib/python3.8/site-packages/numba/core/base.py", line
24, in <module>
from numba.cpython import builtins
File
"/home/desktop/.local/lib/python3.8/site-packages/numba/cpython/builtins.py"
, line 524, in <module>
from numba.core.typing.builtins import IndexValue, IndexValueType
File
"/home/desktop/.local/lib/python3.8/site-packages/numba/core/typing/builtins
.py", line 22, in <module>
@infer_global(print)
File
"/home/desktop/.local/lib/python3.8/site-packages/numba/core/typing/template
s.py", line 1278, in register_global
if getattr(mod, val.__name__) is not val:
AttributeError: module 'pyflink.fn_execution.beam.beam_sdk_worker_main' has
no attribute 'print'
at
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStrea
mObserver.onNext(FnApiControlClient.java:180)
at
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStrea
mObserver.onNext(FnApiControlClient.java:160)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServer
CallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onM
essage(ForwardingServerCallListener.java:33)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCal
lListener.onMessage(Contexts.java:76)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerSt
reamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerSt
reamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplic
ationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.j
ava:782)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(Con
textRunnable.java:37)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run
(SerializingExecutor.java:123)
... 3 more
org.apache.flink.client.program.ProgramAbortException:
java.lang.RuntimeException: Python process exits with code: 1
at
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62
)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl
.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgr
am.java:355)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExec
ution(PackagedProgram.java:222)
at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:836)
at
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1078)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1156
)
at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(No
OpSecurityContext.java:28)
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1156)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
at
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)
... 13 more
Can anyone tell me what is the reason for this error or any suggestion .
With Regards,
Harshit Varshney
Re: Facing Issue in running Python Flink Program in flink cluster (Version=1.15.2)
Posted by Xingbo Huang <hx...@gmail.com>.
Hi Harshit,
According to the stack you provided, I guess you define your Python
function in the main file, and the Python function imports xgboost
globally. The reason for the error is that the xgboost library is difficult
to be serialized by cloudpickle. There are two ways to solve
1. Move `import xgboost` to the inside of the Python function.
2. Move the Python function to another Python file, and then add that
Python file as a dependency via `add_python_file`[1].
Best,
Xingbo
[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/dependency_management/#python-libraries
harshit.varshney@iktara.ai <Ha...@iktara.ai> 于2022年11月21日周一
20:51写道:
> Dear Team,
>
> I am facing a issue while running pyflink program in flink cluster as it
> stop running while reading the machine learning model
>
>
>
> This is the error :
>
>
>
> ./bin/flink run --python /home/desktop/ProjectFiles/test_new.py
>
>
>
> Job has been submitted with JobID 0a561cb330eeac5aa7b40ac047d3c6a3
>
> /home/desktop/.local/lib/python3.8/site-packages/sklearn/base.py:329:
> UserWarning: Trying to unpickle estimator LabelEncoder from version 1.1.1
> when using version 1.1.2. This might lead to breaking code or invalid
> results. Use at your own risk. For more info please refer to:
>
>
> https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
>
> warnings.warn(
>
> /home/desktop/.local/lib/python3.8/site-packages/xgboost/compat.py:31:
> FutureWarning: pandas.Int64Index is deprecated and will be removed from
> pandas in a future version. Use pandas.Index with the appropriate dtype
> instead.
>
> from pandas import MultiIndex, Int64Index
>
> /home/desktop/.local/lib/python3.8/site-packages/pkg_resources/__init__.py:116:
> PkgResourcesDeprecationWarning: 0.1.36ubuntu1 is an invalid version and
> will not be supported in a future release
>
> warnings.warn(
>
> /home/desktop/.local/lib/python3.8/site-packages/pkg_resources/__init__.py:116:
> PkgResourcesDeprecationWarning: 0.23ubuntu1 is an invalid version and will
> not be supported in a future release
>
> warnings.warn(
>
> Traceback (most recent call last):
>
> File "/usr/lib/python3.8/runpy.py", line 194, in _run_module_as_main
>
> return _run_code(code, main_globals, None,
>
> File "/usr/lib/python3.8/runpy.py", line 87, in _run_code
>
> exec(code, run_globals)
>
> File
> "/tmp/pyflink/bc5f6233-c53d-45a0-b8f2-21b41ea0c6ad/71b84799-5210-4a85-ae80-9efe0fb192f1/test_new.py",
> line 510, in <module>
>
> new_user_ratings(envir)
>
> File
> "/tmp/pyflink/bc5f6233-c53d-45a0-b8f2-21b41ea0c6ad/71b84799-5210-4a85-ae80-9efe0fb192f1/test_new.py",
> line 504, in new_user_ratings
>
> environment.execute('new_user_ratings')
>
> File
> "/home/desktop/Downloads/flink/opt/python/pyflink.zip/pyflink/datastream/stream_execution_environment.py",
> line 761, in execute
>
> File
> "/home/desktop/Downloads/flink/opt/python/py4j-0.10.9.3-src.zip/py4j/java_gateway.py",
> line 1321, in __call__
>
> File
> "/home/desktop/Downloads/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py",
> line 146, in deco
>
> File
> "/home/desktop/Downloads/flink/opt/python/py4j-0.10.9.3-src.zip/py4j/protocol.py",
> line 326, in get_return_value
>
> py4j.protocol.Py4JJavaError: An error occurred while calling o8.execute.
>
> : java.util.concurrent.ExecutionException:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: 0a561cb330eeac5aa7b40ac047d3c6a3)
>
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>
> at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>
> at
> org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:173)
>
> at
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:123)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>
> at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>
> at
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>
> at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>
> at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>
> at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>
> at java.lang.Thread.run(Thread.java:750)
>
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
> failed (JobID: 0a561cb330eeac5aa7b40ac047d3c6a3)
>
> at
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:130)
>
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>
> at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>
> at
> org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:403)
>
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>
> at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>
> at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$26(RestClusterClient.java:708)
>
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>
> at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>
> at
> org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:403)
>
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>
> at
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
>
> at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
>
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
> ... 1 more
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
>
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
>
> at
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:128)
>
> ... 24 more
>
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed
> by NoRestartBackoffTimeStrategy
>
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
>
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
>
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:301)
>
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:291)
>
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:282)
>
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:739)
>
> at
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78)
>
> at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:443)
>
> at sun.reflect.GeneratedMethodAccessor125.invoke(Unknown
> Source)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:304)
>
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:302)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
>
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
>
> at
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>
> at
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>
> at
> scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>
> at
> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>
> at
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>
> at akka.actor.Actor.aroundReceive(Actor.scala:537)
>
> at akka.actor.Actor.aroundReceive$(Actor.scala:535)
>
> at
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
>
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
>
> at akka.actor.ActorCell.invoke(ActorCell.scala:548)
>
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>
> at akka.dispatch.Mailbox.run(Mailbox.scala:231)
>
> at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>
> at
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>
> at
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>
> at
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>
> at
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
>
> Caused by: org.apache.flink.runtime.taskmanager.AsynchronousException:
> Caught exception while processing timer.
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1535)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1510)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1650)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$21(StreamTask.java:1639)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
>
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
>
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>
> at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>
> at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>
> at java.lang.Thread.run(Thread.java:750)
>
> Caused by: TimerException{java.lang.RuntimeException: Error while waiting
> for BeamPythonFunctionRunner flush}
>
> ... 14 more
>
> Caused by: java.lang.RuntimeException: Error while waiting for
> BeamPythonFunctionRunner flush
>
> at
> org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:106)
>
> at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:299)
>
> at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:115)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1648)
>
> ... 13 more
>
> Caused by: java.lang.RuntimeException: Failed to close remote bundle
>
> at
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:382)
>
> at
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:366)
>
> at
> org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator.lambda$invokeFinishBundle$0(AbstractExternalPythonFunctionOperator.java:85)
>
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
> ... 1 more
>
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.RuntimeException: Error received from SDK harness for instruction
> 1: Traceback (most recent call last):
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 289, in _execute
>
> response = task()
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 362, in <lambda>
>
> lambda: self.create_worker().do_instruction(request), request)
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 606, in do_instruction
>
> return getattr(self, request_type)(
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 637, in process_bundle
>
> bundle_processor = self.bundle_processor_cache.get(
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 463, in get
>
> processor = bundle_processor.BundleProcessor(
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 868, in __init__
>
> self.ops = self.create_execution_tree(self.process_bundle_descriptor)
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 921, in create_execution_tree
>
> return collections.OrderedDict([(
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 924, in <listcomp>
>
> get_operation(transform_id))) for transform_id in sorted(
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 812, in wrapper
>
> result = cache[args] = func(*args)
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 903, in get_operation
>
> transform_consumers = {
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 904, in <dictcomp>
>
> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 904, in <listcomp>
>
> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 812, in wrapper
>
> result = cache[args] = func(*args)
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 908, in get_operation
>
> return transform_factory.create_operation(
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 1198, in create_operation
>
> return creator(self, transform_id, transform_proto, payload, consumers)
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_operations.py",
> line 129, in create_data_stream_keyed_process_function
>
> return _create_user_defined_function_operation(
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_operations.py",
> line 200, in _create_user_defined_function_operation
>
> return beam_operation_cls(
>
> File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 198, in
> pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__
>
> File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 129, in
> pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
>
> File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 202, in
> pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/datastream/operations.py",
> line 75, in __init__
>
> extract_stateless_function(
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/datastream/operations.py",
> line 155, in extract_stateless_function
>
> user_defined_func = pickle.loads(user_defined_function_proto.payload)
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/pickle.py",
> line 29, in loads
>
> return cloudpickle.loads(payload)
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/xgboost/__init__.py",
> line 9, in <module>
>
> from .core import DMatrix, DeviceQuantileDMatrix, Booster
>
> File "/home/desktop/.local/lib/python3.8/site-packages/xgboost/core.py",
> line 23, in <module>
>
> from .compat import (STRING_TYPES, DataFrame, py_str, PANDAS_INSTALLED,
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/xgboost/compat.py", line
> 110, in <module>
>
> import sparse
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/sparse/__init__.py", line
> 1, in <module>
>
> from ._coo import COO, as_coo
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/sparse/_coo/__init__.py",
> line 1, in <module>
>
> from .core import COO, as_coo
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/sparse/_coo/core.py",
> line 9, in <module>
>
> import numba
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/__init__.py", line
> 38, in <module>
>
> from numba.core.decorators import (cfunc, generated_jit, jit, njit,
> stencil,
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/core/decorators.py",
> line 12, in <module>
>
> from numba.stencils.stencil import stencil
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/stencils/stencil.py",
> line 11, in <module>
>
> from numba.core import types, typing, utils, ir, config, ir_utils,
> registry
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/core/registry.py",
> line 4, in <module>
>
> from numba.core import utils, typing, dispatcher, cpu
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/core/dispatcher.py",
> line 13, in <module>
>
> from numba.core import (
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/core/compiler.py",
> line 6, in <module>
>
> from numba.core import (utils, errors, typing, interpreter, bytecode,
> postproc,
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/core/callconv.py",
> line 12, in <module>
>
> from numba.core.base import PYOBJECT, GENERIC_POINTER
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/core/base.py", line
> 24, in <module>
>
> from numba.cpython import builtins
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/cpython/builtins.py",
> line 524, in <module>
>
> from numba.core.typing.builtins import IndexValue, IndexValueType
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/core/typing/builtins.py",
> line 22, in <module>
>
> @infer_global(print)
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/core/typing/templates.py",
> line 1278, in register_global
>
> if getattr(mod, val.__name__) is not val:
>
> AttributeError: module 'pyflink.fn_execution.beam.beam_sdk_worker_main'
> has no attribute 'print'
>
>
>
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>
> at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>
> at
> org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
>
> at
> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
>
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:555)
>
> at
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:380)
>
> ... 7 more
>
> Caused by: java.lang.RuntimeException: Error received from SDK harness for
> instruction 1: Traceback (most recent call last):
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 289, in _execute
>
> response = task()
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 362, in <lambda>
>
> lambda: self.create_worker().do_instruction(request), request)
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 606, in do_instruction
>
> return getattr(self, request_type)(
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 637, in process_bundle
>
> bundle_processor = self.bundle_processor_cache.get(
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 463, in get
>
> processor = bundle_processor.BundleProcessor(
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 868, in __init__
>
> self.ops = self.create_execution_tree(self.process_bundle_descriptor)
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 921, in create_execution_tree
>
> return collections.OrderedDict([(
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 924, in <listcomp>
>
> get_operation(transform_id))) for transform_id in sorted(
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 812, in wrapper
>
> result = cache[args] = func(*args)
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 903, in get_operation
>
> transform_consumers = {
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 904, in <dictcomp>
>
> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 904, in <listcomp>
>
> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 812, in wrapper
>
> result = cache[args] = func(*args)
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 908, in get_operation
>
> return transform_factory.create_operation(
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 1198, in create_operation
>
> return creator(self, transform_id, transform_proto, payload, consumers)
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_operations.py",
> line 129, in create_data_stream_keyed_process_function
>
> return _create_user_defined_function_operation(
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_operations.py",
> line 200, in _create_user_defined_function_operation
>
> return beam_operation_cls(
>
> File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 198, in
> pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__
>
> File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 129, in
> pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
>
> File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 202, in
> pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/datastream/operations.py",
> line 75, in __init__
>
> extract_stateless_function(
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/datastream/operations.py",
> line 155, in extract_stateless_function
>
> user_defined_func = pickle.loads(user_defined_function_proto.payload)
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/pickle.py",
> line 29, in loads
>
> return cloudpickle.loads(payload)
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/xgboost/__init__.py",
> line 9, in <module>
>
> from .core import DMatrix, DeviceQuantileDMatrix, Booster
>
> File "/home/desktop/.local/lib/python3.8/site-packages/xgboost/core.py",
> line 23, in <module>
>
> from .compat import (STRING_TYPES, DataFrame, py_str, PANDAS_INSTALLED,
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/xgboost/compat.py", line
> 110, in <module>
>
> import sparse
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/sparse/__init__.py", line
> 1, in <module>
>
> from ._coo import COO, as_coo
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/sparse/_coo/__init__.py",
> line 1, in <module>
>
> from .core import COO, as_coo
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/sparse/_coo/core.py",
> line 9, in <module>
>
> import numba
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/__init__.py", line
> 38, in <module>
>
> from numba.core.decorators import (cfunc, generated_jit, jit, njit,
> stencil,
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/core/decorators.py",
> line 12, in <module>
>
> from numba.stencils.stencil import stencil
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/stencils/stencil.py",
> line 11, in <module>
>
> from numba.core import types, typing, utils, ir, config, ir_utils,
> registry
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/core/registry.py",
> line 4, in <module>
>
> from numba.core import utils, typing, dispatcher, cpu
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/core/dispatcher.py",
> line 13, in <module>
>
> from numba.core import (
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/core/compiler.py",
> line 6, in <module>
>
> from numba.core import (utils, errors, typing, interpreter, bytecode,
> postproc,
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/core/callconv.py",
> line 12, in <module>
>
> from numba.core.base import PYOBJECT, GENERIC_POINTER
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/core/base.py", line
> 24, in <module>
>
> from numba.cpython import builtins
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/cpython/builtins.py",
> line 524, in <module>
>
> from numba.core.typing.builtins import IndexValue, IndexValueType
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/core/typing/builtins.py",
> line 22, in <module>
>
> @infer_global(print)
>
> File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/core/typing/templates.py",
> line 1278, in register_global
>
> if getattr(mod, val.__name__) is not val:
>
> AttributeError: module 'pyflink.fn_execution.beam.beam_sdk_worker_main'
> has no attribute 'print'
>
>
>
> at
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:180)
>
> at
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:160)
>
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
>
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
>
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
>
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
>
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
>
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
>
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>
> ... 3 more
>
>
>
> org.apache.flink.client.program.ProgramAbortException:
> java.lang.RuntimeException: Python process exits with code: 1
>
> at
> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>
> at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:836)
>
> at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
>
> at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1078)
>
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1156)
>
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1156)
>
> Caused by: java.lang.RuntimeException: Python process exits with code: 1
>
> at
> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)
>
> ... 13 more
>
>
>
>
>
> Can anyone tell me what is the reason for this error or any suggestion .
>
>
>
> With Regards,
>
> Harshit Varshney
>
>
>