You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by yilun zhang <il...@gmail.com> on 2021/03/04 08:44:31 UTC

Re: using python sdk+kafka under k8s

We create a custom docker image, which include java runtime, python and
docker environment to run our job. But encountered timeout exception:

root@beam-pod:/tmp# PYTHONPATH='./' python  -m kafka_test
--runner=FlinkRunner --flink_master=beam-flink-cluster-jobmanager:8081
--flink_submit_uber_jar --environment_type=EXTERNAL
--environment_config=localhost:50000
WARNING:root:Make sure that locally built Python SDK docker image has
Python 3.7 interpreter.
ERROR:root:java.util.concurrent.TimeoutException: The heartbeat of
TaskManager with id 10.190.29.80:6122-88ce88  timed out.
at
org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(ResourceManager.java:1442)
at
org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



Our test code is super simple:


import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.external.kafka import ReadFromKafka, WriteToKafka
from apache_beam.io import WriteToText

def run_pipeline():
  with beam.Pipeline(options=PipelineOptions( runner="FlinkRunner",
            flink_master="beam-flink-cluster-jobmanager:8081",
            environment_type="EXTERNAL",
            environment_config="localhost:50000")) as p:
    (p
     | 'Read from Kafka' >>
ReadFromKafka(consumer_config={'bootstrap.servers':
'zookeeper.libra.ubiquant:31090',

 'auto.offset.reset': 'latest'},
                                          topics=['test001'])
     | 'Par with 1' >> beam.Map(lambda word: (word, 1))
     | 'Window of 10 seconds' >> beam.WindowInto(window.FixedWindows(5))
     | 'Group by key' >> beam.GroupByKey()
     | 'Sum word counts' >> beam.Map(lambda kv: (kv[0], sum(kv[1])))
   #  | "Write to Kafka" >>
WriteToKafka(producer_config={'bootstrap.servers':
'zookeeper.libra.ubiquant:31090'}, topic='test001')
     | 'Write to text' >> WriteToText("/tmp/output2")
    )

if __name__ == '__main__':
  run_pipeline()


Is there any suggestion on debugging direction? In flink UI, it looks like
it failed from first step, ReadFromKafka.

Thanks,
Yilun

On Sat, Feb 27, 2021 at 2:16 AM Kyle Weaver <kc...@google.com> wrote:

> In Beam, the Kafka connector does not know anything about the underlying
> execution engine (here Flink). It is instead translated by the runner into
> a user defined function in Flink. So it is expected that the resulting DAG
> does not look the same as it would with a native Flink source.
>
> On Fri, Feb 26, 2021 at 5:18 AM yilun zhang <il...@gmail.com> wrote:
>
>> So sorry for subscribing errors on my side resulted in multiple duplicate
>> email!
>>
>> Thanks for reply and it does help!
>>
>> I am confused when submitting beam job with kafka connector to flink, I
>> noticed that flink DAG diagram will included readFromKafka as part of flink
>> workflow. while if we submit a pyflink job(connected with kafka) directly
>> to flink, the flink workflow will exclude reading from kafka(which is the
>> resource) but only has data processing parts.
>>
>> Is that how beam want flink to do?
>>
>> Thanks a lot and sincerely apologize again for silly duplicated emails!
>>
>> Yilun
>>
>> Sam Bourne <sa...@gmail.com>于2021年2月25日 周四上午11:58写道:
>>
>>> Hi Yilun!
>>>
>>> I made a quick proof of concept repo showcasing how to run a beam
>>> pipeline in flink on k8s. It may be useful for you as reference.
>>>
>>> https://github.com/sambvfx/beam-flink-k8s
>>>
>>>
>>> On Wed, Feb 24, 2021, 8:13 AM yilun zhang <il...@gmail.com> wrote:
>>>
>>>> Hey,
>>>>
>>>> Our team is trying to use beam with connector Kafka and runner flink to
>>>> gather information and process data. We adopt python sdk and build in java
>>>> 11 in python 3.7 sdk image as java runtime for kafka expansion service.
>>>>  so :
>>>> image: beam python 3.7 docker image + build in java 11
>>>> connector: kafka
>>>> runner: flink
>>>> container: kubernetes
>>>>
>>>> We encounter an docker not found error when running:
>>>>  python3 -m kafka_test --runner=FlinkRunner
>>>> --flink_master=flink-job-manager:8081 --flink_submit_uber_jar
>>>> --environment_type=EXTERNAL --environment_config=localhost:50000
>>>>
>>>> We notice that in https://beam.apache.org/roadmap/portability/ it
>>>> mentioned the prerequisite also includes Docker. We wonder what is the
>>>> docker usage here? Is there any suggested way to build docker in
>>>> k8s container? (something maybe like sysbox for docker in docker?)
>>>>
>>>> Or maybe we should not use beam sdk+runner in k8s?
>>>>
>>>> Thanks,
>>>> Yilun
>>>>
>>>

