You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Yu Watanabe <yu...@gmail.com> on 2019/09/12 11:50:58 UTC

How do you write portable runner pipeline on separate python code ?

Hello.

I would like to ask for help with my sample code using portable runner
using apache flink.
I was able to work out the wordcount.py using this page.

https://beam.apache.org/roadmap/portability/

I got below two files under /tmp.

-rw-r--r-- 1 ywatanabe ywatanabe    185 Sep 12 19:56
py-wordcount-direct-00001-of-00002
-rw-r--r-- 1 ywatanabe ywatanabe    190 Sep 12 19:56
py-wordcount-direct-00000-of-00002

Then I wrote sample code with below steps.

1.Install apache_beam using pip3 separate from source code directory.
2. Wrote sample code as below and named it "test-protable-runner.py".
Placed it separate directory from source code.
-----------------------------------------------------------------------------------
(python) ywatanabe@debian-09-00:~$ ls -ltr
total 16
drwxr-xr-x 18 ywatanabe ywatanabe 4096 Sep 12 19:06 beam (<- source code
directory)
-rw-r--r--  1 ywatanabe ywatanabe  634 Sep 12 20:25 test-portable-runner.py

-----------------------------------------------------------------------------------
3. Executed the code with "python3 test-protable-ruuner.py"

==========================================================================================
#!/usr/bin/env

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import WriteToText


def printMsg(line):

    print("OUTPUT: {0}".format(line))

    return line

options = PipelineOptions(["--runner=PortableRunner",
"--job_endpoint=localhost:8099", "--shutdown_sources_on_final_watermark"])

p = beam.Pipeline(options=options)

output = ( p | 'create' >> beam.Create(["a", "b", "c"])
             | beam.Map(printMsg)
         )

output | 'write' >> WriteToText('/tmp/sample.txt')
=======================================================================================

Job seemed to went all the way to "FINISHED" state.
-------------------------------------------------------------------------------------------------------------------------------------------------------
[DataSource (Impulse) (1/1)] INFO org.apache.flink.runtime.taskmanager.Task
- Registering task at network: DataSource (Impulse) (1/1)
(9df528f4d493d8c3b62c8be23dc889a8) [DEPLOYING].
[DataSource (Impulse) (1/1)] INFO org.apache.flink.runtime.taskmanager.Task
- DataSource (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) switched
from DEPLOYING to RUNNING.
[flink-akka.actor.default-dispatcher-3] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource
(Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING
to RUNNING.
[flink-akka.actor.default-dispatcher-3] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
(MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
(1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from CREATED to SCHEDULED.
[flink-akka.actor.default-dispatcher-3] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
(MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
(1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from SCHEDULED to
DEPLOYING.
[flink-akka.actor.default-dispatcher-3] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying CHAIN
MapPartition (MapPartition at
[2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1) (attempt #0)
to aad5f142-dbb7-4900-a5ae-af8a6fdcc787 @ localhost (dataPort=-1)
[flink-akka.actor.default-dispatcher-2] INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task CHAIN
MapPartition (MapPartition at
[2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1).
[DataSource (Impulse) (1/1)] INFO org.apache.flink.runtime.taskmanager.Task
- Loading JAR files for task DataSource (Impulse) (1/1)
(d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
[DataSource (Impulse) (1/1)] INFO org.apache.flink.runtime.taskmanager.Task
- Registering task at network: DataSource (Impulse) (1/1)
(d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
[DataSource (Impulse) (1/1)] INFO org.apache.flink.runtime.taskmanager.Task
- DataSource (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) switched
from RUNNING to FINISHED.
-------------------------------------------------------------------------------------------------------------------------------------------------------

But I ended up with docker error on client side.

-------------------------------------------------------------------------------------------------------------------------------------------------------
(python) ywatanabe@debian-09-00:~$ python3 test-portable-runner.py
/home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/__init__.py:84:
UserWarning: Some syntactic constructs of Python 3 are not yet fully
supported by Apache Beam.
  'Some syntactic constructs of Python 3 are not yet fully supported by '
ERROR:root:java.io.IOException: Received exit code 125 for command 'docker
run -d --network=host --env=DOCKER_MAC_CONTAINER=null
ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
--logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
--provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
stderr: Unable to find image '
ywatanabe-docker-apache.bintray.io/beam/python3:latest' locallydocker:
Error response from daemon: unknown: Subject ywatanabe was not found.See
'docker run --help'.
Traceback (most recent call last):
  File "test-portable-runner.py", line 27, in <module>
    result.wait_until_finish()
  File
"/home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/runners/portability/portable_runner.py",
line 446, in wait_until_finish
    self._job_id, self._state, self._last_error_message()))
RuntimeError: Pipeline
BeamApp-ywatanabe-0912112516-22d84d63_d3b5e778-d771-4118-9ba7-c9dde85938e9
failed in state FAILED: java.io.IOException: Received exit code 125 for
command 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
--logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
--provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
stderr: Unable to find image '
ywatanabe-docker-apache.bintray.io/beam/python3:latest' locallydocker:
Error response from daemon: unknown: Subject ywatanabe was not found.See
'docker run --help'.

-------------------------------------------------------------------------------------------------------------------------------------------------------

As a result , I got nothing under /tmp . Code works when using DirectRunner.
May I ask , where should I look for in order to get the pipeline to write
results to text files under /tmp ?

Best Regards,
Yu Watanabe

-- 
Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
yu.w.tennis@gmail.com
[image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
Twitter icon] <https://twitter.com/yuwtennis>

Re: How do you write portable runner pipeline on separate python code ?

Posted by Kyle Weaver <kc...@google.com>.
> Is it one of the best guarded secrets? ;-)
Apparently so!

Filed a few related jiras and assigned to myself.
[1] https://issues.apache.org/jira/browse/BEAM-8214
[2] https://issues.apache.org/jira/browse/BEAM-8232
[3] https://issues.apache.org/jira/browse/BEAM-8233

Kyle Weaver | Software Engineer | github.com/ibzib | kcweaver@google.com


On Fri, Sep 13, 2019 at 9:57 AM Robert Bradshaw <ro...@google.com> wrote:

> Note that loopback won't fix the problem for, say, cross-language IOs.
> But, yes, it's really handy and should probably be used more.
>
> On Fri, Sep 13, 2019 at 8:29 AM Lukasz Cwik <lc...@google.com> wrote:
>
>> And/or update the wiki/website with some how to's...
>>
>> On Fri, Sep 13, 2019 at 7:51 AM Thomas Weise <th...@apache.org> wrote:
>>
>>> I agree that loopback would be preferable for this purpose. I just
>>> wasn't aware this even works with the portable Flink runner. Is it one of
>>> the best guarded secrets? ;-)
>>>
>>> Kyle, can you please post the pipeline options you would use for Flink?
>>>
>>>
>>> On Thu, Sep 12, 2019 at 5:57 PM Kyle Weaver <kc...@google.com> wrote:
>>>
>>>> I prefer loopback because a) it writes output files to the local
>>>> filesystem, as the user expects, and b) you don't have to pull or build
>>>> docker images, or even have docker installed on your system -- which is one
>>>> less point of failure.
>>>>
>>>> Kyle Weaver | Software Engineer | github.com/ibzib |
>>>> kcweaver@google.com
>>>>
>>>>
>>>> On Thu, Sep 12, 2019 at 5:48 PM Thomas Weise <th...@apache.org> wrote:
>>>>
>>>>> This should become much better with 2.16 when we have the Docker
>>>>> images prebuilt.
>>>>>
>>>>> Docker is probably still the best option for Python on a JVM based
>>>>> runner in a local environment that does not have a development setup.
>>>>>
>>>>>
>>>>> On Thu, Sep 12, 2019 at 1:09 PM Kyle Weaver <kc...@google.com>
>>>>> wrote:
>>>>>
>>>>>> +dev <de...@beam.apache.org> I think we should probably point new
>>>>>> users of the portable Flink/Spark runners to use loopback or some other
>>>>>> non-docker environment, as Docker adds some operational complexity that
>>>>>> isn't really needed to run a word count example. For example, Yu's pipeline
>>>>>> errored here because the expected Docker container wasn't built before
>>>>>> running.
>>>>>>
>>>>>> Kyle Weaver | Software Engineer | github.com/ibzib |
>>>>>> kcweaver@google.com
>>>>>>
>>>>>>
>>>>>> On Thu, Sep 12, 2019 at 11:27 AM Robert Bradshaw <ro...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> On this note, making local files easy to read is something we'd
>>>>>>> definitely like to improve, as the current behavior is quite surprising.
>>>>>>> This could be useful not just for running with docker and the portable
>>>>>>> runner locally, but more generally when running on a distributed system
>>>>>>> (e.g. a Flink/Spark cluster or Dataflow). It would be very convenient if we
>>>>>>> could automatically stage local files to be read as artifacts that could be
>>>>>>> consumed by any worker (possibly via external directory mounting in the
>>>>>>> local docker case rather than an actual copy), and conversely copy small
>>>>>>> outputs back to the local machine (with the similar optimization for local
>>>>>>> docker).
>>>>>>>
>>>>>>> At the very least, however, obvious messaging when the local
>>>>>>> filesystem is used from within docker, which is often a (non-obvious and
>>>>>>> hard to debug) mistake should be added.
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Sep 12, 2019 at 10:34 AM Lukasz Cwik <lc...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> When you use a local filesystem path and a docker environment,
>>>>>>>> "/tmp" is written inside the container. You can solve this issue by:
>>>>>>>> * Using a "remote" filesystem such as HDFS/S3/GCS/...
>>>>>>>> * Mounting an external directory into the container so that any
>>>>>>>> "local" writes appear outside the container
>>>>>>>> * Using a non-docker environment such as external or process.
>>>>>>>>
>>>>>>>> On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe <yu...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hello.
>>>>>>>>>
>>>>>>>>> I would like to ask for help with my sample code using portable
>>>>>>>>> runner using apache flink.
>>>>>>>>> I was able to work out the wordcount.py using this page.
>>>>>>>>>
>>>>>>>>> https://beam.apache.org/roadmap/portability/
>>>>>>>>>
>>>>>>>>> I got below two files under /tmp.
>>>>>>>>>
>>>>>>>>> -rw-r--r-- 1 ywatanabe ywatanabe    185 Sep 12 19:56
>>>>>>>>> py-wordcount-direct-00001-of-00002
>>>>>>>>> -rw-r--r-- 1 ywatanabe ywatanabe    190 Sep 12 19:56
>>>>>>>>> py-wordcount-direct-00000-of-00002
>>>>>>>>>
>>>>>>>>> Then I wrote sample code with below steps.
>>>>>>>>>
>>>>>>>>> 1.Install apache_beam using pip3 separate from source code
>>>>>>>>> directory.
>>>>>>>>> 2. Wrote sample code as below and named it
>>>>>>>>> "test-protable-runner.py".  Placed it separate directory from source code.
>>>>>>>>>
>>>>>>>>> -----------------------------------------------------------------------------------
>>>>>>>>> (python) ywatanabe@debian-09-00:~$ ls -ltr
>>>>>>>>> total 16
>>>>>>>>> drwxr-xr-x 18 ywatanabe ywatanabe 4096 Sep 12 19:06 beam (<-
>>>>>>>>> source code directory)
>>>>>>>>> -rw-r--r--  1 ywatanabe ywatanabe  634 Sep 12 20:25
>>>>>>>>> test-portable-runner.py
>>>>>>>>>
>>>>>>>>> -----------------------------------------------------------------------------------
>>>>>>>>> 3. Executed the code with "python3 test-protable-ruuner.py"
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ==========================================================================================
>>>>>>>>> #!/usr/bin/env
>>>>>>>>>
>>>>>>>>> import apache_beam as beam
>>>>>>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>>>>>>> from apache_beam.io import WriteToText
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> def printMsg(line):
>>>>>>>>>
>>>>>>>>>     print("OUTPUT: {0}".format(line))
>>>>>>>>>
>>>>>>>>>     return line
>>>>>>>>>
>>>>>>>>> options = PipelineOptions(["--runner=PortableRunner",
>>>>>>>>> "--job_endpoint=localhost:8099", "--shutdown_sources_on_final_watermark"])
>>>>>>>>>
>>>>>>>>> p = beam.Pipeline(options=options)
>>>>>>>>>
>>>>>>>>> output = ( p | 'create' >> beam.Create(["a", "b", "c"])
>>>>>>>>>              | beam.Map(printMsg)
>>>>>>>>>          )
>>>>>>>>>
>>>>>>>>> output | 'write' >> WriteToText('/tmp/sample.txt')
>>>>>>>>>
>>>>>>>>> =======================================================================================
>>>>>>>>>
>>>>>>>>> Job seemed to went all the way to "FINISHED" state.
>>>>>>>>>
>>>>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>>>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>>>>>>>>> DataSource (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) [DEPLOYING].
>>>>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>>>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>>>>>>>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING to RUNNING.
>>>>>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource
>>>>>>>>> (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING
>>>>>>>>> to RUNNING.
>>>>>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>>>>>>>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>>>>>>>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>>>>>>>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from CREATED to SCHEDULED.
>>>>>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>>>>>>>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>>>>>>>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>>>>>>>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from SCHEDULED to
>>>>>>>>> DEPLOYING.
>>>>>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying CHAIN
>>>>>>>>> MapPartition (MapPartition at
>>>>>>>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>>>>>>>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1) (attempt #0)
>>>>>>>>> to aad5f142-dbb7-4900-a5ae-af8a6fdcc787 @ localhost (dataPort=-1)
>>>>>>>>> [flink-akka.actor.default-dispatcher-2] INFO
>>>>>>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task CHAIN
>>>>>>>>> MapPartition (MapPartition at
>>>>>>>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>>>>>>>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1).
>>>>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>>>>> org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task
>>>>>>>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>>>>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>>>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>>>>>>>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>>>>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>>>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>>>>>>>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from RUNNING to FINISHED.
>>>>>>>>>
>>>>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>>>>
>>>>>>>>> But I ended up with docker error on client side.
>>>>>>>>>
>>>>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>>>> (python) ywatanabe@debian-09-00:~$ python3 test-portable-runner.py
>>>>>>>>> /home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/__init__.py:84:
>>>>>>>>> UserWarning: Some syntactic constructs of Python 3 are not yet fully
>>>>>>>>> supported by Apache Beam.
>>>>>>>>>   'Some syntactic constructs of Python 3 are not yet fully
>>>>>>>>> supported by '
>>>>>>>>> ERROR:root:java.io.IOException: Received exit code 125 for command
>>>>>>>>> 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>>>>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>>>>>>>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>>>>>>>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>>>>>>>>> stderr: Unable to find image '
>>>>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest'
>>>>>>>>> locallydocker: Error response from daemon: unknown: Subject ywatanabe was
>>>>>>>>> not found.See 'docker run --help'.
>>>>>>>>> Traceback (most recent call last):
>>>>>>>>>   File "test-portable-runner.py", line 27, in <module>
>>>>>>>>>     result.wait_until_finish()
>>>>>>>>>   File
>>>>>>>>> "/home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/runners/portability/portable_runner.py",
>>>>>>>>> line 446, in wait_until_finish
>>>>>>>>>     self._job_id, self._state, self._last_error_message()))
>>>>>>>>> RuntimeError: Pipeline
>>>>>>>>> BeamApp-ywatanabe-0912112516-22d84d63_d3b5e778-d771-4118-9ba7-c9dde85938e9
>>>>>>>>> failed in state FAILED: java.io.IOException: Received exit code 125 for
>>>>>>>>> command 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>>>>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>>>>>>>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>>>>>>>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>>>>>>>>> stderr: Unable to find image '
>>>>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest'
>>>>>>>>> locallydocker: Error response from daemon: unknown: Subject ywatanabe was
>>>>>>>>> not found.See 'docker run --help'.
>>>>>>>>>
>>>>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>>>>
>>>>>>>>> As a result , I got nothing under /tmp . Code works when using
>>>>>>>>> DirectRunner.
>>>>>>>>> May I ask , where should I look for in order to get the pipeline
>>>>>>>>> to write results to text files under /tmp ?
>>>>>>>>>
>>>>>>>>> Best Regards,
>>>>>>>>> Yu Watanabe
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Yu Watanabe
>>>>>>>>> Weekend Freelancer who loves to challenge building data platform
>>>>>>>>> yu.w.tennis@gmail.com
>>>>>>>>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
>>>>>>>>> Twitter icon] <https://twitter.com/yuwtennis>
>>>>>>>>>
>>>>>>>>

Re: How do you write portable runner pipeline on separate python code ?

Posted by Kyle Weaver <kc...@google.com>.
> Is it one of the best guarded secrets? ;-)
Apparently so!

Filed a few related jiras and assigned to myself.
[1] https://issues.apache.org/jira/browse/BEAM-8214
[2] https://issues.apache.org/jira/browse/BEAM-8232
[3] https://issues.apache.org/jira/browse/BEAM-8233

Kyle Weaver | Software Engineer | github.com/ibzib | kcweaver@google.com


On Fri, Sep 13, 2019 at 9:57 AM Robert Bradshaw <ro...@google.com> wrote:

> Note that loopback won't fix the problem for, say, cross-language IOs.
> But, yes, it's really handy and should probably be used more.
>
> On Fri, Sep 13, 2019 at 8:29 AM Lukasz Cwik <lc...@google.com> wrote:
>
>> And/or update the wiki/website with some how to's...
>>
>> On Fri, Sep 13, 2019 at 7:51 AM Thomas Weise <th...@apache.org> wrote:
>>
>>> I agree that loopback would be preferable for this purpose. I just
>>> wasn't aware this even works with the portable Flink runner. Is it one of
>>> the best guarded secrets? ;-)
>>>
>>> Kyle, can you please post the pipeline options you would use for Flink?
>>>
>>>
>>> On Thu, Sep 12, 2019 at 5:57 PM Kyle Weaver <kc...@google.com> wrote:
>>>
>>>> I prefer loopback because a) it writes output files to the local
>>>> filesystem, as the user expects, and b) you don't have to pull or build
>>>> docker images, or even have docker installed on your system -- which is one
>>>> less point of failure.
>>>>
>>>> Kyle Weaver | Software Engineer | github.com/ibzib |
>>>> kcweaver@google.com
>>>>
>>>>
>>>> On Thu, Sep 12, 2019 at 5:48 PM Thomas Weise <th...@apache.org> wrote:
>>>>
>>>>> This should become much better with 2.16 when we have the Docker
>>>>> images prebuilt.
>>>>>
>>>>> Docker is probably still the best option for Python on a JVM based
>>>>> runner in a local environment that does not have a development setup.
>>>>>
>>>>>
>>>>> On Thu, Sep 12, 2019 at 1:09 PM Kyle Weaver <kc...@google.com>
>>>>> wrote:
>>>>>
>>>>>> +dev <de...@beam.apache.org> I think we should probably point new
>>>>>> users of the portable Flink/Spark runners to use loopback or some other
>>>>>> non-docker environment, as Docker adds some operational complexity that
>>>>>> isn't really needed to run a word count example. For example, Yu's pipeline
>>>>>> errored here because the expected Docker container wasn't built before
>>>>>> running.
>>>>>>
>>>>>> Kyle Weaver | Software Engineer | github.com/ibzib |
>>>>>> kcweaver@google.com
>>>>>>
>>>>>>
>>>>>> On Thu, Sep 12, 2019 at 11:27 AM Robert Bradshaw <ro...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> On this note, making local files easy to read is something we'd
>>>>>>> definitely like to improve, as the current behavior is quite surprising.
>>>>>>> This could be useful not just for running with docker and the portable
>>>>>>> runner locally, but more generally when running on a distributed system
>>>>>>> (e.g. a Flink/Spark cluster or Dataflow). It would be very convenient if we
>>>>>>> could automatically stage local files to be read as artifacts that could be
>>>>>>> consumed by any worker (possibly via external directory mounting in the
>>>>>>> local docker case rather than an actual copy), and conversely copy small
>>>>>>> outputs back to the local machine (with the similar optimization for local
>>>>>>> docker).
>>>>>>>
>>>>>>> At the very least, however, obvious messaging when the local
>>>>>>> filesystem is used from within docker, which is often a (non-obvious and
>>>>>>> hard to debug) mistake should be added.
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Sep 12, 2019 at 10:34 AM Lukasz Cwik <lc...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> When you use a local filesystem path and a docker environment,
>>>>>>>> "/tmp" is written inside the container. You can solve this issue by:
>>>>>>>> * Using a "remote" filesystem such as HDFS/S3/GCS/...
>>>>>>>> * Mounting an external directory into the container so that any
>>>>>>>> "local" writes appear outside the container
>>>>>>>> * Using a non-docker environment such as external or process.
>>>>>>>>
>>>>>>>> On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe <yu...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hello.
>>>>>>>>>
>>>>>>>>> I would like to ask for help with my sample code using portable
>>>>>>>>> runner using apache flink.
>>>>>>>>> I was able to work out the wordcount.py using this page.
>>>>>>>>>
>>>>>>>>> https://beam.apache.org/roadmap/portability/
>>>>>>>>>
>>>>>>>>> I got below two files under /tmp.
>>>>>>>>>
>>>>>>>>> -rw-r--r-- 1 ywatanabe ywatanabe    185 Sep 12 19:56
>>>>>>>>> py-wordcount-direct-00001-of-00002
>>>>>>>>> -rw-r--r-- 1 ywatanabe ywatanabe    190 Sep 12 19:56
>>>>>>>>> py-wordcount-direct-00000-of-00002
>>>>>>>>>
>>>>>>>>> Then I wrote sample code with below steps.
>>>>>>>>>
>>>>>>>>> 1.Install apache_beam using pip3 separate from source code
>>>>>>>>> directory.
>>>>>>>>> 2. Wrote sample code as below and named it
>>>>>>>>> "test-protable-runner.py".  Placed it separate directory from source code.
>>>>>>>>>
>>>>>>>>> -----------------------------------------------------------------------------------
>>>>>>>>> (python) ywatanabe@debian-09-00:~$ ls -ltr
>>>>>>>>> total 16
>>>>>>>>> drwxr-xr-x 18 ywatanabe ywatanabe 4096 Sep 12 19:06 beam (<-
>>>>>>>>> source code directory)
>>>>>>>>> -rw-r--r--  1 ywatanabe ywatanabe  634 Sep 12 20:25
>>>>>>>>> test-portable-runner.py
>>>>>>>>>
>>>>>>>>> -----------------------------------------------------------------------------------
>>>>>>>>> 3. Executed the code with "python3 test-protable-ruuner.py"
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ==========================================================================================
>>>>>>>>> #!/usr/bin/env
>>>>>>>>>
>>>>>>>>> import apache_beam as beam
>>>>>>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>>>>>>> from apache_beam.io import WriteToText
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> def printMsg(line):
>>>>>>>>>
>>>>>>>>>     print("OUTPUT: {0}".format(line))
>>>>>>>>>
>>>>>>>>>     return line
>>>>>>>>>
>>>>>>>>> options = PipelineOptions(["--runner=PortableRunner",
>>>>>>>>> "--job_endpoint=localhost:8099", "--shutdown_sources_on_final_watermark"])
>>>>>>>>>
>>>>>>>>> p = beam.Pipeline(options=options)
>>>>>>>>>
>>>>>>>>> output = ( p | 'create' >> beam.Create(["a", "b", "c"])
>>>>>>>>>              | beam.Map(printMsg)
>>>>>>>>>          )
>>>>>>>>>
>>>>>>>>> output | 'write' >> WriteToText('/tmp/sample.txt')
>>>>>>>>>
>>>>>>>>> =======================================================================================
>>>>>>>>>
>>>>>>>>> Job seemed to went all the way to "FINISHED" state.
>>>>>>>>>
>>>>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>>>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>>>>>>>>> DataSource (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) [DEPLOYING].
>>>>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>>>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>>>>>>>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING to RUNNING.
>>>>>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource
>>>>>>>>> (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING
>>>>>>>>> to RUNNING.
>>>>>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>>>>>>>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>>>>>>>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>>>>>>>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from CREATED to SCHEDULED.
>>>>>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>>>>>>>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>>>>>>>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>>>>>>>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from SCHEDULED to
>>>>>>>>> DEPLOYING.
>>>>>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying CHAIN
>>>>>>>>> MapPartition (MapPartition at
>>>>>>>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>>>>>>>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1) (attempt #0)
>>>>>>>>> to aad5f142-dbb7-4900-a5ae-af8a6fdcc787 @ localhost (dataPort=-1)
>>>>>>>>> [flink-akka.actor.default-dispatcher-2] INFO
>>>>>>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task CHAIN
>>>>>>>>> MapPartition (MapPartition at
>>>>>>>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>>>>>>>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1).
>>>>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>>>>> org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task
>>>>>>>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>>>>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>>>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>>>>>>>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>>>>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>>>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>>>>>>>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from RUNNING to FINISHED.
>>>>>>>>>
>>>>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>>>>
>>>>>>>>> But I ended up with docker error on client side.
>>>>>>>>>
>>>>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>>>> (python) ywatanabe@debian-09-00:~$ python3 test-portable-runner.py
>>>>>>>>> /home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/__init__.py:84:
>>>>>>>>> UserWarning: Some syntactic constructs of Python 3 are not yet fully
>>>>>>>>> supported by Apache Beam.
>>>>>>>>>   'Some syntactic constructs of Python 3 are not yet fully
>>>>>>>>> supported by '
>>>>>>>>> ERROR:root:java.io.IOException: Received exit code 125 for command
>>>>>>>>> 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>>>>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>>>>>>>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>>>>>>>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>>>>>>>>> stderr: Unable to find image '
>>>>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest'
>>>>>>>>> locallydocker: Error response from daemon: unknown: Subject ywatanabe was
>>>>>>>>> not found.See 'docker run --help'.
>>>>>>>>> Traceback (most recent call last):
>>>>>>>>>   File "test-portable-runner.py", line 27, in <module>
>>>>>>>>>     result.wait_until_finish()
>>>>>>>>>   File
>>>>>>>>> "/home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/runners/portability/portable_runner.py",
>>>>>>>>> line 446, in wait_until_finish
>>>>>>>>>     self._job_id, self._state, self._last_error_message()))
>>>>>>>>> RuntimeError: Pipeline
>>>>>>>>> BeamApp-ywatanabe-0912112516-22d84d63_d3b5e778-d771-4118-9ba7-c9dde85938e9
>>>>>>>>> failed in state FAILED: java.io.IOException: Received exit code 125 for
>>>>>>>>> command 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>>>>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>>>>>>>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>>>>>>>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>>>>>>>>> stderr: Unable to find image '
>>>>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest'
>>>>>>>>> locallydocker: Error response from daemon: unknown: Subject ywatanabe was
>>>>>>>>> not found.See 'docker run --help'.
>>>>>>>>>
>>>>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>>>>
>>>>>>>>> As a result , I got nothing under /tmp . Code works when using
>>>>>>>>> DirectRunner.
>>>>>>>>> May I ask , where should I look for in order to get the pipeline
>>>>>>>>> to write results to text files under /tmp ?
>>>>>>>>>
>>>>>>>>> Best Regards,
>>>>>>>>> Yu Watanabe
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Yu Watanabe
>>>>>>>>> Weekend Freelancer who loves to challenge building data platform
>>>>>>>>> yu.w.tennis@gmail.com
>>>>>>>>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
>>>>>>>>> Twitter icon] <https://twitter.com/yuwtennis>
>>>>>>>>>
>>>>>>>>

