You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "harshit.varshney@iktara.ai" <Ha...@iktara.ai> on 2022/11/21 12:50:56 UTC

Facing Issue in running Python Flink Program in flink cluster (Version=1.15.2)

Dear Team,

I am facing a issue while running pyflink  program in flink cluster as it
stop running while reading the machine learning model

 

This is the error :

 

./bin/flink run --python /home/desktop/ProjectFiles/test_new.py

 

Job has been submitted with JobID 0a561cb330eeac5aa7b40ac047d3c6a3

/home/desktop/.local/lib/python3.8/site-packages/sklearn/base.py:329:
UserWarning: Trying to unpickle estimator LabelEncoder from version 1.1.1
when using version 1.1.2. This might lead to breaking code or invalid
results. Use at your own risk. For more info please refer to:

https://scikit-learn.org/stable/model_persistence.html#security-maintainabil
ity-limitations

  warnings.warn(

/home/desktop/.local/lib/python3.8/site-packages/xgboost/compat.py:31:
FutureWarning: pandas.Int64Index is deprecated and will be removed from
pandas in a future version. Use pandas.Index with the appropriate dtype
instead.

  from pandas import MultiIndex, Int64Index

/home/desktop/.local/lib/python3.8/site-packages/pkg_resources/__init__.py:1
16: PkgResourcesDeprecationWarning: 0.1.36ubuntu1 is an invalid version and
will not be supported in a future release

  warnings.warn(

/home/desktop/.local/lib/python3.8/site-packages/pkg_resources/__init__.py:1
16: PkgResourcesDeprecationWarning: 0.23ubuntu1 is an invalid version and
will not be supported in a future release

  warnings.warn(

Traceback (most recent call last):

  File "/usr/lib/python3.8/runpy.py", line 194, in _run_module_as_main

    return _run_code(code, main_globals, None,

  File "/usr/lib/python3.8/runpy.py", line 87, in _run_code

    exec(code, run_globals)

  File
"/tmp/pyflink/bc5f6233-c53d-45a0-b8f2-21b41ea0c6ad/71b84799-5210-4a85-ae80-9
efe0fb192f1/test_new.py", line 510, in <module>

    new_user_ratings(envir)

  File
"/tmp/pyflink/bc5f6233-c53d-45a0-b8f2-21b41ea0c6ad/71b84799-5210-4a85-ae80-9
efe0fb192f1/test_new.py", line 504, in new_user_ratings

    environment.execute('new_user_ratings')

  File
"/home/desktop/Downloads/flink/opt/python/pyflink.zip/pyflink/datastream/str
eam_execution_environment.py", line 761, in execute

  File
"/home/desktop/Downloads/flink/opt/python/py4j-0.10.9.3-src.zip/py4j/java_ga
teway.py", line 1321, in __call__

  File
"/home/desktop/Downloads/flink/opt/python/pyflink.zip/pyflink/util/exception
s.py", line 146, in deco

  File
"/home/desktop/Downloads/flink/opt/python/py4j-0.10.9.3-src.zip/py4j/protoco
l.py", line 326, in get_return_value

py4j.protocol.Py4JJavaError: An error occurred while calling o8.execute.

: java.util.concurrent.ExecutionException:
org.apache.flink.client.program.ProgramInvocationException: Job failed
(JobID: 0a561cb330eeac5aa7b40ac047d3c6a3)

                at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)

                at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)

                at
org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResu
lt(StreamContextEnvironment.java:173)

                at
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamConte
xtEnvironment.java:123)

                at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)

                at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62
)

                at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl
.java:43)

                at java.lang.reflect.Method.invoke(Method.java:498)

                at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(Meth
odInvoker.java:244)

                at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(R
eflectionEngine.java:357)

                at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)

                at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMetho
d(AbstractCommand.java:132)

                at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCom
mand.java:79)

                at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnect
ion.java:238)

                at java.lang.Thread.run(Thread.java:750)

Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
failed (JobID: 0a561cb330eeac5aa7b40ac047d3c6a3)

                at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null
$6(ClusterClientJobClientAdapter.java:130)

                at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)

                at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.ja
va:591)

                at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:4
88)

                at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)

                at
org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$
9(FutureUtils.java:403)

                at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.jav
a:774)

                at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFu
ture.java:750)

                at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:4
88)

                at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)

                at
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAs
ync$26(RestClusterClient.java:708)

                at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.jav
a:774)

                at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFu
ture.java:750)

                at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:4
88)

                at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)

                at
org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$
9(FutureUtils.java:403)

                at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.jav
a:774)

                at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFu
ture.java:750)

                at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:4