Re: using python sdk+kafka under k8s

Posted by Kyle Weaver <kc...@google.com>.
The problem is that Kafka is a "cross-language" transform that is
implemented in Java. You have  configured your Python pipeline to run with
environment_type=EXTERNAL. However the Kafka transform has its own
environment that has environment_type=DOCKER, it does not respect the
environment_type you set for the pipeline. Currently I don't think there's
a way to configure the environment for an external transform; I brought up
this issue in a recent thread [1]. The reason for the error you are seeing
is that environment_type=DOCKER tries to start up Docker inside your Flink
workers, which must not have Docker installed.

[1]
https://lists.apache.org/thread.html/r6f6fc207ed62e1bf2a1d41deeeab554e35cd2af389ce38289a303cea%40%3Cdev.beam.apache.org%3E

On Thu, Mar 4, 2021 at 2:28 AM yilun zhang <il...@gmail.com> wrote:

> hmmm, looks like I may fail due to docker environment:
> Traceback (most recent call last):
>   File "/usr/local/lib/python3.7/runpy.py", line 193, in
> _run_module_as_main
>     "__main__", mod_spec)
>   File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code
>     exec(code, run_globals)
>   File "/tmp/kafka_test.py", line 26, in <module>
>     run_pipeline()
>   File "/tmp/kafka_test.py", line 22, in run_pipeline
>     |beam.Map(print)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 583, in __exit__
>     self.result.wait_until_finish()
>   File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
> line 581, in wait_until_finish
>     raise self._runtime_exception
> RuntimeError: Pipeline job-0f33f7f0-4fb4-4a57-a0fe-c4b2c34caff8 failed in
> state FAILED:
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
> java.io.IOException: Cannot run program "docker": error=2, No such file or
> directory
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:451)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:436)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303)
> at
> org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)
> at
> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:202)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:243)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:426)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Cannot run program "docker": error=2, No
> such file or directory
> at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
> at
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:189)
> at
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:171)
> at
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runImage(DockerCommand.java:95)
> at
> org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:131)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:252)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:231)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)
> ... 14 more
> Caused by: java.io.IOException: error=2, No such file or directory
> at java.lang.UNIXProcess.forkAndExec(Native Method)
> at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
> at java.lang.ProcessImpl.start(ProcessImpl.java:134)
> at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
> ... 28 more
>
>
> I tried to create yaml like to mount docker in local like:
> apiVersion: v1
> kind: Pod
> metadata:
>   name: beam-pod
>   namespace: default
> spec:
>   volumes:
>     - name: docker-sock
>       hostPath:
>         path: "/var/run/docker.sock"
>         type: Socket
>     - name: docker-directory
>       hostPath:
>         path: "/var/lib/docker"
>         type: Directory
>   containers:
>   - image: python3.7sdk_java11_beam:2.27.0
>     command: ["sleep","3600"]
>     name: beam-pod
>     volumeMounts:
>       - mountPath: "/var/run/docker.sock"
>         name: docker-sock
>         readOnly: false
>       - mountPath: "/var/lib/docker"
>         name: docker-directory
>         readOnly: false
>     securityContext:
>       privileged: true
>       runAsUser: 0
>     imagePullPolicy: Never
>   restartPolicy: Never
>
> And hello-world example runs fine:
> root@beam-pod:/tmp# docker run hello-world
>
> Hello from Docker!
>
>
> Thanks again!
> Yilun
>
> On Thu, Mar 4, 2021 at 4:44 PM yilun zhang <il...@gmail.com> wrote:
>
>> We create a custom docker image, which include java runtime, python and
>> docker environment to run our job. But encountered timeout exception:
>>
>> root@beam-pod:/tmp# PYTHONPATH='./' python  -m kafka_test
>> --runner=FlinkRunner --flink_master=beam-flink-cluster-jobmanager:8081
>> --flink_submit_uber_jar --environment_type=EXTERNAL
>> --environment_config=localhost:50000
>> WARNING:root:Make sure that locally built Python SDK docker image has
>> Python 3.7 interpreter.
>> ERROR:root:java.util.concurrent.TimeoutException: The heartbeat of
>> TaskManager with id 10.190.29.80:6122-88ce88  timed out.
>> at
>> org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(ResourceManager.java:1442)
>> at
>> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>>
>> Our test code is super simple:
>>
>>
>> import apache_beam as beam
>> import apache_beam.transforms.window as window
>> from apache_beam.options.pipeline_options import PipelineOptions
>> from apache_beam.io.external.kafka import ReadFromKafka, WriteToKafka
>> from apache_beam.io import WriteToText
>>
>> def run_pipeline():
>>   with beam.Pipeline(options=PipelineOptions( runner="FlinkRunner",
>>             flink_master="beam-flink-cluster-jobmanager:8081",
>>             environment_type="EXTERNAL",
>>             environment_config="localhost:50000")) as p:
>>     (p
>>      | 'Read from Kafka' >>
>> ReadFromKafka(consumer_config={'bootstrap.servers':
>> 'zookeeper.libra.ubiquant:31090',
>>
>>  'auto.offset.reset': 'latest'},
>>                                           topics=['test001'])
>>      | 'Par with 1' >> beam.Map(lambda word: (word, 1))
>>      | 'Window of 10 seconds' >> beam.WindowInto(window.FixedWindows(5))
>>      | 'Group by key' >> beam.GroupByKey()
>>      | 'Sum word counts' >> beam.Map(lambda kv: (kv[0], sum(kv[1])))
>>    #  | "Write to Kafka" >>
>> WriteToKafka(producer_config={'bootstrap.servers':
>> 'zookeeper.libra.ubiquant:31090'}, topic='test001')
>>      | 'Write to text' >> WriteToText("/tmp/output2")
>>     )
>>
>> if __name__ == '__main__':
>>   run_pipeline()
>>
>>
>> Is there any suggestion on debugging direction? In flink UI, it looks
>> like it failed from first step, ReadFromKafka.
>>
>> Thanks,
>> Yilun
>>
>> On Sat, Feb 27, 2021 at 2:16 AM Kyle Weaver <kc...@google.com> wrote:
>>
>>> In Beam, the Kafka connector does not know anything about the underlying
>>> execution engine (here Flink). It is instead translated by the runner into
>>> a user defined function in Flink. So it is expected that the resulting DAG
>>> does not look the same as it would with a native Flink source.
>>>
>>> On Fri, Feb 26, 2021 at 5:18 AM yilun zhang <il...@gmail.com> wrote:
>>>
>>>> So sorry for subscribing errors on my side resulted in multiple
>>>> duplicate email!
>>>>
>>>> Thanks for reply and it does help!
>>>>
>>>> I am confused when submitting beam job with kafka connector to flink, I
>>>> noticed that flink DAG diagram will included readFromKafka as part of flink
>>>> workflow. while if we submit a pyflink job(connected with kafka) directly
>>>> to flink, the flink workflow will exclude reading from kafka(which is the
>>>> resource) but only has data processing parts.
>>>>
>>>> Is that how beam want flink to do?
>>>>
>>>> Thanks a lot and sincerely apologize again for silly duplicated emails!
>>>>
>>>> Yilun
>>>>
>>>> Sam Bourne <sa...@gmail.com>于2021年2月25日 周四上午11:58写道:
>>>>
>>>>> Hi Yilun!
>>>>>
>>>>> I made a quick proof of concept repo showcasing how to run a beam
>>>>> pipeline in flink on k8s. It may be useful for you as reference.
>>>>>
>>>>> https://github.com/sambvfx/beam-flink-k8s
>>>>>
>>>>>
>>>>> On Wed, Feb 24, 2021, 8:13 AM yilun zhang <il...@gmail.com> wrote:
>>>>>
>>>>>> Hey,
>>>>>>
>>>>>> Our team is trying to use beam with connector Kafka and runner flink
>>>>>> to gather information and process data. We adopt python sdk and build in
>>>>>> java 11 in python 3.7 sdk image as java runtime for kafka expansion service.
>>>>>>  so :
>>>>>> image: beam python 3.7 docker image + build in java 11
>>>>>> connector: kafka
>>>>>> runner: flink
>>>>>> container: kubernetes
>>>>>>
>>>>>> We encounter an docker not found error when running:
>>>>>>  python3 -m kafka_test --runner=FlinkRunner
>>>>>> --flink_master=flink-job-manager:8081 --flink_submit_uber_jar
>>>>>> --environment_type=EXTERNAL --environment_config=localhost:50000
>>>>>>
>>>>>> We notice that in https://beam.apache.org/roadmap/portability/ it
>>>>>> mentioned the prerequisite also includes Docker. We wonder what is the
>>>>>> docker usage here? Is there any suggested way to build docker in
>>>>>> k8s container? (something maybe like sysbox for docker in docker?)
>>>>>>
>>>>>> Or maybe we should not use beam sdk+runner in k8s?
>>>>>>
>>>>>> Thanks,
>>>>>> Yilun
>>>>>>
>>>>>