Re: How do you write portable runner pipeline on separate python code ?

Posted by Robert Bradshaw <ro...@google.com>.
Note that loopback won't fix the problem for, say, cross-language IOs. But,
yes, it's really handy and should probably be used more.

On Fri, Sep 13, 2019 at 8:29 AM Lukasz Cwik <lc...@google.com> wrote:

> And/or update the wiki/website with some how to's...
>
> On Fri, Sep 13, 2019 at 7:51 AM Thomas Weise <th...@apache.org> wrote:
>
>> I agree that loopback would be preferable for this purpose. I just wasn't
>> aware this even works with the portable Flink runner. Is it one of the best
>> guarded secrets? ;-)
>>
>> Kyle, can you please post the pipeline options you would use for Flink?
>>
>>
>> On Thu, Sep 12, 2019 at 5:57 PM Kyle Weaver <kc...@google.com> wrote:
>>
>>> I prefer loopback because a) it writes output files to the local
>>> filesystem, as the user expects, and b) you don't have to pull or build
>>> docker images, or even have docker installed on your system -- which is one
>>> less point of failure.
>>>
>>> Kyle Weaver | Software Engineer | github.com/ibzib | kcweaver@google.com
>>>
>>>
>>> On Thu, Sep 12, 2019 at 5:48 PM Thomas Weise <th...@apache.org> wrote:
>>>
>>>> This should become much better with 2.16 when we have the Docker images
>>>> prebuilt.
>>>>
>>>> Docker is probably still the best option for Python on a JVM based
>>>> runner in a local environment that does not have a development setup.
>>>>
>>>>
>>>> On Thu, Sep 12, 2019 at 1:09 PM Kyle Weaver <kc...@google.com>
>>>> wrote:
>>>>
>>>>> +dev <de...@beam.apache.org> I think we should probably point new users
>>>>> of the portable Flink/Spark runners to use loopback or some other
>>>>> non-docker environment, as Docker adds some operational complexity that
>>>>> isn't really needed to run a word count example. For example, Yu's pipeline
>>>>> errored here because the expected Docker container wasn't built before
>>>>> running.
>>>>>
>>>>> Kyle Weaver | Software Engineer | github.com/ibzib |
>>>>> kcweaver@google.com
>>>>>
>>>>>
>>>>> On Thu, Sep 12, 2019 at 11:27 AM Robert Bradshaw <ro...@google.com>
>>>>> wrote:
>>>>>
>>>>>> On this note, making local files easy to read is something we'd
>>>>>> definitely like to improve, as the current behavior is quite surprising.
>>>>>> This could be useful not just for running with docker and the portable
>>>>>> runner locally, but more generally when running on a distributed system
>>>>>> (e.g. a Flink/Spark cluster or Dataflow). It would be very convenient if we
>>>>>> could automatically stage local files to be read as artifacts that could be
>>>>>> consumed by any worker (possibly via external directory mounting in the
>>>>>> local docker case rather than an actual copy), and conversely copy small
>>>>>> outputs back to the local machine (with the similar optimization for local
>>>>>> docker).
>>>>>>
>>>>>> At the very least, however, obvious messaging when the local
>>>>>> filesystem is used from within docker, which is often a (non-obvious and
>>>>>> hard to debug) mistake should be added.
>>>>>>
>>>>>>
>>>>>> On Thu, Sep 12, 2019 at 10:34 AM Lukasz Cwik <lc...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> When you use a local filesystem path and a docker environment,
>>>>>>> "/tmp" is written inside the container. You can solve this issue by:
>>>>>>> * Using a "remote" filesystem such as HDFS/S3/GCS/...
>>>>>>> * Mounting an external directory into the container so that any
>>>>>>> "local" writes appear outside the container
>>>>>>> * Using a non-docker environment such as external or process.
>>>>>>>
>>>>>>> On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe <yu...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello.
>>>>>>>>
>>>>>>>> I would like to ask for help with my sample code using portable
>>>>>>>> runner using apache flink.
>>>>>>>> I was able to work out the wordcount.py using this page.
>>>>>>>>
>>>>>>>> https://beam.apache.org/roadmap/portability/
>>>>>>>>
>>>>>>>> I got below two files under /tmp.
>>>>>>>>
>>>>>>>> -rw-r--r-- 1 ywatanabe ywatanabe    185 Sep 12 19:56
>>>>>>>> py-wordcount-direct-00001-of-00002
>>>>>>>> -rw-r--r-- 1 ywatanabe ywatanabe    190 Sep 12 19:56
>>>>>>>> py-wordcount-direct-00000-of-00002
>>>>>>>>
>>>>>>>> Then I wrote sample code with below steps.
>>>>>>>>
>>>>>>>> 1.Install apache_beam using pip3 separate from source code
>>>>>>>> directory.
>>>>>>>> 2. Wrote sample code as below and named it
>>>>>>>> "test-protable-runner.py".  Placed it separate directory from source code.
>>>>>>>>
>>>>>>>> -----------------------------------------------------------------------------------
>>>>>>>> (python) ywatanabe@debian-09-00:~$ ls -ltr
>>>>>>>> total 16
>>>>>>>> drwxr-xr-x 18 ywatanabe ywatanabe 4096 Sep 12 19:06 beam (<- source
>>>>>>>> code directory)
>>>>>>>> -rw-r--r--  1 ywatanabe ywatanabe  634 Sep 12 20:25
>>>>>>>> test-portable-runner.py
>>>>>>>>
>>>>>>>> -----------------------------------------------------------------------------------
>>>>>>>> 3. Executed the code with "python3 test-protable-ruuner.py"
>>>>>>>>
>>>>>>>>
>>>>>>>> ==========================================================================================
>>>>>>>> #!/usr/bin/env
>>>>>>>>
>>>>>>>> import apache_beam as beam
>>>>>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>>>>>> from apache_beam.io import WriteToText
>>>>>>>>
>>>>>>>>
>>>>>>>> def printMsg(line):
>>>>>>>>
>>>>>>>>     print("OUTPUT: {0}".format(line))
>>>>>>>>
>>>>>>>>     return line
>>>>>>>>
>>>>>>>> options = PipelineOptions(["--runner=PortableRunner",
>>>>>>>> "--job_endpoint=localhost:8099", "--shutdown_sources_on_final_watermark"])
>>>>>>>>
>>>>>>>> p = beam.Pipeline(options=options)
>>>>>>>>
>>>>>>>> output = ( p | 'create' >> beam.Create(["a", "b", "c"])
>>>>>>>>              | beam.Map(printMsg)
>>>>>>>>          )
>>>>>>>>
>>>>>>>> output | 'write' >> WriteToText('/tmp/sample.txt')
>>>>>>>>
>>>>>>>> =======================================================================================
>>>>>>>>
>>>>>>>> Job seemed to went all the way to "FINISHED" state.
>>>>>>>>
>>>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>>>>>>>> DataSource (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) [DEPLOYING].
>>>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>>>>>>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING to RUNNING.
>>>>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource
>>>>>>>> (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING
>>>>>>>> to RUNNING.
>>>>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>>>>>>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>>>>>>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>>>>>>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from CREATED to SCHEDULED.
>>>>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>>>>>>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>>>>>>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>>>>>>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from SCHEDULED to
>>>>>>>> DEPLOYING.
>>>>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying CHAIN
>>>>>>>> MapPartition (MapPartition at
>>>>>>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>>>>>>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1) (attempt #0)
>>>>>>>> to aad5f142-dbb7-4900-a5ae-af8a6fdcc787 @ localhost (dataPort=-1)
>>>>>>>> [flink-akka.actor.default-dispatcher-2] INFO
>>>>>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task CHAIN
>>>>>>>> MapPartition (MapPartition at
>>>>>>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>>>>>>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1).
>>>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>>>> org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task
>>>>>>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>>>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>>>>>>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>>>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>>>>>>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from RUNNING to FINISHED.
>>>>>>>>
>>>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>>>
>>>>>>>> But I ended up with docker error on client side.
>>>>>>>>
>>>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>>> (python) ywatanabe@debian-09-00:~$ python3 test-portable-runner.py
>>>>>>>> /home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/__init__.py:84:
>>>>>>>> UserWarning: Some syntactic constructs of Python 3 are not yet fully
>>>>>>>> supported by Apache Beam.
>>>>>>>>   'Some syntactic constructs of Python 3 are not yet fully
>>>>>>>> supported by '
>>>>>>>> ERROR:root:java.io.IOException: Received exit code 125 for command
>>>>>>>> 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>>>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>>>>>>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>>>>>>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>>>>>>>> stderr: Unable to find image '
>>>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest'
>>>>>>>> locallydocker: Error response from daemon: unknown: Subject ywatanabe was
>>>>>>>> not found.See 'docker run --help'.
>>>>>>>> Traceback (most recent call last):
>>>>>>>>   File "test-portable-runner.py", line 27, in <module>
>>>>>>>>     result.wait_until_finish()
>>>>>>>>   File
>>>>>>>> "/home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/runners/portability/portable_runner.py",
>>>>>>>> line 446, in wait_until_finish
>>>>>>>>     self._job_id, self._state, self._last_error_message()))
>>>>>>>> RuntimeError: Pipeline
>>>>>>>> BeamApp-ywatanabe-0912112516-22d84d63_d3b5e778-d771-4118-9ba7-c9dde85938e9
>>>>>>>> failed in state FAILED: java.io.IOException: Received exit code 125 for
>>>>>>>> command 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>>>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>>>>>>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>>>>>>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>>>>>>>> stderr: Unable to find image '
>>>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest'
>>>>>>>> locallydocker: Error response from daemon: unknown: Subject ywatanabe was
>>>>>>>> not found.See 'docker run --help'.
>>>>>>>>
>>>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>>>
>>>>>>>> As a result , I got nothing under /tmp . Code works when using
>>>>>>>> DirectRunner.
>>>>>>>> May I ask , where should I look for in order to get the pipeline to
>>>>>>>> write results to text files under /tmp ?
>>>>>>>>
>>>>>>>> Best Regards,
>>>>>>>> Yu Watanabe
>>>>>>>>
>>>>>>>> --
>>>>>>>> Yu Watanabe
>>>>>>>> Weekend Freelancer who loves to challenge building data platform
>>>>>>>> yu.w.tennis@gmail.com
>>>>>>>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
>>>>>>>> Twitter icon] <https://twitter.com/yuwtennis>
>>>>>>>>
>>>>>>>

Re: How do you write portable runner pipeline on separate python code ?

Posted by Robert Bradshaw <ro...@google.com>.
Note that loopback won't fix the problem for, say, cross-language IOs. But,
yes, it's really handy and should probably be used more.

On Fri, Sep 13, 2019 at 8:29 AM Lukasz Cwik <lc...@google.com> wrote:

> And/or update the wiki/website with some how to's...
>
> On Fri, Sep 13, 2019 at 7:51 AM Thomas Weise <th...@apache.org> wrote:
>
>> I agree that loopback would be preferable for this purpose. I just wasn't
>> aware this even works with the portable Flink runner. Is it one of the best
>> guarded secrets? ;-)
>>
>> Kyle, can you please post the pipeline options you would use for Flink?
>>
>>
>> On Thu, Sep 12, 2019 at 5:57 PM Kyle Weaver <kc...@google.com> wrote:
>>
>>> I prefer loopback because a) it writes output files to the local
>>> filesystem, as the user expects, and b) you don't have to pull or build
>>> docker images, or even have docker installed on your system -- which is one
>>> less point of failure.
>>>
>>> Kyle Weaver | Software Engineer | github.com/ibzib | kcweaver@google.com
>>>
>>>
>>> On Thu, Sep 12, 2019 at 5:48 PM Thomas Weise <th...@apache.org> wrote:
>>>
>>>> This should become much better with 2.16 when we have the Docker images
>>>> prebuilt.
>>>>
>>>> Docker is probably still the best option for Python on a JVM based
>>>> runner in a local environment that does not have a development setup.
>>>>
>>>>
>>>> On Thu, Sep 12, 2019 at 1:09 PM Kyle Weaver <kc...@google.com>
>>>> wrote:
>>>>
>>>>> +dev <de...@beam.apache.org> I think we should probably point new users
>>>>> of the portable Flink/Spark runners to use loopback or some other
>>>>> non-docker environment, as Docker adds some operational complexity that
>>>>> isn't really needed to run a word count example. For example, Yu's pipeline
>>>>> errored here because the expected Docker container wasn't built before
>>>>> running.
>>>>>
>>>>> Kyle Weaver | Software Engineer | github.com/ibzib |
>>>>> kcweaver@google.com
>>>>>
>>>>>
>>>>> On Thu, Sep 12, 2019 at 11:27 AM Robert Bradshaw <ro...@google.com>
>>>>> wrote:
>>>>>
>>>>>> On this note, making local files easy to read is something we'd
>>>>>> definitely like to improve, as the current behavior is quite surprising.
>>>>>> This could be useful not just for running with docker and the portable
>>>>>> runner locally, but more generally when running on a distributed system
>>>>>> (e.g. a Flink/Spark cluster or Dataflow). It would be very convenient if we
>>>>>> could automatically stage local files to be read as artifacts that could be
>>>>>> consumed by any worker (possibly via external directory mounting in the
>>>>>> local docker case rather than an actual copy), and conversely copy small
>>>>>> outputs back to the local machine (with the similar optimization for local
>>>>>> docker).
>>>>>>
>>>>>> At the very least, however, obvious messaging when the local
>>>>>> filesystem is used from within docker, which is often a (non-obvious and
>>>>>> hard to debug) mistake should be added.
>>>>>>
>>>>>>
>>>>>> On Thu, Sep 12, 2019 at 10:34 AM Lukasz Cwik <lc...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> When you use a local filesystem path and a docker environment,
>>>>>>> "/tmp" is written inside the container. You can solve this issue by:
>>>>>>> * Using a "remote" filesystem such as HDFS/S3/GCS/...
>>>>>>> * Mounting an external directory into the container so that any
>>>>>>> "local" writes appear outside the container
>>>>>>> * Using a non-docker environment such as external or process.
>>>>>>>
>>>>>>> On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe <yu...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello.
>>>>>>>>
>>>>>>>> I would like to ask for help with my sample code using portable
>>>>>>>> runner using apache flink.
>>>>>>>> I was able to work out the wordcount.py using this page.
>>>>>>>>
>>>>>>>> https://beam.apache.org/roadmap/portability/
>>>>>>>>
>>>>>>>> I got below two files under /tmp.
>>>>>>>>
>>>>>>>> -rw-r--r-- 1 ywatanabe ywatanabe    185 Sep 12 19:56
>>>>>>>> py-wordcount-direct-00001-of-00002
>>>>>>>> -rw-r--r-- 1 ywatanabe ywatanabe    190 Sep 12 19:56
>>>>>>>> py-wordcount-direct-00000-of-00002
>>>>>>>>
>>>>>>>> Then I wrote sample code with below steps.
>>>>>>>>
>>>>>>>> 1.Install apache_beam using pip3 separate from source code
>>>>>>>> directory.
>>>>>>>> 2. Wrote sample code as below and named it
>>>>>>>> "test-protable-runner.py".  Placed it separate directory from source code.
>>>>>>>>
>>>>>>>> -----------------------------------------------------------------------------------
>>>>>>>> (python) ywatanabe@debian-09-00:~$ ls -ltr
>>>>>>>> total 16
>>>>>>>> drwxr-xr-x 18 ywatanabe ywatanabe 4096 Sep 12 19:06 beam (<- source
>>>>>>>> code directory)
>>>>>>>> -rw-r--r--  1 ywatanabe ywatanabe  634 Sep 12 20:25
>>>>>>>> test-portable-runner.py
>>>>>>>>
>>>>>>>> -----------------------------------------------------------------------------------
>>>>>>>> 3. Executed the code with "python3 test-protable-ruuner.py"
>>>>>>>>
>>>>>>>>
>>>>>>>> ==========================================================================================
>>>>>>>> #!/usr/bin/env
>>>>>>>>
>>>>>>>> import apache_beam as beam
>>>>>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>>>>>> from apache_beam.io import WriteToText
>>>>>>>>
>>>>>>>>
>>>>>>>> def printMsg(line):
>>>>>>>>
>>>>>>>>     print("OUTPUT: {0}".format(line))
>>>>>>>>
>>>>>>>>     return line
>>>>>>>>
>>>>>>>> options = PipelineOptions(["--runner=PortableRunner",
>>>>>>>> "--job_endpoint=localhost:8099", "--shutdown_sources_on_final_watermark"])
>>>>>>>>
>>>>>>>> p = beam.Pipeline(options=options)
>>>>>>>>
>>>>>>>> output = ( p | 'create' >> beam.Create(["a", "b", "c"])
>>>>>>>>              | beam.Map(printMsg)
>>>>>>>>          )
>>>>>>>>
>>>>>>>> output | 'write' >> WriteToText('/tmp/sample.txt')
>>>>>>>>
>>>>>>>> =======================================================================================
>>>>>>>>
>>>>>>>> Job seemed to went all the way to "FINISHED" state.
>>>>>>>>
>>>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>>>>>>>> DataSource (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) [DEPLOYING].
>>>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>>>>>>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING to RUNNING.
>>>>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource
>>>>>>>> (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING
>>>>>>>> to RUNNING.
>>>>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>>>>>>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>>>>>>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>>>>>>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from CREATED to SCHEDULED.
>>>>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>>>>>>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>>>>>>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>>>>>>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from SCHEDULED to
>>>>>>>> DEPLOYING.
>>>>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying CHAIN
>>>>>>>> MapPartition (MapPartition at
>>>>>>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>>>>>>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1) (attempt #0)
>>>>>>>> to aad5f142-dbb7-4900-a5ae-af8a6fdcc787 @ localhost (dataPort=-1)
>>>>>>>> [flink-akka.actor.default-dispatcher-2] INFO
>>>>>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task CHAIN
>>>>>>>> MapPartition (MapPartition at
>>>>>>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>>>>>>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1).
>>>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>>>> org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task
>>>>>>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>>>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>>>>>>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>>>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>>>>>>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from RUNNING to FINISHED.
>>>>>>>>
>>>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>>>
>>>>>>>> But I ended up with docker error on client side.
>>>>>>>>
>>>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>>> (python) ywatanabe@debian-09-00:~$ python3 test-portable-runner.py
>>>>>>>> /home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/__init__.py:84:
>>>>>>>> UserWarning: Some syntactic constructs of Python 3 are not yet fully
>>>>>>>> supported by Apache Beam.
>>>>>>>>   'Some syntactic constructs of Python 3 are not yet fully
>>>>>>>> supported by '
>>>>>>>> ERROR:root:java.io.IOException: Received exit code 125 for command
>>>>>>>> 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>>>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>>>>>>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>>>>>>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>>>>>>>> stderr: Unable to find image '
>>>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest'
>>>>>>>> locallydocker: Error response from daemon: unknown: Subject ywatanabe was
>>>>>>>> not found.See 'docker run --help'.
>>>>>>>> Traceback (most recent call last):
>>>>>>>>   File "test-portable-runner.py", line 27, in <module>
>>>>>>>>     result.wait_until_finish()
>>>>>>>>   File
>>>>>>>> "/home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/runners/portability/portable_runner.py",
>>>>>>>> line 446, in wait_until_finish
>>>>>>>>     self._job_id, self._state, self._last_error_message()))
>>>>>>>> RuntimeError: Pipeline
>>>>>>>> BeamApp-ywatanabe-0912112516-22d84d63_d3b5e778-d771-4118-9ba7-c9dde85938e9
>>>>>>>> failed in state FAILED: java.io.IOException: Received exit code 125 for
>>>>>>>> command 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>>>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>>>>>>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>>>>>>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>>>>>>>> stderr: Unable to find image '
>>>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest'
>>>>>>>> locallydocker: Error response from daemon: unknown: Subject ywatanabe was
>>>>>>>> not found.See 'docker run --help'.
>>>>>>>>
>>>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>>>
>>>>>>>> As a result , I got nothing under /tmp . Code works when using
>>>>>>>> DirectRunner.
>>>>>>>> May I ask , where should I look for in order to get the pipeline to
>>>>>>>> write results to text files under /tmp ?
>>>>>>>>
>>>>>>>> Best Regards,
>>>>>>>> Yu Watanabe
>>>>>>>>
>>>>>>>> --
>>>>>>>> Yu Watanabe
>>>>>>>> Weekend Freelancer who loves to challenge building data platform
>>>>>>>> yu.w.tennis@gmail.com
>>>>>>>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
>>>>>>>> Twitter icon] <https://twitter.com/yuwtennis>
>>>>>>>>
>>>>>>>

Re: How do you write portable runner pipeline on separate python code ?

Posted by Lukasz Cwik <lc...@google.com>.
And/or update the wiki/website with some how to's...

On Fri, Sep 13, 2019 at 7:51 AM Thomas Weise <th...@apache.org> wrote:

> I agree that loopback would be preferable for this purpose. I just wasn't
> aware this even works with the portable Flink runner. Is it one of the best
> guarded secrets? ;-)
>
> Kyle, can you please post the pipeline options you would use for Flink?
>
>
> On Thu, Sep 12, 2019 at 5:57 PM Kyle Weaver <kc...@google.com> wrote:
>
>> I prefer loopback because a) it writes output files to the local
>> filesystem, as the user expects, and b) you don't have to pull or build
>> docker images, or even have docker installed on your system -- which is one
>> less point of failure.
>>
>> Kyle Weaver | Software Engineer | github.com/ibzib | kcweaver@google.com
>>
>>
>> On Thu, Sep 12, 2019 at 5:48 PM Thomas Weise <th...@apache.org> wrote:
>>
>>> This should become much better with 2.16 when we have the Docker images
>>> prebuilt.
>>>
>>> Docker is probably still the best option for Python on a JVM based
>>> runner in a local environment that does not have a development setup.
>>>
>>>
>>> On Thu, Sep 12, 2019 at 1:09 PM Kyle Weaver <kc...@google.com> wrote:
>>>
>>>> +dev <de...@beam.apache.org> I think we should probably point new users
>>>> of the portable Flink/Spark runners to use loopback or some other
>>>> non-docker environment, as Docker adds some operational complexity that
>>>> isn't really needed to run a word count example. For example, Yu's pipeline
>>>> errored here because the expected Docker container wasn't built before
>>>> running.
>>>>
>>>> Kyle Weaver | Software Engineer | github.com/ibzib |
>>>> kcweaver@google.com
>>>>
>>>>
>>>> On Thu, Sep 12, 2019 at 11:27 AM Robert Bradshaw <ro...@google.com>
>>>> wrote:
>>>>
>>>>> On this note, making local files easy to read is something we'd
>>>>> definitely like to improve, as the current behavior is quite surprising.
>>>>> This could be useful not just for running with docker and the portable
>>>>> runner locally, but more generally when running on a distributed system
>>>>> (e.g. a Flink/Spark cluster or Dataflow). It would be very convenient if we
>>>>> could automatically stage local files to be read as artifacts that could be
>>>>> consumed by any worker (possibly via external directory mounting in the
>>>>> local docker case rather than an actual copy), and conversely copy small
>>>>> outputs back to the local machine (with the similar optimization for local
>>>>> docker).
>>>>>
>>>>> At the very least, however, obvious messaging when the local
>>>>> filesystem is used from within docker, which is often a (non-obvious and
>>>>> hard to debug) mistake should be added.
>>>>>
>>>>>
>>>>> On Thu, Sep 12, 2019 at 10:34 AM Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> When you use a local filesystem path and a docker environment, "/tmp"
>>>>>> is written inside the container. You can solve this issue by:
>>>>>> * Using a "remote" filesystem such as HDFS/S3/GCS/...
>>>>>> * Mounting an external directory into the container so that any
>>>>>> "local" writes appear outside the container
>>>>>> * Using a non-docker environment such as external or process.
>>>>>>
>>>>>> On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe <yu...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello.
>>>>>>>
>>>>>>> I would like to ask for help with my sample code using portable
>>>>>>> runner using apache flink.
>>>>>>> I was able to work out the wordcount.py using this page.
>>>>>>>
>>>>>>> https://beam.apache.org/roadmap/portability/
>>>>>>>
>>>>>>> I got below two files under /tmp.
>>>>>>>
>>>>>>> -rw-r--r-- 1 ywatanabe ywatanabe    185 Sep 12 19:56
>>>>>>> py-wordcount-direct-00001-of-00002
>>>>>>> -rw-r--r-- 1 ywatanabe ywatanabe    190 Sep 12 19:56
>>>>>>> py-wordcount-direct-00000-of-00002
>>>>>>>
>>>>>>> Then I wrote sample code with below steps.
>>>>>>>
>>>>>>> 1.Install apache_beam using pip3 separate from source code directory.
>>>>>>> 2. Wrote sample code as below and named it
>>>>>>> "test-protable-runner.py".  Placed it separate directory from source code.
>>>>>>>
>>>>>>> -----------------------------------------------------------------------------------
>>>>>>> (python) ywatanabe@debian-09-00:~$ ls -ltr
>>>>>>> total 16
>>>>>>> drwxr-xr-x 18 ywatanabe ywatanabe 4096 Sep 12 19:06 beam (<- source
>>>>>>> code directory)
>>>>>>> -rw-r--r--  1 ywatanabe ywatanabe  634 Sep 12 20:25
>>>>>>> test-portable-runner.py
>>>>>>>
>>>>>>> -----------------------------------------------------------------------------------
>>>>>>> 3. Executed the code with "python3 test-protable-ruuner.py"
>>>>>>>
>>>>>>>
>>>>>>> ==========================================================================================
>>>>>>> #!/usr/bin/env
>>>>>>>
>>>>>>> import apache_beam as beam
>>>>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>>>>> from apache_beam.io import WriteToText
>>>>>>>
>>>>>>>
>>>>>>> def printMsg(line):
>>>>>>>
>>>>>>>     print("OUTPUT: {0}".format(line))
>>>>>>>
>>>>>>>     return line
>>>>>>>
>>>>>>> options = PipelineOptions(["--runner=PortableRunner",
>>>>>>> "--job_endpoint=localhost:8099", "--shutdown_sources_on_final_watermark"])
>>>>>>>
>>>>>>> p = beam.Pipeline(options=options)
>>>>>>>
>>>>>>> output = ( p | 'create' >> beam.Create(["a", "b", "c"])
>>>>>>>              | beam.Map(printMsg)
>>>>>>>          )
>>>>>>>
>>>>>>> output | 'write' >> WriteToText('/tmp/sample.txt')
>>>>>>>
>>>>>>> =======================================================================================
>>>>>>>
>>>>>>> Job seemed to went all the way to "FINISHED" state.
>>>>>>>
>>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>>>>>>> DataSource (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) [DEPLOYING].
>>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>>>>>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING to RUNNING.
>>>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource
>>>>>>> (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING
>>>>>>> to RUNNING.
>>>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>>>>>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>>>>>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>>>>>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from CREATED to SCHEDULED.
>>>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>>>>>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>>>>>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>>>>>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from SCHEDULED to
>>>>>>> DEPLOYING.
>>>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying CHAIN
>>>>>>> MapPartition (MapPartition at
>>>>>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>>>>>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1) (attempt #0)
>>>>>>> to aad5f142-dbb7-4900-a5ae-af8a6fdcc787 @ localhost (dataPort=-1)
>>>>>>> [flink-akka.actor.default-dispatcher-2] INFO
>>>>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task CHAIN
>>>>>>> MapPartition (MapPartition at
>>>>>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>>>>>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1).
>>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>>> org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task
>>>>>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>>>>>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>>>>>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from RUNNING to FINISHED.
>>>>>>>
>>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>>
>>>>>>> But I ended up with docker error on client side.
>>>>>>>
>>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>> (python) ywatanabe@debian-09-00:~$ python3 test-portable-runner.py
>>>>>>> /home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/__init__.py:84:
>>>>>>> UserWarning: Some syntactic constructs of Python 3 are not yet fully
>>>>>>> supported by Apache Beam.
>>>>>>>   'Some syntactic constructs of Python 3 are not yet fully supported
>>>>>>> by '
>>>>>>> ERROR:root:java.io.IOException: Received exit code 125 for command
>>>>>>> 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>>>>>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>>>>>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>>>>>>> stderr: Unable to find image '
>>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest'
>>>>>>> locallydocker: Error response from daemon: unknown: Subject ywatanabe was
>>>>>>> not found.See 'docker run --help'.
>>>>>>> Traceback (most recent call last):
>>>>>>>   File "test-portable-runner.py", line 27, in <module>
>>>>>>>     result.wait_until_finish()
>>>>>>>   File
>>>>>>> "/home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/runners/portability/portable_runner.py",
>>>>>>> line 446, in wait_until_finish
>>>>>>>     self._job_id, self._state, self._last_error_message()))
>>>>>>> RuntimeError: Pipeline
>>>>>>> BeamApp-ywatanabe-0912112516-22d84d63_d3b5e778-d771-4118-9ba7-c9dde85938e9
>>>>>>> failed in state FAILED: java.io.IOException: Received exit code 125 for
>>>>>>> command 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>>>>>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>>>>>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>>>>>>> stderr: Unable to find image '
>>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest'
>>>>>>> locallydocker: Error response from daemon: unknown: Subject ywatanabe was
>>>>>>> not found.See 'docker run --help'.
>>>>>>>
>>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>>
>>>>>>> As a result , I got nothing under /tmp . Code works when using
>>>>>>> DirectRunner.
>>>>>>> May I ask , where should I look for in order to get the pipeline to
>>>>>>> write results to text files under /tmp ?
>>>>>>>
>>>>>>> Best Regards,
>>>>>>> Yu Watanabe
>>>>>>>
>>>>>>> --
>>>>>>> Yu Watanabe
>>>>>>> Weekend Freelancer who loves to challenge building data platform
>>>>>>> yu.w.tennis@gmail.com
>>>>>>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
>>>>>>> Twitter icon] <https://twitter.com/yuwtennis>
>>>>>>>
>>>>>>

Re: How do you write portable runner pipeline on separate python code ?

Posted by Lukasz Cwik <lc...@google.com>.
And/or update the wiki/website with some how to's...

On Fri, Sep 13, 2019 at 7:51 AM Thomas Weise <th...@apache.org> wrote:

> I agree that loopback would be preferable for this purpose. I just wasn't
> aware this even works with the portable Flink runner. Is it one of the best
> guarded secrets? ;-)
>
> Kyle, can you please post the pipeline options you would use for Flink?
>
>
> On Thu, Sep 12, 2019 at 5:57 PM Kyle Weaver <kc...@google.com> wrote:
>
>> I prefer loopback because a) it writes output files to the local
>> filesystem, as the user expects, and b) you don't have to pull or build
>> docker images, or even have docker installed on your system -- which is one
>> less point of failure.
>>
>> Kyle Weaver | Software Engineer | github.com/ibzib | kcweaver@google.com
>>
>>
>> On Thu, Sep 12, 2019 at 5:48 PM Thomas Weise <th...@apache.org> wrote:
>>
>>> This should become much better with 2.16 when we have the Docker images
>>> prebuilt.
>>>
>>> Docker is probably still the best option for Python on a JVM based
>>> runner in a local environment that does not have a development setup.
>>>
>>>
>>> On Thu, Sep 12, 2019 at 1:09 PM Kyle Weaver <kc...@google.com> wrote:
>>>
>>>> +dev <de...@beam.apache.org> I think we should probably point new users
>>>> of the portable Flink/Spark runners to use loopback or some other
>>>> non-docker environment, as Docker adds some operational complexity that
>>>> isn't really needed to run a word count example. For example, Yu's pipeline
>>>> errored here because the expected Docker container wasn't built before
>>>> running.
>>>>
>>>> Kyle Weaver | Software Engineer | github.com/ibzib |
>>>> kcweaver@google.com
>>>>
>>>>
>>>> On Thu, Sep 12, 2019 at 11:27 AM Robert Bradshaw <ro...@google.com>
>>>> wrote:
>>>>
>>>>> On this note, making local files easy to read is something we'd
>>>>> definitely like to improve, as the current behavior is quite surprising.
>>>>> This could be useful not just for running with docker and the portable
>>>>> runner locally, but more generally when running on a distributed system
>>>>> (e.g. a Flink/Spark cluster or Dataflow). It would be very convenient if we
>>>>> could automatically stage local files to be read as artifacts that could be
>>>>> consumed by any worker (possibly via external directory mounting in the
>>>>> local docker case rather than an actual copy), and conversely copy small
>>>>> outputs back to the local machine (with the similar optimization for local
>>>>> docker).
>>>>>
>>>>> At the very least, however, obvious messaging when the local
>>>>> filesystem is used from within docker, which is often a (non-obvious and
>>>>> hard to debug) mistake should be added.
>>>>>
>>>>>
>>>>> On Thu, Sep 12, 2019 at 10:34 AM Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> When you use a local filesystem path and a docker environment, "/tmp"
>>>>>> is written inside the container. You can solve this issue by:
>>>>>> * Using a "remote" filesystem such as HDFS/S3/GCS/...
>>>>>> * Mounting an external directory into the container so that any
>>>>>> "local" writes appear outside the container
>>>>>> * Using a non-docker environment such as external or process.
>>>>>>
>>>>>> On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe <yu...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello.
>>>>>>>
>>>>>>> I would like to ask for help with my sample code using portable
>>>>>>> runner using apache flink.
>>>>>>> I was able to work out the wordcount.py using this page.
>>>>>>>
>>>>>>> https://beam.apache.org/roadmap/portability/
>>>>>>>
>>>>>>> I got below two files under /tmp.
>>>>>>>
>>>>>>> -rw-r--r-- 1 ywatanabe ywatanabe    185 Sep 12 19:56
>>>>>>> py-wordcount-direct-00001-of-00002
>>>>>>> -rw-r--r-- 1 ywatanabe ywatanabe    190 Sep 12 19:56
>>>>>>> py-wordcount-direct-00000-of-00002
>>>>>>>
>>>>>>> Then I wrote sample code with below steps.
>>>>>>>
>>>>>>> 1.Install apache_beam using pip3 separate from source code directory.
>>>>>>> 2. Wrote sample code as below and named it
>>>>>>> "test-protable-runner.py".  Placed it separate directory from source code.
>>>>>>>
>>>>>>> -----------------------------------------------------------------------------------
>>>>>>> (python) ywatanabe@debian-09-00:~$ ls -ltr
>>>>>>> total 16
>>>>>>> drwxr-xr-x 18 ywatanabe ywatanabe 4096 Sep 12 19:06 beam (<- source
>>>>>>> code directory)
>>>>>>> -rw-r--r--  1 ywatanabe ywatanabe  634 Sep 12 20:25
>>>>>>> test-portable-runner.py
>>>>>>>
>>>>>>> -----------------------------------------------------------------------------------
>>>>>>> 3. Executed the code with "python3 test-protable-ruuner.py"
>>>>>>>
>>>>>>>
>>>>>>> ==========================================================================================
>>>>>>> #!/usr/bin/env
>>>>>>>
>>>>>>> import apache_beam as beam
>>>>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>>>>> from apache_beam.io import WriteToText
>>>>>>>
>>>>>>>
>>>>>>> def printMsg(line):
>>>>>>>
>>>>>>>     print("OUTPUT: {0}".format(line))
>>>>>>>
>>>>>>>     return line
>>>>>>>
>>>>>>> options = PipelineOptions(["--runner=PortableRunner",
>>>>>>> "--job_endpoint=localhost:8099", "--shutdown_sources_on_final_watermark"])
>>>>>>>
>>>>>>> p = beam.Pipeline(options=options)
>>>>>>>
>>>>>>> output = ( p | 'create' >> beam.Create(["a", "b", "c"])
>>>>>>>              | beam.Map(printMsg)
>>>>>>>          )
>>>>>>>
>>>>>>> output | 'write' >> WriteToText('/tmp/sample.txt')
>>>>>>>
>>>>>>> =======================================================================================
>>>>>>>
>>>>>>> Job seemed to went all the way to "FINISHED" state.
>>>>>>>
>>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>>>>>>> DataSource (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) [DEPLOYING].
>>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>>>>>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING to RUNNING.
>>>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource
>>>>>>> (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING
>>>>>>> to RUNNING.
>>>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>>>>>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>>>>>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>>>>>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from CREATED to SCHEDULED.
>>>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>>>>>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>>>>>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>>>>>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from SCHEDULED to
>>>>>>> DEPLOYING.
>>>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying CHAIN
>>>>>>> MapPartition (MapPartition at
>>>>>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>>>>>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1) (attempt #0)
>>>>>>> to aad5f142-dbb7-4900-a5ae-af8a6fdcc787 @ localhost (dataPort=-1)
>>>>>>> [flink-akka.actor.default-dispatcher-2] INFO
>>>>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task CHAIN
>>>>>>> MapPartition (MapPartition at
>>>>>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>>>>>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1).
>>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>>> org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task
>>>>>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>>>>>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>>>>>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from RUNNING to FINISHED.
>>>>>>>
>>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>>
>>>>>>> But I ended up with docker error on client side.
>>>>>>>
>>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>> (python) ywatanabe@debian-09-00:~$ python3 test-portable-runner.py
>>>>>>> /home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/__init__.py:84:
>>>>>>> UserWarning: Some syntactic constructs of Python 3 are not yet fully
>>>>>>> supported by Apache Beam.
>>>>>>>   'Some syntactic constructs of Python 3 are not yet fully supported
>>>>>>> by '
>>>>>>> ERROR:root:java.io.IOException: Received exit code 125 for command
>>>>>>> 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>>>>>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>>>>>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>>>>>>> stderr: Unable to find image '
>>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest'
>>>>>>> locallydocker: Error response from daemon: unknown: Subject ywatanabe was
>>>>>>> not found.See 'docker run --help'.
>>>>>>> Traceback (most recent call last):
>>>>>>>   File "test-portable-runner.py", line 27, in <module>
>>>>>>>     result.wait_until_finish()
>>>>>>>   File
>>>>>>> "/home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/runners/portability/portable_runner.py",
>>>>>>> line 446, in wait_until_finish
>>>>>>>     self._job_id, self._state, self._last_error_message()))
>>>>>>> RuntimeError: Pipeline
>>>>>>> BeamApp-ywatanabe-0912112516-22d84d63_d3b5e778-d771-4118-9ba7-c9dde85938e9
>>>>>>> failed in state FAILED: java.io.IOException: Received exit code 125 for
>>>>>>> command 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>>>>>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>>>>>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>>>>>>> stderr: Unable to find image '
>>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest'
>>>>>>> locallydocker: Error response from daemon: unknown: Subject ywatanabe was
>>>>>>> not found.See 'docker run --help'.
>>>>>>>
>>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>>
>>>>>>> As a result , I got nothing under /tmp . Code works when using
>>>>>>> DirectRunner.
>>>>>>> May I ask , where should I look for in order to get the pipeline to
>>>>>>> write results to text files under /tmp ?
>>>>>>>
>>>>>>> Best Regards,
>>>>>>> Yu Watanabe
>>>>>>>
>>>>>>> --
>>>>>>> Yu Watanabe
>>>>>>> Weekend Freelancer who loves to challenge building data platform
>>>>>>> yu.w.tennis@gmail.com
>>>>>>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
>>>>>>> Twitter icon] <https://twitter.com/yuwtennis>
>>>>>>>
>>>>>>

Re: How do you write portable runner pipeline on separate python code ?

Posted by Thomas Weise <th...@apache.org>.
I agree that loopback would be preferable for this purpose. I just wasn't
aware this even works with the portable Flink runner. Is it one of the best
guarded secrets? ;-)

Kyle, can you please post the pipeline options you would use for Flink?


On Thu, Sep 12, 2019 at 5:57 PM Kyle Weaver <kc...@google.com> wrote:

> I prefer loopback because a) it writes output files to the local
> filesystem, as the user expects, and b) you don't have to pull or build
> docker images, or even have docker installed on your system -- which is one
> less point of failure.
>
> Kyle Weaver | Software Engineer | github.com/ibzib | kcweaver@google.com
>
>
> On Thu, Sep 12, 2019 at 5:48 PM Thomas Weise <th...@apache.org> wrote:
>
>> This should become much better with 2.16 when we have the Docker images
>> prebuilt.
>>
>> Docker is probably still the best option for Python on a JVM based runner
>> in a local environment that does not have a development setup.
>>
>>
>> On Thu, Sep 12, 2019 at 1:09 PM Kyle Weaver <kc...@google.com> wrote:
>>
>>> +dev <de...@beam.apache.org> I think we should probably point new users
>>> of the portable Flink/Spark runners to use loopback or some other
>>> non-docker environment, as Docker adds some operational complexity that
>>> isn't really needed to run a word count example. For example, Yu's pipeline
>>> errored here because the expected Docker container wasn't built before
>>> running.
>>>
>>> Kyle Weaver | Software Engineer | github.com/ibzib | kcweaver@google.com
>>>
>>>
>>> On Thu, Sep 12, 2019 at 11:27 AM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>>
>>>> On this note, making local files easy to read is something we'd
>>>> definitely like to improve, as the current behavior is quite surprising.
>>>> This could be useful not just for running with docker and the portable
>>>> runner locally, but more generally when running on a distributed system
>>>> (e.g. a Flink/Spark cluster or Dataflow). It would be very convenient if we
>>>> could automatically stage local files to be read as artifacts that could be
>>>> consumed by any worker (possibly via external directory mounting in the
>>>> local docker case rather than an actual copy), and conversely copy small
>>>> outputs back to the local machine (with the similar optimization for local
>>>> docker).
>>>>
>>>> At the very least, however, obvious messaging when the local filesystem
>>>> is used from within docker, which is often a (non-obvious and hard to
>>>> debug) mistake should be added.
>>>>
>>>>
>>>> On Thu, Sep 12, 2019 at 10:34 AM Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> When you use a local filesystem path and a docker environment, "/tmp"
>>>>> is written inside the container. You can solve this issue by:
>>>>> * Using a "remote" filesystem such as HDFS/S3/GCS/...
>>>>> * Mounting an external directory into the container so that any
>>>>> "local" writes appear outside the container
>>>>> * Using a non-docker environment such as external or process.
>>>>>
>>>>> On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe <yu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello.
>>>>>>
>>>>>> I would like to ask for help with my sample code using portable
>>>>>> runner using apache flink.
>>>>>> I was able to work out the wordcount.py using this page.
>>>>>>
>>>>>> https://beam.apache.org/roadmap/portability/
>>>>>>
>>>>>> I got below two files under /tmp.
>>>>>>
>>>>>> -rw-r--r-- 1 ywatanabe ywatanabe    185 Sep 12 19:56
>>>>>> py-wordcount-direct-00001-of-00002
>>>>>> -rw-r--r-- 1 ywatanabe ywatanabe    190 Sep 12 19:56
>>>>>> py-wordcount-direct-00000-of-00002
>>>>>>
>>>>>> Then I wrote sample code with below steps.
>>>>>>
>>>>>> 1.Install apache_beam using pip3 separate from source code directory.
>>>>>> 2. Wrote sample code as below and named it
>>>>>> "test-protable-runner.py".  Placed it separate directory from source code.
>>>>>>
>>>>>> -----------------------------------------------------------------------------------
>>>>>> (python) ywatanabe@debian-09-00:~$ ls -ltr
>>>>>> total 16
>>>>>> drwxr-xr-x 18 ywatanabe ywatanabe 4096 Sep 12 19:06 beam (<- source
>>>>>> code directory)
>>>>>> -rw-r--r--  1 ywatanabe ywatanabe  634 Sep 12 20:25
>>>>>> test-portable-runner.py
>>>>>>
>>>>>> -----------------------------------------------------------------------------------
>>>>>> 3. Executed the code with "python3 test-protable-ruuner.py"
>>>>>>
>>>>>>
>>>>>> ==========================================================================================
>>>>>> #!/usr/bin/env
>>>>>>
>>>>>> import apache_beam as beam
>>>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>>>> from apache_beam.io import WriteToText
>>>>>>
>>>>>>
>>>>>> def printMsg(line):
>>>>>>
>>>>>>     print("OUTPUT: {0}".format(line))
>>>>>>
>>>>>>     return line
>>>>>>
>>>>>> options = PipelineOptions(["--runner=PortableRunner",
>>>>>> "--job_endpoint=localhost:8099", "--shutdown_sources_on_final_watermark"])
>>>>>>
>>>>>> p = beam.Pipeline(options=options)
>>>>>>
>>>>>> output = ( p | 'create' >> beam.Create(["a", "b", "c"])
>>>>>>              | beam.Map(printMsg)
>>>>>>          )
>>>>>>
>>>>>> output | 'write' >> WriteToText('/tmp/sample.txt')
>>>>>>
>>>>>> =======================================================================================
>>>>>>
>>>>>> Job seemed to went all the way to "FINISHED" state.
>>>>>>
>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>>>>>> DataSource (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) [DEPLOYING].
>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>>>>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING to RUNNING.
>>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource
>>>>>> (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING
>>>>>> to RUNNING.
>>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>>>>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>>>>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>>>>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from CREATED to SCHEDULED.
>>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>>>>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>>>>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>>>>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from SCHEDULED to
>>>>>> DEPLOYING.
>>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying CHAIN
>>>>>> MapPartition (MapPartition at
>>>>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>>>>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1) (attempt #0)
>>>>>> to aad5f142-dbb7-4900-a5ae-af8a6fdcc787 @ localhost (dataPort=-1)
>>>>>> [flink-akka.actor.default-dispatcher-2] INFO
>>>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task CHAIN
>>>>>> MapPartition (MapPartition at
>>>>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>>>>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1).
>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>> org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task
>>>>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>>>>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>>>>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from RUNNING to FINISHED.
>>>>>>
>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>
>>>>>> But I ended up with docker error on client side.
>>>>>>
>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>> (python) ywatanabe@debian-09-00:~$ python3 test-portable-runner.py
>>>>>> /home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/__init__.py:84:
>>>>>> UserWarning: Some syntactic constructs of Python 3 are not yet fully
>>>>>> supported by Apache Beam.
>>>>>>   'Some syntactic constructs of Python 3 are not yet fully supported
>>>>>> by '
>>>>>> ERROR:root:java.io.IOException: Received exit code 125 for command
>>>>>> 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>>>>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>>>>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>>>>>> stderr: Unable to find image '
>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest'
>>>>>> locallydocker: Error response from daemon: unknown: Subject ywatanabe was
>>>>>> not found.See 'docker run --help'.
>>>>>> Traceback (most recent call last):
>>>>>>   File "test-portable-runner.py", line 27, in <module>
>>>>>>     result.wait_until_finish()
>>>>>>   File
>>>>>> "/home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/runners/portability/portable_runner.py",
>>>>>> line 446, in wait_until_finish
>>>>>>     self._job_id, self._state, self._last_error_message()))
>>>>>> RuntimeError: Pipeline
>>>>>> BeamApp-ywatanabe-0912112516-22d84d63_d3b5e778-d771-4118-9ba7-c9dde85938e9
>>>>>> failed in state FAILED: java.io.IOException: Received exit code 125 for
>>>>>> command 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>>>>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>>>>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>>>>>> stderr: Unable to find image '
>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest'
>>>>>> locallydocker: Error response from daemon: unknown: Subject ywatanabe was
>>>>>> not found.See 'docker run --help'.
>>>>>>
>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>
>>>>>> As a result , I got nothing under /tmp . Code works when using
>>>>>> DirectRunner.
>>>>>> May I ask , where should I look for in order to get the pipeline to
>>>>>> write results to text files under /tmp ?
>>>>>>
>>>>>> Best Regards,
>>>>>> Yu Watanabe
>>>>>>
>>>>>> --
>>>>>> Yu Watanabe
>>>>>> Weekend Freelancer who loves to challenge building data platform
>>>>>> yu.w.tennis@gmail.com
>>>>>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
>>>>>> Twitter icon] <https://twitter.com/yuwtennis>
>>>>>>
>>>>>

Re: How do you write portable runner pipeline on separate python code ?

Posted by Thomas Weise <th...@apache.org>.
I agree that loopback would be preferable for this purpose. I just wasn't
aware this even works with the portable Flink runner. Is it one of the best
guarded secrets? ;-)

Kyle, can you please post the pipeline options you would use for Flink?


On Thu, Sep 12, 2019 at 5:57 PM Kyle Weaver <kc...@google.com> wrote:

> I prefer loopback because a) it writes output files to the local
> filesystem, as the user expects, and b) you don't have to pull or build
> docker images, or even have docker installed on your system -- which is one
> less point of failure.
>
> Kyle Weaver | Software Engineer | github.com/ibzib | kcweaver@google.com
>
>
> On Thu, Sep 12, 2019 at 5:48 PM Thomas Weise <th...@apache.org> wrote:
>
>> This should become much better with 2.16 when we have the Docker images
>> prebuilt.
>>
>> Docker is probably still the best option for Python on a JVM based runner
>> in a local environment that does not have a development setup.
>>
>>
>> On Thu, Sep 12, 2019 at 1:09 PM Kyle Weaver <kc...@google.com> wrote:
>>
>>> +dev <de...@beam.apache.org> I think we should probably point new users
>>> of the portable Flink/Spark runners to use loopback or some other
>>> non-docker environment, as Docker adds some operational complexity that
>>> isn't really needed to run a word count example. For example, Yu's pipeline
>>> errored here because the expected Docker container wasn't built before
>>> running.
>>>
>>> Kyle Weaver | Software Engineer | github.com/ibzib | kcweaver@google.com
>>>
>>>
>>> On Thu, Sep 12, 2019 at 11:27 AM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>>
>>>> On this note, making local files easy to read is something we'd
>>>> definitely like to improve, as the current behavior is quite surprising.
>>>> This could be useful not just for running with docker and the portable
>>>> runner locally, but more generally when running on a distributed system
>>>> (e.g. a Flink/Spark cluster or Dataflow). It would be very convenient if we
>>>> could automatically stage local files to be read as artifacts that could be
>>>> consumed by any worker (possibly via external directory mounting in the
>>>> local docker case rather than an actual copy), and conversely copy small
>>>> outputs back to the local machine (with the similar optimization for local
>>>> docker).
>>>>
>>>> At the very least, however, obvious messaging when the local filesystem
>>>> is used from within docker, which is often a (non-obvious and hard to
>>>> debug) mistake should be added.
>>>>
>>>>
>>>> On Thu, Sep 12, 2019 at 10:34 AM Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> When you use a local filesystem path and a docker environment, "/tmp"
>>>>> is written inside the container. You can solve this issue by:
>>>>> * Using a "remote" filesystem such as HDFS/S3/GCS/...
>>>>> * Mounting an external directory into the container so that any
>>>>> "local" writes appear outside the container
>>>>> * Using a non-docker environment such as external or process.
>>>>>
>>>>> On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe <yu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello.
>>>>>>
>>>>>> I would like to ask for help with my sample code using portable
>>>>>> runner using apache flink.
>>>>>> I was able to work out the wordcount.py using this page.
>>>>>>
>>>>>> https://beam.apache.org/roadmap/portability/
>>>>>>
>>>>>> I got below two files under /tmp.
>>>>>>
>>>>>> -rw-r--r-- 1 ywatanabe ywatanabe    185 Sep 12 19:56
>>>>>> py-wordcount-direct-00001-of-00002
>>>>>> -rw-r--r-- 1 ywatanabe ywatanabe    190 Sep 12 19:56
>>>>>> py-wordcount-direct-00000-of-00002
>>>>>>
>>>>>> Then I wrote sample code with below steps.
>>>>>>
>>>>>> 1.Install apache_beam using pip3 separate from source code directory.
>>>>>> 2. Wrote sample code as below and named it
>>>>>> "test-protable-runner.py".  Placed it separate directory from source code.
>>>>>>
>>>>>> -----------------------------------------------------------------------------------
>>>>>> (python) ywatanabe@debian-09-00:~$ ls -ltr
>>>>>> total 16
>>>>>> drwxr-xr-x 18 ywatanabe ywatanabe 4096 Sep 12 19:06 beam (<- source
>>>>>> code directory)
>>>>>> -rw-r--r--  1 ywatanabe ywatanabe  634 Sep 12 20:25
>>>>>> test-portable-runner.py
>>>>>>
>>>>>> -----------------------------------------------------------------------------------
>>>>>> 3. Executed the code with "python3 test-protable-ruuner.py"
>>>>>>
>>>>>>
>>>>>> ==========================================================================================
>>>>>> #!/usr/bin/env
>>>>>>
>>>>>> import apache_beam as beam
>>>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>>>> from apache_beam.io import WriteToText
>>>>>>
>>>>>>
>>>>>> def printMsg(line):
>>>>>>
>>>>>>     print("OUTPUT: {0}".format(line))
>>>>>>
>>>>>>     return line
>>>>>>
>>>>>> options = PipelineOptions(["--runner=PortableRunner",
>>>>>> "--job_endpoint=localhost:8099", "--shutdown_sources_on_final_watermark"])
>>>>>>
>>>>>> p = beam.Pipeline(options=options)
>>>>>>
>>>>>> output = ( p | 'create' >> beam.Create(["a", "b", "c"])
>>>>>>              | beam.Map(printMsg)
>>>>>>          )
>>>>>>
>>>>>> output | 'write' >> WriteToText('/tmp/sample.txt')
>>>>>>
>>>>>> =======================================================================================
>>>>>>
>>>>>> Job seemed to went all the way to "FINISHED" state.
>>>>>>
>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>>>>>> DataSource (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) [DEPLOYING].
>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>>>>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING to RUNNING.
>>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource
>>>>>> (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING
>>>>>> to RUNNING.
>>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>>>>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>>>>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>>>>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from CREATED to SCHEDULED.
>>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>>>>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>>>>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>>>>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from SCHEDULED to
>>>>>> DEPLOYING.
>>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying CHAIN
>>>>>> MapPartition (MapPartition at
>>>>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>>>>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1) (attempt #0)
>>>>>> to aad5f142-dbb7-4900-a5ae-af8a6fdcc787 @ localhost (dataPort=-1)
>>>>>> [flink-akka.actor.default-dispatcher-2] INFO
>>>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task CHAIN
>>>>>> MapPartition (MapPartition at
>>>>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>>>>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1).
>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>> org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task
>>>>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>>>>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>>>>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from RUNNING to FINISHED.
>>>>>>
>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>
>>>>>> But I ended up with docker error on client side.
>>>>>>
>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>> (python) ywatanabe@debian-09-00:~$ python3 test-portable-runner.py
>>>>>> /home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/__init__.py:84:
>>>>>> UserWarning: Some syntactic constructs of Python 3 are not yet fully
>>>>>> supported by Apache Beam.
>>>>>>   'Some syntactic constructs of Python 3 are not yet fully supported
>>>>>> by '
>>>>>> ERROR:root:java.io.IOException: Received exit code 125 for command
>>>>>> 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>>>>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>>>>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>>>>>> stderr: Unable to find image '
>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest'
>>>>>> locallydocker: Error response from daemon: unknown: Subject ywatanabe was
>>>>>> not found.See 'docker run --help'.
>>>>>> Traceback (most recent call last):
>>>>>>   File "test-portable-runner.py", line 27, in <module>
>>>>>>     result.wait_until_finish()
>>>>>>   File
>>>>>> "/home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/runners/portability/portable_runner.py",
>>>>>> line 446, in wait_until_finish
>>>>>>     self._job_id, self._state, self._last_error_message()))
>>>>>> RuntimeError: Pipeline
>>>>>> BeamApp-ywatanabe-0912112516-22d84d63_d3b5e778-d771-4118-9ba7-c9dde85938e9
>>>>>> failed in state FAILED: java.io.IOException: Received exit code 125 for
>>>>>> command 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>>>>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>>>>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>>>>>> stderr: Unable to find image '
>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest'
>>>>>> locallydocker: Error response from daemon: unknown: Subject ywatanabe was
>>>>>> not found.See 'docker run --help'.
>>>>>>
>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>
>>>>>> As a result , I got nothing under /tmp . Code works when using
>>>>>> DirectRunner.
>>>>>> May I ask , where should I look for in order to get the pipeline to
>>>>>> write results to text files under /tmp ?
>>>>>>
>>>>>> Best Regards,
>>>>>> Yu Watanabe
>>>>>>
>>>>>> --
>>>>>> Yu Watanabe
>>>>>> Weekend Freelancer who loves to challenge building data platform
>>>>>> yu.w.tennis@gmail.com
>>>>>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
>>>>>> Twitter icon] <https://twitter.com/yuwtennis>
>>>>>>
>>>>>