88)

                at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)

                at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.
java:943)

                at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java
:456)

                at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11
49)

                at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6
24)

                ... 1 more

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.

                at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.
java:144)

                at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null
$6(ClusterClientJobClientAdapter.java:128)

                ... 24 more

Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy

                at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandl
er.handleFailure(ExecutionFailureHandler.java:138)

                at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandl
er.getFailureHandlingResult(ExecutionFailureHandler.java:82)

                at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(Defaul
tScheduler.java:301)

                at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(D
efaultScheduler.java:291)

                at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionState
Internal(DefaultScheduler.java:282)

                at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(Sc
hedulerBase.java:739)

                at
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(Sche
dulerNG.java:78)

                at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMas
ter.java:443)

                at sun.reflect.GeneratedMethodAccessor125.invoke(Unknown
Source)

                at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl
.java:43)

                at java.lang.reflect.Method.invoke(Method.java:498)

                at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(
AkkaRpcActor.java:304)

                at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextCla
ssLoader(ClassLoadingUtils.java:83)

                at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcAc
tor.java:302)

                at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor
.java:217)

                at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(Fenced
AkkaRpcActor.java:78)

                at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.ja
va:163)

                at
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)

                at
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)

                at
scala.PartialFunction.applyOrElse(PartialFunction.scala:123)

                at
scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)

                at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)

                at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

                at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)

                at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)

                at akka.actor.Actor.aroundReceive(Actor.scala:537)

                at akka.actor.Actor.aroundReceive$(Actor.scala:535)

                at
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)

                at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)

                at akka.actor.ActorCell.invoke(ActorCell.scala:548)

                at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)

                at akka.dispatch.Mailbox.run(Mailbox.scala:231)

                at akka.dispatch.Mailbox.exec(Mailbox.scala:243)

                at
java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)

                at
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)

                at
java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)

                at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)

Caused by: org.apache.flink.runtime.taskmanager.AsynchronousException:
Caught exception while processing timer.

                at
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncException
Handler.handleAsyncException(StreamTask.java:1535)

                at
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(Str
eamTask.java:1510)

                at
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCall
back(StreamTask.java:1650)

                at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$21(StreamTas
k.java:1639)

                at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$Synchroniz
edStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)

                at
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)

                at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMai
lsWhenDefaultActionUnavailable(MailboxProcessor.java:338)

                at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMai
l(MailboxProcessor.java:324)

                at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailbox
Loop(MailboxProcessor.java:201)

                at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTas
k.java:804)

                at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:7
53)

                at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.j
ava:948)

                at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)

                at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)

                at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)

                at java.lang.Thread.run(Thread.java:750)

Caused by: TimerException{java.lang.RuntimeException: Error while waiting
for BeamPythonFunctionRunner flush}

                ... 14 more

Caused by: java.lang.RuntimeException: Error while waiting for
BeamPythonFunctionRunner flush

                at
org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFuncti
onOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:10
6)

                at
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperat
or.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:299)

                at
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperat
or.lambda$open$0(AbstractPythonFunctionOperator.java:115)

                at
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCall
back(StreamTask.java:1648)

                ... 13 more

Caused by: java.lang.RuntimeException: Failed to close remote bundle

                at
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.
finishBundle(BeamPythonFunctionRunner.java:382)

                at
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.
flush(BeamPythonFunctionRunner.java:366)

                at
org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFuncti
onOperator.lambda$invokeFinishBundle$0(AbstractExternalPythonFunctionOperato
r.java:85)

                at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

                at java.util.concurrent.FutureTask.run(FutureTask.java:266)

                at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11
49)

                at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6
24)

                ... 1 more

