You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Chiara Troiani <t....@gmail.com> on 2021/10/18 09:43:19 UTC

[Question] Beam+Python+Flink

Hi,


I am trying to follow these tutorials

http://beam.apache.org/documentation/runners/flink/

For the Portable (Python)


I am not able to execute a Beam pipeline on a Flink cluster.

I am running a Flink Session Cluster with docker-compose,


This is my docker-compose file:


——————

version: "2.2"

services:

  jobmanager:

    image: flink:1.13.2-scala_2.11

    ports:

      - "8081:8081"

    command: jobmanager

    environment:

      - |

        FLINK_PROPERTIES=

        jobmanager.rpc.address: jobmanager


  taskmanager:

    image: flink:1.13.2-scala_2.11

    depends_on:

      - jobmanager

    command: taskmanager

    scale: 1

    environment:

      - |

        FLINK_PROPERTIES=

        jobmanager.rpc.address: jobmanager

        taskmanager.numberOfTaskSlots: 2


—————


I run the examples from a virtual environment, python3.8,
apache-beam==2.32.0

macOS Catalina 10.15.7

Docker desktop 4.1.1


When I run:

python -m apache_beam.examples.wordcount --input=text.txt --output=out.txt
--runner=FlinkRunner --flink_master=localhost:8081
--environment_type=LOOPBACK


I get this error:

*org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy*

* at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)*

* at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)*

* at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)*

* at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)*

* at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)*

* at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)*

* at
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)*

* at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)*

* at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)*

* at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)*

* at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*

* at java.lang.reflect.Method.invoke(Method.java:498)*

* at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)*

* at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)*

* at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)*

* at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)*

* at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)*

* at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)*

* at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)*

* at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)*

* at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)*

* at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)*

* at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)*

* at akka.actor.Actor$class.aroundReceive(Actor.scala:517)*

* at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)*

* at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)*

* at akka.actor.ActorCell.invoke(ActorCell.scala:561)*

* at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)*

* at akka.dispatch.Mailbox.run(Mailbox.scala:225)*

* at akka.dispatch.Mailbox.exec(Mailbox.scala:235)*

* at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)*

* at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)*

* at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)*

* at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)*

*Caused by: java.lang.Exception: The user defined 'open()' method caused an
exception:
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.StatusRuntimeException:
UNAVAILABLE: io exception*

* at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:513)*

* at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:360)*

* at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)*

* at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)*

* at java.lang.Thread.run(Thread.java:748)*

*Caused by:
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.StatusRuntimeException:
UNAVAILABLE: io exception*

* at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)*

* at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)*

* at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)*

* at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)*

* at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)*

* at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:451)*

* at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:436)*

* at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303)*

* at
org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)*

* at
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:202)*

* at
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.open(FlinkExecutableStageFunction.java:157)*

* at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)*

* at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:508)*

* ... 4 more*

*Caused by:
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.StatusRuntimeException:
UNAVAILABLE: io exception*

* at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262)*

* at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243)*

* at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156)*

* at
org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc$BeamFnExternalWorkerPoolBlockingStub.startWorker(BeamFnExternalWorkerPoolGrpc.java:224)*

* at
org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory.createEnvironment(ExternalEnvironmentFactory.java:116)*

* at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:252)*

* at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:231)*

* at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)*

* at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)*

* at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)*

* at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)*

* ... 16 more*

*Caused by:
org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.AbstractChannel$AnnotatedConnectException:
finishConnect(..) failed: Connection refused: localhost/127.0.0.1:57464
<http://127.0.0.1:57464/>*

*Caused by: java.net.ConnectException: finishConnect(..) failed: Connection
refused*

* at
org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.unix.Errors.throwConnectException(Errors.java:124)*

* at
org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.unix.Socket.finishConnect(Socket.java:251)*

* at
org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.doFinishConnect(AbstractEpollChannel.java:672)*

* at
org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.finishConnect(AbstractEpollChannel.java:649)*

* at
org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollOutReady(AbstractEpollChannel.java:529)*

* at
org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:465)*

* at
org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)*

* at
org.apache.beam.vendor.grpc.v1p36p0.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)*

* at
org.apache.beam.vendor.grpc.v1p36p0.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)*

* at
org.apache.beam.vendor.grpc.v1p36p0.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)*

* at java.lang.Thread.run(Thread.java:748)*

—————-




When I run it in default docker mode:

python -m apache_beam.examples.wordcount --input=text.txt --output=out.txt
--runner=FlinkRunner --flink_master=localhost:8081


I can see the job submitted to Flink UI on localhost:8081, but it fails.


I get this error:

*2021-10-15 17:52:12*

*org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy*

* at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)*

* at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)*

* at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)*

* at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)*

* at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)*

* at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)*

* at
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)*

* at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)*

* at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)*

* at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)*

* at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*

* at java.lang.reflect.Method.invoke(Method.java:498)*

* at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)*

* at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)*

* at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)*

* at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)*

* at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)*

* at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)*

* at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)*

* at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)*

* at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)*

* at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)*

* at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)*

* at akka.actor.Actor$class.aroundReceive(Actor.scala:517)*

* at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)*

* at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)*

* at akka.actor.ActorCell.invoke(ActorCell.scala:561)*

* at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)*

* at akka.dispatch.Mailbox.run(Mailbox.scala:225)*

* at akka.dispatch.Mailbox.exec(Mailbox.scala:235)*

* at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)*

* at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)*

* at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)*

* at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)*

*Caused by: java.lang.Exception: The user defined 'open()' method caused an
exception: java.io.IOException: Cannot run program "docker": error=2, No
such file or directory*

* at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:513)*

* at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:360)*

* at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)*

* at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)*

* at java.lang.Thread.run(Thread.java:748)*

*Caused by:
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
java.io.IOException: Cannot run program "docker": error=2, No such file or
directory*

* at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966)*

* at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:451)*

* at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:436)*

* at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303)*

* at
org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)*

* at
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:202)*

* at
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.open(FlinkExecutableStageFunction.java:157)*

* at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)*

* at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:508)*

* ... 4 more*

*Caused by: java.io.IOException: Cannot run program "docker": error=2, No
such file or directory*

* at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)*

* at
org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:189)*

* at
org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:171)*

* at
org.apache.beam.runners.fnexecution.environment.DockerCommand.runImage(DockerCommand.java:95)*

* at
org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:131)*

* at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:252)*

* at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:231)*

* at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)*

* at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)*

* at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)*

* at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)*

* at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)*

* at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)*

* at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)*

* at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)*

* ... 12 more*

*Caused by: java.io.IOException: error=2, No such file or directory*

* at java.lang.UNIXProcess.forkAndExec(Native Method)*

* at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)*

