You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by yilun zhang <il...@gmail.com> on 2021/02/24 08:21:55 UTC

using python sdk+kafka under k8s

Hey,

Our team is trying to use beam with connector Kafka and runner flink to
gather information and process data. We adopt python sdk and build in java
11 in python 3.7 sdk image as java runtime for kafka expansion service.
 so :
image: beam python 3.7 docker image + build in java 11
connector: kafka
runner: flink
container: kubernetes

We encounter an docker not found error when running:
 python3 -m kafka_test --runner=FlinkRunner
--flink_master=flink-job-manager:8081 --flink_submit_uber_jar
--environment_type=EXTERNAL --environment_config=localhost:50000

We notice that in https://beam.apache.org/roadmap/portability/ it mentioned
the prerequisite also includes Docker. We wonder what is the docker usage
here? Is there any suggested way to build docker in k8s container?
(something maybe like sysbox for docker in docker?)

Or maybe we should not use beam sdk+runner in k8s?

Thanks,
Yilun

Re: using python sdk+kafka under k8s

Posted by Kyle Weaver <kc...@google.com>.
The problem is that Kafka is a "cross-language" transform that is
implemented in Java. You have  configured your Python pipeline to run with
environment_type=EXTERNAL. However the Kafka transform has its own
environment that has environment_type=DOCKER, it does not respect the
environment_type you set for the pipeline. Currently I don't think there's
a way to configure the environment for an external transform; I brought up
this issue in a recent thread [1]. The reason for the error you are seeing
is that environment_type=DOCKER tries to start up Docker inside your Flink
workers, which must not have Docker installed.

[1]
https://lists.apache.org/thread.html/r6f6fc207ed62e1bf2a1d41deeeab554e35cd2af389ce38289a303cea%40%3Cdev.beam.apache.org%3E

On Thu, Mar 4, 2021 at 2:28 AM yilun zhang <il...@gmail.com> wrote:

> hmmm, looks like I may fail due to docker environment:
> Traceback (most recent call last):
>   File "/usr/local/lib/python3.7/runpy.py", line 193, in
> _run_module_as_main
>     "__main__", mod_spec)
>   File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code
>     exec(code, run_globals)
>   File "/tmp/kafka_test.py", line 26, in <module>
>     run_pipeline()
>   File "/tmp/kafka_test.py", line 22, in run_pipeline
>     |beam.Map(print)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 583, in __exit__
>     self.result.wait_until_finish()
>   File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
> line 581, in wait_until_finish
>     raise self._runtime_exception
> RuntimeError: Pipeline job-0f33f7f0-4fb4-4a57-a0fe-c4b2c34caff8 failed in
> state FAILED:
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
> java.io.IOException: Cannot run program "docker": error=2, No such file or
> directory
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:451)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:436)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303)
> at
> org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)
> at
> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:202)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:243)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:426)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Cannot run program "docker": error=2, No
> such file or directory
> at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
> at
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:189)
> at
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:171)
> at
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runImage(DockerCommand.java:95)
> at
> org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:131)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:252)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:231)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)
> ... 14 more
> Caused by: java.io.IOException: error=2, No such file or directory
> at java.lang.UNIXProcess.forkAndExec(Native Method)
> at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
> at java.lang.ProcessImpl.start(ProcessImpl.java:134)
> at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
> ... 28 more
>
>
> I tried to create yaml like to mount docker in local like:
> apiVersion: v1
> kind: Pod
> metadata:
>   name: beam-pod
>   namespace: default
> spec:
>   volumes:
>     - name: docker-sock
>       hostPath:
>         path: "/var/run/docker.sock"
>         type: Socket
>     - name: docker-directory
>       hostPath:
>         path: "/var/lib/docker"
>         type: Directory
>   containers:
>   - image: python3.7sdk_java11_beam:2.27.0
>     command: ["sleep","3600"]
>     name: beam-pod
>     volumeMounts:
>       - mountPath: "/var/run/docker.sock"
>         name: docker-sock
>         readOnly: false
>       - mountPath: "/var/lib/docker"
>         name: docker-directory
>         readOnly: false
>     securityContext:
>       privileged: true
>       runAsUser: 0
>     imagePullPolicy: Never
>   restartPolicy: Never
>
> And hello-world example runs fine:
> root@beam-pod:/tmp# docker run hello-world
>
> Hello from Docker!
>
>
> Thanks again!
> Yilun
>
> On Thu, Mar 4, 2021 at 4:44 PM yilun zhang <il...@gmail.com> wrote:
>
>> We create a custom docker image, which include java runtime, python and
>> docker environment to run our job. But encountered timeout exception:
>>
>> root@beam-pod:/tmp# PYTHONPATH='./' python  -m kafka_test
>> --runner=FlinkRunner --flink_master=beam-flink-cluster-jobmanager:8081
>> --flink_submit_uber_jar --environment_type=EXTERNAL
>> --environment_config=localhost:50000
>> WARNING:root:Make sure that locally built Python SDK docker image has
>> Python 3.7 interpreter.
>> ERROR:root:java.util.concurrent.TimeoutException: The heartbeat of
>> TaskManager with id 10.190.29.80:6122-88ce88  timed out.
>> at
>> org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(ResourceManager.java:1442)
>> at
>> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>>
>> Our test code is super simple:
>>
>>
>> import apache_beam as beam
>> import apache_beam.transforms.window as window
>> from apache_beam.options.pipeline_options import PipelineOptions
>> from apache_beam.io.external.kafka import ReadFromKafka, WriteToKafka
>> from apache_beam.io import WriteToText
>>
>> def run_pipeline():
>>   with beam.Pipeline(options=PipelineOptions( runner="FlinkRunner",
>>             flink_master="beam-flink-cluster-jobmanager:8081",
>>             environment_type="EXTERNAL",
>>             environment_config="localhost:50000")) as p:
>>     (p
>>      | 'Read from Kafka' >>
>> ReadFromKafka(consumer_config={'bootstrap.servers':
>> 'zookeeper.libra.ubiquant:31090',
>>
>>  'auto.offset.reset': 'latest'},
>>                                           topics=['test001'])
>>      | 'Par with 1' >> beam.Map(lambda word: (word, 1))
>>      | 'Window of 10 seconds' >> beam.WindowInto(window.FixedWindows(5))
>>      | 'Group by key' >> beam.GroupByKey()
>>      | 'Sum word counts' >> beam.Map(lambda kv: (kv[0], sum(kv[1])))
>>    #  | "Write to Kafka" >>
>> WriteToKafka(producer_config={'bootstrap.servers':
>> 'zookeeper.libra.ubiquant:31090'}, topic='test001')
>>      | 'Write to text' >> WriteToText("/tmp/output2")
>>     )
>>
>> if __name__ == '__main__':
>>   run_pipeline()
>>
>>
>> Is there any suggestion on debugging direction? In flink UI, it looks
>> like it failed from first step, ReadFromKafka.
>>
>> Thanks,
>> Yilun
>>
>> On Sat, Feb 27, 2021 at 2:16 AM Kyle Weaver <kc...@google.com> wrote:
>>
>>> In Beam, the Kafka connector does not know anything about the underlying
>>> execution engine (here Flink). It is instead translated by the runner into
>>> a user defined function in Flink. So it is expected that the resulting DAG
>>> does not look the same as it would with a native Flink source.
>>>
>>> On Fri, Feb 26, 2021 at 5:18 AM yilun zhang <il...@gmail.com> wrote:
>>>
>>>> So sorry for subscribing errors on my side resulted in multiple
>>>> duplicate email!
>>>>
>>>> Thanks for reply and it does help!
>>>>
>>>> I am confused when submitting beam job with kafka connector to flink, I
>>>> noticed that flink DAG diagram will included readFromKafka as part of flink
>>>> workflow. while if we submit a pyflink job(connected with kafka) directly
>>>> to flink, the flink workflow will exclude reading from kafka(which is the
>>>> resource) but only has data processing parts.
>>>>
>>>> Is that how beam want flink to do?
>>>>
>>>> Thanks a lot and sincerely apologize again for silly duplicated emails!
>>>>
>>>> Yilun
>>>>
>>>> Sam Bourne <sa...@gmail.com>于2021年2月25日 周四上午11:58写道:
>>>>
>>>>> Hi Yilun!
>>>>>
>>>>> I made a quick proof of concept repo showcasing how to run a beam
>>>>> pipeline in flink on k8s. It may be useful for you as reference.
>>>>>
>>>>> https://github.com/sambvfx/beam-flink-k8s
>>>>>
>>>>>
>>>>> On Wed, Feb 24, 2021, 8:13 AM yilun zhang <il...@gmail.com> wrote:
>>>>>
>>>>>> Hey,
>>>>>>
>>>>>> Our team is trying to use beam with connector Kafka and runner flink
>>>>>> to gather information and process data. We adopt python sdk and build in
>>>>>> java 11 in python 3.7 sdk image as java runtime for kafka expansion service.
>>>>>>  so :
>>>>>> image: beam python 3.7 docker image + build in java 11
>>>>>> connector: kafka
>>>>>> runner: flink
>>>>>> container: kubernetes
>>>>>>
>>>>>> We encounter an docker not found error when running:
>>>>>>  python3 -m kafka_test --runner=FlinkRunner
>>>>>> --flink_master=flink-job-manager:8081 --flink_submit_uber_jar
>>>>>> --environment_type=EXTERNAL --environment_config=localhost:50000
>>>>>>
>>>>>> We notice that in https://beam.apache.org/roadmap/portability/ it
>>>>>> mentioned the prerequisite also includes Docker. We wonder what is the
>>>>>> docker usage here? Is there any suggested way to build docker in
>>>>>> k8s container? (something maybe like sysbox for docker in docker?)
>>>>>>
>>>>>> Or maybe we should not use beam sdk+runner in k8s?
>>>>>>
>>>>>> Thanks,
>>>>>> Yilun
>>>>>>
>>>>>

