You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Robert Cullen <ci...@gmail.com> on 2021/02/22 14:01:21 UTC

Install/Run Streaming Anomaly Detection R package in Flink

My customer wants us to install this package in our Flink Cluster:

https://github.com/twitter/AnomalyDetection

One of our engineers developed a python version:

https://pypi.org/project/streaming-anomaly-detection/

Is there a way to install this in our cluster?

-- 
Robert Cullen
240-475-4490

Re: Install/Run Streaming Anomaly Detection R package in Flink

Posted by Robert Cullen <ci...@gmail.com>.
Wei,

Thank you for pointing to those examples. Here is a code sample of how it's
configured for me:

        env = StreamExecutionEnvironment.get_execution_environment()
        env.set_parallelism(1)
        env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
        env.add_python_archive("/Users/admin/pyflink/venv.zip")
        env.set_python_executable("venv.zip/venv/bin/python")
...

But when I run the virtual environment on my cluster I’m getting this error:

2021-03-29 15:42:35
org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Failed to execute the command:
venv.zip/venv/bin/python -c import pyflink;import
os;print(os.path.join(os.path.abspath(os.path.dirname(pyflink.__file__)),
'bin'))
output: venv.zip/venv/bin/python: 1: venv.zip/venv/bin/python: Syntax
error: "(" unexpected

at org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:198)
    at org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:141)
    at org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:181)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createPythonExecutionEnvironment(BeamPythonFunctionRunner.java:340)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.lambda$open$0(BeamPythonFunctionRunner.java:259)
    at org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:512)
    at org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
    at org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
    at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:522)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:262)
    at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:123)
    at org.apache.flink.streaming.api.operators.python.OneInputPythonFunctionOperator.open(OneInputPythonFunctionOperator.java:126)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

```

On Tue, Feb 23, 2021 at 10:10 PM Wei Zhong <we...@gmail.com> wrote:

> Hi Robert,
>
> If you do not want to install the library on every machine of the cluster,
> the Python dependency management API can be used to upload and use the
> required dependencies to cluster.
>
> For this case, I recommend building a portable python environment that
> contains all the required dependencies. You can call `add_python_archives`
> to upload the environment to your and call `set_python_executable` to set
> the path of the python interpreter in your cluster.
>
> For more detailed information, you can refer to the following link.
>
> Documentation of the Python dependency management API and configuration:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/dependency_management.html#python-dependency-in-python-program
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/python_config.html#python-archives
>
> How to build a portable python environment:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/faq.html#preparing-python-virtual-environment
>
> Best,
> Wei
>
> 在 2021年2月24日,01:38,Roman Khachatryan <ro...@apache.org> 写道:
>
> Hi,
>
> I'm pulling in Wei Zhong and Xingbo Huang who know PyFlink better.
>
> Regards,
> Roman
>
>
> On Mon, Feb 22, 2021 at 3:01 PM Robert Cullen <ci...@gmail.com>
> wrote:
>
>> My customer wants us to install this package in our Flink Cluster:
>>
>> https://github.com/twitter/AnomalyDetection
>>
>> One of our engineers developed a python version:
>>
>> https://pypi.org/project/streaming-anomaly-detection/
>>
>> Is there a way to install this in our cluster?
>>
>> --
>> Robert Cullen
>> 240-475-4490
>>
>
>

-- 
Robert Cullen
240-475-4490

Re: Install/Run Streaming Anomaly Detection R package in Flink

Posted by Wei Zhong <we...@gmail.com>.
Hi Robert,

If you do not want to install the library on every machine of the cluster, the Python dependency management API can be used to upload and use the required dependencies to cluster. 

For this case, I recommend building a portable python environment that contains all the required dependencies. You can call `add_python_archives` to upload the environment to your and call `set_python_executable` to set the path of the python interpreter in your cluster.

For more detailed information, you can refer to the following link.

Documentation of the Python dependency management API and configuration:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/dependency_management.html#python-dependency-in-python-program <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/dependency_management.html#python-dependency-in-python-program>
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/python_config.html#python-archives <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/python_config.html#python-archives>

How to build a portable python environment:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/faq.html#preparing-python-virtual-environment <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/faq.html#preparing-python-virtual-environment>

Best,
Wei

> 在 2021年2月24日,01:38,Roman Khachatryan <ro...@apache.org> 写道:
> 
> Hi,
> 
> I'm pulling in Wei Zhong and Xingbo Huang who know PyFlink better.
> 
> Regards,
> Roman
> 
> 
> On Mon, Feb 22, 2021 at 3:01 PM Robert Cullen <cinquaterra@gmail.com <ma...@gmail.com>> wrote:
> My customer wants us to install this package in our Flink Cluster:
> 
> https://github.com/twitter/AnomalyDetection <https://github.com/twitter/AnomalyDetection>
> 
> One of our engineers developed a python version:
> 
> https://pypi.org/project/streaming-anomaly-detection/ <https://pypi.org/project/streaming-anomaly-detection/>
> 
> Is there a way to install this in our cluster?
> 
> -- 
> Robert Cullen
> 240-475-4490


Re: Install/Run Streaming Anomaly Detection R package in Flink

Posted by Roman Khachatryan <ro...@apache.org>.
Hi,

I'm pulling in Wei Zhong and Xingbo Huang who know PyFlink better.

Regards,
Roman


On Mon, Feb 22, 2021 at 3:01 PM Robert Cullen <ci...@gmail.com> wrote:

> My customer wants us to install this package in our Flink Cluster:
>
> https://github.com/twitter/AnomalyDetection
>
> One of our engineers developed a python version:
>
> https://pypi.org/project/streaming-anomaly-detection/
>
> Is there a way to install this in our cluster?
>
> --
> Robert Cullen
> 240-475-4490
>