* at java.lang.ProcessImpl.start(ProcessImpl.java:134)*

* at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)*

* ... 26 more*

————


Do you have any suggestions?


I also tried the Portable (Java/Python/Go) tutorial, with this difference:
docker run -p 8099:8099 -p 8098:8098 -p 8097:8097
apache/beam_flink1.13_job_server:latest --flink-master=localhost:8081


But I am ending up with similar issues.


Many thanks,

Chiara

Re: [Question] Beam+Python+Flink

Posted by Sam Bourne <sa...@gmail.com>.
Hey Chiara,

I went through a lot of the same struggles a while back and made this repo
to showcase how I accomplished something similar.
https://github.com/sambvfx/beam-flink-k8s

It shouldn't be hard to convert to a docker-compose setup (I actually had
it like this originally while testing before porting to kubernetes).


On Thu, Oct 28, 2021 at 10:41 AM Kyle Weaver <kc...@google.com> wrote:

> > I still cannot figure out how to make sure that the worker_pool is
> accessible via  'localhost' hostname.
>
> In Kubernetes, we make the worker pool a sidecar of the Flink task manager
> container. Perhaps there is a similar feature available in docker compose?
>
>
> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/0310df76d6e2128cd5d2bc51fae4e842d370c463/examples/beam/without_job_server/beam_flink_cluster.yaml#L17-L20
>
> On Thu, Oct 28, 2021 at 10:30 AM Chiara Troiani <
> t.chiara.for.dev@gmail.com> wrote:
>
>> Hi Jan,
>>
>> Thank you very much for your answer, and sorry for the late reply.
>>
>> I can see the job in flink UI, but it still fails, with this
>> exception among others:
>>
>> Caused by:
>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.AbstractChannel$AnnotatedConnectException:
>>
>>
>> finishConnect(..) failed: Connection refused: localhost/127.0.0.1:50000
>>
>> Caused by: java.net.ConnectException: finishConnect(..) failed:
>> Connection refused
>>
>>
>> I guess it is a Docker problem now.
>> I still cannot figure out how to make sure that the worker_pool is
>> accessible via  'localhost' hostname.
>>
>> Many thanks,
>> Chiara
>>
>> On Mon, Oct 18, 2021 at 11:56 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Hi Chiara,
>>>
>>> environment_type LOOPBACK is meant for local execution only. The default
>>> is docker, which is not ideal when you use docker-compose (docker in
>>> docker), so the other option is to use EXTERNAL environment. With this
>>> environment, you must manually start the Python SDK harness as a separate
>>> container using apache/beam_python3.8_sdk docker image with args set to
>>> '--worker_pool'. That should run a container, that will take care of
>>> running the Python harness processes. It will by default listen on port
>>> 50000, it must be accessible from the taskmanager container via
>>> localhost:50000, and you then pass it via environment_config, e.g.:
>>>
>>>  --environment_type=EXTERNAL --environment_config=localhost:50000
>>>
>>> That should do the trick. Because of limitations of the 'worker_pool'
>>> you must make sure, that it is accessible via 'localhost' hostname. For
>>> more information see [1].
>>>
>>>  Jan
>>>
>>> [1] https://beam.apache.org/documentation/runtime/sdk-harness-config/
>>> On 10/18/21 11:43, Chiara Troiani wrote:
>>>
>>> Hi,
>>>
>>>
>>> I am trying to follow these tutorials
>>>
>>> http://beam.apache.org/documentation/runners/flink/
>>>
>>> For the Portable (Python)
>>>
>>>
>>> I am not able to execute a Beam pipeline on a Flink cluster.
>>>
>>> I am running a Flink Session Cluster with docker-compose,
>>>
>>>
>>> This is my docker-compose file:
>>>
>>>
>>> ——————
>>>
>>> version: "2.2"
>>>
>>> services:
>>>
>>>   jobmanager:
>>>
>>>     image: flink:1.13.2-scala_2.11
>>>
>>>     ports:
>>>
>>>       - "8081:8081"
>>>
>>>     command: jobmanager
>>>
>>>     environment:
>>>
>>>       - |
>>>
>>>         FLINK_PROPERTIES=
>>>
>>>         jobmanager.rpc.address: jobmanager
>>>
>>>
>>>   taskmanager:
>>>
>>>     image: flink:1.13.2-scala_2.11
>>>
>>>     depends_on:
>>>
>>>       - jobmanager
>>>
>>>     command: taskmanager
>>>
>>>     scale: 1
>>>
>>>     environment:
>>>
>>>       - |
>>>
>>>         FLINK_PROPERTIES=
>>>
>>>         jobmanager.rpc.address: jobmanager
>>>
>>>         taskmanager.numberOfTaskSlots: 2
>>>
>>>
>>> —————
>>>
>>>
>>> I run the examples from a virtual environment, python3.8,
>>> apache-beam==2.32.0
>>>
>>> macOS Catalina 10.15.7
>>>
>>> Docker desktop 4.1.1
>>>
>>>
>>> When I run:
>>>
>>> python -m apache_beam.examples.wordcount --input=text.txt
>>> --output=out.txt --runner=FlinkRunner --flink_master=localhost:8081
>>> --environment_type=LOOPBACK
>>>
>>>
>>> I get this error:
>>>
>>> *org.apache.flink.runtime.JobException: Recovery is suppressed by
>>> NoRestartBackoffTimeStrategy*
>>>
>>> * at
>>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)*
>>>
>>> * at
>>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)*
>>>
>>> * at
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)*
>>>
>>> * at
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)*
>>>
>>> * at
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)*
>>>
>>> * at
>>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)*
>>>
>>> * at
>>> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)*
>>>
>>> * at
>>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)*
>>>
>>> * at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)*
>>>
>>> * at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)*
>>>
>>> * at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
>>>
>>> * at java.lang.reflect.Method.invoke(Method.java:498)*
>>>
>>> * at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)*
>>>
>>> * at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)*
>>>
>>> * at
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)*
>>>
>>> * at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)*
>>>
>>> * at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)*
>>>
>>> * at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)*
>>>
>>> * at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)*
>>>
>>> * at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)*
>>>
>>> * at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)*
>>>
>>> * at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)*
>>>
>>> * at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)*
>>>
>>> * at akka.actor.Actor$class.aroundReceive(Actor.scala:517)*
>>>
>>> * at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)*
>>>
>>> * at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)*
>>>
>>> * at akka.actor.ActorCell.invoke(ActorCell.scala:561)*
>>>
>>> * at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)*
>>>
>>> * at akka.dispatch.Mailbox.run(Mailbox.scala:225)*
>>>
>>> * at akka.dispatch.Mailbox.exec(Mailbox.scala:235)*
>>>
>>> * at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)*
>>>
>>> * at
>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)*
>>>
>>> * at
>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)*
>>>
>>> * at
>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)*
>>>
>>> *Caused by: java.lang.Exception: The user defined 'open()' method caused
>>> an exception:
>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.StatusRuntimeException:
>>> UNAVAILABLE: io exception*
>>>
>>> * at
>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:513)*
>>>
>>> * at
>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:360)*
>>>
>>> * at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)*
>>>
>>> * at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)*
>>>
>>> * at java.lang.Thread.run(Thread.java:748)*
>>>
>>> *Caused by:
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.StatusRuntimeException:
>>> UNAVAILABLE: io exception*
>>>
>>> * at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)*
>>>
>>> * at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)*
>>>
>>> * at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)*
>>>
>>> * at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)*
>>>
>>> * at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)*
>>>
>>> * at
>>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:451)*
>>>
>>> * at
>>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:436)*
>>>
>>> * at
>>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303)*
>>>
>>> * at
>>> org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)*
>>>
>>> * at
>>> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:202)*
>>>
>>> * at
>>> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.open(FlinkExecutableStageFunction.java:157)*
>>>
>>> * at
>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)*
>>>
>>> * at
>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:508)*
>>>
>>> * ... 4 more*
>>>
>>> *Caused by:
>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.StatusRuntimeException:
>>> UNAVAILABLE: io exception*
>>>
>>> * at
>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262)*
>>>
>>> * at
>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243)*
>>>
>>> * at
>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156)*
>>>
>>> * at
>>> org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc$BeamFnExternalWorkerPoolBlockingStub.startWorker(BeamFnExternalWorkerPoolGrpc.java:224)*
>>>
>>> * at
>>> org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory.createEnvironment(ExternalEnvironmentFactory.java:116)*
>>>
>>> * at
>>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:252)*
>>>
>>> * at
>>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:231)*
>>>
>>> * at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)*
>>>
>>> * at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)*
>>>
>>> * at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)*
>>>
>>> * at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)*
>>>
>>> * ... 16 more*
>>>
>>> *Caused by:
>>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.AbstractChannel$AnnotatedConnectException:
>>> finishConnect(..) failed: Connection refused: localhost/127.0.0.1:57464
>>> <http://127.0.0.1:57464/>*
>>>
>>> *Caused by: java.net.ConnectException: finishConnect(..) failed:
>>> Connection refused*
>>>
>>> * at
>>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.unix.Errors.throwConnectException(Errors.java:124)*
>>>
>>> * at
>>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.unix.Socket.finishConnect(Socket.java:251)*
>>>
>>> * at
>>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.doFinishConnect(AbstractEpollChannel.java:672)*
>>>
>>> * at
>>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.finishConnect(AbstractEpollChannel.java:649)*
>>>
>>> * at
>>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollOutReady(AbstractEpollChannel.java:529)*
>>>
>>> * at
>>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:465)*
>>>
>>> * at
>>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)*
>>>
>>> * at
>>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)*
>>>
>>> * at
>>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)*
>>>
>>> * at
>>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)*
>>>
>>> * at java.lang.Thread.run(Thread.java:748)*
>>>
>>> —————-
>>>
>>>
>>>
>>>
>>> When I run it in default docker mode:
>>>
>>> python -m apache_beam.examples.wordcount --input=text.txt
>>> --output=out.txt --runner=FlinkRunner --flink_master=localhost:8081
>>>
>>>
>>> I can see the job submitted to Flink UI on localhost:8081, but it fails.
>>>
>>>
>>> I get this error:
>>>
>>> *2021-10-15 17:52:12*
>>>
>>> *org.apache.flink.runtime.JobException: Recovery is suppressed by
>>> NoRestartBackoffTimeStrategy*
>>>
>>> * at
>>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)*
>>>
>>> * at
>>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)*
>>>
>>> * at
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)*
>>>
>>> * at
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)*
>>>
>>> * at
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)*
>>>
>>> * at
>>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)*
>>>
>>> * at
>>> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)*
>>>
>>> * at
>>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)*
>>>
>>> * at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)*
>>>
>>> * at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)*
>>>
>>> * at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
>>>
>>> * at java.lang.reflect.Method.invoke(Method.java:498)*
>>>
>>> * at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)*
>>>
>>> * at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)*
>>>
>>> * at
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)*
>>>
>>> * at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)*
>>>
>>> * at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)*
>>>
>>> * at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)*
>>>
>>> * at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)*
>>>
>>> * at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)*
>>>
>>> * at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)*
>>>
>>> * at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)*
>>>
>>> * at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)*
>>>
>>> * at akka.actor.Actor$class.aroundReceive(Actor.scala:517)*
>>>
>>> * at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)*
>>>
>>> * at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)*
>>>
>>> * at akka.actor.ActorCell.invoke(ActorCell.scala:561)*
>>>
>>> * at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)*
>>>
>>> * at akka.dispatch.Mailbox.run(Mailbox.scala:225)*
>>>
>>> * at akka.dispatch.Mailbox.exec(Mailbox.scala:235)*
>>>
>>> * at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)*
>>>
>>> * at
>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)*
>>>
>>> * at
>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)*
>>>
>>> * at
>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)*
>>>
>>> *Caused by: java.lang.Exception: The user defined 'open()' method caused
>>> an exception: java.io.IOException: Cannot run program "docker": error=2, No
>>> such file or directory*
>>>
>>> * at
>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:513)*
>>>
>>> * at
>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:360)*
>>>
>>> * at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)*
>>>
>>> * at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)*
>>>
>>> * at java.lang.Thread.run(Thread.java:748)*
>>>
>>> *Caused by:
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
>>> java.io.IOException: Cannot run program "docker": error=2, No such file or
>>> directory*
>>>
>>> * at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966)*
>>>
>>> * at
>>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:451)*
>>>
>>> * at
>>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:436)*
>>>
>>> * at
>>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303)*
>>>
>>> * at
>>> org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)*
>>>
>>> * at
>>> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:202)*
>>>
>>> * at
>>> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.open(FlinkExecutableStageFunction.java:157)*
>>>
>>> * at
>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)*
>>>
>>> * at
>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:508)*
>>>
>>> * ... 4 more*
>>>
>>> *Caused by: java.io.IOException: Cannot run program "docker": error=2,
>>> No such file or directory*
>>>
>>> * at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)*
>>>
>>> * at
>>> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:189)*
>>>
>>> * at
>>> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:171)*
>>>
>>> * at
>>> org.apache.beam.runners.fnexecution.environment.DockerCommand.runImage(DockerCommand.java:95)*
>>>
>>> * at
>>> org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:131)*
>>>
>>> * at
>>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:252)*
>>>
>>> * at
>>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:231)*
>>>
>>> * at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)*
>>>
>>> * at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)*
>>>
>>> * at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)*
>>>
>>> * at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)*
>>>
>>> * at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)*
>>>
>>> * at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)*
>>>
>>> * at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)*
>>>
>>> * at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)*
>>>
>>> * ... 12 more*
>>>
>>> *Caused by: java.io.IOException: error=2, No such file or directory*
>>>
>>> * at java.lang.UNIXProcess.forkAndExec(Native Method)*
>>>
>>> * at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)*
>>>
>>> * at java.lang.ProcessImpl.start(ProcessImpl.java:134)*
>>>
>>> * at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)*
>>>
>>> * ... 26 more*
>>>
>>> ————
>>>
>>>
>>> Do you have any suggestions?
>>>
>>>
>>> I also tried the Portable (Java/Python/Go) tutorial, with this
>>> difference:
>>> docker run -p 8099:8099 -p 8098:8098 -p 8097:8097
>>> apache/beam_flink1.13_job_server:latest --flink-master=localhost:8081
>>>
>>>
>>> But I am ending up with similar issues.
>>>
>>>
>>> Many thanks,
>>>
>>> Chiara
>>>
>>>