Re: using python sdk+kafka under k8s

Posted by yilun zhang <il...@gmail.com>.
hmmm, looks like I may fail due to docker environment:
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/tmp/kafka_test.py", line 26, in <module>
    run_pipeline()
  File "/tmp/kafka_test.py", line 22, in run_pipeline
    |beam.Map(print)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 583, in __exit__
    self.result.wait_until_finish()
  File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
line 581, in wait_until_finish
    raise self._runtime_exception
RuntimeError: Pipeline job-0f33f7f0-4fb4-4a57-a0fe-c4b2c34caff8 failed in
state FAILED:
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
java.io.IOException: Cannot run program "docker": error=2, No such file or
directory
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:451)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:436)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303)
at
org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)
at
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:202)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:243)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:426)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Cannot run program "docker": error=2, No
such file or directory
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
at
org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:189)
at
org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:171)
at
org.apache.beam.runners.fnexecution.environment.DockerCommand.runImage(DockerCommand.java:95)
at
org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:131)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:252)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:231)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)
... 14 more
Caused by: java.io.IOException: error=2, No such file or directory
at java.lang.UNIXProcess.forkAndExec(Native Method)
at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
at java.lang.ProcessImpl.start(ProcessImpl.java:134)
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
... 28 more


I tried to create yaml like to mount docker in local like:
apiVersion: v1
kind: Pod
metadata:
  name: beam-pod
  namespace: default
spec:
  volumes:
    - name: docker-sock
      hostPath:
        path: "/var/run/docker.sock"
        type: Socket
    - name: docker-directory
      hostPath:
        path: "/var/lib/docker"
        type: Directory
  containers:
  - image: python3.7sdk_java11_beam:2.27.0
    command: ["sleep","3600"]
    name: beam-pod
    volumeMounts:
      - mountPath: "/var/run/docker.sock"
        name: docker-sock
        readOnly: false
      - mountPath: "/var/lib/docker"
        name: docker-directory
        readOnly: false
    securityContext:
      privileged: true
      runAsUser: 0
    imagePullPolicy: Never
  restartPolicy: Never

And hello-world example runs fine:
root@beam-pod:/tmp# docker run hello-world

Hello from Docker!


Thanks again!
Yilun

On Thu, Mar 4, 2021 at 4:44 PM yilun zhang <il...@gmail.com> wrote:

> We create a custom docker image, which include java runtime, python and
> docker environment to run our job. But encountered timeout exception:
>
> root@beam-pod:/tmp# PYTHONPATH='./' python  -m kafka_test
> --runner=FlinkRunner --flink_master=beam-flink-cluster-jobmanager:8081
> --flink_submit_uber_jar --environment_type=EXTERNAL
> --environment_config=localhost:50000
> WARNING:root:Make sure that locally built Python SDK docker image has
> Python 3.7 interpreter.
> ERROR:root:java.util.concurrent.TimeoutException: The heartbeat of
> TaskManager with id 10.190.29.80:6122-88ce88  timed out.
> at
> org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(ResourceManager.java:1442)
> at
> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
> Our test code is super simple:
>
>
> import apache_beam as beam
> import apache_beam.transforms.window as window
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.io.external.kafka import ReadFromKafka, WriteToKafka
> from apache_beam.io import WriteToText
>
> def run_pipeline():
>   with beam.Pipeline(options=PipelineOptions( runner="FlinkRunner",
>             flink_master="beam-flink-cluster-jobmanager:8081",
>             environment_type="EXTERNAL",
>             environment_config="localhost:50000")) as p:
>     (p
>      | 'Read from Kafka' >>
> ReadFromKafka(consumer_config={'bootstrap.servers':
> 'zookeeper.libra.ubiquant:31090',
>
>  'auto.offset.reset': 'latest'},
>                                           topics=['test001'])
>      | 'Par with 1' >> beam.Map(lambda word: (word, 1))
>      | 'Window of 10 seconds' >> beam.WindowInto(window.FixedWindows(5))
>      | 'Group by key' >> beam.GroupByKey()
>      | 'Sum word counts' >> beam.Map(lambda kv: (kv[0], sum(kv[1])))
>    #  | "Write to Kafka" >>
> WriteToKafka(producer_config={'bootstrap.servers':
> 'zookeeper.libra.ubiquant:31090'}, topic='test001')
>      | 'Write to text' >> WriteToText("/tmp/output2")
>     )
>
> if __name__ == '__main__':
>   run_pipeline()
>
>
> Is there any suggestion on debugging direction? In flink UI, it looks like
> it failed from first step, ReadFromKafka.
>
> Thanks,
> Yilun
>
> On Sat, Feb 27, 2021 at 2:16 AM Kyle Weaver <kc...@google.com> wrote:
>
>> In Beam, the Kafka connector does not know anything about the underlying
>> execution engine (here Flink). It is instead translated by the runner into
>> a user defined function in Flink. So it is expected that the resulting DAG
>> does not look the same as it would with a native Flink source.
>>
>> On Fri, Feb 26, 2021 at 5:18 AM yilun zhang <il...@gmail.com> wrote:
>>
>>> So sorry for subscribing errors on my side resulted in multiple
>>> duplicate email!
>>>
>>> Thanks for reply and it does help!
>>>
>>> I am confused when submitting beam job with kafka connector to flink, I
>>> noticed that flink DAG diagram will included readFromKafka as part of flink
>>> workflow. while if we submit a pyflink job(connected with kafka) directly
>>> to flink, the flink workflow will exclude reading from kafka(which is the
>>> resource) but only has data processing parts.
>>>
>>> Is that how beam want flink to do?
>>>
>>> Thanks a lot and sincerely apologize again for silly duplicated emails!
>>>
>>> Yilun
>>>
>>> Sam Bourne <sa...@gmail.com>于2021年2月25日 周四上午11:58写道:
>>>
>>>> Hi Yilun!
>>>>
>>>> I made a quick proof of concept repo showcasing how to run a beam
>>>> pipeline in flink on k8s. It may be useful for you as reference.
>>>>
>>>> https://github.com/sambvfx/beam-flink-k8s
>>>>
>>>>
>>>> On Wed, Feb 24, 2021, 8:13 AM yilun zhang <il...@gmail.com> wrote:
>>>>
>>>>> Hey,
>>>>>
>>>>> Our team is trying to use beam with connector Kafka and runner flink
>>>>> to gather information and process data. We adopt python sdk and build in
>>>>> java 11 in python 3.7 sdk image as java runtime for kafka expansion service.
>>>>>  so :
>>>>> image: beam python 3.7 docker image + build in java 11
>>>>> connector: kafka
>>>>> runner: flink
>>>>> container: kubernetes
>>>>>
>>>>> We encounter an docker not found error when running:
>>>>>  python3 -m kafka_test --runner=FlinkRunner
>>>>> --flink_master=flink-job-manager:8081 --flink_submit_uber_jar
>>>>> --environment_type=EXTERNAL --environment_config=localhost:50000
>>>>>
>>>>> We notice that in https://beam.apache.org/roadmap/portability/ it
>>>>> mentioned the prerequisite also includes Docker. We wonder what is the
>>>>> docker usage here? Is there any suggested way to build docker in
>>>>> k8s container? (something maybe like sysbox for docker in docker?)
>>>>>
>>>>> Or maybe we should not use beam sdk+runner in k8s?
>>>>>
>>>>> Thanks,
>>>>> Yilun
>>>>>
>>>>