Re: using python sdk+kafka under k8s

Posted by yilun zhang <il...@gmail.com>.
hmmm, looks like I may fail due to docker environment:
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/tmp/kafka_test.py", line 26, in <module>
    run_pipeline()
  File "/tmp/kafka_test.py", line 22, in run_pipeline
    |beam.Map(print)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 583, in __exit__
    self.result.wait_until_finish()
  File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
line 581, in wait_until_finish
    raise self._runtime_exception
RuntimeError: Pipeline job-0f33f7f0-4fb4-4a57-a0fe-c4b2c34caff8 failed in
state FAILED:
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
java.io.IOException: Cannot run program "docker": error=2, No such file or
directory
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:451)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:436)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303)
at
org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)
at
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:202)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:243)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:426)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Cannot run program "docker": error=2, No
such file or directory
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
at
org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:189)
at
org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:171)
at
org.apache.beam.runners.fnexecution.environment.DockerCommand.runImage(DockerCommand.java:95)
at
org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:131)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:252)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:231)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)
... 14 more
Caused by: java.io.IOException: error=2, No such file or directory
at java.lang.UNIXProcess.forkAndExec(Native Method)
at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
at java.lang.ProcessImpl.start(ProcessImpl.java:134)
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
... 28 more


I tried to create yaml like to mount docker in local like:
apiVersion: v1
kind: Pod
metadata:
  name: beam-pod
  namespace: default