Re: [Question] Beam+Python+Flink

Posted by Kyle Weaver <kc...@google.com>.
> I still cannot figure out how to make sure that the worker_pool is
accessible via  'localhost' hostname.

In Kubernetes, we make the worker pool a sidecar of the Flink task manager
container. Perhaps there is a similar feature available in docker compose?

https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/0310df76d6e2128cd5d2bc51fae4e842d370c463/examples/beam/without_job_server/beam_flink_cluster.yaml#L17-L20

On Thu, Oct 28, 2021 at 10:30 AM Chiara Troiani <t....@gmail.com>
wrote:

> Hi Jan,
>
> Thank you very much for your answer, and sorry for the late reply.
>
> I can see the job in flink UI, but it still fails, with this
> exception among others:
>
> Caused by:
> org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.AbstractChannel$AnnotatedConnectException:
>
>
> finishConnect(..) failed: Connection refused: localhost/127.0.0.1:50000
>
> Caused by: java.net.ConnectException: finishConnect(..) failed: Connection
> refused
>
>
> I guess it is a Docker problem now.
> I still cannot figure out how to make sure that the worker_pool is
> accessible via  'localhost' hostname.
>
> Many thanks,
> Chiara
>
> On Mon, Oct 18, 2021 at 11:56 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi Chiara,
>>
>> environment_type LOOPBACK is meant for local execution only. The default
>> is docker, which is not ideal when you use docker-compose (docker in
>> docker), so the other option is to use EXTERNAL environment. With this
>> environment, you must manually start the Python SDK harness as a separate
>> container using apache/beam_python3.8_sdk docker image with args set to
>> '--worker_pool'. That should run a container, that will take care of
>> running the Python harness processes. It will by default listen on port
>> 50000, it must be accessible from the taskmanager container via
>> localhost:50000, and you then pass it via environment_config, e.g.:
>>
>>  --environment_type=EXTERNAL --environment_config=localhost:50000
>>
>> That should do the trick. Because of limitations of the 'worker_pool' you
>> must make sure, that it is accessible via 'localhost' hostname. For more
>> information see [1].
>>
>>  Jan
>>
>> [1] https://beam.apache.org/documentation/runtime/sdk-harness-config/
>> On 10/18/21 11:43, Chiara Troiani wrote:
>>
>> Hi,
>>
>>
>> I am trying to follow these tutorials
>>
>> http://beam.apache.org/documentation/runners/flink/
>>
>> For the Portable (Python)
>>
>>
>> I am not able to execute a Beam pipeline on a Flink cluster.
>>
>> I am running a Flink Session Cluster with docker-compose,
>>
>>
>> This is my docker-compose file:
>>
>>
>> ——————
>>
>> version: "2.2"
>>
>> services:
>>
>>   jobmanager:
>>
>>     image: flink:1.13.2-scala_2.11
>>
>>     ports:
>>
>>       - "8081:8081"
>>
>>     command: jobmanager
>>
>>     environment:
>>
>>       - |
>>
>>         FLINK_PROPERTIES=
>>
>>         jobmanager.rpc.address: jobmanager
>>
>>
>>   taskmanager:
>>
>>     image: flink:1.13.2-scala_2.11
>>
>>     depends_on:
>>
>>       - jobmanager
>>
>>     command: taskmanager
>>
>>     scale: 1
>>
>>     environment:
>>
>>       - |
>>
>>         FLINK_PROPERTIES=
>>
>>         jobmanager.rpc.address: jobmanager
>>
>>         taskmanager.numberOfTaskSlots: 2
>>
>>
>> —————
>>
>>
>> I run the examples from a virtual environment, python3.8,
>> apache-beam==2.32.0
>>
>> macOS Catalina 10.15.7
>>
>> Docker desktop 4.1.1
>>
>>
>> When I run:
>>
>> python -m apache_beam.examples.wordcount --input=text.txt
>> --output=out.txt --runner=FlinkRunner --flink_master=localhost:8081
>> --environment_type=LOOPBACK
>>
>>
>> I get this error:
>>
>> *org.apache.flink.runtime.JobException: Recovery is suppressed by
>> NoRestartBackoffTimeStrategy*
>>
>> * at
>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)*
>>
>> * at
>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)*
>>
>> * at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)*
>>
>> * at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)*
>>
>> * at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)*
>>
>> * at
>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)*
>>
>> * at
>> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)*
>>
>> * at
>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)*
>>
>> * at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)*
>>
>> * at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)*
>>
>> * at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
>>
>> * at java.lang.reflect.Method.invoke(Method.java:498)*
>>
>> * at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)*
>>
>> * at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)*
>>
>> * at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)*
>>
>> * at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)*
>>
>> * at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)*
>>
>> * at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)*
>>
>> * at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)*
>>
>> * at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)*
>>
>> * at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)*
>>
>> * at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)*
>>
>> * at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)*
>>
>> * at akka.actor.Actor$class.aroundReceive(Actor.scala:517)*
>>
>> * at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)*
>>
>> * at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)*
>>
>> * at akka.actor.ActorCell.invoke(ActorCell.scala:561)*
>>
>> * at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)*
>>
>> * at akka.dispatch.Mailbox.run(Mailbox.scala:225)*
>>
>> * at akka.dispatch.Mailbox.exec(Mailbox.scala:235)*
>>
>> * at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)*
>>
>> * at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)*
>>
>> * at
>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)*
>>
>> * at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)*
>>
>> *Caused by: java.lang.Exception: The user defined 'open()' method caused
>> an exception:
>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.StatusRuntimeException:
>> UNAVAILABLE: io exception*
>>
>> * at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:513)*
>>
>> * at
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:360)*
>>
>> * at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)*
>>
>> * at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)*
>>
>> * at java.lang.Thread.run(Thread.java:748)*
>>
>> *Caused by:
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.StatusRuntimeException:
>> UNAVAILABLE: io exception*
>>
>> * at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)*
>>
>> * at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)*
>>
>> * at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)*
>>
>> * at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)*
>>
>> * at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)*
>>
>> * at
>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:451)*
>>
>> * at
>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:436)*
>>
>> * at
>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303)*
>>
>> * at
>> org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)*
>>
>> * at
>> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:202)*
>>
>> * at
>> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.open(FlinkExecutableStageFunction.java:157)*
>>
>> * at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)*
>>
>> * at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:508)*
>>
>> * ... 4 more*
>>
>> *Caused by:
>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.StatusRuntimeException:
>> UNAVAILABLE: io exception*
>>
>> * at
>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262)*
>>
>> * at
>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243)*
>>
>> * at
>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156)*
>>
>> * at
>> org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc$BeamFnExternalWorkerPoolBlockingStub.startWorker(BeamFnExternalWorkerPoolGrpc.java:224)*
>>
>> * at
>> org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory.createEnvironment(ExternalEnvironmentFactory.java:116)*
>>
>> * at
>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:252)*
>>
>> * at
>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:231)*
>>
>> * at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)*
>>
>> * at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)*
>>
>> * at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)*
>>
>> * at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)*
>>
>> * ... 16 more*
>>
>> *Caused by:
>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.AbstractChannel$AnnotatedConnectException:
>> finishConnect(..) failed: Connection refused: localhost/127.0.0.1:57464
>> <http://127.0.0.1:57464/>*
>>
>> *Caused by: java.net.ConnectException: finishConnect(..) failed:
>> Connection refused*
>>
>> * at
>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.unix.Errors.throwConnectException(Errors.java:124)*
>>
>> * at
>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.unix.Socket.finishConnect(Socket.java:251)*
>>
>> * at
>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.doFinishConnect(AbstractEpollChannel.java:672)*
>>
>> * at
>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.finishConnect(AbstractEpollChannel.java:649)*
>>
>> * at
>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollOutReady(AbstractEpollChannel.java:529)*
>>
>> * at
>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:465)*
>>
>> * at
>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)*
>>
>> * at
>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)*
>>
>> * at
>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)*
>>
>> * at
>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)*
>>
>> * at java.lang.Thread.run(Thread.java:748)*
>>
>> —————-
>>
>>
>>
>>
>> When I run it in default docker mode:
>>
>> python -m apache_beam.examples.wordcount --input=text.txt
>> --output=out.txt --runner=FlinkRunner --flink_master=localhost:8081
>>
>>
>> I can see the job submitted to Flink UI on localhost:8081, but it fails.
>>
>>
>> I get this error:
>>
>> *2021-10-15 17:52:12*
>>
>> *org.apache.flink.runtime.JobException: Recovery is suppressed by
>> NoRestartBackoffTimeStrategy*
>>
>> * at
>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)*
>>
>> * at
>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)*
>>
>> * at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)*
>>
>> * at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)*
>>
>> * at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)*
>>
>> * at
>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)*
>>
>> * at
>> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)*
>>
>> * at
>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)*
>>
>> * at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)*
>>
>> * at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)*
>>
>> * at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
>>
>> * at java.lang.reflect.Method.invoke(Method.java:498)*
>>
>> * at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)*
>>
>> * at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)*
>>
>> * at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)*
>>
>> * at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)*
>>
>> * at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)*
>>
>> * at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)*
>>
>> * at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)*
>>
>> * at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)*
>>
>> * at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)*
>>
>> * at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)*
>>
>> * at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)*
>>
>> * at akka.actor.Actor$class.aroundReceive(Actor.scala:517)*
>>
>> * at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)*
>>
>> * at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)*
>>
>> * at akka.actor.ActorCell.invoke(ActorCell.scala:561)*
>>
>> * at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)*
>>
>> * at akka.dispatch.Mailbox.run(Mailbox.scala:225)*
>>
>> * at akka.dispatch.Mailbox.exec(Mailbox.scala:235)*
>>
>> * at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)*
>>
>> * at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)*
>>
>> * at
>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)*
>>
>> * at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)*
>>
>> *Caused by: java.lang.Exception: The user defined 'open()' method caused
>> an exception: java.io.IOException: Cannot run program "docker": error=2, No
>> such file or directory*
>>
>> * at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:513)*
>>
>> * at
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:360)*
>>
>> * at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)*
>>
>> * at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)*
>>
>> * at java.lang.Thread.run(Thread.java:748)*
>>
>> *Caused by:
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
>> java.io.IOException: Cannot run program "docker": error=2, No such file or
>> directory*
>>
>> * at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966)*
>>
>> * at
>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:451)*
>>
>> * at
>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:436)*
>>
>> * at
>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303)*
>>
>> * at
>> org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)*
>>
>> * at
>> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:202)*
>>
>> * at
>> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.open(FlinkExecutableStageFunction.java:157)*
>>
>> * at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)*
>>
>> * at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:508)*
>>
>> * ... 4 more*
>>
>> *Caused by: java.io.IOException: Cannot run program "docker": error=2, No
>> such file or directory*
>>
>> * at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)*
>>
>> * at
>> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:189)*
>>
>> * at
>> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:171)*
>>
>> * at
>> org.apache.beam.runners.fnexecution.environment.DockerCommand.runImage(DockerCommand.java:95)*
>>
>> * at
>> org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:131)*
>>
>> * at
>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:252)*
>>
>> * at
>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:231)*
>>
>> * at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)*
>>
>> * at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)*
>>
>> * at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)*
>>
>> * at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)*
>>
>> * at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)*
>>
>> * at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)*
>>
>> * at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)*
>>
>> * at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)*
>>
>> * ... 12 more*
>>
>> *Caused by: java.io.IOException: error=2, No such file or directory*
>>
>> * at java.lang.UNIXProcess.forkAndExec(Native Method)*
>>
>> * at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)*
>>
>> * at java.lang.ProcessImpl.start(ProcessImpl.java:134)*
>>
>> * at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)*
>>
>> * ... 26 more*
>>
>> ————
>>
>>
>> Do you have any suggestions?
>>
>>
>> I also tried the Portable (Java/Python/Go) tutorial, with this difference:
>> docker run -p 8099:8099 -p 8098:8098 -p 8097:8097
>> apache/beam_flink1.13_job_server:latest --flink-master=localhost:8081
>>
>>
>> But I am ending up with similar issues.
>>
>>
>> Many thanks,
>>
>> Chiara
>>
>>