Re: How do you write portable runner pipeline on separate python code ?

Posted by Kyle Weaver <kc...@google.com>.
I prefer loopback because a) it writes output files to the local
filesystem, as the user expects, and b) you don't have to pull or build
docker images, or even have docker installed on your system -- which is one
less point of failure.

Kyle Weaver | Software Engineer | github.com/ibzib | kcweaver@google.com


On Thu, Sep 12, 2019 at 5:48 PM Thomas Weise <th...@apache.org> wrote:

> This should become much better with 2.16 when we have the Docker images
> prebuilt.
>
> Docker is probably still the best option for Python on a JVM based runner
> in a local environment that does not have a development setup.
>
>
> On Thu, Sep 12, 2019 at 1:09 PM Kyle Weaver <kc...@google.com> wrote:
>
>> +dev <de...@beam.apache.org> I think we should probably point new users of
>> the portable Flink/Spark runners to use loopback or some other non-docker
>> environment, as Docker adds some operational complexity that isn't really
>> needed to run a word count example. For example, Yu's pipeline errored here
>> because the expected Docker container wasn't built before running.
>>
>> Kyle Weaver | Software Engineer | github.com/ibzib | kcweaver@google.com
>>
>>
>> On Thu, Sep 12, 2019 at 11:27 AM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> On this note, making local files easy to read is something we'd
>>> definitely like to improve, as the current behavior is quite surprising.
>>> This could be useful not just for running with docker and the portable
>>> runner locally, but more generally when running on a distributed system
>>> (e.g. a Flink/Spark cluster or Dataflow). It would be very convenient if we
>>> could automatically stage local files to be read as artifacts that could be
>>> consumed by any worker (possibly via external directory mounting in the
>>> local docker case rather than an actual copy), and conversely copy small
>>> outputs back to the local machine (with the similar optimization for local
>>> docker).
>>>
>>> At the very least, however, obvious messaging when the local filesystem
>>> is used from within docker, which is often a (non-obvious and hard to
>>> debug) mistake should be added.
>>>
>>>
>>> On Thu, Sep 12, 2019 at 10:34 AM Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> When you use a local filesystem path and a docker environment, "/tmp"
>>>> is written inside the container. You can solve this issue by:
>>>> * Using a "remote" filesystem such as HDFS/S3/GCS/...
>>>> * Mounting an external directory into the container so that any "local"
>>>> writes appear outside the container
>>>> * Using a non-docker environment such as external or process.
>>>>
>>>> On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe <yu...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello.
>>>>>
>>>>> I would like to ask for help with my sample code using portable runner
>>>>> using apache flink.
>>>>> I was able to work out the wordcount.py using this page.
>>>>>
>>>>> https://beam.apache.org/roadmap/portability/
>>>>>
>>>>> I got below two files under /tmp.
>>>>>
>>>>> -rw-r--r-- 1 ywatanabe ywatanabe    185 Sep 12 19:56
>>>>> py-wordcount-direct-00001-of-00002
>>>>> -rw-r--r-- 1 ywatanabe ywatanabe    190 Sep 12 19:56
>>>>> py-wordcount-direct-00000-of-00002
>>>>>
>>>>> Then I wrote sample code with below steps.
>>>>>
>>>>> 1.Install apache_beam using pip3 separate from source code directory.
>>>>> 2. Wrote sample code as below and named it "test-protable-runner.py".
>>>>> Placed it separate directory from source code.
>>>>>
>>>>> -----------------------------------------------------------------------------------
>>>>> (python) ywatanabe@debian-09-00:~$ ls -ltr
>>>>> total 16
>>>>> drwxr-xr-x 18 ywatanabe ywatanabe 4096 Sep 12 19:06 beam (<- source
>>>>> code directory)
>>>>> -rw-r--r--  1 ywatanabe ywatanabe  634 Sep 12 20:25
>>>>> test-portable-runner.py
>>>>>
>>>>> -----------------------------------------------------------------------------------
>>>>> 3. Executed the code with "python3 test-protable-ruuner.py"
>>>>>
>>>>>
>>>>> ==========================================================================================
>>>>> #!/usr/bin/env
>>>>>
>>>>> import apache_beam as beam
>>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>>> from apache_beam.io import WriteToText
>>>>>
>>>>>
>>>>> def printMsg(line):
>>>>>
>>>>>     print("OUTPUT: {0}".format(line))
>>>>>
>>>>>     return line
>>>>>
>>>>> options = PipelineOptions(["--runner=PortableRunner",
>>>>> "--job_endpoint=localhost:8099", "--shutdown_sources_on_final_watermark"])
>>>>>
>>>>> p = beam.Pipeline(options=options)
>>>>>
>>>>> output = ( p | 'create' >> beam.Create(["a", "b", "c"])
>>>>>              | beam.Map(printMsg)
>>>>>          )
>>>>>
>>>>> output | 'write' >> WriteToText('/tmp/sample.txt')
>>>>>
>>>>> =======================================================================================
>>>>>
>>>>> Job seemed to went all the way to "FINISHED" state.
>>>>>
>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>>>>> DataSource (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) [DEPLOYING].
>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>>>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING to RUNNING.
>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource
>>>>> (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING
>>>>> to RUNNING.
>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>>>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>>>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>>>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from CREATED to SCHEDULED.
>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>>>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>>>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>>>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from SCHEDULED to
>>>>> DEPLOYING.
>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying CHAIN
>>>>> MapPartition (MapPartition at
>>>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>>>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1) (attempt #0)
>>>>> to aad5f142-dbb7-4900-a5ae-af8a6fdcc787 @ localhost (dataPort=-1)
>>>>> [flink-akka.actor.default-dispatcher-2] INFO
>>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task CHAIN
>>>>> MapPartition (MapPartition at
>>>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>>>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1).
>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>> org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task
>>>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>>>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>>>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from RUNNING to FINISHED.
>>>>>
>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>
>>>>> But I ended up with docker error on client side.
>>>>>
>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>> (python) ywatanabe@debian-09-00:~$ python3 test-portable-runner.py
>>>>> /home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/__init__.py:84:
>>>>> UserWarning: Some syntactic constructs of Python 3 are not yet fully
>>>>> supported by Apache Beam.
>>>>>   'Some syntactic constructs of Python 3 are not yet fully supported
>>>>> by '
>>>>> ERROR:root:java.io.IOException: Received exit code 125 for command
>>>>> 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>>>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>>>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>>>>> stderr: Unable to find image '
>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest'
>>>>> locallydocker: Error response from daemon: unknown: Subject ywatanabe was
>>>>> not found.See 'docker run --help'.
>>>>> Traceback (most recent call last):
>>>>>   File "test-portable-runner.py", line 27, in <module>
>>>>>     result.wait_until_finish()
>>>>>   File
>>>>> "/home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/runners/portability/portable_runner.py",
>>>>> line 446, in wait_until_finish
>>>>>     self._job_id, self._state, self._last_error_message()))
>>>>> RuntimeError: Pipeline
>>>>> BeamApp-ywatanabe-0912112516-22d84d63_d3b5e778-d771-4118-9ba7-c9dde85938e9
>>>>> failed in state FAILED: java.io.IOException: Received exit code 125 for
>>>>> command 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>>>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>>>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>>>>> stderr: Unable to find image '
>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest'
>>>>> locallydocker: Error response from daemon: unknown: Subject ywatanabe was
>>>>> not found.See 'docker run --help'.
>>>>>
>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>
>>>>> As a result , I got nothing under /tmp . Code works when using
>>>>> DirectRunner.
>>>>> May I ask , where should I look for in order to get the pipeline to
>>>>> write results to text files under /tmp ?
>>>>>
>>>>> Best Regards,
>>>>> Yu Watanabe
>>>>>
>>>>> --
>>>>> Yu Watanabe
>>>>> Weekend Freelancer who loves to challenge building data platform
>>>>> yu.w.tennis@gmail.com
>>>>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
>>>>> Twitter icon] <https://twitter.com/yuwtennis>
>>>>>
>>>>

Re: How do you write portable runner pipeline on separate python code ?

Posted by Kyle Weaver <kc...@google.com>.
I prefer loopback because a) it writes output files to the local
filesystem, as the user expects, and b) you don't have to pull or build
docker images, or even have docker installed on your system -- which is one
less point of failure.

Kyle Weaver | Software Engineer | github.com/ibzib | kcweaver@google.com


On Thu, Sep 12, 2019 at 5:48 PM Thomas Weise <th...@apache.org> wrote:

> This should become much better with 2.16 when we have the Docker images
> prebuilt.
>
> Docker is probably still the best option for Python on a JVM based runner
> in a local environment that does not have a development setup.
>
>
> On Thu, Sep 12, 2019 at 1:09 PM Kyle Weaver <kc...@google.com> wrote:
>
>> +dev <de...@beam.apache.org> I think we should probably point new users of
>> the portable Flink/Spark runners to use loopback or some other non-docker
>> environment, as Docker adds some operational complexity that isn't really
>> needed to run a word count example. For example, Yu's pipeline errored here
>> because the expected Docker container wasn't built before running.
>>
>> Kyle Weaver | Software Engineer | github.com/ibzib | kcweaver@google.com
>>
>>
>> On Thu, Sep 12, 2019 at 11:27 AM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> On this note, making local files easy to read is something we'd
>>> definitely like to improve, as the current behavior is quite surprising.
>>> This could be useful not just for running with docker and the portable
>>> runner locally, but more generally when running on a distributed system
>>> (e.g. a Flink/Spark cluster or Dataflow). It would be very convenient if we
>>> could automatically stage local files to be read as artifacts that could be
>>> consumed by any worker (possibly via external directory mounting in the
>>> local docker case rather than an actual copy), and conversely copy small
>>> outputs back to the local machine (with the similar optimization for local
>>> docker).
>>>
>>> At the very least, however, obvious messaging when the local filesystem
>>> is used from within docker, which is often a (non-obvious and hard to
>>> debug) mistake should be added.
>>>
>>>
>>> On Thu, Sep 12, 2019 at 10:34 AM Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> When you use a local filesystem path and a docker environment, "/tmp"
>>>> is written inside the container. You can solve this issue by:
>>>> * Using a "remote" filesystem such as HDFS/S3/GCS/...
>>>> * Mounting an external directory into the container so that any "local"
>>>> writes appear outside the container
>>>> * Using a non-docker environment such as external or process.
>>>>
>>>> On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe <yu...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello.
>>>>>
>>>>> I would like to ask for help with my sample code using portable runner
>>>>> using apache flink.
>>>>> I was able to work out the wordcount.py using this page.
>>>>>
>>>>> https://beam.apache.org/roadmap/portability/
>>>>>
>>>>> I got below two files under /tmp.
>>>>>
>>>>> -rw-r--r-- 1 ywatanabe ywatanabe    185 Sep 12 19:56
>>>>> py-wordcount-direct-00001-of-00002
>>>>> -rw-r--r-- 1 ywatanabe ywatanabe    190 Sep 12 19:56
>>>>> py-wordcount-direct-00000-of-00002
>>>>>
>>>>> Then I wrote sample code with below steps.
>>>>>
>>>>> 1.Install apache_beam using pip3 separate from source code directory.
>>>>> 2. Wrote sample code as below and named it "test-protable-runner.py".
>>>>> Placed it separate directory from source code.
>>>>>
>>>>> -----------------------------------------------------------------------------------
>>>>> (python) ywatanabe@debian-09-00:~$ ls -ltr
>>>>> total 16
>>>>> drwxr-xr-x 18 ywatanabe ywatanabe 4096 Sep 12 19:06 beam (<- source
>>>>> code directory)
>>>>> -rw-r--r--  1 ywatanabe ywatanabe  634 Sep 12 20:25
>>>>> test-portable-runner.py
>>>>>
>>>>> -----------------------------------------------------------------------------------
>>>>> 3. Executed the code with "python3 test-protable-ruuner.py"
>>>>>
>>>>>
>>>>> ==========================================================================================
>>>>> #!/usr/bin/env
>>>>>
>>>>> import apache_beam as beam
>>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>>> from apache_beam.io import WriteToText
>>>>>
>>>>>
>>>>> def printMsg(line):
>>>>>
>>>>>     print("OUTPUT: {0}".format(line))
>>>>>
>>>>>     return line
>>>>>
>>>>> options = PipelineOptions(["--runner=PortableRunner",
>>>>> "--job_endpoint=localhost:8099", "--shutdown_sources_on_final_watermark"])
>>>>>
>>>>> p = beam.Pipeline(options=options)
>>>>>
>>>>> output = ( p | 'create' >> beam.Create(["a", "b", "c"])
>>>>>              | beam.Map(printMsg)
>>>>>          )
>>>>>
>>>>> output | 'write' >> WriteToText('/tmp/sample.txt')
>>>>>
>>>>> =======================================================================================
>>>>>
>>>>> Job seemed to went all the way to "FINISHED" state.
>>>>>
>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>>>>> DataSource (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) [DEPLOYING].
>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>>>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING to RUNNING.
>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource
>>>>> (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING
>>>>> to RUNNING.
>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>>>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>>>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>>>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from CREATED to SCHEDULED.
>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>>>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>>>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>>>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from SCHEDULED to
>>>>> DEPLOYING.
>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying CHAIN
>>>>> MapPartition (MapPartition at
>>>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>>>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1) (attempt #0)
>>>>> to aad5f142-dbb7-4900-a5ae-af8a6fdcc787 @ localhost (dataPort=-1)
>>>>> [flink-akka.actor.default-dispatcher-2] INFO
>>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task CHAIN
>>>>> MapPartition (MapPartition at
>>>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>>>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1).
>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>> org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task
>>>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>>>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>>>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from RUNNING to FINISHED.
>>>>>
>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>
>>>>> But I ended up with docker error on client side.
>>>>>
>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>> (python) ywatanabe@debian-09-00:~$ python3 test-portable-runner.py
>>>>> /home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/__init__.py:84:
>>>>> UserWarning: Some syntactic constructs of Python 3 are not yet fully
>>>>> supported by Apache Beam.
>>>>>   'Some syntactic constructs of Python 3 are not yet fully supported
>>>>> by '
>>>>> ERROR:root:java.io.IOException: Received exit code 125 for command
>>>>> 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>>>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>>>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>>>>> stderr: Unable to find image '
>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest'
>>>>> locallydocker: Error response from daemon: unknown: Subject ywatanabe was
>>>>> not found.See 'docker run --help'.
>>>>> Traceback (most recent call last):
>>>>>   File "test-portable-runner.py", line 27, in <module>
>>>>>     result.wait_until_finish()
>>>>>   File
>>>>> "/home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/runners/portability/portable_runner.py",
>>>>> line 446, in wait_until_finish
>>>>>     self._job_id, self._state, self._last_error_message()))
>>>>> RuntimeError: Pipeline
>>>>> BeamApp-ywatanabe-0912112516-22d84d63_d3b5e778-d771-4118-9ba7-c9dde85938e9
>>>>> failed in state FAILED: java.io.IOException: Received exit code 125 for
>>>>> command 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>>>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>>>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>>>>> stderr: Unable to find image '
>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest'
>>>>> locallydocker: Error response from daemon: unknown: Subject ywatanabe was
>>>>> not found.See 'docker run --help'.
>>>>>
>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>
>>>>> As a result , I got nothing under /tmp . Code works when using
>>>>> DirectRunner.
>>>>> May I ask , where should I look for in order to get the pipeline to
>>>>> write results to text files under /tmp ?
>>>>>
>>>>> Best Regards,
>>>>> Yu Watanabe
>>>>>
>>>>> --
>>>>> Yu Watanabe
>>>>> Weekend Freelancer who loves to challenge building data platform
>>>>> yu.w.tennis@gmail.com
>>>>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
>>>>> Twitter icon] <https://twitter.com/yuwtennis>
>>>>>
>>>>

Re: How do you write portable runner pipeline on separate python code ?

Posted by Thomas Weise <th...@apache.org>.
This should become much better with 2.16 when we have the Docker images
prebuilt.

Docker is probably still the best option for Python on a JVM based runner
in a local environment that does not have a development setup.


On Thu, Sep 12, 2019 at 1:09 PM Kyle Weaver <kc...@google.com> wrote:

> +dev <de...@beam.apache.org> I think we should probably point new users of
> the portable Flink/Spark runners to use loopback or some other non-docker
> environment, as Docker adds some operational complexity that isn't really
> needed to run a word count example. For example, Yu's pipeline errored here
> because the expected Docker container wasn't built before running.
>
> Kyle Weaver | Software Engineer | github.com/ibzib | kcweaver@google.com
>
>
> On Thu, Sep 12, 2019 at 11:27 AM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> On this note, making local files easy to read is something we'd
>> definitely like to improve, as the current behavior is quite surprising.
>> This could be useful not just for running with docker and the portable
>> runner locally, but more generally when running on a distributed system
>> (e.g. a Flink/Spark cluster or Dataflow). It would be very convenient if we
>> could automatically stage local files to be read as artifacts that could be
>> consumed by any worker (possibly via external directory mounting in the
>> local docker case rather than an actual copy), and conversely copy small
>> outputs back to the local machine (with the similar optimization for local
>> docker).
>>
>> At the very least, however, obvious messaging when the local filesystem
>> is used from within docker, which is often a (non-obvious and hard to
>> debug) mistake should be added.
>>
>>
>> On Thu, Sep 12, 2019 at 10:34 AM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> When you use a local filesystem path and a docker environment, "/tmp" is
>>> written inside the container. You can solve this issue by:
>>> * Using a "remote" filesystem such as HDFS/S3/GCS/...
>>> * Mounting an external directory into the container so that any "local"
>>> writes appear outside the container
>>> * Using a non-docker environment such as external or process.
>>>
>>> On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe <yu...@gmail.com>
>>> wrote:
>>>
>>>> Hello.
>>>>
>>>> I would like to ask for help with my sample code using portable runner
>>>> using apache flink.
>>>> I was able to work out the wordcount.py using this page.
>>>>
>>>> https://beam.apache.org/roadmap/portability/
>>>>
>>>> I got below two files under /tmp.
>>>>
>>>> -rw-r--r-- 1 ywatanabe ywatanabe    185 Sep 12 19:56
>>>> py-wordcount-direct-00001-of-00002
>>>> -rw-r--r-- 1 ywatanabe ywatanabe    190 Sep 12 19:56
>>>> py-wordcount-direct-00000-of-00002
>>>>
>>>> Then I wrote sample code with below steps.
>>>>
>>>> 1.Install apache_beam using pip3 separate from source code directory.
>>>> 2. Wrote sample code as below and named it "test-protable-runner.py".
>>>> Placed it separate directory from source code.
>>>>
>>>> -----------------------------------------------------------------------------------
>>>> (python) ywatanabe@debian-09-00:~$ ls -ltr
>>>> total 16
>>>> drwxr-xr-x 18 ywatanabe ywatanabe 4096 Sep 12 19:06 beam (<- source
>>>> code directory)
>>>> -rw-r--r--  1 ywatanabe ywatanabe  634 Sep 12 20:25
>>>> test-portable-runner.py
>>>>
>>>> -----------------------------------------------------------------------------------
>>>> 3. Executed the code with "python3 test-protable-ruuner.py"
>>>>
>>>>
>>>> ==========================================================================================
>>>> #!/usr/bin/env
>>>>
>>>> import apache_beam as beam
>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>> from apache_beam.io import WriteToText
>>>>
>>>>
>>>> def printMsg(line):
>>>>
>>>>     print("OUTPUT: {0}".format(line))
>>>>
>>>>     return line
>>>>
>>>> options = PipelineOptions(["--runner=PortableRunner",
>>>> "--job_endpoint=localhost:8099", "--shutdown_sources_on_final_watermark"])
>>>>
>>>> p = beam.Pipeline(options=options)
>>>>
>>>> output = ( p | 'create' >> beam.Create(["a", "b", "c"])
>>>>              | beam.Map(printMsg)
>>>>          )
>>>>
>>>> output | 'write' >> WriteToText('/tmp/sample.txt')
>>>>
>>>> =======================================================================================
>>>>
>>>> Job seemed to went all the way to "FINISHED" state.
>>>>
>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>> [DataSource (Impulse) (1/1)] INFO
>>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>>>> DataSource (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) [DEPLOYING].
>>>> [DataSource (Impulse) (1/1)] INFO
>>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING to RUNNING.
>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource
>>>> (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING
>>>> to RUNNING.
>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from CREATED to SCHEDULED.
>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from SCHEDULED to
>>>> DEPLOYING.
>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying CHAIN
>>>> MapPartition (MapPartition at
>>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1) (attempt #0)
>>>> to aad5f142-dbb7-4900-a5ae-af8a6fdcc787 @ localhost (dataPort=-1)
>>>> [flink-akka.actor.default-dispatcher-2] INFO
>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task CHAIN
>>>> MapPartition (MapPartition at
>>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1).
>>>> [DataSource (Impulse) (1/1)] INFO
>>>> org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task
>>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>>>> [DataSource (Impulse) (1/1)] INFO
>>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>>>> [DataSource (Impulse) (1/1)] INFO
>>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from RUNNING to FINISHED.
>>>>
>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>
>>>> But I ended up with docker error on client side.
>>>>
>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>> (python) ywatanabe@debian-09-00:~$ python3 test-portable-runner.py
>>>> /home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/__init__.py:84:
>>>> UserWarning: Some syntactic constructs of Python 3 are not yet fully
>>>> supported by Apache Beam.
>>>>   'Some syntactic constructs of Python 3 are not yet fully supported by
>>>> '
>>>> ERROR:root:java.io.IOException: Received exit code 125 for command
>>>> 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>>>> stderr: Unable to find image '
>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest' locallydocker:
>>>> Error response from daemon: unknown: Subject ywatanabe was not found.See
>>>> 'docker run --help'.
>>>> Traceback (most recent call last):
>>>>   File "test-portable-runner.py", line 27, in <module>
>>>>     result.wait_until_finish()
>>>>   File
>>>> "/home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/runners/portability/portable_runner.py",
>>>> line 446, in wait_until_finish
>>>>     self._job_id, self._state, self._last_error_message()))
>>>> RuntimeError: Pipeline
>>>> BeamApp-ywatanabe-0912112516-22d84d63_d3b5e778-d771-4118-9ba7-c9dde85938e9
>>>> failed in state FAILED: java.io.IOException: Received exit code 125 for
>>>> command 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>>>> stderr: Unable to find image '
>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest' locallydocker:
>>>> Error response from daemon: unknown: Subject ywatanabe was not found.See
>>>> 'docker run --help'.
>>>>
>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>
>>>> As a result , I got nothing under /tmp . Code works when using
>>>> DirectRunner.
>>>> May I ask , where should I look for in order to get the pipeline to
>>>> write results to text files under /tmp ?
>>>>
>>>> Best Regards,
>>>> Yu Watanabe
>>>>
>>>> --
>>>> Yu Watanabe
>>>> Weekend Freelancer who loves to challenge building data platform
>>>> yu.w.tennis@gmail.com
>>>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
>>>> Twitter icon] <https://twitter.com/yuwtennis>
>>>>
>>>

Re: How do you write portable runner pipeline on separate python code ?

Posted by Thomas Weise <th...@apache.org>.
This should become much better with 2.16 when we have the Docker images
prebuilt.

Docker is probably still the best option for Python on a JVM based runner
in a local environment that does not have a development setup.


On Thu, Sep 12, 2019 at 1:09 PM Kyle Weaver <kc...@google.com> wrote:

> +dev <de...@beam.apache.org> I think we should probably point new users of
> the portable Flink/Spark runners to use loopback or some other non-docker
> environment, as Docker adds some operational complexity that isn't really
> needed to run a word count example. For example, Yu's pipeline errored here
> because the expected Docker container wasn't built before running.
>
> Kyle Weaver | Software Engineer | github.com/ibzib | kcweaver@google.com
>
>
> On Thu, Sep 12, 2019 at 11:27 AM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> On this note, making local files easy to read is something we'd
>> definitely like to improve, as the current behavior is quite surprising.
>> This could be useful not just for running with docker and the portable
>> runner locally, but more generally when running on a distributed system
>> (e.g. a Flink/Spark cluster or Dataflow). It would be very convenient if we
>> could automatically stage local files to be read as artifacts that could be
>> consumed by any worker (possibly via external directory mounting in the
>> local docker case rather than an actual copy), and conversely copy small
>> outputs back to the local machine (with the similar optimization for local
>> docker).
>>
>> At the very least, however, obvious messaging when the local filesystem
>> is used from within docker, which is often a (non-obvious and hard to
>> debug) mistake should be added.
>>
>>
>> On Thu, Sep 12, 2019 at 10:34 AM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> When you use a local filesystem path and a docker environment, "/tmp" is
>>> written inside the container. You can solve this issue by:
>>> * Using a "remote" filesystem such as HDFS/S3/GCS/...
>>> * Mounting an external directory into the container so that any "local"
>>> writes appear outside the container
>>> * Using a non-docker environment such as external or process.
>>>
>>> On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe <yu...@gmail.com>
>>> wrote:
>>>
>>>> Hello.
>>>>
>>>> I would like to ask for help with my sample code using portable runner
>>>> using apache flink.
>>>> I was able to work out the wordcount.py using this page.
>>>>
>>>> https://beam.apache.org/roadmap/portability/
>>>>
>>>> I got below two files under /tmp.
>>>>
>>>> -rw-r--r-- 1 ywatanabe ywatanabe    185 Sep 12 19:56
>>>> py-wordcount-direct-00001-of-00002
>>>> -rw-r--r-- 1 ywatanabe ywatanabe    190 Sep 12 19:56
>>>> py-wordcount-direct-00000-of-00002
>>>>
>>>> Then I wrote sample code with below steps.
>>>>
>>>> 1.Install apache_beam using pip3 separate from source code directory.
>>>> 2. Wrote sample code as below and named it "test-protable-runner.py".
>>>> Placed it separate directory from source code.
>>>>
>>>> -----------------------------------------------------------------------------------
>>>> (python) ywatanabe@debian-09-00:~$ ls -ltr
>>>> total 16
>>>> drwxr-xr-x 18 ywatanabe ywatanabe 4096 Sep 12 19:06 beam (<- source
>>>> code directory)
>>>> -rw-r--r--  1 ywatanabe ywatanabe  634 Sep 12 20:25
>>>> test-portable-runner.py
>>>>
>>>> -----------------------------------------------------------------------------------
>>>> 3. Executed the code with "python3 test-protable-ruuner.py"
>>>>
>>>>
>>>> ==========================================================================================
>>>> #!/usr/bin/env
>>>>
>>>> import apache_beam as beam
>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>> from apache_beam.io import WriteToText
>>>>
>>>>
>>>> def printMsg(line):
>>>>
>>>>     print("OUTPUT: {0}".format(line))
>>>>
>>>>     return line
>>>>
>>>> options = PipelineOptions(["--runner=PortableRunner",
>>>> "--job_endpoint=localhost:8099", "--shutdown_sources_on_final_watermark"])
>>>>
>>>> p = beam.Pipeline(options=options)
>>>>
>>>> output = ( p | 'create' >> beam.Create(["a", "b", "c"])
>>>>              | beam.Map(printMsg)
>>>>          )
>>>>
>>>> output | 'write' >> WriteToText('/tmp/sample.txt')
>>>>
>>>> =======================================================================================
>>>>
>>>> Job seemed to went all the way to "FINISHED" state.
>>>>
>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>> [DataSource (Impulse) (1/1)] INFO
>>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>>>> DataSource (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) [DEPLOYING].
>>>> [DataSource (Impulse) (1/1)] INFO
>>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING to RUNNING.
>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource
>>>> (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING
>>>> to RUNNING.
>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from CREATED to SCHEDULED.
>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from SCHEDULED to
>>>> DEPLOYING.
>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying CHAIN
>>>> MapPartition (MapPartition at
>>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1) (attempt #0)
>>>> to aad5f142-dbb7-4900-a5ae-af8a6fdcc787 @ localhost (dataPort=-1)
>>>> [flink-akka.actor.default-dispatcher-2] INFO
>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task CHAIN
>>>> MapPartition (MapPartition at
>>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1).
>>>> [DataSource (Impulse) (1/1)] INFO
>>>> org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task
>>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>>>> [DataSource (Impulse) (1/1)] INFO
>>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>>>> [DataSource (Impulse) (1/1)] INFO
>>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from RUNNING to FINISHED.
>>>>
>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>
>>>> But I ended up with docker error on client side.
>>>>
>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>> (python) ywatanabe@debian-09-00:~$ python3 test-portable-runner.py
>>>> /home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/__init__.py:84:
>>>> UserWarning: Some syntactic constructs of Python 3 are not yet fully
>>>> supported by Apache Beam.
>>>>   'Some syntactic constructs of Python 3 are not yet fully supported by
>>>> '
>>>> ERROR:root:java.io.IOException: Received exit code 125 for command
>>>> 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>>>> stderr: Unable to find image '
>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest' locallydocker:
>>>> Error response from daemon: unknown: Subject ywatanabe was not found.See
>>>> 'docker run --help'.
>>>> Traceback (most recent call last):
>>>>   File "test-portable-runner.py", line 27, in <module>
>>>>     result.wait_until_finish()
>>>>   File
>>>> "/home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/runners/portability/portable_runner.py",
>>>> line 446, in wait_until_finish
>>>>     self._job_id, self._state, self._last_error_message()))
>>>> RuntimeError: Pipeline
>>>> BeamApp-ywatanabe-0912112516-22d84d63_d3b5e778-d771-4118-9ba7-c9dde85938e9
>>>> failed in state FAILED: java.io.IOException: Received exit code 125 for
>>>> command 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>>>> stderr: Unable to find image '
>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest' locallydocker:
>>>> Error response from daemon: unknown: Subject ywatanabe was not found.See
>>>> 'docker run --help'.
>>>>
>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>
>>>> As a result , I got nothing under /tmp . Code works when using
>>>> DirectRunner.
>>>> May I ask , where should I look for in order to get the pipeline to
>>>> write results to text files under /tmp ?
>>>>
>>>> Best Regards,
>>>> Yu Watanabe
>>>>
>>>> --
>>>> Yu Watanabe
>>>> Weekend Freelancer who loves to challenge building data platform
>>>> yu.w.tennis@gmail.com
>>>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
>>>> Twitter icon] <https://twitter.com/yuwtennis>
>>>>
>>>

Re: How do you write portable runner pipeline on separate python code ?

Posted by Yu Watanabe <yu...@gmail.com>.
Kyle

Thank you for the advice.

> For example, Yu's pipeline errored here because the expected Docker
container wasn't built before running.

I was able to spin up the harness container  and submit job to the job
service by preparing the container properly.
I needed to do extra steps  in the online instruction..
What I have done is things you should already know I guess.

Below (*) is what I have done.

============================================================================
https://beam.apache.org/documentation/runners/flink/

Executing a Beam pipeline on a Flink Cluster