spec:
  volumes:
    - name: docker-sock
      hostPath:
        path: "/var/run/docker.sock"
        type: Socket
    - name: docker-directory
      hostPath:
        path: "/var/lib/docker"
        type: Directory
  containers:
  - image: python3.7sdk_java11_beam:2.27.0
    command: ["sleep","3600"]
    name: beam-pod
    volumeMounts:
      - mountPath: "/var/run/docker.sock"
        name: docker-sock
        readOnly: false
      - mountPath: "/var/lib/docker"
        name: docker-directory
        readOnly: false
    securityContext:
      privileged: true
      runAsUser: 0
    imagePullPolicy: Never
  restartPolicy: Never

And hello-world example runs fine:
root@beam-pod:/tmp# docker run hello-world

Hello from Docker!


Thanks again!
Yilun

On Thu, Mar 4, 2021 at 4:44 PM yilun zhang <il...@gmail.com> wrote:

> We create a custom docker image, which include java runtime, python and
> docker environment to run our job. But encountered timeout exception:
>
> root@beam-pod:/tmp# PYTHONPATH='./' python  -m kafka_test
> --runner=FlinkRunner --flink_master=beam-flink-cluster-jobmanager:8081
> --flink_submit_uber_jar --environment_type=EXTERNAL
> --environment_config=localhost:50000
> WARNING:root:Make sure that locally built Python SDK docker image has
> Python 3.7 interpreter.
> ERROR:root:java.util.concurrent.TimeoutException: The heartbeat of
> TaskManager with id 10.190.29.80:6122-88ce88  timed out.
> at
> org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(ResourceManager.java:1442)
> at
> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
> Our test code is super simple:
>
>
> import apache_beam as beam
> import apache_beam.transforms.window as window
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.io.external.kafka import ReadFromKafka, WriteToKafka
> from apache_beam.io import WriteToText
>
> def run_pipeline():
>   with beam.Pipeline(options=PipelineOptions( runner="FlinkRunner",
>             flink_master="beam-flink-cluster-jobmanager:8081",
>             environment_type="EXTERNAL",
>             environment_config="localhost:50000")) as p:
>     (p
>      | 'Read from Kafka' >>
> ReadFromKafka(consumer_config={'bootstrap.servers':
> 'zookeeper.libra.ubiquant:31090',
>
>  'auto.offset.reset': 'latest'},
>                                           topics=['test001'])
>      | 'Par with 1' >> beam.Map(lambda word: (word, 1))
>      | 'Window of 10 seconds' >> beam.WindowInto(window.FixedWindows(5))
>      | 'Group by key' >> beam.GroupByKey()
>      | 'Sum word counts' >> beam.Map(lambda kv: (kv[0], sum(kv[1])))
>    #  | "Write to Kafka" >>
> WriteToKafka(producer_config={'bootstrap.servers':
> 'zookeeper.libra.ubiquant:31090'}, topic='test001')
>      | 'Write to text' >> WriteToText("/tmp/output2")
>     )
>
> if __name__ == '__main__':
>   run_pipeline()
>
>
> Is there any suggestion on debugging direction? In flink UI, it looks like
> it failed from first step, ReadFromKafka.
>
> Thanks,
> Yilun
>
> On Sat, Feb 27, 2021 at 2:16 AM Kyle Weaver <kc...@google.com> wrote:
>
>> In Beam, the Kafka connector does not know anything about the underlying
>> execution engine (here Flink). It is instead translated by the runner into
>> a user defined function in Flink. So it is expected that the resulting DAG
>> does not look the same as it would with a native Flink source.
>>
>> On Fri, Feb 26, 2021 at 5:18 AM yilun zhang <il...@gmail.com> wrote:
>>
>>> So sorry for subscribing errors on my side resulted in multiple
>>> duplicate email!
>>>
>>> Thanks for reply and it does help!
>>>
>>> I am confused when submitting beam job with kafka connector to flink, I
>>> noticed that flink DAG diagram will included readFromKafka as part of flink
>>> workflow. while if we submit a pyflink job(connected with kafka) directly
>>> to flink, the flink workflow will exclude reading from kafka(which is the
>>> resource) but only has data processing parts.
>>>
>>> Is that how beam want flink to do?
>>>
>>> Thanks a lot and sincerely apologize again for silly duplicated emails!
>>>
>>> Yilun
>>>
>>> Sam Bourne <sa...@gmail.com>于2021年2月25日 周四上午11:58写道:
>>>
>>>> Hi Yilun!
>>>>
>>>> I made a quick proof of concept repo showcasing how to run a beam
>>>> pipeline in flink on k8s. It may be useful for you as reference.
>>>>
>>>> https://github.com/sambvfx/beam-flink-k8s
>>>>
>>>>
>>>> On Wed, Feb 24, 2021, 8:13 AM yilun zhang <il...@gmail.com> wrote:
>>>>
>>>>> Hey,
>>>>>
>>>>> Our team is trying to use beam with connector Kafka and runner flink
>>>>> to gather information and process data. We adopt python sdk and build in
>>>>> java 11 in python 3.7 sdk image as java runtime for kafka expansion service.
>>>>>  so :
>>>>> image: beam python 3.7 docker image + build in java 11
>>>>> connector: kafka
>>>>> runner: flink
>>>>> container: kubernetes
>>>>>
>>>>> We encounter an docker not found error when running:
>>>>>  python3 -m kafka_test --runner=FlinkRunner
>>>>> --flink_master=flink-job-manager:8081 --flink_submit_uber_jar
>>>>> --environment_type=EXTERNAL --environment_config=localhost:50000
>>>>>
>>>>> We notice that in https://beam.apache.org/roadmap/portability/ it
>>>>> mentioned the prerequisite also includes Docker. We wonder what is the
>>>>> docker usage here? Is there any suggested way to build docker in
>>>>> k8s container? (something maybe like sysbox for docker in docker?)
>>>>>
>>>>> Or maybe we should not use beam sdk+runner in k8s?
>>>>>
>>>>> Thanks,
>>>>> Yilun
>>>>>
>>>>