Re: using python sdk+kafka under k8s

Posted by yilun zhang <il...@gmail.com>.
We create a custom docker image, which include java runtime, python and
docker environment to run our job. But encountered timeout exception:

root@beam-pod:/tmp# PYTHONPATH='./' python  -m kafka_test
--runner=FlinkRunner --flink_master=beam-flink-cluster-jobmanager:8081
--flink_submit_uber_jar --environment_type=EXTERNAL
--environment_config=localhost:50000
WARNING:root:Make sure that locally built Python SDK docker image has
Python 3.7 interpreter.
ERROR:root:java.util.concurrent.TimeoutException: The heartbeat of
TaskManager with id 10.190.29.80:6122-88ce88  timed out.
at
org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(ResourceManager.java:1442)
at
org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



Our test code is super simple:


import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.external.kafka import ReadFromKafka, WriteToKafka
from apache_beam.io import WriteToText

def run_pipeline():
  with beam.Pipeline(options=PipelineOptions( runner="FlinkRunner",
            flink_master="beam-flink-cluster-jobmanager:8081",
            environment_type="EXTERNAL",
            environment_config="localhost:50000")) as p:
    (p
     | 'Read from Kafka' >>
ReadFromKafka(consumer_config={'bootstrap.servers':
'zookeeper.libra.ubiquant:31090',

 'auto.offset.reset': 'latest'},
                                          topics=['test001'])
     | 'Par with 1' >> beam.Map(lambda word: (word, 1))
     | 'Window of 10 seconds' >> beam.WindowInto(window.FixedWindows(5))
     | 'Group by key' >> beam.GroupByKey()
     | 'Sum word counts' >> beam.Map(lambda kv: (kv[0], sum(kv[1])))
   #  | "Write to Kafka" >>
WriteToKafka(producer_config={'bootstrap.servers':
'zookeeper.libra.ubiquant:31090'}, topic='test001')
     | 'Write to text' >> WriteToText("/tmp/output2")
    )

if __name__ == '__main__':
  run_pipeline()


Is there any suggestion on debugging direction? In flink UI, it looks like
it failed from first step, ReadFromKafka.

Thanks,
Yilun

On Sat, Feb 27, 2021 at 2:16 AM Kyle Weaver <kc...@google.com> wrote:

> In Beam, the Kafka connector does not know anything about the underlying
> execution engine (here Flink). It is instead translated by the runner into
> a user defined function in Flink. So it is expected that the resulting DAG
> does not look the same as it would with a native Flink source.
>
> On Fri, Feb 26, 2021 at 5:18 AM yilun zhang <il...@gmail.com> wrote:
>
>> So sorry for subscribing errors on my side resulted in multiple duplicate
>> email!
>>
>> Thanks for reply and it does help!
>>
>> I am confused when submitting beam job with kafka connector to flink, I
>> noticed that flink DAG diagram will included readFromKafka as part of flink
>> workflow. while if we submit a pyflink job(connected with kafka) directly
>> to flink, the flink workflow will exclude reading from kafka(which is the
>> resource) but only has data processing parts.
>>
>> Is that how beam want flink to do?
>>
>> Thanks a lot and sincerely apologize again for silly duplicated emails!
>>
>> Yilun
>>
>> Sam Bourne <sa...@gmail.com>于2021年2月25日 周四上午11:58写道:
>>
>>> Hi Yilun!
>>>
>>> I made a quick proof of concept repo showcasing how to run a beam
>>> pipeline in flink on k8s. It may be useful for you as reference.
>>>
>>> https://github.com/sambvfx/beam-flink-k8s
>>>
>>>
>>> On Wed, Feb 24, 2021, 8:13 AM yilun zhang <il...@gmail.com> wrote:
>>>
>>>> Hey,
>>>>
>>>> Our team is trying to use beam with connector Kafka and runner flink to
>>>> gather information and process data. We adopt python sdk and build in java
>>>> 11 in python 3.7 sdk image as java runtime for kafka expansion service.
>>>>  so :
>>>> image: beam python 3.7 docker image + build in java 11
>>>> connector: kafka
>>>> runner: flink
>>>> container: kubernetes
>>>>
>>>> We encounter an docker not found error when running:
>>>>  python3 -m kafka_test --runner=FlinkRunner
>>>> --flink_master=flink-job-manager:8081 --flink_submit_uber_jar
>>>> --environment_type=EXTERNAL --environment_config=localhost:50000
>>>>
>>>> We notice that in https://beam.apache.org/roadmap/portability/ it
>>>> mentioned the prerequisite also includes Docker. We wonder what is the
>>>> docker usage here? Is there any suggested way to build docker in
>>>> k8s container? (something maybe like sysbox for docker in docker?)
>>>>
>>>> Or maybe we should not use beam sdk+runner in k8s?
>>>>
>>>> Thanks,
>>>> Yilun
>>>>
>>>