1. Only required once: Build the SDK harness container (optionally replace
py35 with the Python version of your choice): ./gradlew
:sdks:python:container:py35:docker
*2. Prepare bintray account (https://bintray.com/)
*3. Push the image to bintray registry using "docker push" (mentioned here
=> https://github.com/apache/beam/blob/release-2.15.0/sdks/CONTAINERS.md)
*4. Login to bintray account by "docker login"
5.. Start the JobService endpoint: ./gradlew
:runners:flink:1.5:job-server:runShadow

The JobService is the central instance where you submit your Beam pipeline
to. The JobService will create a Flink job for the pipeline and execute the
job. To execute the job on a Flink cluster, the Beam JobService needs to be
provided with the Flink JobManager address.

6. Submit the Python pipeline to the above endpoint by using the
PortableRunner and job_endpoint set to localhost:8099 (this is the default
address of the JobService). For example:
============================================================================

Thanks,
Yu Watanabe

On Fri, Sep 13, 2019 at 5:09 AM Kyle Weaver <kc...@google.com> wrote:

> +dev <de...@beam.apache.org> I think we should probably point new users of
> the portable Flink/Spark runners to use loopback or some other non-docker
> environment, as Docker adds some operational complexity that isn't really
> needed to run a word count example. For example, Yu's pipeline errored here
> because the expected Docker container wasn't built before running.
>
> Kyle Weaver | Software Engineer | github.com/ibzib | kcweaver@google.com
>
>
> On Thu, Sep 12, 2019 at 11:27 AM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> On this note, making local files easy to read is something we'd
>> definitely like to improve, as the current behavior is quite surprising.
>> This could be useful not just for running with docker and the portable
>> runner locally, but more generally when running on a distributed system
>> (e.g. a Flink/Spark cluster or Dataflow). It would be very convenient if we
>> could automatically stage local files to be read as artifacts that could be
>> consumed by any worker (possibly via external directory mounting in the
>> local docker case rather than an actual copy), and conversely copy small
>> outputs back to the local machine (with the similar optimization for local
>> docker).
>>
>> At the very least, however, obvious messaging when the local filesystem
>> is used from within docker, which is often a (non-obvious and hard to
>> debug) mistake should be added.
>>
>>
>> On Thu, Sep 12, 2019 at 10:34 AM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> When you use a local filesystem path and a docker environment, "/tmp" is
>>> written inside the container. You can solve this issue by:
>>> * Using a "remote" filesystem such as HDFS/S3/GCS/...
>>> * Mounting an external directory into the container so that any "local"
>>> writes appear outside the container
>>> * Using a non-docker environment such as external or process.
>>>
>>> On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe <yu...@gmail.com>
>>> wrote:
>>>
>>>> Hello.
>>>>
>>>> I would like to ask for help with my sample code using portable runner
>>>> using apache flink.
>>>> I was able to work out the wordcount.py using this page.
>>>>
>>>> https://beam.apache.org/roadmap/portability/
>>>>
>>>> I got below two files under /tmp.
>>>>
>>>> -rw-r--r-- 1 ywatanabe ywatanabe    185 Sep 12 19:56
>>>> py-wordcount-direct-00001-of-00002
>>>> -rw-r--r-- 1 ywatanabe ywatanabe    190 Sep 12 19:56
>>>> py-wordcount-direct-00000-of-00002
>>>>
>>>> Then I wrote sample code with below steps.
>>>>
>>>> 1.Install apache_beam using pip3 separate from source code directory.
>>>> 2. Wrote sample code as below and named it "test-protable-runner.py".
>>>> Placed it separate directory from source code.
>>>>
>>>> -----------------------------------------------------------------------------------
>>>> (python) ywatanabe@debian-09-00:~$ ls -ltr
>>>> total 16
>>>> drwxr-xr-x 18 ywatanabe ywatanabe 4096 Sep 12 19:06 beam (<- source
>>>> code directory)
>>>> -rw-r--r--  1 ywatanabe ywatanabe  634 Sep 12 20:25
>>>> test-portable-runner.py
>>>>
>>>> -----------------------------------------------------------------------------------
>>>> 3. Executed the code with "python3 test-protable-ruuner.py"
>>>>
>>>>
>>>> ==========================================================================================
>>>> #!/usr/bin/env
>>>>
>>>> import apache_beam as beam
>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>> from apache_beam.io import WriteToText
>>>>
>>>>
>>>> def printMsg(line):
>>>>
>>>>     print("OUTPUT: {0}".format(line))
>>>>
>>>>     return line
>>>>
>>>> options = PipelineOptions(["--runner=PortableRunner",
>>>> "--job_endpoint=localhost:8099", "--shutdown_sources_on_final_watermark"])
>>>>
>>>> p = beam.Pipeline(options=options)
>>>>
>>>> output = ( p | 'create' >> beam.Create(["a", "b", "c"])
>>>>              | beam.Map(printMsg)
>>>>          )
>>>>
>>>> output | 'write' >> WriteToText('/tmp/sample.txt')
>>>>
>>>> =======================================================================================
>>>>
>>>> Job seemed to went all the way to "FINISHED" state.
>>>>
>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>> [DataSource (Impulse) (1/1)] INFO
>>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>>>> DataSource (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) [DEPLOYING].
>>>> [DataSource (Impulse) (1/1)] INFO
>>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING to RUNNING.
>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource
>>>> (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING
>>>> to RUNNING.
>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from CREATED to SCHEDULED.
>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from SCHEDULED to
>>>> DEPLOYING.
>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying CHAIN
>>>> MapPartition (MapPartition at
>>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1) (attempt #0)
>>>> to aad5f142-dbb7-4900-a5ae-af8a6fdcc787 @ localhost (dataPort=-1)
>>>> [flink-akka.actor.default-dispatcher-2] INFO
>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task CHAIN
>>>> MapPartition (MapPartition at
>>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1).
>>>> [DataSource (Impulse) (1/1)] INFO
>>>> org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task
>>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>>>> [DataSource (Impulse) (1/1)] INFO
>>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>>>> [DataSource (Impulse) (1/1)] INFO
>>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from RUNNING to FINISHED.
>>>>
>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>
>>>> But I ended up with docker error on client side.
>>>>
>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>> (python) ywatanabe@debian-09-00:~$ python3 test-portable-runner.py
>>>> /home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/__init__.py:84:
>>>> UserWarning: Some syntactic constructs of Python 3 are not yet fully
>>>> supported by Apache Beam.
>>>>   'Some syntactic constructs of Python 3 are not yet fully supported by
>>>> '
>>>> ERROR:root:java.io.IOException: Received exit code 125 for command
>>>> 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>>>> stderr: Unable to find image '
>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest' locallydocker:
>>>> Error response from daemon: unknown: Subject ywatanabe was not found.See
>>>> 'docker run --help'.
>>>> Traceback (most recent call last):
>>>>   File "test-portable-runner.py", line 27, in <module>
>>>>     result.wait_until_finish()
>>>>   File
>>>> "/home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/runners/portability/portable_runner.py",
>>>> line 446, in wait_until_finish
>>>>     self._job_id, self._state, self._last_error_message()))
>>>> RuntimeError: Pipeline
>>>> BeamApp-ywatanabe-0912112516-22d84d63_d3b5e778-d771-4118-9ba7-c9dde85938e9
>>>> failed in state FAILED: java.io.IOException: Received exit code 125 for
>>>> command 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>>>> stderr: Unable to find image '
>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest' locallydocker:
>>>> Error response from daemon: unknown: Subject ywatanabe was not found.See
>>>> 'docker run --help'.
>>>>
>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>
>>>> As a result , I got nothing under /tmp . Code works when using
>>>> DirectRunner.
>>>> May I ask , where should I look for in order to get the pipeline to
>>>> write results to text files under /tmp ?
>>>>
>>>> Best Regards,
>>>> Yu Watanabe
>>>>
>>>> --
>>>> Yu Watanabe
>>>> Weekend Freelancer who loves to challenge building data platform
>>>> yu.w.tennis@gmail.com
>>>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
>>>> Twitter icon] <https://twitter.com/yuwtennis>
>>>>
>>>

-- 
Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
yu.w.tennis@gmail.com
[image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
Twitter icon] <https://twitter.com/yuwtennis>

Re: How do you write portable runner pipeline on separate python code ?

Posted by Yu Watanabe <yu...@gmail.com>.
Kyle

Thank you for the advice.

> For example, Yu's pipeline errored here because the expected Docker
container wasn't built before running.

I was able to spin up the harness container  and submit job to the job
service by preparing the container properly.
I needed to do extra steps  in the online instruction..
What I have done is things you should already know I guess.

Below (*) is what I have done.

============================================================================
https://beam.apache.org/documentation/runners/flink/

Executing a Beam pipeline on a Flink Cluster

1. Only required once: Build the SDK harness container (optionally replace
py35 with the Python version of your choice): ./gradlew
:sdks:python:container:py35:docker
*2. Prepare bintray account (https://bintray.com/)
*3. Push the image to bintray registry using "docker push" (mentioned here
=> https://github.com/apache/beam/blob/release-2.15.0/sdks/CONTAINERS.md)
*4. Login to bintray account by "docker login"
5.. Start the JobService endpoint: ./gradlew
:runners:flink:1.5:job-server:runShadow

The JobService is the central instance where you submit your Beam pipeline
to. The JobService will create a Flink job for the pipeline and execute the
job. To execute the job on a Flink cluster, the Beam JobService needs to be
provided with the Flink JobManager address.

6. Submit the Python pipeline to the above endpoint by using the
PortableRunner and job_endpoint set to localhost:8099 (this is the default
address of the JobService). For example:
============================================================================

Thanks,
Yu Watanabe

On Fri, Sep 13, 2019 at 5:09 AM Kyle Weaver <kc...@google.com> wrote:

> +dev <de...@beam.apache.org> I think we should probably point new users of
> the portable Flink/Spark runners to use loopback or some other non-docker
> environment, as Docker adds some operational complexity that isn't really
> needed to run a word count example. For example, Yu's pipeline errored here
> because the expected Docker container wasn't built before running.
>
> Kyle Weaver | Software Engineer | github.com/ibzib | kcweaver@google.com
>
>
> On Thu, Sep 12, 2019 at 11:27 AM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> On this note, making local files easy to read is something we'd
>> definitely like to improve, as the current behavior is quite surprising.
>> This could be useful not just for running with docker and the portable
>> runner locally, but more generally when running on a distributed system
>> (e.g. a Flink/Spark cluster or Dataflow). It would be very convenient if we
>> could automatically stage local files to be read as artifacts that could be
>> consumed by any worker (possibly via external directory mounting in the
>> local docker case rather than an actual copy), and conversely copy small
>> outputs back to the local machine (with the similar optimization for local
>> docker).
>>
>> At the very least, however, obvious messaging when the local filesystem
>> is used from within docker, which is often a (non-obvious and hard to
>> debug) mistake should be added.
>>
>>
>> On Thu, Sep 12, 2019 at 10:34 AM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> When you use a local filesystem path and a docker environment, "/tmp" is
>>> written inside the container. You can solve this issue by:
>>> * Using a "remote" filesystem such as HDFS/S3/GCS/...
>>> * Mounting an external directory into the container so that any "local"
>>> writes appear outside the container
>>> * Using a non-docker environment such as external or process.
>>>
>>> On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe <yu...@gmail.com>
>>> wrote:
>>>
>>>> Hello.
>>>>
>>>> I would like to ask for help with my sample code using portable runner
>>>> using apache flink.
>>>> I was able to work out the wordcount.py using this page.
>>>>
>>>> https://beam.apache.org/roadmap/portability/
>>>>
>>>> I got below two files under /tmp.
>>>>
>>>> -rw-r--r-- 1 ywatanabe ywatanabe    185 Sep 12 19:56
>>>> py-wordcount-direct-00001-of-00002
>>>> -rw-r--r-- 1 ywatanabe ywatanabe    190 Sep 12 19:56
>>>> py-wordcount-direct-00000-of-00002
>>>>
>>>> Then I wrote sample code with below steps.
>>>>
>>>> 1.Install apache_beam using pip3 separate from source code directory.
>>>> 2. Wrote sample code as below and named it "test-protable-runner.py".
>>>> Placed it separate directory from source code.
>>>>
>>>> -----------------------------------------------------------------------------------
>>>> (python) ywatanabe@debian-09-00:~$ ls -ltr
>>>> total 16
>>>> drwxr-xr-x 18 ywatanabe ywatanabe 4096 Sep 12 19:06 beam (<- source
>>>> code directory)
>>>> -rw-r--r--  1 ywatanabe ywatanabe  634 Sep 12 20:25
>>>> test-portable-runner.py
>>>>
>>>> -----------------------------------------------------------------------------------
>>>> 3. Executed the code with "python3 test-protable-ruuner.py"
>>>>
>>>>
>>>> ==========================================================================================
>>>> #!/usr/bin/env
>>>>
>>>> import apache_beam as beam
>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>> from apache_beam.io import WriteToText
>>>>
>>>>
>>>> def printMsg(line):
>>>>
>>>>     print("OUTPUT: {0}".format(line))
>>>>
>>>>     return line
>>>>
>>>> options = PipelineOptions(["--runner=PortableRunner",
>>>> "--job_endpoint=localhost:8099", "--shutdown_sources_on_final_watermark"])
>>>>
>>>> p = beam.Pipeline(options=options)
>>>>
>>>> output = ( p | 'create' >> beam.Create(["a", "b", "c"])
>>>>              | beam.Map(printMsg)
>>>>          )
>>>>
>>>> output | 'write' >> WriteToText('/tmp/sample.txt')
>>>>
>>>> =======================================================================================
>>>>
>>>> Job seemed to went all the way to "FINISHED" state.
>>>>
>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>> [DataSource (Impulse) (1/1)] INFO
>>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>>>> DataSource (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) [DEPLOYING].
>>>> [DataSource (Impulse) (1/1)] INFO
>>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING to RUNNING.
>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource
>>>> (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING
>>>> to RUNNING.
>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from CREATED to SCHEDULED.
>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from SCHEDULED to
>>>> DEPLOYING.
>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying CHAIN
>>>> MapPartition (MapPartition at
>>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1) (attempt #0)
>>>> to aad5f142-dbb7-4900-a5ae-af8a6fdcc787 @ localhost (dataPort=-1)
>>>> [flink-akka.actor.default-dispatcher-2] INFO
>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task CHAIN
>>>> MapPartition (MapPartition at
>>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1).
>>>> [DataSource (Impulse) (1/1)] INFO
>>>> org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task
>>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>>>> [DataSource (Impulse) (1/1)] INFO
>>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>>>> [DataSource (Impulse) (1/1)] INFO
>>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from RUNNING to FINISHED.
>>>>
>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>
>>>> But I ended up with docker error on client side.
>>>>
>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>> (python) ywatanabe@debian-09-00:~$ python3 test-portable-runner.py
>>>> /home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/__init__.py:84:
>>>> UserWarning: Some syntactic constructs of Python 3 are not yet fully
>>>> supported by Apache Beam.
>>>>   'Some syntactic constructs of Python 3 are not yet fully supported by
>>>> '
>>>> ERROR:root:java.io.IOException: Received exit code 125 for command
>>>> 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>>>> stderr: Unable to find image '
>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest' locallydocker:
>>>> Error response from daemon: unknown: Subject ywatanabe was not found.See
>>>> 'docker run --help'.
>>>> Traceback (most recent call last):
>>>>   File "test-portable-runner.py", line 27, in <module>
>>>>     result.wait_until_finish()
>>>>   File
>>>> "/home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/runners/portability/portable_runner.py",
>>>> line 446, in wait_until_finish
>>>>     self._job_id, self._state, self._last_error_message()))
>>>> RuntimeError: Pipeline
>>>> BeamApp-ywatanabe-0912112516-22d84d63_d3b5e778-d771-4118-9ba7-c9dde85938e9
>>>> failed in state FAILED: java.io.IOException: Received exit code 125 for
>>>> command 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>>>> stderr: Unable to find image '
>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest' locallydocker:
>>>> Error response from daemon: unknown: Subject ywatanabe was not found.See
>>>> 'docker run --help'.
>>>>
>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>
>>>> As a result , I got nothing under /tmp . Code works when using
>>>> DirectRunner.
>>>> May I ask , where should I look for in order to get the pipeline to
>>>> write results to text files under /tmp ?
>>>>
>>>> Best Regards,
>>>> Yu Watanabe
>>>>
>>>> --
>>>> Yu Watanabe
>>>> Weekend Freelancer who loves to challenge building data platform
>>>> yu.w.tennis@gmail.com
>>>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
>>>> Twitter icon] <https://twitter.com/yuwtennis>
>>>>
>>>

-- 
Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
yu.w.tennis@gmail.com
[image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
Twitter icon] <https://twitter.com/yuwtennis>

Re: How do you write portable runner pipeline on separate python code ?

Posted by Kyle Weaver <kc...@google.com>.
+dev <de...@beam.apache.org> I think we should probably point new users of
the portable Flink/Spark runners to use loopback or some other non-docker
environment, as Docker adds some operational complexity that isn't really
needed to run a word count example. For example, Yu's pipeline errored here
because the expected Docker container wasn't built before running.

Kyle Weaver | Software Engineer | github.com/ibzib | kcweaver@google.com


On Thu, Sep 12, 2019 at 11:27 AM Robert Bradshaw <ro...@google.com>
wrote:

> On this note, making local files easy to read is something we'd definitely
> like to improve, as the current behavior is quite surprising. This could be
> useful not just for running with docker and the portable runner locally,
> but more generally when running on a distributed system (e.g. a Flink/Spark
> cluster or Dataflow). It would be very convenient if we could automatically
> stage local files to be read as artifacts that could be consumed by any
> worker (possibly via external directory mounting in the local docker case
> rather than an actual copy), and conversely copy small outputs back to the
> local machine (with the similar optimization for local docker).
>
> At the very least, however, obvious messaging when the local filesystem is
> used from within docker, which is often a (non-obvious and hard to debug)
> mistake should be added.
>
>
> On Thu, Sep 12, 2019 at 10:34 AM Lukasz Cwik <lc...@google.com> wrote:
>
>> When you use a local filesystem path and a docker environment, "/tmp" is
>> written inside the container. You can solve this issue by:
>> * Using a "remote" filesystem such as HDFS/S3/GCS/...
>> * Mounting an external directory into the container so that any "local"
>> writes appear outside the container
>> * Using a non-docker environment such as external or process.
>>
>> On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe <yu...@gmail.com>
>> wrote:
>>
>>> Hello.
>>>
>>> I would like to ask for help with my sample code using portable runner
>>> using apache flink.
>>> I was able to work out the wordcount.py using this page.
>>>
>>> https://beam.apache.org/roadmap/portability/
>>>
>>> I got below two files under /tmp.
>>>
>>> -rw-r--r-- 1 ywatanabe ywatanabe    185 Sep 12 19:56
>>> py-wordcount-direct-00001-of-00002
>>> -rw-r--r-- 1 ywatanabe ywatanabe    190 Sep 12 19:56
>>> py-wordcount-direct-00000-of-00002
>>>
>>> Then I wrote sample code with below steps.
>>>
>>> 1.Install apache_beam using pip3 separate from source code directory.
>>> 2. Wrote sample code as below and named it "test-protable-runner.py".
>>> Placed it separate directory from source code.
>>>
>>> -----------------------------------------------------------------------------------
>>> (python) ywatanabe@debian-09-00:~$ ls -ltr
>>> total 16
>>> drwxr-xr-x 18 ywatanabe ywatanabe 4096 Sep 12 19:06 beam (<- source code
>>> directory)
>>> -rw-r--r--  1 ywatanabe ywatanabe  634 Sep 12 20:25
>>> test-portable-runner.py
>>>
>>> -----------------------------------------------------------------------------------
>>> 3. Executed the code with "python3 test-protable-ruuner.py"
>>>
>>>
>>> ==========================================================================================
>>> #!/usr/bin/env
>>>
>>> import apache_beam as beam
>>> from apache_beam.options.pipeline_options import PipelineOptions
>>> from apache_beam.io import WriteToText
>>>
>>>
>>> def printMsg(line):
>>>
>>>     print("OUTPUT: {0}".format(line))
>>>
>>>     return line
>>>
>>> options = PipelineOptions(["--runner=PortableRunner",
>>> "--job_endpoint=localhost:8099", "--shutdown_sources_on_final_watermark"])
>>>
>>> p = beam.Pipeline(options=options)
>>>
>>> output = ( p | 'create' >> beam.Create(["a", "b", "c"])
>>>              | beam.Map(printMsg)
>>>          )
>>>
>>> output | 'write' >> WriteToText('/tmp/sample.txt')
>>>
>>> =======================================================================================
>>>
>>> Job seemed to went all the way to "FINISHED" state.
>>>
>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>> [DataSource (Impulse) (1/1)] INFO
>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>>> DataSource (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) [DEPLOYING].
>>> [DataSource (Impulse) (1/1)] INFO
>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING to RUNNING.
>>> [flink-akka.actor.default-dispatcher-3] INFO
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource
>>> (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING
>>> to RUNNING.
>>> [flink-akka.actor.default-dispatcher-3] INFO
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from CREATED to SCHEDULED.
>>> [flink-akka.actor.default-dispatcher-3] INFO
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from SCHEDULED to
>>> DEPLOYING.
>>> [flink-akka.actor.default-dispatcher-3] INFO
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying CHAIN
>>> MapPartition (MapPartition at
>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1) (attempt #0)
>>> to aad5f142-dbb7-4900-a5ae-af8a6fdcc787 @ localhost (dataPort=-1)
>>> [flink-akka.actor.default-dispatcher-2] INFO
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task CHAIN
>>> MapPartition (MapPartition at
>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1).
>>> [DataSource (Impulse) (1/1)] INFO
>>> org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task
>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>>> [DataSource (Impulse) (1/1)] INFO
>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>>> [DataSource (Impulse) (1/1)] INFO
>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from RUNNING to FINISHED.
>>>
>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>
>>> But I ended up with docker error on client side.
>>>
>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>> (python) ywatanabe@debian-09-00:~$ python3 test-portable-runner.py
>>> /home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/__init__.py:84:
>>> UserWarning: Some syntactic constructs of Python 3 are not yet fully
>>> supported by Apache Beam.
>>>   'Some syntactic constructs of Python 3 are not yet fully supported by '
>>> ERROR:root:java.io.IOException: Received exit code 125 for command
>>> 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>>> stderr: Unable to find image '
>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest' locallydocker:
>>> Error response from daemon: unknown: Subject ywatanabe was not found.See
>>> 'docker run --help'.
>>> Traceback (most recent call last):
>>>   File "test-portable-runner.py", line 27, in <module>
>>>     result.wait_until_finish()
>>>   File
>>> "/home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/runners/portability/portable_runner.py",
>>> line 446, in wait_until_finish
>>>     self._job_id, self._state, self._last_error_message()))
>>> RuntimeError: Pipeline
>>> BeamApp-ywatanabe-0912112516-22d84d63_d3b5e778-d771-4118-9ba7-c9dde85938e9
>>> failed in state FAILED: java.io.IOException: Received exit code 125 for
>>> command 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>>> stderr: Unable to find image '
>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest' locallydocker:
>>> Error response from daemon: unknown: Subject ywatanabe was not found.See
>>> 'docker run --help'.
>>>
>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>
>>> As a result , I got nothing under /tmp . Code works when using
>>> DirectRunner.
>>> May I ask , where should I look for in order to get the pipeline to
>>> write results to text files under /tmp ?
>>>
>>> Best Regards,
>>> Yu Watanabe
>>>
>>> --
>>> Yu Watanabe
>>> Weekend Freelancer who loves to challenge building data platform
>>> yu.w.tennis@gmail.com
>>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
>>> Twitter icon] <https://twitter.com/yuwtennis>
>>>
>>

Re: How do you write portable runner pipeline on separate python code ?

Posted by Kyle Weaver <kc...@google.com>.
+dev <de...@beam.apache.org> I think we should probably point new users of
the portable Flink/Spark runners to use loopback or some other non-docker
environment, as Docker adds some operational complexity that isn't really
needed to run a word count example. For example, Yu's pipeline errored here
because the expected Docker container wasn't built before running.

Kyle Weaver | Software Engineer | github.com/ibzib | kcweaver@google.com


On Thu, Sep 12, 2019 at 11:27 AM Robert Bradshaw <ro...@google.com>
wrote:

> On this note, making local files easy to read is something we'd definitely
> like to improve, as the current behavior is quite surprising. This could be
> useful not just for running with docker and the portable runner locally,
> but more generally when running on a distributed system (e.g. a Flink/Spark
> cluster or Dataflow). It would be very convenient if we could automatically
> stage local files to be read as artifacts that could be consumed by any
> worker (possibly via external directory mounting in the local docker case
> rather than an actual copy), and conversely copy small outputs back to the
> local machine (with the similar optimization for local docker).
>
> At the very least, however, obvious messaging when the local filesystem is
> used from within docker, which is often a (non-obvious and hard to debug)
> mistake should be added.
>
>
> On Thu, Sep 12, 2019 at 10:34 AM Lukasz Cwik <lc...@google.com> wrote:
>
>> When you use a local filesystem path and a docker environment, "/tmp" is
>> written inside the container. You can solve this issue by:
>> * Using a "remote" filesystem such as HDFS/S3/GCS/...
>> * Mounting an external directory into the container so that any "local"
>> writes appear outside the container
>> * Using a non-docker environment such as external or process.
>>
>> On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe <yu...@gmail.com>
>> wrote:
>>
>>> Hello.
>>>
>>> I would like to ask for help with my sample code using portable runner
>>> using apache flink.
>>> I was able to work out the wordcount.py using this page.
>>>
>>> https://beam.apache.org/roadmap/portability/
>>>
>>> I got below two files under /tmp.
>>>
>>> -rw-r--r-- 1 ywatanabe ywatanabe    185 Sep 12 19:56
>>> py-wordcount-direct-00001-of-00002
>>> -rw-r--r-- 1 ywatanabe ywatanabe    190 Sep 12 19:56
>>> py-wordcount-direct-00000-of-00002
>>>
>>> Then I wrote sample code with below steps.
>>>
>>> 1.Install apache_beam using pip3 separate from source code directory.
>>> 2. Wrote sample code as below and named it "test-protable-runner.py".
>>> Placed it separate directory from source code.
>>>
>>> -----------------------------------------------------------------------------------
>>> (python) ywatanabe@debian-09-00:~$ ls -ltr
>>> total 16
>>> drwxr-xr-x 18 ywatanabe ywatanabe 4096 Sep 12 19:06 beam (<- source code
>>> directory)
>>> -rw-r--r--  1 ywatanabe ywatanabe  634 Sep 12 20:25
>>> test-portable-runner.py
>>>
>>> -----------------------------------------------------------------------------------
>>> 3. Executed the code with "python3 test-protable-ruuner.py"
>>>
>>>
>>> ==========================================================================================
>>> #!/usr/bin/env
>>>
>>> import apache_beam as beam
>>> from apache_beam.options.pipeline_options import PipelineOptions
>>> from apache_beam.io import WriteToText
>>>
>>>
>>> def printMsg(line):
>>>
>>>     print("OUTPUT: {0}".format(line))
>>>
>>>     return line
>>>
>>> options = PipelineOptions(["--runner=PortableRunner",
>>> "--job_endpoint=localhost:8099", "--shutdown_sources_on_final_watermark"])
>>>
>>> p = beam.Pipeline(options=options)
>>>
>>> output = ( p | 'create' >> beam.Create(["a", "b", "c"])
>>>              | beam.Map(printMsg)
>>>          )
>>>
>>> output | 'write' >> WriteToText('/tmp/sample.txt')
>>>
>>> =======================================================================================
>>>
>>> Job seemed to went all the way to "FINISHED" state.
>>>
>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>> [DataSource (Impulse) (1/1)] INFO
>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>>> DataSource (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) [DEPLOYING].
>>> [DataSource (Impulse) (1/1)] INFO
>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING to RUNNING.
>>> [flink-akka.actor.default-dispatcher-3] INFO
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource
>>> (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING
>>> to RUNNING.
>>> [flink-akka.actor.default-dispatcher-3] INFO
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from CREATED to SCHEDULED.
>>> [flink-akka.actor.default-dispatcher-3] INFO
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from SCHEDULED to
>>> DEPLOYING.
>>> [flink-akka.actor.default-dispatcher-3] INFO
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying CHAIN
>>> MapPartition (MapPartition at
>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1) (attempt #0)
>>> to aad5f142-dbb7-4900-a5ae-af8a6fdcc787 @ localhost (dataPort=-1)
>>> [flink-akka.actor.default-dispatcher-2] INFO
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task CHAIN
>>> MapPartition (MapPartition at
>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1).
>>> [DataSource (Impulse) (1/1)] INFO
>>> org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task
>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>>> [DataSource (Impulse) (1/1)] INFO
>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>>> [DataSource (Impulse) (1/1)] INFO
>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from RUNNING to FINISHED.
>>>
>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>
>>> But I ended up with docker error on client side.
>>>
>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>> (python) ywatanabe@debian-09-00:~$ python3 test-portable-runner.py
>>> /home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/__init__.py:84:
>>> UserWarning: Some syntactic constructs of Python 3 are not yet fully
>>> supported by Apache Beam.
>>>   'Some syntactic constructs of Python 3 are not yet fully supported by '
>>> ERROR:root:java.io.IOException: Received exit code 125 for command
>>> 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>>> stderr: Unable to find image '
>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest' locallydocker:
>>> Error response from daemon: unknown: Subject ywatanabe was not found.See
>>> 'docker run --help'.
>>> Traceback (most recent call last):
>>>   File "test-portable-runner.py", line 27, in <module>
>>>     result.wait_until_finish()
>>>   File
>>> "/home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/runners/portability/portable_runner.py",
>>> line 446, in wait_until_finish
>>>     self._job_id, self._state, self._last_error_message()))
>>> RuntimeError: Pipeline
>>> BeamApp-ywatanabe-0912112516-22d84d63_d3b5e778-d771-4118-9ba7-c9dde85938e9
>>> failed in state FAILED: java.io.IOException: Received exit code 125 for
>>> command 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>>> stderr: Unable to find image '
>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest' locallydocker:
>>> Error response from daemon: unknown: Subject ywatanabe was not found.See
>>> 'docker run --help'.
>>>
>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>
>>> As a result , I got nothing under /tmp . Code works when using
>>> DirectRunner.
>>> May I ask , where should I look for in order to get the pipeline to
>>> write results to text files under /tmp ?
>>>
>>> Best Regards,
>>> Yu Watanabe
>>>
>>> --
>>> Yu Watanabe
>>> Weekend Freelancer who loves to challenge building data platform
>>> yu.w.tennis@gmail.com
>>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
>>> Twitter icon] <https://twitter.com/yuwtennis>
>>>
>>