Caused by: java.util.concurrent.ExecutionException:
java.lang.RuntimeException: Error received from SDK harness for instruction
1: Traceback (most recent call last):

  File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/sdk_worker.py", line 289, in _execute

    response = task()

  File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/sdk_worker.py", line 362, in <lambda>

    lambda: self.create_worker().do_instruction(request), request)

  File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/sdk_worker.py", line 606, in do_instruction

    return getattr(self, request_type)(

  File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/sdk_worker.py", line 637, in process_bundle

    bundle_processor = self.bundle_processor_cache.get(

  File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/sdk_worker.py", line 463, in get

    processor = bundle_processor.BundleProcessor(

  File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 868, in __init__

    self.ops = self.create_execution_tree(self.process_bundle_descriptor)

  File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 921, in create_execution_tree

    return collections.OrderedDict([(

  File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 924, in <listcomp>

    get_operation(transform_id))) for transform_id in sorted(

  File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 812, in wrapper

    result = cache[args] = func(*args)

  File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 903, in get_operation

    transform_consumers = {

  File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 904, in <dictcomp>

    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]

  File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 904, in <listcomp>

    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]

  File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 812, in wrapper

    result = cache[args] = func(*args)

  File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 908, in get_operation

    return transform_factory.create_operation(

  File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 1198, in create_operation

    return creator(self, transform_id, transform_proto, payload, consumers)

  File
"/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/beam/
beam_operations.py", line 129, in create_data_stream_keyed_process_function

    return _create_user_defined_function_operation(

  File
"/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/beam/
beam_operations.py", line 200, in _create_user_defined_function_operation

    return beam_operation_cls(

  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 198, in
pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__
init__

  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 129, in
pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__

  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 202, in
pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.ge
nerate_operation

  File
"/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/datas
tream/operations.py", line 75, in __init__

    extract_stateless_function(

  File
"/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/datas
tream/operations.py", line 155, in extract_stateless_function

    user_defined_func = pickle.loads(user_defined_function_proto.payload)

  File
"/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/pickl
e.py", line 29, in loads

    return cloudpickle.loads(payload)

  File
"/home/desktop/.local/lib/python3.8/site-packages/xgboost/__init__.py", line
9, in <module>

    from .core import DMatrix, DeviceQuantileDMatrix, Booster

  File "/home/desktop/.local/lib/python3.8/site-packages/xgboost/core.py",
line 23, in <module>

    from .compat import (STRING_TYPES, DataFrame, py_str, PANDAS_INSTALLED,

  File "/home/desktop/.local/lib/python3.8/site-packages/xgboost/compat.py",
line 110, in <module>

    import sparse

  File
"/home/desktop/.local/lib/python3.8/site-packages/sparse/__init__.py", line
1, in <module>

    from ._coo import COO, as_coo

  File
"/home/desktop/.local/lib/python3.8/site-packages/sparse/_coo/__init__.py",
line 1, in <module>

    from .core import COO, as_coo

  File
"/home/desktop/.local/lib/python3.8/site-packages/sparse/_coo/core.py", line
9, in <module>

    import numba

  File "/home/desktop/.local/lib/python3.8/site-packages/numba/__init__.py",
line 38, in <module>

    from numba.core.decorators import (cfunc, generated_jit, jit, njit,
stencil,

  File
"/home/desktop/.local/lib/python3.8/site-packages/numba/core/decorators.py",
line 12, in <module>

    from numba.stencils.stencil import stencil

  File
"/home/desktop/.local/lib/python3.8/site-packages/numba/stencils/stencil.py"
, line 11, in <module>

    from numba.core import types, typing, utils, ir, config, ir_utils,
registry

  File
"/home/desktop/.local/lib/python3.8/site-packages/numba/core/registry.py",
line 4, in <module>

    from numba.core import utils, typing, dispatcher, cpu

  File
"/home/desktop/.local/lib/python3.8/site-packages/numba/core/dispatcher.py",
line 13, in <module>

    from numba.core import (

  File
"/home/desktop/.local/lib/python3.8/site-packages/numba/core/compiler.py",
line 6, in <module>

    from numba.core import (utils, errors, typing, interpreter, bytecode,
postproc,

  File
"/home/desktop/.local/lib/python3.8/site-packages/numba/core/callconv.py",
line 12, in <module>

    from numba.core.base import PYOBJECT, GENERIC_POINTER

  File
"/home/desktop/.local/lib/python3.8/site-packages/numba/core/base.py", line
24, in <module>

    from numba.cpython import builtins

  File
"/home/desktop/.local/lib/python3.8/site-packages/numba/cpython/builtins.py"
, line 524, in <module>

    from numba.core.typing.builtins import IndexValue, IndexValueType

  File
"/home/desktop/.local/lib/python3.8/site-packages/numba/core/typing/builtins
.py", line 22, in <module>

    @infer_global(print)

  File
"/home/desktop/.local/lib/python3.8/site-packages/numba/core/typing/template
s.py", line 1278, in register_global

    if getattr(mod, val.__name__) is not val:

AttributeError: module 'pyflink.fn_execution.beam.beam_sdk_worker_main' has
no attribute 'print'

 

                at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)

                at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)

                at
org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)

                at
org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor
$ActiveBundle.close(SdkHarnessClient.java:504)

                at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleSt
ageBundleFactory$1.close(DefaultJobBundleFactory.java:555)

                at
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.
finishBundle(BeamPythonFunctionRunner.java:380)

                ... 7 more

Caused by: java.lang.RuntimeException: Error received from SDK harness for
instruction 1: Traceback (most recent call last):

  File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/sdk_worker.py", line 289, in _execute

    response = task()

  File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/sdk_worker.py", line 362, in <lambda>

    lambda: self.create_worker().do_instruction(request), request)

  File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/sdk_worker.py", line 606, in do_instruction

    return getattr(self, request_type)(

  File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/sdk_worker.py", line 637, in process_bundle

    bundle_processor = self.bundle_processor_cache.get(

  File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/sdk_worker.py", line 463, in get

    processor = bundle_processor.BundleProcessor(

  File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 868, in __init__

    self.ops = self.create_execution_tree(self.process_bundle_descriptor)

  File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 921, in create_execution_tree

    return collections.OrderedDict([(

  File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 924, in <listcomp>

    get_operation(transform_id))) for transform_id in sorted(

  File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 812, in wrapper

    result = cache[args] = func(*args)

  File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 903, in get_operation

    transform_consumers = {

  File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 904, in <dictcomp>

    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]

  File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 904, in <listcomp>

    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]

  File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 812, in wrapper

    result = cache[args] = func(*args)

  File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 908, in get_operation

    return transform_factory.create_operation(

  File
"/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker
/bundle_processor.py", line 1198, in create_operation

    return creator(self, transform_id, transform_proto, payload, consumers)

  File
"/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/beam/
beam_operations.py", line 129, in create_data_stream_keyed_process_function

    return _create_user_defined_function_operation(

  File
"/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/beam/
beam_operations.py", line 200, in _create_user_defined_function_operation

    return beam_operation_cls(

  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 198, in
pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__
init__

  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 129, in
pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__

  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 202, in
pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.ge
nerate_operation

  File
"/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/datas
tream/operations.py", line 75, in __init__

    extract_stateless_function(

  File
"/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/datas
tream/operations.py", line 155, in extract_stateless_function

    user_defined_func = pickle.loads(user_defined_function_proto.payload)

  File
"/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/pickl
e.py", line 29, in loads

    return cloudpickle.loads(payload)

  File
"/home/desktop/.local/lib/python3.8/site-packages/xgboost/__init__.py", line
9, in <module>

    from .core import DMatrix, DeviceQuantileDMatrix, Booster

  File "/home/desktop/.local/lib/python3.8/site-packages/xgboost/core.py",
line 23, in <module>

    from .compat import (STRING_TYPES, DataFrame, py_str, PANDAS_INSTALLED,

  File "/home/desktop/.local/lib/python3.8/site-packages/xgboost/compat.py",
line 110, in <module>

    import sparse

  File
"/home/desktop/.local/lib/python3.8/site-packages/sparse/__init__.py", line
1, in <module>

    from ._coo import COO, as_coo

  File
"/home/desktop/.local/lib/python3.8/site-packages/sparse/_coo/__init__.py",
line 1, in <module>

    from .core import COO, as_coo

  File
"/home/desktop/.local/lib/python3.8/site-packages/sparse/_coo/core.py", line
9, in <module>

    import numba

  File "/home/desktop/.local/lib/python3.8/site-packages/numba/__init__.py",
line 38, in <module>

    from numba.core.decorators import (cfunc, generated_jit, jit, njit,
stencil,

  File
"/home/desktop/.local/lib/python3.8/site-packages/numba/core/decorators.py",
line 12, in <module>

    from numba.stencils.stencil import stencil

  File
"/home/desktop/.local/lib/python3.8/site-packages/numba/stencils/stencil.py"
, line 11, in <module>

    from numba.core import types, typing, utils, ir, config, ir_utils,
registry

  File
"/home/desktop/.local/lib/python3.8/site-packages/numba/core/registry.py",
line 4, in <module>

    from numba.core import utils, typing, dispatcher, cpu

  File
"/home/desktop/.local/lib/python3.8/site-packages/numba/core/dispatcher.py",
line 13, in <module>

    from numba.core import (

  File
"/home/desktop/.local/lib/python3.8/site-packages/numba/core/compiler.py",
line 6, in <module>

    from numba.core import (utils, errors, typing, interpreter, bytecode,
postproc,

  File
"/home/desktop/.local/lib/python3.8/site-packages/numba/core/callconv.py",
line 12, in <module>

    from numba.core.base import PYOBJECT, GENERIC_POINTER

  File
"/home/desktop/.local/lib/python3.8/site-packages/numba/core/base.py", line
24, in <module>

    from numba.cpython import builtins

  File
"/home/desktop/.local/lib/python3.8/site-packages/numba/cpython/builtins.py"
, line 524, in <module>

    from numba.core.typing.builtins import IndexValue, IndexValueType

  File
"/home/desktop/.local/lib/python3.8/site-packages/numba/core/typing/builtins
.py", line 22, in <module>

    @infer_global(print)

  File
"/home/desktop/.local/lib/python3.8/site-packages/numba/core/typing/template
s.py", line 1278, in register_global

    if getattr(mod, val.__name__) is not val:

AttributeError: module 'pyflink.fn_execution.beam.beam_sdk_worker_main' has
no attribute 'print'

 

                at
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStrea
mObserver.onNext(FnApiControlClient.java:180)

                at
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStrea
mObserver.onNext(FnApiControlClient.java:160)

                at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServer
CallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)

                at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onM
essage(ForwardingServerCallListener.java:33)

                at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCal
lListener.onMessage(Contexts.java:76)

                at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerSt
reamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)

                at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerSt
reamListenerImpl.messagesAvailable(ServerCallImpl.java:292)

                at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplic
ationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.j
ava:782)

                at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(Con
textRunnable.java:37)

                at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run
(SerializingExecutor.java:123)

                ... 3 more

 

org.apache.flink.client.program.ProgramAbortException:
java.lang.RuntimeException: Python process exits with code: 1

                at
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140)

                at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)

                at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62
)

                at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl
.java:43)

                at java.lang.reflect.Method.invoke(Method.java:498)

                at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgr
am.java:355)

                at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExec
ution(PackagedProgram.java:222)

                at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

                at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:836)

                at
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)

                at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1078)

                at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1156
)

                at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(No
OpSecurityContext.java:28)

                at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1156)

Caused by: java.lang.RuntimeException: Python process exits with code: 1

                at
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)

                ... 13 more

 

 

Can anyone tell me what is the reason for this error or any suggestion .

 

With Regards,

Harshit Varshney

 


Re: Facing Issue in running Python Flink Program in flink cluster (Version=1.15.2)

Posted by Xingbo Huang <hx...@gmail.com>.
Hi Harshit,

According to the stack you provided, I guess you define your Python
function in the main file, and the Python function imports xgboost
globally. The reason for the error is that the xgboost library is difficult
to be serialized by cloudpickle. There are two ways to solve

1. Move `import xgboost` to the inside of the Python function.

2. Move the Python function to another Python file, and then add that
Python file as a dependency via `add_python_file`[1].

Best,
Xingbo

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/dependency_management/#python-libraries

harshit.varshney@iktara.ai <Ha...@iktara.ai> 于2022年11月21日周一
20:51写道:

> Dear Team,
>
> I am facing a issue while running pyflink  program in flink cluster as it
> stop running while reading the machine learning model
>
>
>
> This is the error :
>
>
>
> ./bin/flink run --python /home/desktop/ProjectFiles/test_new.py
>
>
>
> Job has been submitted with JobID 0a561cb330eeac5aa7b40ac047d3c6a3
>
> /home/desktop/.local/lib/python3.8/site-packages/sklearn/base.py:329:
> UserWarning: Trying to unpickle estimator LabelEncoder from version 1.1.1
> when using version 1.1.2. This might lead to breaking code or invalid
> results. Use at your own risk. For more info please refer to:
>
>
> https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
>
>   warnings.warn(
>
> /home/desktop/.local/lib/python3.8/site-packages/xgboost/compat.py:31:
> FutureWarning: pandas.Int64Index is deprecated and will be removed from
> pandas in a future version. Use pandas.Index with the appropriate dtype
> instead.
>
>   from pandas import MultiIndex, Int64Index
>
> /home/desktop/.local/lib/python3.8/site-packages/pkg_resources/__init__.py:116:
> PkgResourcesDeprecationWarning: 0.1.36ubuntu1 is an invalid version and
> will not be supported in a future release
>
>   warnings.warn(
>
> /home/desktop/.local/lib/python3.8/site-packages/pkg_resources/__init__.py:116:
> PkgResourcesDeprecationWarning: 0.23ubuntu1 is an invalid version and will
> not be supported in a future release
>
>   warnings.warn(
>
> Traceback (most recent call last):
>
>   File "/usr/lib/python3.8/runpy.py", line 194, in _run_module_as_main
>
>     return _run_code(code, main_globals, None,
>
>   File "/usr/lib/python3.8/runpy.py", line 87, in _run_code
>
>     exec(code, run_globals)
>
>   File
> "/tmp/pyflink/bc5f6233-c53d-45a0-b8f2-21b41ea0c6ad/71b84799-5210-4a85-ae80-9efe0fb192f1/test_new.py",
> line 510, in <module>
>
>     new_user_ratings(envir)
>
>   File
> "/tmp/pyflink/bc5f6233-c53d-45a0-b8f2-21b41ea0c6ad/71b84799-5210-4a85-ae80-9efe0fb192f1/test_new.py",
> line 504, in new_user_ratings
>
>     environment.execute('new_user_ratings')
>
>   File
> "/home/desktop/Downloads/flink/opt/python/pyflink.zip/pyflink/datastream/stream_execution_environment.py",
> line 761, in execute
>
>   File
> "/home/desktop/Downloads/flink/opt/python/py4j-0.10.9.3-src.zip/py4j/java_gateway.py",
> line 1321, in __call__
>
>   File
> "/home/desktop/Downloads/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py",
> line 146, in deco
>
>   File
> "/home/desktop/Downloads/flink/opt/python/py4j-0.10.9.3-src.zip/py4j/protocol.py",
> line 326, in get_return_value
>
> py4j.protocol.Py4JJavaError: An error occurred while calling o8.execute.
>
> : java.util.concurrent.ExecutionException:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: 0a561cb330eeac5aa7b40ac047d3c6a3)
>
>                 at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>
>                 at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>
>                 at
> org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:173)
>
>                 at
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:123)
>
>                 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>
>                 at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>                 at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>                 at java.lang.reflect.Method.invoke(Method.java:498)
>
>                 at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>
>                 at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>
>                 at
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>
>                 at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>
>                 at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>
>                 at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>
>                 at java.lang.Thread.run(Thread.java:750)
>
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
> failed (JobID: 0a561cb330eeac5aa7b40ac047d3c6a3)
>
>                 at
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:130)
>
>                 at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>
>                 at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>
>                 at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>
>                 at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>
>                 at
> org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:403)
>
>                 at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>
>                 at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>
>                 at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>
>                 at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>
>                 at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$26(RestClusterClient.java:708)
>
>                 at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>
>                 at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>
>                 at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>
>                 at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>
>                 at
> org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:403)
>
>                 at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>
>                 at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>
>                 at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>
>                 at
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
>
>                 at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
>
>                 at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>
>                 at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>                 at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>                 ... 1 more
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
>
>                 at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
>
>                 at
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:128)
>
>                 ... 24 more
>
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed
> by NoRestartBackoffTimeStrategy
>
>                 at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
>
>                 at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
>
>                 at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:301)
>
>                 at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:291)
>
>                 at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:282)
>
>                 at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:739)
>
>                 at
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78)
>
>                 at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:443)
>
>                 at sun.reflect.GeneratedMethodAccessor125.invoke(Unknown
> Source)
>
>                 at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>                 at java.lang.reflect.Method.invoke(Method.java:498)
>
>                 at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:304)
>
>                 at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
>
>                 at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:302)
>
>                 at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
>
>                 at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
>
>                 at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
>
>                 at
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>
>                 at
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>
>                 at
> scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>
>                 at
> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>
>                 at
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>
>                 at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>
>                 at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>
>                 at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>
>                 at akka.actor.Actor.aroundReceive(Actor.scala:537)
>
>                 at akka.actor.Actor.aroundReceive$(Actor.scala:535)
>
>                 at
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
>
>                 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
>
>                 at akka.actor.ActorCell.invoke(ActorCell.scala:548)
>
>                 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>
>                 at akka.dispatch.Mailbox.run(Mailbox.scala:231)
>
>                 at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>
>                 at
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>
>                 at
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>
>                 at
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>
>                 at
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
>
> Caused by: org.apache.flink.runtime.taskmanager.AsynchronousException:
> Caught exception while processing timer.
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1535)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1510)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1650)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$21(StreamTask.java:1639)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>
>                 at java.lang.Thread.run(Thread.java:750)
>
> Caused by: TimerException{java.lang.RuntimeException: Error while waiting
> for BeamPythonFunctionRunner flush}
>
>                 ... 14 more
>
> Caused by: java.lang.RuntimeException: Error while waiting for
> BeamPythonFunctionRunner flush
>
>                 at
> org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:106)
>
>                 at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:299)
>
>                 at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:115)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1648)
>
>                 ... 13 more
>
> Caused by: java.lang.RuntimeException: Failed to close remote bundle
>
>                 at
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:382)
>
>                 at
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:366)
>
>                 at
> org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator.lambda$invokeFinishBundle$0(AbstractExternalPythonFunctionOperator.java:85)
>
>                 at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
>                 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
>                 at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>                 at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>                 ... 1 more
>
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.RuntimeException: Error received from SDK harness for instruction
> 1: Traceback (most recent call last):
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 289, in _execute
>
>     response = task()
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 362, in <lambda>
>
>     lambda: self.create_worker().do_instruction(request), request)
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 606, in do_instruction
>
>     return getattr(self, request_type)(
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 637, in process_bundle
>
>     bundle_processor = self.bundle_processor_cache.get(
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 463, in get
>
>     processor = bundle_processor.BundleProcessor(
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 868, in __init__
>
>     self.ops = self.create_execution_tree(self.process_bundle_descriptor)
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 921, in create_execution_tree
>
>     return collections.OrderedDict([(
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 924, in <listcomp>
>
>     get_operation(transform_id))) for transform_id in sorted(
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 812, in wrapper
>
>     result = cache[args] = func(*args)
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 903, in get_operation
>
>     transform_consumers = {
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 904, in <dictcomp>
>
>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 904, in <listcomp>
>
>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 812, in wrapper
>
>     result = cache[args] = func(*args)
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 908, in get_operation
>
>     return transform_factory.create_operation(
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 1198, in create_operation
>
>     return creator(self, transform_id, transform_proto, payload, consumers)
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_operations.py",
> line 129, in create_data_stream_keyed_process_function
>
>     return _create_user_defined_function_operation(
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_operations.py",
> line 200, in _create_user_defined_function_operation
>
>     return beam_operation_cls(
>
>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 198, in
> pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__
>
>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 129, in
> pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
>
>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 202, in
> pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/datastream/operations.py",
> line 75, in __init__
>
>     extract_stateless_function(
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/datastream/operations.py",
> line 155, in extract_stateless_function
>
>     user_defined_func = pickle.loads(user_defined_function_proto.payload)
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/pickle.py",
> line 29, in loads
>
>     return cloudpickle.loads(payload)
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/xgboost/__init__.py",
> line 9, in <module>
>
>     from .core import DMatrix, DeviceQuantileDMatrix, Booster
>
>   File "/home/desktop/.local/lib/python3.8/site-packages/xgboost/core.py",
> line 23, in <module>
>
>     from .compat import (STRING_TYPES, DataFrame, py_str, PANDAS_INSTALLED,
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/xgboost/compat.py", line
> 110, in <module>
>
>     import sparse
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/sparse/__init__.py", line
> 1, in <module>
>
>     from ._coo import COO, as_coo
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/sparse/_coo/__init__.py",
> line 1, in <module>
>
>     from .core import COO, as_coo
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/sparse/_coo/core.py",
> line 9, in <module>
>
>     import numba
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/__init__.py", line
> 38, in <module>
>
>     from numba.core.decorators import (cfunc, generated_jit, jit, njit,
> stencil,
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/core/decorators.py",
> line 12, in <module>
>
>     from numba.stencils.stencil import stencil
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/stencils/stencil.py",
> line 11, in <module>
>
>     from numba.core import types, typing, utils, ir, config, ir_utils,
> registry
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/core/registry.py",
> line 4, in <module>
>
>     from numba.core import utils, typing, dispatcher, cpu
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/core/dispatcher.py",
> line 13, in <module>
>
>     from numba.core import (
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/core/compiler.py",
> line 6, in <module>
>
>     from numba.core import (utils, errors, typing, interpreter, bytecode,
> postproc,
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/core/callconv.py",
> line 12, in <module>
>
>     from numba.core.base import PYOBJECT, GENERIC_POINTER
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/core/base.py", line
> 24, in <module>
>
>     from numba.cpython import builtins
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/cpython/builtins.py",
> line 524, in <module>
>
>     from numba.core.typing.builtins import IndexValue, IndexValueType
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/core/typing/builtins.py",
> line 22, in <module>
>
>     @infer_global(print)
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/core/typing/templates.py",
> line 1278, in register_global
>
>     if getattr(mod, val.__name__) is not val:
>
> AttributeError: module 'pyflink.fn_execution.beam.beam_sdk_worker_main'
> has no attribute 'print'
>
>
>
>                 at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>
>                 at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>
>                 at
> org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
>
>                 at
> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
>
>                 at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:555)
>
>                 at
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:380)
>
>                 ... 7 more
>
> Caused by: java.lang.RuntimeException: Error received from SDK harness for
> instruction 1: Traceback (most recent call last):
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 289, in _execute
>
>     response = task()
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 362, in <lambda>
>
>     lambda: self.create_worker().do_instruction(request), request)
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 606, in do_instruction
>
>     return getattr(self, request_type)(
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 637, in process_bundle
>
>     bundle_processor = self.bundle_processor_cache.get(
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 463, in get
>
>     processor = bundle_processor.BundleProcessor(
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 868, in __init__
>
>     self.ops = self.create_execution_tree(self.process_bundle_descriptor)
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 921, in create_execution_tree
>
>     return collections.OrderedDict([(
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 924, in <listcomp>
>
>     get_operation(transform_id))) for transform_id in sorted(
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 812, in wrapper
>
>     result = cache[args] = func(*args)
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 903, in get_operation
>
>     transform_consumers = {
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 904, in <dictcomp>
>
>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 904, in <listcomp>
>
>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 812, in wrapper
>
>     result = cache[args] = func(*args)
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 908, in get_operation
>
>     return transform_factory.create_operation(
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 1198, in create_operation
>
>     return creator(self, transform_id, transform_proto, payload, consumers)
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_operations.py",
> line 129, in create_data_stream_keyed_process_function
>
>     return _create_user_defined_function_operation(
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_operations.py",
> line 200, in _create_user_defined_function_operation
>
>     return beam_operation_cls(
>
>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 198, in
> pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__
>
>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 129, in
> pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
>
>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 202, in
> pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/datastream/operations.py",
> line 75, in __init__
>
>     extract_stateless_function(
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/datastream/operations.py",
> line 155, in extract_stateless_function
>
>     user_defined_func = pickle.loads(user_defined_function_proto.payload)
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/pickle.py",
> line 29, in loads
>
>     return cloudpickle.loads(payload)
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/xgboost/__init__.py",
> line 9, in <module>
>
>     from .core import DMatrix, DeviceQuantileDMatrix, Booster
>
>   File "/home/desktop/.local/lib/python3.8/site-packages/xgboost/core.py",
> line 23, in <module>
>
>     from .compat import (STRING_TYPES, DataFrame, py_str, PANDAS_INSTALLED,
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/xgboost/compat.py", line
> 110, in <module>
>
>     import sparse
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/sparse/__init__.py", line
> 1, in <module>
>
>     from ._coo import COO, as_coo
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/sparse/_coo/__init__.py",
> line 1, in <module>
>
>     from .core import COO, as_coo
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/sparse/_coo/core.py",
> line 9, in <module>
>
>     import numba
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/__init__.py", line
> 38, in <module>
>
>     from numba.core.decorators import (cfunc, generated_jit, jit, njit,
> stencil,
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/core/decorators.py",
> line 12, in <module>
>
>     from numba.stencils.stencil import stencil
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/stencils/stencil.py",
> line 11, in <module>
>
>     from numba.core import types, typing, utils, ir, config, ir_utils,
> registry
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/core/registry.py",
> line 4, in <module>
>
>     from numba.core import utils, typing, dispatcher, cpu
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/core/dispatcher.py",
> line 13, in <module>
>
>     from numba.core import (
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/core/compiler.py",
> line 6, in <module>
>
>     from numba.core import (utils, errors, typing, interpreter, bytecode,
> postproc,
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/core/callconv.py",
> line 12, in <module>
>
>     from numba.core.base import PYOBJECT, GENERIC_POINTER
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/core/base.py", line
> 24, in <module>
>
>     from numba.cpython import builtins
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/cpython/builtins.py",
> line 524, in <module>
>
>     from numba.core.typing.builtins import IndexValue, IndexValueType
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/core/typing/builtins.py",
> line 22, in <module>
>
>     @infer_global(print)
>
>   File
> "/home/desktop/.local/lib/python3.8/site-packages/numba/core/typing/templates.py",
> line 1278, in register_global
>
>     if getattr(mod, val.__name__) is not val:
>
> AttributeError: module 'pyflink.fn_execution.beam.beam_sdk_worker_main'
> has no attribute 'print'
>
>
>
>                 at
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:180)
>
>                 at
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:160)
>
>                 at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
>
>                 at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
>
>                 at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
>
>                 at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
>
>                 at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
>
>                 at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
>
>                 at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>
>                 at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>
>                 ... 3 more
>
>
>
> org.apache.flink.client.program.ProgramAbortException:
> java.lang.RuntimeException: Python process exits with code: 1
>
>                 at
> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140)
>
>                 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>
>                 at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>                 at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>                 at java.lang.reflect.Method.invoke(Method.java:498)
>
>                 at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>
>                 at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>
>                 at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>
>                 at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:836)
>
>                 at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
>
>                 at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1078)
>
>                 at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1156)
>
>                 at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>
>                 at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1156)
>
> Caused by: java.lang.RuntimeException: Python process exits with code: 1
>
>                 at
> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)
>
>                 ... 13 more
>
>
>
>
>
> Can anyone tell me what is the reason for this error or any suggestion .
>
>
>
> With Regards,
>
> Harshit Varshney
>
>
>