Re: using python sdk+kafka under k8s

Posted by Kyle Weaver <kc...@google.com>.
In Beam, the Kafka connector does not know anything about the underlying
execution engine (here Flink). It is instead translated by the runner into
a user defined function in Flink. So it is expected that the resulting DAG
does not look the same as it would with a native Flink source.

On Fri, Feb 26, 2021 at 5:18 AM yilun zhang <il...@gmail.com> wrote:

> So sorry for subscribing errors on my side resulted in multiple duplicate
> email!
>
> Thanks for reply and it does help!
>
> I am confused when submitting beam job with kafka connector to flink, I
> noticed that flink DAG diagram will included readFromKafka as part of flink
> workflow. while if we submit a pyflink job(connected with kafka) directly
> to flink, the flink workflow will exclude reading from kafka(which is the
> resource) but only has data processing parts.
>
> Is that how beam want flink to do?
>
> Thanks a lot and sincerely apologize again for silly duplicated emails!
>
> Yilun
>
> Sam Bourne <sa...@gmail.com>于2021年2月25日 周四上午11:58写道:
>
>> Hi Yilun!
>>
>> I made a quick proof of concept repo showcasing how to run a beam
>> pipeline in flink on k8s. It may be useful for you as reference.
>>
>> https://github.com/sambvfx/beam-flink-k8s
>>
>>
>> On Wed, Feb 24, 2021, 8:13 AM yilun zhang <il...@gmail.com> wrote:
>>
>>> Hey,
>>>
>>> Our team is trying to use beam with connector Kafka and runner flink to
>>> gather information and process data. We adopt python sdk and build in java
>>> 11 in python 3.7 sdk image as java runtime for kafka expansion service.
>>>  so :
>>> image: beam python 3.7 docker image + build in java 11
>>> connector: kafka
>>> runner: flink
>>> container: kubernetes
>>>
>>> We encounter an docker not found error when running:
>>>  python3 -m kafka_test --runner=FlinkRunner
>>> --flink_master=flink-job-manager:8081 --flink_submit_uber_jar
>>> --environment_type=EXTERNAL --environment_config=localhost:50000
>>>
>>> We notice that in https://beam.apache.org/roadmap/portability/ it
>>> mentioned the prerequisite also includes Docker. We wonder what is the
>>> docker usage here? Is there any suggested way to build docker in
>>> k8s container? (something maybe like sysbox for docker in docker?)
>>>
>>> Or maybe we should not use beam sdk+runner in k8s?
>>>
>>> Thanks,
>>> Yilun
>>>
>>

Re: using python sdk+kafka under k8s

Posted by yilun zhang <il...@gmail.com>.
So sorry for subscribing errors on my side resulted in multiple duplicate
email!

Thanks for reply and it does help!

I am confused when submitting beam job with kafka connector to flink, I
noticed that flink DAG diagram will included readFromKafka as part of flink
workflow. while if we submit a pyflink job(connected with kafka) directly
to flink, the flink workflow will exclude reading from kafka(which is the
resource) but only has data processing parts.

Is that how beam want flink to do?