Re: [Question] Beam+Python+Flink

Posted by Chiara Troiani <t....@gmail.com>.
Hi Jan,

Thank you very much for your answer, and sorry for the late reply.

I can see the job in flink UI, but it still fails, with this
exception among others:

Caused by:
org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.AbstractChannel$AnnotatedConnectException:


finishConnect(..) failed: Connection refused: localhost/127.0.0.1:50000

Caused by: java.net.ConnectException: finishConnect(..) failed: Connection
refused


I guess it is a Docker problem now.
I still cannot figure out how to make sure that the worker_pool is
accessible via  'localhost' hostname.

Many thanks,
Chiara

On Mon, Oct 18, 2021 at 11:56 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Chiara,
>
> environment_type LOOPBACK is meant for local execution only. The default
> is docker, which is not ideal when you use docker-compose (docker in
> docker), so the other option is to use EXTERNAL environment. With this
> environment, you must manually start the Python SDK harness as a separate
> container using apache/beam_python3.8_sdk docker image with args set to
> '--worker_pool'. That should run a container, that will take care of
> running the Python harness processes. It will by default listen on port
> 50000, it must be accessible from the taskmanager container via
> localhost:50000, and you then pass it via environment_config, e.g.:
>
>  --environment_type=EXTERNAL --environment_config=localhost:50000
>
> That should do the trick. Because of limitations of the 'worker_pool' you
> must make sure, that it is accessible via 'localhost' hostname. For more
> information see [1].
>
>  Jan
>
> [1] https://beam.apache.org/documentation/runtime/sdk-harness-config/
> On 10/18/21 11:43, Chiara Troiani wrote:
>
> Hi,
>
>
> I am trying to follow these tutorials
>
> http://beam.apache.org/documentation/runners/flink/
>
> For the Portable (Python)
>
>
> I am not able to execute a Beam pipeline on a Flink cluster.
>
> I am running a Flink Session Cluster with docker-compose,
>
>
> This is my docker-compose file:
>
>
> ——————
>
> version: "2.2"
>
> services:
>
>   jobmanager:
>
>     image: flink:1.13.2-scala_2.11
>
>     ports:
>
>       - "8081:8081"
>
>     command: jobmanager
>
>     environment:
>
>       - |
>
>         FLINK_PROPERTIES=
>
>         jobmanager.rpc.address: jobmanager
>
>
>   taskmanager:
>
>     image: flink:1.13.2-scala_2.11
>
>     depends_on:
>
>       - jobmanager
>
>     command: taskmanager
>
>     scale: 1
>
>     environment:
>
>       - |
>
>         FLINK_PROPERTIES=
>
>         jobmanager.rpc.address: jobmanager
>
>         taskmanager.numberOfTaskSlots: 2
>
>
> —————
>
>
> I run the examples from a virtual environment, python3.8,
> apache-beam==2.32.0
>
> macOS Catalina 10.15.7
>
> Docker desktop 4.1.1
>
>
> When I run:
>
> python -m apache_beam.examples.wordcount --input=text.txt --output=out.txt
> --runner=FlinkRunner --flink_master=localhost:8081
> --environment_type=LOOPBACK
>
>
> I get this error:
>
> *org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy*
>
> * at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)*
>
> * at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)*
>
> * at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)*
>
> * at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)*
>
> * at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)*
>
> * at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)*
>
> * at
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)*
>
> * at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)*
>
> * at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)*
>
> * at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)*
>
> * at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
>
> * at java.lang.reflect.Method.invoke(Method.java:498)*
>
> * at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)*
>
> * at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)*
>
> * at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)*
>
> * at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)*
>
> * at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)*
>
> * at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)*
>
> * at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)*
>
> * at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)*
>
> * at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)*
>
> * at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)*
>
> * at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)*
>
> * at akka.actor.Actor$class.aroundReceive(Actor.scala:517)*
>
> * at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)*
>
> * at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)*
>
> * at akka.actor.ActorCell.invoke(ActorCell.scala:561)*
>
> * at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)*
>
> * at akka.dispatch.Mailbox.run(Mailbox.scala:225)*
>
> * at akka.dispatch.Mailbox.exec(Mailbox.scala:235)*
>
> * at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)*
>
> * at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)*
>
> * at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)*
>
> * at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)*
>
> *Caused by: java.lang.Exception: The user defined 'open()' method caused
> an exception:
> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.StatusRuntimeException:
> UNAVAILABLE: io exception*
>
> * at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:513)*
>
> * at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:360)*
>
> * at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)*
>
> * at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)*
>
> * at java.lang.Thread.run(Thread.java:748)*
>
> *Caused by:
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.StatusRuntimeException:
> UNAVAILABLE: io exception*
>
> * at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)*
>
> * at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)*
>
> * at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)*
>
> * at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)*
>
> * at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)*
>
> * at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:451)*
>
> * at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:436)*
>
> * at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303)*
>
> * at
> org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)*
>
> * at
> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:202)*
>
> * at
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.open(FlinkExecutableStageFunction.java:157)*
>
> * at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)*
>
> * at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:508)*
>
> * ... 4 more*
>
> *Caused by:
> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.StatusRuntimeException:
> UNAVAILABLE: io exception*
>
> * at
> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262)*
>
> * at
> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243)*
>
> * at
> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156)*
>
> * at
> org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc$BeamFnExternalWorkerPoolBlockingStub.startWorker(BeamFnExternalWorkerPoolGrpc.java:224)*
>
> * at
> org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory.createEnvironment(ExternalEnvironmentFactory.java:116)*
>
> * at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:252)*
>
> * at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:231)*
>
> * at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)*
>
> * at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)*
>
> * at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)*
>
> * at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)*
>
> * ... 16 more*
>
> *Caused by:
> org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.AbstractChannel$AnnotatedConnectException:
> finishConnect(..) failed: Connection refused: localhost/127.0.0.1:57464
> <http://127.0.0.1:57464/>*
>
> *Caused by: java.net.ConnectException: finishConnect(..) failed:
> Connection refused*
>
> * at
> org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.unix.Errors.throwConnectException(Errors.java:124)*
>
> * at
> org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.unix.Socket.finishConnect(Socket.java:251)*
>
> * at
> org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.doFinishConnect(AbstractEpollChannel.java:672)*
>
> * at
> org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.finishConnect(AbstractEpollChannel.java:649)*
>
> * at
> org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollOutReady(AbstractEpollChannel.java:529)*
>
> * at
> org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:465)*
>
> * at
> org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)*
>
> * at
> org.apache.beam.vendor.grpc.v1p36p0.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)*
>
> * at
> org.apache.beam.vendor.grpc.v1p36p0.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)*
>
> * at
> org.apache.beam.vendor.grpc.v1p36p0.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)*
>
> * at java.lang.Thread.run(Thread.java:748)*
>
> —————-
>
>
>
>
> When I run it in default docker mode:
>
> python -m apache_beam.examples.wordcount --input=text.txt --output=out.txt
> --runner=FlinkRunner --flink_master=localhost:8081
>
>
> I can see the job submitted to Flink UI on localhost:8081, but it fails.
>
>
> I get this error:
>
> *2021-10-15 17:52:12*
>
> *org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy*
>
> * at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)*
>
> * at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)*
>
> * at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)*
>
> * at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)*
>
> * at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)*
>
> * at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)*
>
> * at
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)*
>
> * at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)*
>
> * at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)*
>
> * at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)*
>
> * at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
>
> * at java.lang.reflect.Method.invoke(Method.java:498)*
>
> * at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)*
>
> * at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)*
>
> * at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)*
>
> * at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)*
>
> * at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)*
>
> * at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)*
>
> * at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)*
>
> * at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)*
>
> * at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)*
>
> * at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)*
>
> * at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)*
>
> * at akka.actor.Actor$class.aroundReceive(Actor.scala:517)*
>
> * at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)*
>
> * at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)*
>
> * at akka.actor.ActorCell.invoke(ActorCell.scala:561)*
>
> * at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)*
>
> * at akka.dispatch.Mailbox.run(Mailbox.scala:225)*
>
> * at akka.dispatch.Mailbox.exec(Mailbox.scala:235)*
>
> * at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)*
>
> * at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)*
>
> * at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)*
>
> * at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)*
>
> *Caused by: java.lang.Exception: The user defined 'open()' method caused
> an exception: java.io.IOException: Cannot run program "docker": error=2, No
> such file or directory*
>
> * at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:513)*
>
> * at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:360)*
>
> * at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)*
>
> * at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)*
>
> * at java.lang.Thread.run(Thread.java:748)*
>
> *Caused by:
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
> java.io.IOException: Cannot run program "docker": error=2, No such file or
> directory*
>
> * at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966)*
>
> * at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:451)*
>
> * at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:436)*
>
> * at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303)*
>
> * at
> org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)*
>
> * at
> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:202)*
>
> * at
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.open(FlinkExecutableStageFunction.java:157)*
>
> * at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)*
>
> * at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:508)*
>
> * ... 4 more*
>
> *Caused by: java.io.IOException: Cannot run program "docker": error=2, No
> such file or directory*
>
> * at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)*
>
> * at
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:189)*
>
> * at
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:171)*
>
> * at
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runImage(DockerCommand.java:95)*
>
> * at
> org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:131)*
>
> * at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:252)*
>
> * at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:231)*
>
> * at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)*
>
> * at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)*
>
> * at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)*
>
> * at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)*
>
> * at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)*
>
> * at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)*
>
> * at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)*
>
> * at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)*
>
> * ... 12 more*
>
> *Caused by: java.io.IOException: error=2, No such file or directory*
>
> * at java.lang.UNIXProcess.forkAndExec(Native Method)*
>
> * at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)*
>
> * at java.lang.ProcessImpl.start(ProcessImpl.java:134)*
>
> * at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)*
>
> * ... 26 more*
>
> ————
>
>
> Do you have any suggestions?
>
>
> I also tried the Portable (Java/Python/Go) tutorial, with this difference:
> docker run -p 8099:8099 -p 8098:8098 -p 8097:8097
> apache/beam_flink1.13_job_server:latest --flink-master=localhost:8081
>
>
> But I am ending up with similar issues.
>
>
> Many thanks,
>
> Chiara
>
>