Re: How do you write portable runner pipeline on separate python code ?

Posted by Robert Bradshaw <ro...@google.com>.
On this note, making local files easy to read is something we'd definitely
like to improve, as the current behavior is quite surprising. This could be
useful not just for running with docker and the portable runner locally,
but more generally when running on a distributed system (e.g. a Flink/Spark
cluster or Dataflow). It would be very convenient if we could automatically
stage local files to be read as artifacts that could be consumed by any
worker (possibly via external directory mounting in the local docker case
rather than an actual copy), and conversely copy small outputs back to the
local machine (with the similar optimization for local docker).

At the very least, however, obvious messaging when the local filesystem is
used from within docker, which is often a (non-obvious and hard to debug)
mistake should be added.


On Thu, Sep 12, 2019 at 10:34 AM Lukasz Cwik <lc...@google.com> wrote:

> When you use a local filesystem path and a docker environment, "/tmp" is
> written inside the container. You can solve this issue by:
> * Using a "remote" filesystem such as HDFS/S3/GCS/...
> * Mounting an external directory into the container so that any "local"
> writes appear outside the container
> * Using a non-docker environment such as external or process.
>
> On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe <yu...@gmail.com> wrote:
>
>> Hello.
>>
>> I would like to ask for help with my sample code using portable runner
>> using apache flink.
>> I was able to work out the wordcount.py using this page.
>>
>> https://beam.apache.org/roadmap/portability/
>>
>> I got below two files under /tmp.
>>
>> -rw-r--r-- 1 ywatanabe ywatanabe    185 Sep 12 19:56
>> py-wordcount-direct-00001-of-00002
>> -rw-r--r-- 1 ywatanabe ywatanabe    190 Sep 12 19:56
>> py-wordcount-direct-00000-of-00002
>>
>> Then I wrote sample code with below steps.
>>
>> 1.Install apache_beam using pip3 separate from source code directory.
>> 2. Wrote sample code as below and named it "test-protable-runner.py".
>> Placed it separate directory from source code.
>>
>> -----------------------------------------------------------------------------------
>> (python) ywatanabe@debian-09-00:~$ ls -ltr
>> total 16
>> drwxr-xr-x 18 ywatanabe ywatanabe 4096 Sep 12 19:06 beam (<- source code
>> directory)
>> -rw-r--r--  1 ywatanabe ywatanabe  634 Sep 12 20:25
>> test-portable-runner.py
>>
>> -----------------------------------------------------------------------------------
>> 3. Executed the code with "python3 test-protable-ruuner.py"
>>
>>
>> ==========================================================================================
>> #!/usr/bin/env
>>
>> import apache_beam as beam
>> from apache_beam.options.pipeline_options import PipelineOptions
>> from apache_beam.io import WriteToText
>>
>>
>> def printMsg(line):
>>
>>     print("OUTPUT: {0}".format(line))
>>
>>     return line
>>
>> options = PipelineOptions(["--runner=PortableRunner",
>> "--job_endpoint=localhost:8099", "--shutdown_sources_on_final_watermark"])
>>
>> p = beam.Pipeline(options=options)
>>
>> output = ( p | 'create' >> beam.Create(["a", "b", "c"])
>>              | beam.Map(printMsg)
>>          )
>>
>> output | 'write' >> WriteToText('/tmp/sample.txt')
>>
>> =======================================================================================
>>
>> Job seemed to went all the way to "FINISHED" state.
>>
>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>> [DataSource (Impulse) (1/1)] INFO
>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>> DataSource (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) [DEPLOYING].
>> [DataSource (Impulse) (1/1)] INFO
>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>> (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING to RUNNING.
>> [flink-akka.actor.default-dispatcher-3] INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource
>> (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING
>> to RUNNING.
>> [flink-akka.actor.default-dispatcher-3] INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from CREATED to SCHEDULED.
>> [flink-akka.actor.default-dispatcher-3] INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from SCHEDULED to
>> DEPLOYING.
>> [flink-akka.actor.default-dispatcher-3] INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying CHAIN
>> MapPartition (MapPartition at
>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1) (attempt #0)
>> to aad5f142-dbb7-4900-a5ae-af8a6fdcc787 @ localhost (dataPort=-1)
>> [flink-akka.actor.default-dispatcher-2] INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task CHAIN
>> MapPartition (MapPartition at
>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1).
>> [DataSource (Impulse) (1/1)] INFO
>> org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task
>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>> [DataSource (Impulse) (1/1)] INFO
>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>> [DataSource (Impulse) (1/1)] INFO
>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>> (9df528f4d493d8c3b62c8be23dc889a8) switched from RUNNING to FINISHED.
>>
>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>
>> But I ended up with docker error on client side.
>>
>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>> (python) ywatanabe@debian-09-00:~$ python3 test-portable-runner.py
>> /home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/__init__.py:84:
>> UserWarning: Some syntactic constructs of Python 3 are not yet fully
>> supported by Apache Beam.
>>   'Some syntactic constructs of Python 3 are not yet fully supported by '
>> ERROR:root:java.io.IOException: Received exit code 125 for command
>> 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>> stderr: Unable to find image '
>> ywatanabe-docker-apache.bintray.io/beam/python3:latest' locallydocker:
>> Error response from daemon: unknown: Subject ywatanabe was not found.See
>> 'docker run --help'.
>> Traceback (most recent call last):
>>   File "test-portable-runner.py", line 27, in <module>
>>     result.wait_until_finish()
>>   File
>> "/home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/runners/portability/portable_runner.py",
>> line 446, in wait_until_finish
>>     self._job_id, self._state, self._last_error_message()))
>> RuntimeError: Pipeline
>> BeamApp-ywatanabe-0912112516-22d84d63_d3b5e778-d771-4118-9ba7-c9dde85938e9
>> failed in state FAILED: java.io.IOException: Received exit code 125 for
>> command 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>> stderr: Unable to find image '
>> ywatanabe-docker-apache.bintray.io/beam/python3:latest' locallydocker:
>> Error response from daemon: unknown: Subject ywatanabe was not found.See
>> 'docker run --help'.
>>
>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>
>> As a result , I got nothing under /tmp . Code works when using
>> DirectRunner.
>> May I ask , where should I look for in order to get the pipeline to write
>> results to text files under /tmp ?
>>
>> Best Regards,
>> Yu Watanabe
>>
>> --
>> Yu Watanabe
>> Weekend Freelancer who loves to challenge building data platform
>> yu.w.tennis@gmail.com
>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
>> Twitter icon] <https://twitter.com/yuwtennis>
>>
>

Re: How do you write portable runner pipeline on separate python code ?

Posted by Yu Watanabe <yu...@gmail.com>.
Lukasz

Thank you for the reply.

> * Using a "remote" filesystem such as HDFS/S3/GCS/...
> * Mounting an external directory into the container so that any "local"
writes appear outside the container
> * Using a non-docker environment such as external or process.

  Understood.

Thanks,
Yu Watanabe

On Fri, Sep 13, 2019 at 2:34 AM Lukasz Cwik <lc...@google.com> wrote:

> When you use a local filesystem path and a docker environment, "/tmp" is
> written inside the container. You can solve this issue by:
> * Using a "remote" filesystem such as HDFS/S3/GCS/...
> * Mounting an external directory into the container so that any "local"
> writes appear outside the container
> * Using a non-docker environment such as external or process.
>
> On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe <yu...@gmail.com> wrote:
>
>> Hello.
>>
>> I would like to ask for help with my sample code using portable runner
>> using apache flink.
>> I was able to work out the wordcount.py using this page.
>>
>> https://beam.apache.org/roadmap/portability/
>>
>> I got below two files under /tmp.
>>
>> -rw-r--r-- 1 ywatanabe ywatanabe    185 Sep 12 19:56
>> py-wordcount-direct-00001-of-00002
>> -rw-r--r-- 1 ywatanabe ywatanabe    190 Sep 12 19:56
>> py-wordcount-direct-00000-of-00002
>>
>> Then I wrote sample code with below steps.
>>
>> 1.Install apache_beam using pip3 separate from source code directory.
>> 2. Wrote sample code as below and named it "test-protable-runner.py".
>> Placed it separate directory from source code.
>>
>> -----------------------------------------------------------------------------------
>> (python) ywatanabe@debian-09-00:~$ ls -ltr
>> total 16
>> drwxr-xr-x 18 ywatanabe ywatanabe 4096 Sep 12 19:06 beam (<- source code
>> directory)
>> -rw-r--r--  1 ywatanabe ywatanabe  634 Sep 12 20:25
>> test-portable-runner.py
>>
>> -----------------------------------------------------------------------------------
>> 3. Executed the code with "python3 test-protable-ruuner.py"
>>
>>
>> ==========================================================================================
>> #!/usr/bin/env
>>
>> import apache_beam as beam
>> from apache_beam.options.pipeline_options import PipelineOptions
>> from apache_beam.io import WriteToText
>>
>>
>> def printMsg(line):
>>
>>     print("OUTPUT: {0}".format(line))
>>
>>     return line
>>
>> options = PipelineOptions(["--runner=PortableRunner",
>> "--job_endpoint=localhost:8099", "--shutdown_sources_on_final_watermark"])
>>
>> p = beam.Pipeline(options=options)
>>
>> output = ( p | 'create' >> beam.Create(["a", "b", "c"])
>>              | beam.Map(printMsg)
>>          )
>>
>> output | 'write' >> WriteToText('/tmp/sample.txt')
>>
>> =======================================================================================
>>
>> Job seemed to went all the way to "FINISHED" state.
>>
>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>> [DataSource (Impulse) (1/1)] INFO
>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>> DataSource (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) [DEPLOYING].
>> [DataSource (Impulse) (1/1)] INFO
>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>> (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING to RUNNING.
>> [flink-akka.actor.default-dispatcher-3] INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource
>> (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING
>> to RUNNING.
>> [flink-akka.actor.default-dispatcher-3] INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from CREATED to SCHEDULED.
>> [flink-akka.actor.default-dispatcher-3] INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from SCHEDULED to
>> DEPLOYING.
>> [flink-akka.actor.default-dispatcher-3] INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying CHAIN
>> MapPartition (MapPartition at
>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1) (attempt #0)
>> to aad5f142-dbb7-4900-a5ae-af8a6fdcc787 @ localhost (dataPort=-1)
>> [flink-akka.actor.default-dispatcher-2] INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task CHAIN
>> MapPartition (MapPartition at
>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1).
>> [DataSource (Impulse) (1/1)] INFO
>> org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task
>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>> [DataSource (Impulse) (1/1)] INFO
>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>> [DataSource (Impulse) (1/1)] INFO
>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>> (9df528f4d493d8c3b62c8be23dc889a8) switched from RUNNING to FINISHED.
>>
>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>
>> But I ended up with docker error on client side.
>>
>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>> (python) ywatanabe@debian-09-00:~$ python3 test-portable-runner.py
>> /home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/__init__.py:84:
>> UserWarning: Some syntactic constructs of Python 3 are not yet fully
>> supported by Apache Beam.
>>   'Some syntactic constructs of Python 3 are not yet fully supported by '
>> ERROR:root:java.io.IOException: Received exit code 125 for command
>> 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>> stderr: Unable to find image '
>> ywatanabe-docker-apache.bintray.io/beam/python3:latest' locallydocker:
>> Error response from daemon: unknown: Subject ywatanabe was not found.See
>> 'docker run --help'.
>> Traceback (most recent call last):
>>   File "test-portable-runner.py", line 27, in <module>
>>     result.wait_until_finish()
>>   File
>> "/home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/runners/portability/portable_runner.py",
>> line 446, in wait_until_finish
>>     self._job_id, self._state, self._last_error_message()))
>> RuntimeError: Pipeline
>> BeamApp-ywatanabe-0912112516-22d84d63_d3b5e778-d771-4118-9ba7-c9dde85938e9
>> failed in state FAILED: java.io.IOException: Received exit code 125 for
>> command 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>> stderr: Unable to find image '
>> ywatanabe-docker-apache.bintray.io/beam/python3:latest' locallydocker:
>> Error response from daemon: unknown: Subject ywatanabe was not found.See
>> 'docker run --help'.
>>
>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>
>> As a result , I got nothing under /tmp . Code works when using
>> DirectRunner.
>> May I ask , where should I look for in order to get the pipeline to write
>> results to text files under /tmp ?
>>
>> Best Regards,
>> Yu Watanabe
>>
>> --
>> Yu Watanabe
>> Weekend Freelancer who loves to challenge building data platform
>> yu.w.tennis@gmail.com
>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
>> Twitter icon] <https://twitter.com/yuwtennis>
>>
>

-- 
Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
yu.w.tennis@gmail.com
[image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
Twitter icon] <https://twitter.com/yuwtennis>

Re: How do you write portable runner pipeline on separate python code ?

Posted by Lukasz Cwik <lc...@google.com>.
When you use a local filesystem path and a docker environment, "/tmp" is
written inside the container. You can solve this issue by:
* Using a "remote" filesystem such as HDFS/S3/GCS/...
* Mounting an external directory into the container so that any "local"
writes appear outside the container
* Using a non-docker environment such as external or process.

On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe <yu...@gmail.com> wrote:

> Hello.
>
> I would like to ask for help with my sample code using portable runner
> using apache flink.
> I was able to work out the wordcount.py using this page.
>
> https://beam.apache.org/roadmap/portability/
>
> I got below two files under /tmp.
>
> -rw-r--r-- 1 ywatanabe ywatanabe    185 Sep 12 19:56
> py-wordcount-direct-00001-of-00002
> -rw-r--r-- 1 ywatanabe ywatanabe    190 Sep 12 19:56
> py-wordcount-direct-00000-of-00002
>
> Then I wrote sample code with below steps.
>
> 1.Install apache_beam using pip3 separate from source code directory.
> 2. Wrote sample code as below and named it "test-protable-runner.py".
> Placed it separate directory from source code.
>
> -----------------------------------------------------------------------------------
> (python) ywatanabe@debian-09-00:~$ ls -ltr
> total 16
> drwxr-xr-x 18 ywatanabe ywatanabe 4096 Sep 12 19:06 beam (<- source code
> directory)
> -rw-r--r--  1 ywatanabe ywatanabe  634 Sep 12 20:25 test-portable-runner.py
>
> -----------------------------------------------------------------------------------
> 3. Executed the code with "python3 test-protable-ruuner.py"
>
>
> ==========================================================================================
> #!/usr/bin/env
>
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.io import WriteToText
>
>
> def printMsg(line):
>
>     print("OUTPUT: {0}".format(line))
>
>     return line
>
> options = PipelineOptions(["--runner=PortableRunner",
> "--job_endpoint=localhost:8099", "--shutdown_sources_on_final_watermark"])
>
> p = beam.Pipeline(options=options)
>
> output = ( p | 'create' >> beam.Create(["a", "b", "c"])
>              | beam.Map(printMsg)
>          )
>
> output | 'write' >> WriteToText('/tmp/sample.txt')
>
> =======================================================================================
>
> Job seemed to went all the way to "FINISHED" state.
>
> -------------------------------------------------------------------------------------------------------------------------------------------------------
> [DataSource (Impulse) (1/1)] INFO
> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
> DataSource (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) [DEPLOYING].
> [DataSource (Impulse) (1/1)] INFO
> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
> (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING to RUNNING.
> [flink-akka.actor.default-dispatcher-3] INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource
> (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING
> to RUNNING.
> [flink-akka.actor.default-dispatcher-3] INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from CREATED to SCHEDULED.
> [flink-akka.actor.default-dispatcher-3] INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from SCHEDULED to
> DEPLOYING.
> [flink-akka.actor.default-dispatcher-3] INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying CHAIN
> MapPartition (MapPartition at
> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1) (attempt #0)
> to aad5f142-dbb7-4900-a5ae-af8a6fdcc787 @ localhost (dataPort=-1)
> [flink-akka.actor.default-dispatcher-2] INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task CHAIN
> MapPartition (MapPartition at
> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1).
> [DataSource (Impulse) (1/1)] INFO
> org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task
> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
> [DataSource (Impulse) (1/1)] INFO
> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
> [DataSource (Impulse) (1/1)] INFO
> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
> (9df528f4d493d8c3b62c8be23dc889a8) switched from RUNNING to FINISHED.
>
> -------------------------------------------------------------------------------------------------------------------------------------------------------
>
> But I ended up with docker error on client side.
>
> -------------------------------------------------------------------------------------------------------------------------------------------------------
> (python) ywatanabe@debian-09-00:~$ python3 test-portable-runner.py
> /home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/__init__.py:84:
> UserWarning: Some syntactic constructs of Python 3 are not yet fully
> supported by Apache Beam.
>   'Some syntactic constructs of Python 3 are not yet fully supported by '
> ERROR:root:java.io.IOException: Received exit code 125 for command 'docker
> run -d --network=host --env=DOCKER_MAC_CONTAINER=null
> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
> stderr: Unable to find image '
> ywatanabe-docker-apache.bintray.io/beam/python3:latest' locallydocker:
> Error response from daemon: unknown: Subject ywatanabe was not found.See
> 'docker run --help'.
> Traceback (most recent call last):
>   File "test-portable-runner.py", line 27, in <module>
>     result.wait_until_finish()
>   File
> "/home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/runners/portability/portable_runner.py",
> line 446, in wait_until_finish
>     self._job_id, self._state, self._last_error_message()))
> RuntimeError: Pipeline
> BeamApp-ywatanabe-0912112516-22d84d63_d3b5e778-d771-4118-9ba7-c9dde85938e9
> failed in state FAILED: java.io.IOException: Received exit code 125 for
> command 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
> stderr: Unable to find image '
> ywatanabe-docker-apache.bintray.io/beam/python3:latest' locallydocker:
> Error response from daemon: unknown: Subject ywatanabe was not found.See
> 'docker run --help'.
>
> -------------------------------------------------------------------------------------------------------------------------------------------------------
>
> As a result , I got nothing under /tmp . Code works when using
> DirectRunner.
> May I ask , where should I look for in order to get the pipeline to write
> results to text files under /tmp ?
>
> Best Regards,
> Yu Watanabe
>
> --
> Yu Watanabe
> Weekend Freelancer who loves to challenge building data platform
> yu.w.tennis@gmail.com
> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
> Twitter icon] <https://twitter.com/yuwtennis>
>