Thanks a lot and sincerely apologize again for silly duplicated emails!

Yilun

Sam Bourne <sa...@gmail.com>于2021年2月25日 周四上午11:58写道:

> Hi Yilun!
>
> I made a quick proof of concept repo showcasing how to run a beam pipeline
> in flink on k8s. It may be useful for you as reference.
>
> https://github.com/sambvfx/beam-flink-k8s
>
>
> On Wed, Feb 24, 2021, 8:13 AM yilun zhang <il...@gmail.com> wrote:
>
>> Hey,
>>
>> Our team is trying to use beam with connector Kafka and runner flink to
>> gather information and process data. We adopt python sdk and build in java
>> 11 in python 3.7 sdk image as java runtime for kafka expansion service.
>>  so :
>> image: beam python 3.7 docker image + build in java 11
>> connector: kafka
>> runner: flink
>> container: kubernetes
>>
>> We encounter an docker not found error when running:
>>  python3 -m kafka_test --runner=FlinkRunner
>> --flink_master=flink-job-manager:8081 --flink_submit_uber_jar
>> --environment_type=EXTERNAL --environment_config=localhost:50000
>>
>> We notice that in https://beam.apache.org/roadmap/portability/ it
>> mentioned the prerequisite also includes Docker. We wonder what is the
>> docker usage here? Is there any suggested way to build docker in
>> k8s container? (something maybe like sysbox for docker in docker?)
>>
>> Or maybe we should not use beam sdk+runner in k8s?
>>
>> Thanks,
>> Yilun
>>
>

Re: using python sdk+kafka under k8s

Posted by Sam Bourne <sa...@gmail.com>.
Hi Yilun!

I made a quick proof of concept repo showcasing how to run a beam pipeline
in flink on k8s. It may be useful for you as reference.

https://github.com/sambvfx/beam-flink-k8s

On Wed, Feb 24, 2021, 8:13 AM yilun zhang <il...@gmail.com> wrote:

> Hey,
>
> Our team is trying to use beam with connector Kafka and runner flink to
> gather information and process data. We adopt python sdk and build in java
> 11 in python 3.7 sdk image as java runtime for kafka expansion service.
>  so :
> image: beam python 3.7 docker image + build in java 11
> connector: kafka
> runner: flink
> container: kubernetes
>
> We encounter an docker not found error when running:
>  python3 -m kafka_test --runner=FlinkRunner
> --flink_master=flink-job-manager:8081 --flink_submit_uber_jar
> --environment_type=EXTERNAL --environment_config=localhost:50000
>
> We notice that in https://beam.apache.org/roadmap/portability/ it
> mentioned the prerequisite also includes Docker. We wonder what is the
> docker usage here? Is there any suggested way to build docker in
> k8s container? (something maybe like sysbox for docker in docker?)
>
> Or maybe we should not use beam sdk+runner in k8s?
>
> Thanks,
> Yilun
>

Re: using python sdk+kafka under k8s

Posted by Sam Bourne <sa...@gmail.com>.
Hi Yilun!

I made a quick proof of concept repo showcasing how to run a beam pipeline
in flink on k8s. It may be useful for you as reference.

https://github.com/sambvfx/beam-flink-k8s

On Wed, Feb 24, 2021, 8:13 AM yilun zhang <il...@gmail.com> wrote:

> Hey,
>
> Our team is trying to use beam with connector Kafka and runner flink to
> gather information and process data. We adopt python sdk and build in java
> 11 in python 3.7 sdk image as java runtime for kafka expansion service.
>  so :
> image: beam python 3.7 docker image + build in java 11
> connector: kafka
> runner: flink
> container: kubernetes
>
> We encounter an docker not found error when running:
>  python3 -m kafka_test --runner=FlinkRunner
> --flink_master=flink-job-manager:8081 --flink_submit_uber_jar
> --environment_type=EXTERNAL --environment_config=localhost:50000
>
> We notice that in https://beam.apache.org/roadmap/portability/ it
> mentioned the prerequisite also includes Docker. We wonder what is the
> docker usage here? Is there any suggested way to build docker in
> k8s container? (something maybe like sysbox for docker in docker?)
>
> Or maybe we should not use beam sdk+runner in k8s?
>
> Thanks,
> Yilun
>