Re: [Question] Beam+Python+Flink

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Chiara,

environment_type LOOPBACK is meant for local execution only. The default 
is docker, which is not ideal when you use docker-compose (docker in 
docker), so the other option is to use EXTERNAL environment. With this 
environment, you must manually start the Python SDK harness as a 
separate container using apache/beam_python3.8_sdk docker image with 
args set to '--worker_pool'. That should run a container, that will take 
care of running the Python harness processes. It will by default listen 
on port 50000, it must be accessible from the taskmanager container via 
localhost:50000, and you then pass it via environment_config, e.g.:

  --environment_type=EXTERNAL --environment_config=localhost:50000

That should do the trick. Because of limitations of the 'worker_pool' 
you must make sure, that it is accessible via 'localhost' hostname. For 
more information see [1].

  Jan

[1] https://beam.apache.org/documentation/runtime/sdk-harness-config/

On 10/18/21 11:43, Chiara Troiani wrote:
>
> Hi,
>
>
> I am trying to follow these tutorials
>
> http://beam.apache.org/documentation/runners/flink/
>
> For the Portable (Python)
>
>
> I am not able to execute a Beam pipeline on a Flink cluster.
>
> I am running a Flink Session Cluster with docker-compose,
>
>
> This is my docker-compose file:
>
>
> ——————
>
> version: "2.2"
>
> services:
>
>   jobmanager:
>
>     image: flink:1.13.2-scala_2.11
>
>     ports:
>
>       - "8081:8081"
>
>     command: jobmanager
>
>     environment:
>
>       - |
>
>         FLINK_PROPERTIES=
>
>         jobmanager.rpc.address: jobmanager
>
>
>   taskmanager:
>
>     image: flink:1.13.2-scala_2.11
>
>     depends_on:
>
>       - jobmanager
>
>     command: taskmanager
>
>     scale: 1
>
>     environment:
>
>       - |
>
>         FLINK_PROPERTIES=
>
>         jobmanager.rpc.address: jobmanager
>
>         taskmanager.numberOfTaskSlots: 2
>
>
> —————
>
>
> I run the examples from a virtual environment, python3.8, 
> apache-beam==2.32.0
>
> macOS Catalina 10.15.7
>
> Docker desktop 4.1.1
>
>
> When I run:
>
> python -m apache_beam.examples.wordcount --input=text.txt 
> --output=out.txt --runner=FlinkRunner --flink_master=localhost:8081 
> --environment_type=LOOPBACK
>
>
> I get this error:
>
> /org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy/
>
> /at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)/
>
> /at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)/
>
> /at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)/
>
> /at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)/
>
> /at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)/
>
> /at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)/
>
> /at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)/
>
> /at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)/
>
> /at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)/
>
> /at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)/
>
> /at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)/
>
> /at java.lang.reflect.Method.invoke(Method.java:498)/
>
> /at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)/
>
> /at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)/
>
> /at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)/
>
> /at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)/
>
> /at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)/
>
> /at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)/
>
> /at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)/
>
> /at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)/
>
> /at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)/
>
> /at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)/
>
> /at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)/
>
> /at akka.actor.Actor$class.aroundReceive(Actor.scala:517)/
>
> /at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)/
>
> /at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)/
>
> /at akka.actor.ActorCell.invoke(ActorCell.scala:561)/
>
> /at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)/
>
> /at akka.dispatch.Mailbox.run(Mailbox.scala:225)/
>
> /at akka.dispatch.Mailbox.exec(Mailbox.scala:235)/
>
> /at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)/
>
> /at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)/
>
> /at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)/
>
> /at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)/
>
> /Caused by: java.lang.Exception: The user defined 'open()' method 
> caused an exception: 
> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.StatusRuntimeException: 
> UNAVAILABLE: io exception/
>
> /at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:513)/
>
> /at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:360)/
>
> /at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)/
>
> /at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)/
>
> /at java.lang.Thread.run(Thread.java:748)/
>
> /Caused by: 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: 
> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.StatusRuntimeException: 
> UNAVAILABLE: io exception/
>
> /at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)/
>
> /at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)/
>
> /at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)/
>
> /at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)/
>
> /at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)/
>
> /at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:451)/
>
> /at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:436)/
>
> /at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303)/
>
> /at 
> org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)/
>
> /at 
> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:202)/
>
> /at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.open(FlinkExecutableStageFunction.java:157)/
>
> /at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)/
>
> /at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:508)/
>
> /... 4 more/
>
> /Caused by: 
> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.StatusRuntimeException: 
> UNAVAILABLE: io exception/
>
> /at 
> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262)/
>
> /at 
> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243)/
>
> /at 
> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156)/
>
> /at 
> org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc$BeamFnExternalWorkerPoolBlockingStub.startWorker(BeamFnExternalWorkerPoolGrpc.java:224)/
>
> /at 
> org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory.createEnvironment(ExternalEnvironmentFactory.java:116)/
>
> /at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:252)/
>
> /at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:231)/
>
> /at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)/
>
> /at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)/
>
> /at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)/
>
> /at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)/
>
> /... 16 more/
>
> /Caused by: 
> org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.AbstractChannel$AnnotatedConnectException: 
> finishConnect(..) failed: Connection refused: 
> localhost/127.0.0.1:57464 <http://127.0.0.1:57464/>/
>
> /Caused by: java.net.ConnectException: finishConnect(..) failed: 
> Connection refused/
>
> /at 
> org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.unix.Errors.throwConnectException(Errors.java:124)/
>
> /at 
> org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.unix.Socket.finishConnect(Socket.java:251)/
>
> /at 
> org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.doFinishConnect(AbstractEpollChannel.java:672)/
>
> /at 
> org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.finishConnect(AbstractEpollChannel.java:649)/
>
> /at 
> org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollOutReady(AbstractEpollChannel.java:529)/
>
> /at 
> org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:465)/
>
> /at 
> org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)/
>
> /at 
> org.apache.beam.vendor.grpc.v1p36p0.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)/
>
> /at 
> org.apache.beam.vendor.grpc.v1p36p0.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)/
>
> /at 
> org.apache.beam.vendor.grpc.v1p36p0.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)/
>
> /at java.lang.Thread.run(Thread.java:748)/
>
> —————-
>
>
>
>
> When I run it in default docker mode:
>
> python -m apache_beam.examples.wordcount --input=text.txt 
> --output=out.txt --runner=FlinkRunner --flink_master=localhost:8081
>
>
> I can see the job submitted to Flink UI on localhost:8081, but it fails.
>
>
> I get this error:
>
> /2021-10-15 17:52:12/
>
> /org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy/
>
> /at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)/
>
> /at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)/
>
> /at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)/
>
> /at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)/
>
> /at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)/
>
> /at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)/
>
> /at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)/
>
> /at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)/
>
> /at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)/
>
> /at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)/
>
> /at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)/
>
> /at java.lang.reflect.Method.invoke(Method.java:498)/
>
> /at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)/
>
> /at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)/
>
> /at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)/
>
> /at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)/
>
> /at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)/
>
> /at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)/
>
> /at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)/
>
> /at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)/
>
> /at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)/
>
> /at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)/
>
> /at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)/
>
> /at akka.actor.Actor$class.aroundReceive(Actor.scala:517)/
>
> /at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)/
>
> /at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)/
>
> /at akka.actor.ActorCell.invoke(ActorCell.scala:561)/
>
> /at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)/
>
> /at akka.dispatch.Mailbox.run(Mailbox.scala:225)/
>
> /at akka.dispatch.Mailbox.exec(Mailbox.scala:235)/
>
> /at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)/
>
> /at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)/
>
> /at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)/
>
> /at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)/
>
> /Caused by: java.lang.Exception: The user defined 'open()' method 
> caused an exception: java.io.IOException: Cannot run program "docker": 
> error=2, No such file or directory/
>
> /at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:513)/
>
> /at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:360)/
>
> /at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)/
>
> /at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)/
>
> /at java.lang.Thread.run(Thread.java:748)/
>
> /Caused by: 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: 
> java.io.IOException: Cannot run program "docker": error=2, No such 
> file or directory/
>
> /at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966)/
>
> /at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:451)/
>
> /at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:436)/
>
> /at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303)/
>
> /at 
> org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)/
>
> /at 
> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:202)/
>
> /at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.open(FlinkExecutableStageFunction.java:157)/
>
> /at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)/
>
> /at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:508)/
>
> /... 4 more/
>
> /Caused by: java.io.IOException: Cannot run program "docker": error=2, 
> No such file or directory/
>
> /at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)/
>
> /at 
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:189)/
>
> /at 
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:171)/
>
> /at 
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runImage(DockerCommand.java:95)/
>
> /at 
> org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:131)/
>
> /at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:252)/
>
> /at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:231)/
>
> /at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)/
>
> /at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)/
>
> /at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)/
>
> /at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)/
>
> /at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)/
>
> /at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)/
>
> /at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)/
>
> /at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)/
>
> /... 12 more/
>
> /Caused by: java.io.IOException: error=2, No such file or directory/
>
> /at java.lang.UNIXProcess.forkAndExec(Native Method)/
>
> /at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)/
>
> /at java.lang.ProcessImpl.start(ProcessImpl.java:134)/
>
> /at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)/
>
> /... 26 more/
>
> ————
>
>
> Do you have any suggestions?
>
>
> I also tried the Portable (Java/Python/Go) tutorial, with this difference:
> docker run -p 8099:8099 -p 8098:8098 -p 8097:8097 
> apache/beam_flink1.13_job_server:latest --flink-master=localhost:8081
>
>
> But I am ending up with similar issues.
>
>
> Many thanks,
>
> Chiara
>