You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Ruoyun Huang <ru...@google.com> on 2018/11/05 23:40:34 UTC

How to use "PortableRunner" in Python SDK?

Hi, Folks,

     I want to try out Python PortableRunner, by using following command:

*sdk/python: python -m apache_beam.examples.wordcount
 --output=/tmp/test_output   --runner PortableRunner*

     It complains with following error message:

Caused by: java.lang.Exception: The user defined 'open()' method caused an
exception: java.io.IOException: Cannot run program "docker": error=13,
Permission denied
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
... 1 more
Caused by:
org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.util.concurrent.UncheckedExecutionException:
java.io.IOException: Cannot run program "docker": error=13, Permission
denied
at
org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4994)

... 7 more



My py2 environment is properly configured, because DirectRunner works.
Also I tested my docker installation by 'docker run hello-world ', no
issue.


Thanks.
-- 
================
Ruoyun  Huang

Re: How to use "PortableRunner" in Python SDK?

Posted by Heejong Lee <he...@google.com>.
You can also try without --streaming option. There's a separate streaming
wordcount example in the same directory.

If you want to look into the output files, it would be easier to use
external target like gs:// instead of local file.

python -m apache_beam.examples.wordcount --input=/etc/profile
--output=gs://tmp_location/py-wordcount --runner=PortableRunner
--job_endpoint=localhost:8099 --parallelism=1

On Tue, Jan 22, 2019 at 11:44 AM junwan01@gmail.com <ju...@gmail.com>
wrote:

> Hello,
>
> I tried to follow the instructions at
> https://beam.apache.org/roadmap/portability/#python-on-flink,
>
> 1. I installed Flink local cluster, and followed their
> SocketWindowWordCount example and confirmed  the cluster works properly.
>
> 2. Start Flink job server:
> ./gradlew :beam-runners-flink_2.11-job-server:runShadow
> -PflinkMasterUrl=localhost:8081
>
> 3. Subject the job as suggested by an earlier thread:
> python -m apache_beam.examples.wordcount --input=/etc/profile
> --output=/tmp/py-wordcount-direct --runner=PortableRunner
> --job_endpoint=localhost:8099 --parallelism=1
> --OPTIONALflink_master=localhost:8081 --streaming
>
> But got the following NullPointerException error (sorry for the long text
> below), any ideas? Thanks
>
> Jun Wan
>
> ---- log starts ----
> [grpc-default-executor-2] INFO
> org.apache.beam.runners.flink.FlinkJobInvoker - Invoking job
> BeamApp-jwan-0121192804-387b3baa_1d32eea3-d71a-45a9-afa8-edbc66bc1d6b
> [grpc-default-executor-2] INFO
> org.apache.beam.runners.flink.FlinkJobInvocation - Starting job invocation
> BeamApp-jwan-0121192804-387b3baa_1d32eea3-d71a-45a9-afa8-edbc66bc1d6b
> [flink-runner-job-server] INFO
> org.apache.beam.runners.flink.FlinkJobInvocation - Translating pipeline to
> Flink program.
> [flink-runner-job-server] INFO
> org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a
> Streaming Environment.
> [flink-runner-job-server] INFO
> org.apache.beam.runners.flink.FlinkExecutionEnvironments - Using Flink
> Master URL localhost:8081.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - class
> org.apache.beam.sdk.transforms.join.RawUnionValue does not contain a setter
> for field unionTag
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - Class class
> org.apache.beam.sdk.transforms.join.RawUnionValue cannot be used as a POJO
> type because not all fields are valid POJO fields, and must be processed as
> GenericType. Please read the Flink documentation on "Data Types &
> Serialization" for details of the effect on performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - class
> org.apache.beam.sdk.transforms.join.RawUnionValue does not contain a setter
> for field unionTag
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - Class class
> org.apache.beam.sdk.transforms.join.RawUnionValue cannot be used as a POJO
> type because not all fields are valid POJO fields, and must be processed as
> GenericType. Please read the Flink documentation on "Data Types &
> Serialization" for details of the effect on performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - class
> org.apache.beam.sdk.transforms.join.RawUnionValue does not contain a setter
> for field unionTag
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - Class class
> org.apache.beam.sdk.transforms.join.RawUnionValue cannot be used as a POJO
> type because not all fields are valid POJO fields, and must be processed as
> GenericType. Please read the Flink documentation on "Data Types &
> Serialization" for details of the effect on performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - class
> org.apache.beam.sdk.transforms.join.RawUnionValue does not contain a setter
> for field unionTag
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - Class class
> org.apache.beam.sdk.transforms.join.RawUnionValue cannot be used as a POJO
> type because not all fields are valid POJO fields, and must be processed as
> GenericType. Please read the Flink documentation on "Data Types &
> Serialization" for details of the effect on performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - class
> org.apache.beam.sdk.transforms.join.RawUnionValue does not contain a setter
> for field unionTag
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - Class class
> org.apache.beam.sdk.transforms.join.RawUnionValue cannot be used as a POJO
> type because not all fields are valid POJO fields, and must be processed as
> GenericType. Please read the Flink documentation on "Data Types &
> Serialization" for details of the effect on performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - class
> org.apache.beam.sdk.transforms.join.RawUnionValue does not contain a setter
> for field unionTag
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - Class class
> org.apache.beam.sdk.transforms.join.RawUnionValue cannot be used as a POJO
> type because not all fields are valid POJO fields, and must be processed as
> GenericType. Please read the Flink documentation on "Data Types &
> Serialization" for details of the effect on performance.
> [flink-runner-job-server] INFO
> org.apache.beam.runners.flink.FlinkExecutionEnvironments - Running remotely
> at localhost:8081
> [flink-runner-job-server] WARN
> org.apache.flink.configuration.Configuration - Config uses deprecated
> configuration key 'jobmanager.rpc.address' instead of proper key
> 'rest.address'
> [flink-runner-job-server] INFO org.apache.flink.runtime.rest.RestClient -
> Rest client endpoint started.
> [flink-runner-job-server] INFO
> org.apache.flink.client.program.rest.RestClusterClient - Submitting job
> 9a91889c469db1d88ec8f6a6d04a67b7 (detached: false).
> [flink-runner-job-server] INFO org.apache.flink.runtime.rest.RestClient -
> Shutting down rest endpoint.
> [flink-runner-job-server] INFO org.apache.flink.runtime.rest.RestClient -
> Rest endpoint shutdown complete.
> [flink-runner-job-server] ERROR
> org.apache.beam.runners.flink.FlinkJobInvocation - Error during job
> invocation
> BeamApp-jwan-0121192804-387b3baa_1d32eea3-d71a-45a9-afa8-edbc66bc1d6b.
> org.apache.flink.client.program.ProgramInvocationException: Could not
> retrieve the execution result.
>         at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:258)
>         at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
>         at
> org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.executeRemotely(FlinkExecutionEnvironments.java:355)
>         at
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:179)
>         at
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator$StreamingTranslationContext.execute(FlinkStreamingPortablePipelineTranslator.java:158)
>         at
> org.apache.beam.runners.flink.FlinkJobInvocation.runPipelineWithTranslator(FlinkJobInvocation.java:142)
>         at
> org.apache.beam.runners.flink.FlinkJobInvocation.runPipeline(FlinkJobInvocation.java:112)
>         at org.apache.beam.repackaged.beam_runners_flink_2.11.com
> .google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:111)
>         at org.apache.beam.repackaged.beam_runners_flink_2.11.com
> .google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:58)
>         at org.apache.beam.repackaged.beam_runners_flink_2.11.com
> .google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:75)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
> to submit JobGraph.
>         at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:371)
>         at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>         at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>         at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>         at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>         at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:203)
>         at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>         at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>         at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>         at
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
>         at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
>         at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>         ... 3 more
> Caused by: org.apache.flink.runtime.rest.util.RestClientException:
> [Internal server error., <Exception on server side:
> java.util.concurrent.CompletionException: java.lang.NullPointerException
>         at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
>         at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
>         at
> java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1107)
>         at
> java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1070)
>         at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>         at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
>         at
> org.apache.flink.runtime.jobgraph.JobGraph.writeUserArtifactEntriesToConfiguration(JobGraph.java:586)
>         at
> org.apache.flink.runtime.client.ClientUtils.setUserArtifactBlobKeys(ClientUtils.java:140)
>         at
> org.apache.flink.runtime.client.ClientUtils.uploadAndSetUserArtifacts(ClientUtils.java:121)
>         at
> org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:78)
>         at
> org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:168)
>         at
> java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1105)
>         ... 6 more
>
> End of exception on server side>]
>         at
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:349)
>         at
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:333)
>         at
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
>         at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>         ... 4 more
>
> ---- log ends ----
>
> On 2018/11/15 11:01:28, Maximilian Michels <mx...@apache.org> wrote:
> > Hi Ruoyun,
> >
> > The output file will be within the container which is deleted after
> > shutdown by default. You can keep the containers if you add the flag
> >
> >    --retain_docker_containers
> >
> > Note, this is from ManualDockerEnvironmentOptions.
> >
> > The problem with batch is that it executes staged and will create
> > multiple containers [1] which don't share the same local file system. So
> > the wordcount only works reliably if you use a distributed file system.
> >
> > Cheers,
> > Max
> >
> > [1] You can prevent multiple containers by using
> >      --environment_cache_millis=10000
> >
> > On 14.11.18 20:44, Ruoyun Huang wrote:
> > > Thanks Thomas!
> > >
> > > My desktop runs Linux.  I was using gradle to run wordcount, and that
> > > was how I got the job hanging. Since both of you get it working, I
> guess
> > > more likely sth is wrong with my setup.
> > >
> > >
> > > By using Thmoas's python command line exactly as is, I am able to see
> > > the job run succeeds, however two questions:
> > >
> > > 1)  Did you check whether output file "/tmp/py-wordcount-direct"
> exists
> > > or not?  I expect there should be a text output, but I don't see this
> > > file afterwards.   (I am still in the stage building confidence in
> > > telling what a succeeded run is.  Maybe I will try DataflowRunner and
> > > cross check outputs).
> > >
> > > 2)  Why it needs a "--streaming" arg?  Isn't this a static batch
> input,
> > > by feeding a txt file input?  In fact, I got failure message if I
> remove
> > > '--streaming', not sure if it is due to my setup again.
> > >
> > >
> > > On Wed, Nov 14, 2018 at 7:51 AM Thomas Weise <thw@apache.org
> > > <ma...@apache.org>> wrote:
> > >
> > >     Works for me on macOS as well.
> > >
> > >     In case you don't launch the pipeline through Gradle, this would be
> > >     the command:
> > >
> > >     python -m apache_beam.examples.wordcount \
> > >        --input=/etc/profile \
> > >        --output=/tmp/py-wordcount-direct \
> > >        --runner=PortableRunner \
> > >        --job_endpoint=localhost:8099 \
> > >        --parallelism=1 \
> > >        --OPTIONALflink_master=localhost:8081 \
> > >        --streaming
> > >
> > >     We talked about adding the wordcount to pre-commit..
> > >
> > >     Regarding using ULR vs. Flink runner: There seems to be confusion
> > >     between PortableRunner using the user supplied endpoint vs. trying
> > >     to launch a job server. I commented in the doc.
> > >
> > >     Thomas
> > >
> > >
> > >
> > >     On Wed, Nov 14, 2018 at 3:30 AM Maximilian Michels <mxm@apache.org
> > >     <ma...@apache.org>> wrote:
> > >
> > >         Hi Ruoyun,
> > >
> > >         I just ran the wordcount locally using the instructions on the
> > >         page.
> > >         I've tried the local file system and GCS. Both times it ran
> > >         successfully
> > >         and produced valid output.
> > >
> > >         I'm assuming there is some problem with your setup. Which
> > >         platform are
> > >         you using? I'm on MacOS.
> > >
> > >         Could you expand on the planned merge? From my understanding we
> > >         will
> > >         always need PortableRunner in Python to be able to submit
> > >         against the
> > >         Beam JobServer.
> > >
> > >         Thanks,
> > >         Max
> > >
> > >         On 14.11.18 00:39, Ruoyun Huang wrote:
> > >          > A quick follow-up on using current PortableRunner.
> > >          >
> > >          > I followed the exact three steps as Ankur and Maximilian
> > >         shared in
> > >          >
> https://beam.apache.org/roadmap/portability/#python-on-flink
> > >         ;   The
> > >          > wordcount example keeps hanging after 10 minutes.  I also
> tried
> > >          > specifying explicit input/output args, either using gcs
> > >         folder or local
> > >          > file system, but none of them works.
> > >          >
> > >          > Spent some time looking into it but conclusion yet.  At this
> > >         point
> > >          > though, I guess it does not matter much any more, given we
> > >         already have
> > >          > the plan of merging PortableRunner into using java reference
> > >         runner
> > >          > (i.e. :beam-runners-reference-job-server).
> > >          >
> > >          > Still appreciated if someone can try out the python-on-flink
> > >          >
> > >         <https://beam.apache.org/roadmap/portability/#python-on-flink
> >instructions
> > >
> > >          > in case it is just due to my local machine setup.  Thanks!
> > >          >
> > >          >
> > >          >
> > >          > On Thu, Nov 8, 2018 at 5:04 PM Ruoyun Huang
> > >         <ruoyun@google.com <ma...@google.com>
> > >          > <mailto:ruoyun@google.com <ma...@google.com>>>
> wrote:
> > >          >
> > >          >     Thanks Maximilian!
> > >          >
> > >          >     I am working on migrating existing PortableRunner to
> > >         using java ULR
> > >          >     (Link to Notes
> > >          >
> > >           <
> https://docs.google.com/document/d/1S86saZqiDaE_M5wxO0zOQ_rwC6QHv7sp1BmGTm0dLNE/edit#
> >).
> > >          >     If this issue is non-trivial to solve, I would vote for
> > >         removing
> > >          >     this default behavior as part of the consolidation.
> > >          >
> > >          >     On Thu, Nov 8, 2018 at 2:58 AM Maximilian Michels
> > >         <mxm@apache.org <ma...@apache.org>
> > >          >     <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
> > >          >
> > >          >         In the long run, we should get rid of the
> > >         Docker-inside-Docker
> > >          >         approach,
> > >          >         which was only intended for testing anyways. It
> would
> > >         be cleaner to
> > >          >         start the SDK harness container alongside with
> > >         JobServer container.
> > >          >
> > >          >         Short term, I think it should be easy to either fix
> the
> > >          >         permissions of
> > >          >         the mounted "docker" executable or use a Docker
> image
> > >         for the
> > >          >         JobServer
> > >          >         which comes with Docker pre-installed.
> > >          >
> > >          >         JIRA:
> https://issues.apache.org/jira/browse/BEAM-6020
> > >          >
> > >          >         Thanks for reporting this Ruoyun!
> > >          >
> > >          >         -Max
> > >          >
> > >          >         On 08.11.18 00:10, Ruoyun Huang wrote:
> > >          >          > Thanks Ankur and Maximilian.
> > >          >          >
> > >          >          > Just for reference in case other people
> > >         encountering the same
> > >          >         error
> > >          >          > message, the "permission denied" error in my
> > >         original email
> > >          >         is exactly
> > >          >          > due to dockerinsidedocker issue that Ankur
> mentioned.
> > >          >         Thanks Ankur!
> > >          >          > Didn't make the link when you said it, had to
> > >         discover that
> > >          >         in a hard
> > >          >          > way (I thought it is due to my docker
> installation
> > >         messed up).
> > >          >          >
> > >          >          > On Tue, Nov 6, 2018 at 1:53 AM Maximilian Michels
> > >          >         <mxm@apache.org <ma...@apache.org>
> > >         <mailto:mxm@apache.org <ma...@apache.org>>
> > >          >          > <mailto:mxm@apache.org <ma...@apache.org>
> > >         <mailto:mxm@apache.org <ma...@apache.org>>>> wrote:
> > >          >          >
> > >          >          >     Hi,
> > >          >          >
> > >          >          >     Please follow
> > >          >          >
> > >         https://beam.apache.org/roadmap/portability/#python-on-flink
> > >          >          >
> > >          >          >     Cheers,
> > >          >          >     Max
> > >          >          >
> > >          >          >     On 06.11.18 01:14, Ankur Goenka wrote:
> > >          >          >      > Hi,
> > >          >          >      >
> > >          >          >      > The Portable Runner requires a job server
> > >         uri to work
> > >          >         with. The
> > >          >          >     current
> > >          >          >      > default job server docker image is broken
> > >         because of
> > >          >         docker inside
> > >          >          >      > docker issue.
> > >          >          >      >
> > >          >          >      > Please refer to
> > >          >          >      >
> > >          >
> https://beam.apache.org/roadmap/portability/#python-on-flink for
> > >          >          >     how to
> > >          >          >      > run a wordcount using Portable Flink
> Runner.
> > >          >          >      >
> > >          >          >      > Thanks,
> > >          >          >      > Ankur
> > >          >          >      >
> > >          >          >      > On Mon, Nov 5, 2018 at 3:41 PM Ruoyun
> Huang
> > >          >         <ruoyun@google.com <ma...@google.com>
> > >         <mailto:ruoyun@google.com <ma...@google.com>>
> > >          >          >     <mailto:ruoyun@google.com
> > >         <ma...@google.com> <mailto:ruoyun@google.com
> > >         <ma...@google.com>>>
> > >          >          >      > <mailto:ruoyun@google.com
> > >         <ma...@google.com> <mailto:ruoyun@google.com
> > >         <ma...@google.com>>
> > >          >         <mailto:ruoyun@google.com <mailto:ruoyun@google.com
> >
> > >         <mailto:ruoyun@google.com <ma...@google.com>>>>>
> wrote:
> > >          >          >      >
> > >          >          >      >     Hi, Folks,
> > >          >          >      >
> > >          >          >      >           I want to try out Python
> > >         PortableRunner, by
> > >          >         using following
> > >          >          >      >     command:
> > >          >          >      >
> > >          >          >      >     *sdk/python: python -m
> > >         apache_beam.examples.wordcount
> > >          >          >      >       --output=/tmp/test_output   --runner
> > >         PortableRunner*
> > >          >          >      >
> > >          >          >      >           It complains with following
> error
> > >         message:
> > >          >          >      >
> > >          >          >      >     Caused by: java.lang.Exception: The
> > >         user defined
> > >          >         'open()' method
> > >          >          >      >     caused an exception:
> > >         java.io.IOException: Cannot
> > >          >         run program
> > >          >          >      >     "docker": error=13, Permission denied
> > >          >          >      >     at
> > >          >          >
> > >          >
> > >
>  org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
> > >          >          >      >     at
> > >          >          >      >
> > >          >          >
> > >          >
> > >
>  org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> > >          >          >      >     at
> > >          >
> > >           org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
> > >          >          >      >     ... 1 more
> > >          >          >      >     Caused by:
> > >          >          >      >
> > >          >          >
> > >          >
> > >
>  org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.util.concurrent.UncheckedExecutionException:
> > >          >          >      >     java.io.IOException: Cannot run
> program
> > >         "docker":
> > >          >         error=13,
> > >          >          >      >     Permission denied
> > >          >          >      >     at
> > >          >          >      >
> > >          >          >
> > >          >
> > >
>  org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4994)
> > >          >          >      >
> > >          >          >      >     ... 7 more
> > >          >          >      >
> > >          >          >      >
> > >          >          >      >
> > >          >          >      >     My py2 environment is properly
> > >         configured, because
> > >          >         DirectRunner
> > >          >          >      >     works.  Also I tested my docker
> > >         installation by
> > >          >         'docker run
> > >          >          >      >     hello-world ', no issue.
> > >          >          >      >
> > >          >          >      >
> > >          >          >      >     Thanks.
> > >          >          >      >     --
> > >          >          >      >     ================
> > >          >          >      >     Ruoyun  Huang
> > >          >          >      >
> > >          >          >
> > >          >          >
> > >          >          >
> > >          >          > --
> > >          >          > ================
> > >          >          > Ruoyun  Huang
> > >          >          >
> > >          >
> > >          >
> > >          >
> > >          >     --
> > >          >     ================
> > >          >     Ruoyun  Huang
> > >          >
> > >          >
> > >          >
> > >          > --
> > >          > ================
> > >          > Ruoyun  Huang
> > >          >
> > >
> > >
> > >
> > > --
> > > ================
> > > Ruoyun  Huang
> > >
> >
>

Re: How to use "PortableRunner" in Python SDK?

Posted by Ankur Goenka <go...@google.com>.
Hi Jun,

This error can be because of different Flink version.
Please make sure that you are using Flink 1.5.6 for the commands you
mentioned.

Thanks,
Ankur

On Tue, Jan 22, 2019 at 11:44 AM junwan01@gmail.com <ju...@gmail.com>
wrote:

> Hello,
>
> I tried to follow the instructions at
> https://beam.apache.org/roadmap/portability/#python-on-flink,
>
> 1. I installed Flink local cluster, and followed their
> SocketWindowWordCount example and confirmed  the cluster works properly.
>
> 2. Start Flink job server:
> ./gradlew :beam-runners-flink_2.11-job-server:runShadow
> -PflinkMasterUrl=localhost:8081
>
> 3. Subject the job as suggested by an earlier thread:
> python -m apache_beam.examples.wordcount --input=/etc/profile
> --output=/tmp/py-wordcount-direct --runner=PortableRunner
> --job_endpoint=localhost:8099 --parallelism=1
> --OPTIONALflink_master=localhost:8081 --streaming
>
> But got the following NullPointerException error (sorry for the long text
> below), any ideas? Thanks
>
> Jun Wan
>
> ---- log starts ----
> [grpc-default-executor-2] INFO
> org.apache.beam.runners.flink.FlinkJobInvoker - Invoking job
> BeamApp-jwan-0121192804-387b3baa_1d32eea3-d71a-45a9-afa8-edbc66bc1d6b
> [grpc-default-executor-2] INFO
> org.apache.beam.runners.flink.FlinkJobInvocation - Starting job invocation
> BeamApp-jwan-0121192804-387b3baa_1d32eea3-d71a-45a9-afa8-edbc66bc1d6b
> [flink-runner-job-server] INFO
> org.apache.beam.runners.flink.FlinkJobInvocation - Translating pipeline to
> Flink program.
> [flink-runner-job-server] INFO
> org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a
> Streaming Environment.
> [flink-runner-job-server] INFO
> org.apache.beam.runners.flink.FlinkExecutionEnvironments - Using Flink
> Master URL localhost:8081.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - class
> org.apache.beam.sdk.transforms.join.RawUnionValue does not contain a setter
> for field unionTag
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - Class class
> org.apache.beam.sdk.transforms.join.RawUnionValue cannot be used as a POJO
> type because not all fields are valid POJO fields, and must be processed as
> GenericType. Please read the Flink documentation on "Data Types &
> Serialization" for details of the effect on performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - class
> org.apache.beam.sdk.transforms.join.RawUnionValue does not contain a setter
> for field unionTag
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - Class class
> org.apache.beam.sdk.transforms.join.RawUnionValue cannot be used as a POJO
> type because not all fields are valid POJO fields, and must be processed as
> GenericType. Please read the Flink documentation on "Data Types &
> Serialization" for details of the effect on performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - class
> org.apache.beam.sdk.transforms.join.RawUnionValue does not contain a setter
> for field unionTag
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - Class class
> org.apache.beam.sdk.transforms.join.RawUnionValue cannot be used as a POJO
> type because not all fields are valid POJO fields, and must be processed as
> GenericType. Please read the Flink documentation on "Data Types &
> Serialization" for details of the effect on performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - class
> org.apache.beam.sdk.transforms.join.RawUnionValue does not contain a setter
> for field unionTag
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - Class class
> org.apache.beam.sdk.transforms.join.RawUnionValue cannot be used as a POJO
> type because not all fields are valid POJO fields, and must be processed as
> GenericType. Please read the Flink documentation on "Data Types &
> Serialization" for details of the effect on performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - class
> org.apache.beam.sdk.transforms.join.RawUnionValue does not contain a setter
> for field unionTag
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - Class class
> org.apache.beam.sdk.transforms.join.RawUnionValue cannot be used as a POJO
> type because not all fields are valid POJO fields, and must be processed as
> GenericType. Please read the Flink documentation on "Data Types &
> Serialization" for details of the effect on performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - class
> org.apache.beam.sdk.transforms.join.RawUnionValue does not contain a setter
> for field unionTag
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - Class class
> org.apache.beam.sdk.transforms.join.RawUnionValue cannot be used as a POJO
> type because not all fields are valid POJO fields, and must be processed as
> GenericType. Please read the Flink documentation on "Data Types &
> Serialization" for details of the effect on performance.
> [flink-runner-job-server] INFO
> org.apache.beam.runners.flink.FlinkExecutionEnvironments - Running remotely
> at localhost:8081
> [flink-runner-job-server] WARN
> org.apache.flink.configuration.Configuration - Config uses deprecated
> configuration key 'jobmanager.rpc.address' instead of proper key
> 'rest.address'
> [flink-runner-job-server] INFO org.apache.flink.runtime.rest.RestClient -
> Rest client endpoint started.
> [flink-runner-job-server] INFO
> org.apache.flink.client.program.rest.RestClusterClient - Submitting job
> 9a91889c469db1d88ec8f6a6d04a67b7 (detached: false).
> [flink-runner-job-server] INFO org.apache.flink.runtime.rest.RestClient -
> Shutting down rest endpoint.
> [flink-runner-job-server] INFO org.apache.flink.runtime.rest.RestClient -
> Rest endpoint shutdown complete.
> [flink-runner-job-server] ERROR
> org.apache.beam.runners.flink.FlinkJobInvocation - Error during job
> invocation
> BeamApp-jwan-0121192804-387b3baa_1d32eea3-d71a-45a9-afa8-edbc66bc1d6b.
> org.apache.flink.client.program.ProgramInvocationException: Could not
> retrieve the execution result.
>         at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:258)
>         at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
>         at
> org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.executeRemotely(FlinkExecutionEnvironments.java:355)
>         at
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:179)
>         at
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator$StreamingTranslationContext.execute(FlinkStreamingPortablePipelineTranslator.java:158)
>         at
> org.apache.beam.runners.flink.FlinkJobInvocation.runPipelineWithTranslator(FlinkJobInvocation.java:142)
>         at
> org.apache.beam.runners.flink.FlinkJobInvocation.runPipeline(FlinkJobInvocation.java:112)
>         at org.apache.beam.repackaged.beam_runners_flink_2.11.com
> .google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:111)
>         at org.apache.beam.repackaged.beam_runners_flink_2.11.com
> .google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:58)
>         at org.apache.beam.repackaged.beam_runners_flink_2.11.com
> .google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:75)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
> to submit JobGraph.
>         at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:371)
>         at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>         at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>         at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>         at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>         at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:203)
>         at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>         at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>         at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>         at
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
>         at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
>         at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>         ... 3 more
> Caused by: org.apache.flink.runtime.rest.util.RestClientException:
> [Internal server error., <Exception on server side:
> java.util.concurrent.CompletionException: java.lang.NullPointerException
>         at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
>         at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
>         at
> java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1107)
>         at
> java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1070)
>         at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>         at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
>         at
> org.apache.flink.runtime.jobgraph.JobGraph.writeUserArtifactEntriesToConfiguration(JobGraph.java:586)
>         at
> org.apache.flink.runtime.client.ClientUtils.setUserArtifactBlobKeys(ClientUtils.java:140)
>         at
> org.apache.flink.runtime.client.ClientUtils.uploadAndSetUserArtifacts(ClientUtils.java:121)
>         at
> org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:78)
>         at
> org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:168)
>         at
> java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1105)
>         ... 6 more
>
> End of exception on server side>]
>         at
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:349)
>         at
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:333)
>         at
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
>         at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>         ... 4 more
>
> ---- log ends ----
>
> On 2018/11/15 11:01:28, Maximilian Michels <mx...@apache.org> wrote:
> > Hi Ruoyun,
> >
> > The output file will be within the container which is deleted after
> > shutdown by default. You can keep the containers if you add the flag
> >
> >    --retain_docker_containers
> >
> > Note, this is from ManualDockerEnvironmentOptions.
> >
> > The problem with batch is that it executes staged and will create
> > multiple containers [1] which don't share the same local file system. So
> > the wordcount only works reliably if you use a distributed file system.
> >
> > Cheers,
> > Max
> >
> > [1] You can prevent multiple containers by using
> >      --environment_cache_millis=10000
> >
> > On 14.11.18 20:44, Ruoyun Huang wrote:
> > > Thanks Thomas!
> > >
> > > My desktop runs Linux.  I was using gradle to run wordcount, and that
> > > was how I got the job hanging. Since both of you get it working, I
> guess
> > > more likely sth is wrong with my setup.
> > >
> > >
> > > By using Thmoas's python command line exactly as is, I am able to see
> > > the job run succeeds, however two questions:
> > >
> > > 1)  Did you check whether output file "/tmp/py-wordcount-direct"
> exists
> > > or not?  I expect there should be a text output, but I don't see this
> > > file afterwards.   (I am still in the stage building confidence in
> > > telling what a succeeded run is.  Maybe I will try DataflowRunner and
> > > cross check outputs).
> > >
> > > 2)  Why it needs a "--streaming" arg?  Isn't this a static batch
> input,
> > > by feeding a txt file input?  In fact, I got failure message if I
> remove
> > > '--streaming', not sure if it is due to my setup again.
> > >
> > >
> > > On Wed, Nov 14, 2018 at 7:51 AM Thomas Weise <thw@apache.org
> > > <ma...@apache.org>> wrote:
> > >
> > >     Works for me on macOS as well.
> > >
> > >     In case you don't launch the pipeline through Gradle, this would be
> > >     the command:
> > >
> > >     python -m apache_beam.examples.wordcount \
> > >        --input=/etc/profile \
> > >        --output=/tmp/py-wordcount-direct \
> > >        --runner=PortableRunner \
> > >        --job_endpoint=localhost:8099 \
> > >        --parallelism=1 \
> > >        --OPTIONALflink_master=localhost:8081 \
> > >        --streaming
> > >
> > >     We talked about adding the wordcount to pre-commit..
> > >
> > >     Regarding using ULR vs. Flink runner: There seems to be confusion
> > >     between PortableRunner using the user supplied endpoint vs. trying
> > >     to launch a job server. I commented in the doc.
> > >
> > >     Thomas
> > >
> > >
> > >
> > >     On Wed, Nov 14, 2018 at 3:30 AM Maximilian Michels <mxm@apache.org
> > >     <ma...@apache.org>> wrote:
> > >
> > >         Hi Ruoyun,
> > >
> > >         I just ran the wordcount locally using the instructions on the
> > >         page.
> > >         I've tried the local file system and GCS. Both times it ran
> > >         successfully
> > >         and produced valid output.
> > >
> > >         I'm assuming there is some problem with your setup. Which
> > >         platform are
> > >         you using? I'm on MacOS.
> > >
> > >         Could you expand on the planned merge? From my understanding we
> > >         will
> > >         always need PortableRunner in Python to be able to submit
> > >         against the
> > >         Beam JobServer.
> > >
> > >         Thanks,
> > >         Max
> > >
> > >         On 14.11.18 00:39, Ruoyun Huang wrote:
> > >          > A quick follow-up on using current PortableRunner.
> > >          >
> > >          > I followed the exact three steps as Ankur and Maximilian
> > >         shared in
> > >          >
> https://beam.apache.org/roadmap/portability/#python-on-flink
> > >         ;   The
> > >          > wordcount example keeps hanging after 10 minutes.  I also
> tried
> > >          > specifying explicit input/output args, either using gcs
> > >         folder or local
> > >          > file system, but none of them works.
> > >          >
> > >          > Spent some time looking into it but conclusion yet.  At this
> > >         point
> > >          > though, I guess it does not matter much any more, given we
> > >         already have
> > >          > the plan of merging PortableRunner into using java reference
> > >         runner
> > >          > (i.e. :beam-runners-reference-job-server).
> > >          >
> > >          > Still appreciated if someone can try out the python-on-flink
> > >          >
> > >         <https://beam.apache.org/roadmap/portability/#python-on-flink
> >instructions
> > >
> > >          > in case it is just due to my local machine setup.  Thanks!
> > >          >
> > >          >
> > >          >
> > >          > On Thu, Nov 8, 2018 at 5:04 PM Ruoyun Huang
> > >         <ruoyun@google.com <ma...@google.com>
> > >          > <mailto:ruoyun@google.com <ma...@google.com>>>
> wrote:
> > >          >
> > >          >     Thanks Maximilian!
> > >          >
> > >          >     I am working on migrating existing PortableRunner to
> > >         using java ULR
> > >          >     (Link to Notes
> > >          >
> > >           <
> https://docs.google.com/document/d/1S86saZqiDaE_M5wxO0zOQ_rwC6QHv7sp1BmGTm0dLNE/edit#
> >).
> > >          >     If this issue is non-trivial to solve, I would vote for
> > >         removing
> > >          >     this default behavior as part of the consolidation.
> > >          >
> > >          >     On Thu, Nov 8, 2018 at 2:58 AM Maximilian Michels
> > >         <mxm@apache.org <ma...@apache.org>
> > >          >     <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
> > >          >
> > >          >         In the long run, we should get rid of the
> > >         Docker-inside-Docker
> > >          >         approach,
> > >          >         which was only intended for testing anyways. It
> would
> > >         be cleaner to
> > >          >         start the SDK harness container alongside with
> > >         JobServer container.
> > >          >
> > >          >         Short term, I think it should be easy to either fix
> the
> > >          >         permissions of
> > >          >         the mounted "docker" executable or use a Docker
> image
> > >         for the
> > >          >         JobServer
> > >          >         which comes with Docker pre-installed.
> > >          >
> > >          >         JIRA:
> https://issues.apache.org/jira/browse/BEAM-6020
> > >          >
> > >          >         Thanks for reporting this Ruoyun!
> > >          >
> > >          >         -Max
> > >          >
> > >          >         On 08.11.18 00:10, Ruoyun Huang wrote:
> > >          >          > Thanks Ankur and Maximilian.
> > >          >          >
> > >          >          > Just for reference in case other people
> > >         encountering the same
> > >          >         error
> > >          >          > message, the "permission denied" error in my
> > >         original email
> > >          >         is exactly
> > >          >          > due to dockerinsidedocker issue that Ankur
> mentioned.
> > >          >         Thanks Ankur!
> > >          >          > Didn't make the link when you said it, had to
> > >         discover that
> > >          >         in a hard
> > >          >          > way (I thought it is due to my docker
> installation
> > >         messed up).
> > >          >          >
> > >          >          > On Tue, Nov 6, 2018 at 1:53 AM Maximilian Michels
> > >          >         <mxm@apache.org <ma...@apache.org>
> > >         <mailto:mxm@apache.org <ma...@apache.org>>
> > >          >          > <mailto:mxm@apache.org <ma...@apache.org>
> > >         <mailto:mxm@apache.org <ma...@apache.org>>>> wrote:
> > >          >          >
> > >          >          >     Hi,
> > >          >          >
> > >          >          >     Please follow
> > >          >          >
> > >         https://beam.apache.org/roadmap/portability/#python-on-flink
> > >          >          >
> > >          >          >     Cheers,
> > >          >          >     Max
> > >          >          >
> > >          >          >     On 06.11.18 01:14, Ankur Goenka wrote:
> > >          >          >      > Hi,
> > >          >          >      >
> > >          >          >      > The Portable Runner requires a job server
> > >         uri to work
> > >          >         with. The
> > >          >          >     current
> > >          >          >      > default job server docker image is broken
> > >         because of
> > >          >         docker inside
> > >          >          >      > docker issue.
> > >          >          >      >
> > >          >          >      > Please refer to
> > >          >          >      >
> > >          >
> https://beam.apache.org/roadmap/portability/#python-on-flink for
> > >          >          >     how to
> > >          >          >      > run a wordcount using Portable Flink
> Runner.
> > >          >          >      >
> > >          >          >      > Thanks,
> > >          >          >      > Ankur
> > >          >          >      >
> > >          >          >      > On Mon, Nov 5, 2018 at 3:41 PM Ruoyun
> Huang
> > >          >         <ruoyun@google.com <ma...@google.com>
> > >         <mailto:ruoyun@google.com <ma...@google.com>>
> > >          >          >     <mailto:ruoyun@google.com
> > >         <ma...@google.com> <mailto:ruoyun@google.com
> > >         <ma...@google.com>>>
> > >          >          >      > <mailto:ruoyun@google.com
> > >         <ma...@google.com> <mailto:ruoyun@google.com
> > >         <ma...@google.com>>
> > >          >         <mailto:ruoyun@google.com <mailto:ruoyun@google.com
> >
> > >         <mailto:ruoyun@google.com <ma...@google.com>>>>>
> wrote:
> > >          >          >      >
> > >          >          >      >     Hi, Folks,
> > >          >          >      >
> > >          >          >      >           I want to try out Python
> > >         PortableRunner, by
> > >          >         using following
> > >          >          >      >     command:
> > >          >          >      >
> > >          >          >      >     *sdk/python: python -m
> > >         apache_beam.examples.wordcount
> > >          >          >      >       --output=/tmp/test_output   --runner
> > >         PortableRunner*
> > >          >          >      >
> > >          >          >      >           It complains with following
> error
> > >         message:
> > >          >          >      >
> > >          >          >      >     Caused by: java.lang.Exception: The
> > >         user defined
> > >          >         'open()' method
> > >          >          >      >     caused an exception:
> > >         java.io.IOException: Cannot
> > >          >         run program
> > >          >          >      >     "docker": error=13, Permission denied
> > >          >          >      >     at
> > >          >          >
> > >          >
> > >
>  org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
> > >          >          >      >     at
> > >          >          >      >
> > >          >          >
> > >          >
> > >
>  org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> > >          >          >      >     at
> > >          >
> > >           org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
> > >          >          >      >     ... 1 more
> > >          >          >      >     Caused by:
> > >          >          >      >
> > >          >          >
> > >          >
> > >
>  org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.util.concurrent.UncheckedExecutionException:
> > >          >          >      >     java.io.IOException: Cannot run
> program
> > >         "docker":
> > >          >         error=13,
> > >          >          >      >     Permission denied
> > >          >          >      >     at
> > >          >          >      >
> > >          >          >
> > >          >
> > >
>  org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4994)
> > >          >          >      >
> > >          >          >      >     ... 7 more
> > >          >          >      >
> > >          >          >      >
> > >          >          >      >
> > >          >          >      >     My py2 environment is properly
> > >         configured, because
> > >          >         DirectRunner
> > >          >          >      >     works.  Also I tested my docker
> > >         installation by
> > >          >         'docker run
> > >          >          >      >     hello-world ', no issue.
> > >          >          >      >
> > >          >          >      >
> > >          >          >      >     Thanks.
> > >          >          >      >     --
> > >          >          >      >     ================
> > >          >          >      >     Ruoyun  Huang
> > >          >          >      >
> > >          >          >
> > >          >          >
> > >          >          >
> > >          >          > --
> > >          >          > ================
> > >          >          > Ruoyun  Huang
> > >          >          >
> > >          >
> > >          >
> > >          >
> > >          >     --
> > >          >     ================
> > >          >     Ruoyun  Huang
> > >          >
> > >          >
> > >          >
> > >          > --
> > >          > ================
> > >          > Ruoyun  Huang
> > >          >
> > >
> > >
> > >
> > > --
> > > ================
> > > Ruoyun  Huang
> > >
> >
>

Re: How to use "PortableRunner" in Python SDK?

Posted by ju...@gmail.com, ju...@gmail.com.
Hello,

I tried to follow the instructions at https://beam.apache.org/roadmap/portability/#python-on-flink, 

1. I installed Flink local cluster, and followed their SocketWindowWordCount example and confirmed  the cluster works properly.

2. Start Flink job server:
./gradlew :beam-runners-flink_2.11-job-server:runShadow -PflinkMasterUrl=localhost:8081

3. Subject the job as suggested by an earlier thread:
python -m apache_beam.examples.wordcount --input=/etc/profile --output=/tmp/py-wordcount-direct --runner=PortableRunner --job_endpoint=localhost:8099 --parallelism=1 --OPTIONALflink_master=localhost:8081 --streaming

But got the following NullPointerException error (sorry for the long text below), any ideas? Thanks

Jun Wan

---- log starts ----
[grpc-default-executor-2] INFO org.apache.beam.runners.flink.FlinkJobInvoker - Invoking job BeamApp-jwan-0121192804-387b3baa_1d32eea3-d71a-45a9-afa8-edbc66bc1d6b
[grpc-default-executor-2] INFO org.apache.beam.runners.flink.FlinkJobInvocation - Starting job invocation BeamApp-jwan-0121192804-387b3baa_1d32eea3-d71a-45a9-afa8-edbc66bc1d6b
[flink-runner-job-server] INFO org.apache.beam.runners.flink.FlinkJobInvocation - Translating pipeline to Flink program.
[flink-runner-job-server] INFO org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a Streaming Environment.
[flink-runner-job-server] INFO org.apache.beam.runners.flink.FlinkExecutionEnvironments - Using Flink Master URL localhost:8081.
[flink-runner-job-server] INFO org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.apache.beam.sdk.transforms.join.RawUnionValue does not contain a setter for field unionTag
[flink-runner-job-server] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class org.apache.beam.sdk.transforms.join.RawUnionValue cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.apache.beam.sdk.transforms.join.RawUnionValue does not contain a setter for field unionTag
[flink-runner-job-server] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class org.apache.beam.sdk.transforms.join.RawUnionValue cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.apache.beam.sdk.transforms.join.RawUnionValue does not contain a setter for field unionTag
[flink-runner-job-server] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class org.apache.beam.sdk.transforms.join.RawUnionValue cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.apache.beam.sdk.transforms.join.RawUnionValue does not contain a setter for field unionTag
[flink-runner-job-server] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class org.apache.beam.sdk.transforms.join.RawUnionValue cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.apache.beam.sdk.transforms.join.RawUnionValue does not contain a setter for field unionTag
[flink-runner-job-server] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class org.apache.beam.sdk.transforms.join.RawUnionValue cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.apache.beam.sdk.transforms.join.RawUnionValue does not contain a setter for field unionTag
[flink-runner-job-server] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class org.apache.beam.sdk.transforms.join.RawUnionValue cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO org.apache.beam.runners.flink.FlinkExecutionEnvironments - Running remotely at localhost:8081
[flink-runner-job-server] WARN org.apache.flink.configuration.Configuration - Config uses deprecated configuration key 'jobmanager.rpc.address' instead of proper key 'rest.address'
[flink-runner-job-server] INFO org.apache.flink.runtime.rest.RestClient - Rest client endpoint started.
[flink-runner-job-server] INFO org.apache.flink.client.program.rest.RestClusterClient - Submitting job 9a91889c469db1d88ec8f6a6d04a67b7 (detached: false).
[flink-runner-job-server] INFO org.apache.flink.runtime.rest.RestClient - Shutting down rest endpoint.
[flink-runner-job-server] INFO org.apache.flink.runtime.rest.RestClient - Rest endpoint shutdown complete.
[flink-runner-job-server] ERROR org.apache.beam.runners.flink.FlinkJobInvocation - Error during job invocation BeamApp-jwan-0121192804-387b3baa_1d32eea3-d71a-45a9-afa8-edbc66bc1d6b.
org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result.
        at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:258)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
        at org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.executeRemotely(FlinkExecutionEnvironments.java:355)
        at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:179)
        at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator$StreamingTranslationContext.execute(FlinkStreamingPortablePipelineTranslator.java:158)
        at org.apache.beam.runners.flink.FlinkJobInvocation.runPipelineWithTranslator(FlinkJobInvocation.java:142)
        at org.apache.beam.runners.flink.FlinkJobInvocation.runPipeline(FlinkJobInvocation.java:112)
        at org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:111)
        at org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:58)
        at org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:75)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
        at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:371)
        at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
        at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
        at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:203)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
        ... 3 more
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
java.util.concurrent.CompletionException: java.lang.NullPointerException
        at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
        at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
        at java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1107)
        at java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1070)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
        at org.apache.flink.runtime.jobgraph.JobGraph.writeUserArtifactEntriesToConfiguration(JobGraph.java:586)
        at org.apache.flink.runtime.client.ClientUtils.setUserArtifactBlobKeys(ClientUtils.java:140)
        at org.apache.flink.runtime.client.ClientUtils.uploadAndSetUserArtifacts(ClientUtils.java:121)
        at org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:78)
        at org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:168)
        at java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1105)
        ... 6 more

End of exception on server side>]
        at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:349)
        at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:333)
        at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
        ... 4 more

---- log ends ----

On 2018/11/15 11:01:28, Maximilian Michels <mx...@apache.org> wrote: 
> Hi Ruoyun,
> 
> The output file will be within the container which is deleted after 
> shutdown by default. You can keep the containers if you add the flag
> 
>    --retain_docker_containers
> 
> Note, this is from ManualDockerEnvironmentOptions.
> 
> The problem with batch is that it executes staged and will create 
> multiple containers [1] which don't share the same local file system. So 
> the wordcount only works reliably if you use a distributed file system.
> 
> Cheers,
> Max
> 
> [1] You can prevent multiple containers by using
>      --environment_cache_millis=10000
> 
> On 14.11.18 20:44, Ruoyun Huang wrote:
> > Thanks Thomas!
> > 
> > My desktop runs Linux.  I was using gradle to run wordcount, and that 
> > was how I got the job hanging. Since both of you get it working, I guess 
> > more likely sth is wrong with my setup.
> > 
> > 
> > By using Thmoas's python command line exactly as is, I am able to see 
> > the job run succeeds, however two questions:
> > 
> > 1)  Did you check whether output file "/tmp/py-wordcount-direct" exists 
> > or not?  I expect there should be a text output, but I don't see this 
> > file afterwards.   (I am still in the stage building confidence in 
> > telling what a succeeded run is.  Maybe I will try DataflowRunner and 
> > cross check outputs).
> > 
> > 2)  Why it needs a "--streaming" arg?  Isn't this a static batch input, 
> > by feeding a txt file input?  In fact, I got failure message if I remove 
> > '--streaming', not sure if it is due to my setup again.
> > 
> > 
> > On Wed, Nov 14, 2018 at 7:51 AM Thomas Weise <thw@apache.org 
> > <ma...@apache.org>> wrote:
> > 
> >     Works for me on macOS as well.
> > 
> >     In case you don't launch the pipeline through Gradle, this would be
> >     the command:
> > 
> >     python -m apache_beam.examples.wordcount \
> >        --input=/etc/profile \
> >        --output=/tmp/py-wordcount-direct \
> >        --runner=PortableRunner \
> >        --job_endpoint=localhost:8099 \
> >        --parallelism=1 \
> >        --OPTIONALflink_master=localhost:8081 \
> >        --streaming
> > 
> >     We talked about adding the wordcount to pre-commit..
> > 
> >     Regarding using ULR vs. Flink runner: There seems to be confusion
> >     between PortableRunner using the user supplied endpoint vs. trying
> >     to launch a job server. I commented in the doc.
> > 
> >     Thomas
> > 
> > 
> > 
> >     On Wed, Nov 14, 2018 at 3:30 AM Maximilian Michels <mxm@apache.org
> >     <ma...@apache.org>> wrote:
> > 
> >         Hi Ruoyun,
> > 
> >         I just ran the wordcount locally using the instructions on the
> >         page.
> >         I've tried the local file system and GCS. Both times it ran
> >         successfully
> >         and produced valid output.
> > 
> >         I'm assuming there is some problem with your setup. Which
> >         platform are
> >         you using? I'm on MacOS.
> > 
> >         Could you expand on the planned merge? From my understanding we
> >         will
> >         always need PortableRunner in Python to be able to submit
> >         against the
> >         Beam JobServer.
> > 
> >         Thanks,
> >         Max
> > 
> >         On 14.11.18 00:39, Ruoyun Huang wrote:
> >          > A quick follow-up on using current PortableRunner.
> >          >
> >          > I followed the exact three steps as Ankur and Maximilian
> >         shared in
> >          > https://beam.apache.org/roadmap/portability/#python-on-flink 
> >         ;   The
> >          > wordcount example keeps hanging after 10 minutes.  I also tried
> >          > specifying explicit input/output args, either using gcs
> >         folder or local
> >          > file system, but none of them works.
> >          >
> >          > Spent some time looking into it but conclusion yet.  At this
> >         point
> >          > though, I guess it does not matter much any more, given we
> >         already have
> >          > the plan of merging PortableRunner into using java reference
> >         runner
> >          > (i.e. :beam-runners-reference-job-server).
> >          >
> >          > Still appreciated if someone can try out the python-on-flink
> >          >
> >         <https://beam.apache.org/roadmap/portability/#python-on-flink>instructions
> > 
> >          > in case it is just due to my local machine setup.  Thanks!
> >          >
> >          >
> >          >
> >          > On Thu, Nov 8, 2018 at 5:04 PM Ruoyun Huang
> >         <ruoyun@google.com <ma...@google.com>
> >          > <mailto:ruoyun@google.com <ma...@google.com>>> wrote:
> >          >
> >          >     Thanks Maximilian!
> >          >
> >          >     I am working on migrating existing PortableRunner to
> >         using java ULR
> >          >     (Link to Notes
> >          >   
> >           <https://docs.google.com/document/d/1S86saZqiDaE_M5wxO0zOQ_rwC6QHv7sp1BmGTm0dLNE/edit#>).
> >          >     If this issue is non-trivial to solve, I would vote for
> >         removing
> >          >     this default behavior as part of the consolidation.
> >          >
> >          >     On Thu, Nov 8, 2018 at 2:58 AM Maximilian Michels
> >         <mxm@apache.org <ma...@apache.org>
> >          >     <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
> >          >
> >          >         In the long run, we should get rid of the
> >         Docker-inside-Docker
> >          >         approach,
> >          >         which was only intended for testing anyways. It would
> >         be cleaner to
> >          >         start the SDK harness container alongside with
> >         JobServer container.
> >          >
> >          >         Short term, I think it should be easy to either fix the
> >          >         permissions of
> >          >         the mounted "docker" executable or use a Docker image
> >         for the
> >          >         JobServer
> >          >         which comes with Docker pre-installed.
> >          >
> >          >         JIRA: https://issues.apache.org/jira/browse/BEAM-6020
> >          >
> >          >         Thanks for reporting this Ruoyun!
> >          >
> >          >         -Max
> >          >
> >          >         On 08.11.18 00:10, Ruoyun Huang wrote:
> >          >          > Thanks Ankur and Maximilian.
> >          >          >
> >          >          > Just for reference in case other people
> >         encountering the same
> >          >         error
> >          >          > message, the "permission denied" error in my
> >         original email
> >          >         is exactly
> >          >          > due to dockerinsidedocker issue that Ankur mentioned.
> >          >         Thanks Ankur!
> >          >          > Didn't make the link when you said it, had to
> >         discover that
> >          >         in a hard
> >          >          > way (I thought it is due to my docker installation
> >         messed up).
> >          >          >
> >          >          > On Tue, Nov 6, 2018 at 1:53 AM Maximilian Michels
> >          >         <mxm@apache.org <ma...@apache.org>
> >         <mailto:mxm@apache.org <ma...@apache.org>>
> >          >          > <mailto:mxm@apache.org <ma...@apache.org>
> >         <mailto:mxm@apache.org <ma...@apache.org>>>> wrote:
> >          >          >
> >          >          >     Hi,
> >          >          >
> >          >          >     Please follow
> >          >          >
> >         https://beam.apache.org/roadmap/portability/#python-on-flink
> >          >          >
> >          >          >     Cheers,
> >          >          >     Max
> >          >          >
> >          >          >     On 06.11.18 01:14, Ankur Goenka wrote:
> >          >          >      > Hi,
> >          >          >      >
> >          >          >      > The Portable Runner requires a job server
> >         uri to work
> >          >         with. The
> >          >          >     current
> >          >          >      > default job server docker image is broken
> >         because of
> >          >         docker inside
> >          >          >      > docker issue.
> >          >          >      >
> >          >          >      > Please refer to
> >          >          >      >
> >          > https://beam.apache.org/roadmap/portability/#python-on-flink for
> >          >          >     how to
> >          >          >      > run a wordcount using Portable Flink Runner.
> >          >          >      >
> >          >          >      > Thanks,
> >          >          >      > Ankur
> >          >          >      >
> >          >          >      > On Mon, Nov 5, 2018 at 3:41 PM Ruoyun Huang
> >          >         <ruoyun@google.com <ma...@google.com>
> >         <mailto:ruoyun@google.com <ma...@google.com>>
> >          >          >     <mailto:ruoyun@google.com
> >         <ma...@google.com> <mailto:ruoyun@google.com
> >         <ma...@google.com>>>
> >          >          >      > <mailto:ruoyun@google.com
> >         <ma...@google.com> <mailto:ruoyun@google.com
> >         <ma...@google.com>>
> >          >         <mailto:ruoyun@google.com <ma...@google.com>
> >         <mailto:ruoyun@google.com <ma...@google.com>>>>> wrote:
> >          >          >      >
> >          >          >      >     Hi, Folks,
> >          >          >      >
> >          >          >      >           I want to try out Python
> >         PortableRunner, by
> >          >         using following
> >          >          >      >     command:
> >          >          >      >
> >          >          >      >     *sdk/python: python -m
> >         apache_beam.examples.wordcount
> >          >          >      >       --output=/tmp/test_output   --runner
> >         PortableRunner*
> >          >          >      >
> >          >          >      >           It complains with following error
> >         message:
> >          >          >      >
> >          >          >      >     Caused by: java.lang.Exception: The
> >         user defined
> >          >         'open()' method
> >          >          >      >     caused an exception:
> >         java.io.IOException: Cannot
> >          >         run program
> >          >          >      >     "docker": error=13, Permission denied
> >          >          >      >     at
> >          >          >
> >          >         
> >           org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
> >          >          >      >     at
> >          >          >      >
> >          >          >
> >          >         
> >           org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> >          >          >      >     at
> >          >       
> >           org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
> >          >          >      >     ... 1 more
> >          >          >      >     Caused by:
> >          >          >      >
> >          >          >
> >          >         
> >           org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.util.concurrent.UncheckedExecutionException:
> >          >          >      >     java.io.IOException: Cannot run program
> >         "docker":
> >          >         error=13,
> >          >          >      >     Permission denied
> >          >          >      >     at
> >          >          >      >
> >          >          >
> >          >         
> >           org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4994)
> >          >          >      >
> >          >          >      >     ... 7 more
> >          >          >      >
> >          >          >      >
> >          >          >      >
> >          >          >      >     My py2 environment is properly
> >         configured, because
> >          >         DirectRunner
> >          >          >      >     works.  Also I tested my docker
> >         installation by
> >          >         'docker run
> >          >          >      >     hello-world ', no issue.
> >          >          >      >
> >          >          >      >
> >          >          >      >     Thanks.
> >          >          >      >     --
> >          >          >      >     ================
> >          >          >      >     Ruoyun  Huang
> >          >          >      >
> >          >          >
> >          >          >
> >          >          >
> >          >          > --
> >          >          > ================
> >          >          > Ruoyun  Huang
> >          >          >
> >          >
> >          >
> >          >
> >          >     --
> >          >     ================
> >          >     Ruoyun  Huang
> >          >
> >          >
> >          >
> >          > --
> >          > ================
> >          > Ruoyun  Huang
> >          >
> > 
> > 
> > 
> > -- 
> > ================
> > Ruoyun  Huang
> > 
> 

Re: How to use "PortableRunner" in Python SDK?

Posted by Maximilian Michels <mx...@apache.org>.
Hi Ruoyun,

The output file will be within the container which is deleted after 
shutdown by default. You can keep the containers if you add the flag

   --retain_docker_containers

Note, this is from ManualDockerEnvironmentOptions.

The problem with batch is that it executes staged and will create 
multiple containers [1] which don't share the same local file system. So 
the wordcount only works reliably if you use a distributed file system.

Cheers,
Max

[1] You can prevent multiple containers by using
     --environment_cache_millis=10000

On 14.11.18 20:44, Ruoyun Huang wrote:
> Thanks Thomas!
> 
> My desktop runs Linux.  I was using gradle to run wordcount, and that 
> was how I got the job hanging. Since both of you get it working, I guess 
> more likely sth is wrong with my setup.
> 
> 
> By using Thmoas's python command line exactly as is, I am able to see 
> the job run succeeds, however two questions:
> 
> 1)  Did you check whether output file "/tmp/py-wordcount-direct" exists 
> or not?  I expect there should be a text output, but I don't see this 
> file afterwards.   (I am still in the stage building confidence in 
> telling what a succeeded run is.  Maybe I will try DataflowRunner and 
> cross check outputs).
> 
> 2)  Why it needs a "--streaming" arg?  Isn't this a static batch input, 
> by feeding a txt file input?  In fact, I got failure message if I remove 
> '--streaming', not sure if it is due to my setup again.
> 
> 
> On Wed, Nov 14, 2018 at 7:51 AM Thomas Weise <thw@apache.org 
> <ma...@apache.org>> wrote:
> 
>     Works for me on macOS as well.
> 
>     In case you don't launch the pipeline through Gradle, this would be
>     the command:
> 
>     python -m apache_beam.examples.wordcount \
>        --input=/etc/profile \
>        --output=/tmp/py-wordcount-direct \
>        --runner=PortableRunner \
>        --job_endpoint=localhost:8099 \
>        --parallelism=1 \
>        --OPTIONALflink_master=localhost:8081 \
>        --streaming
> 
>     We talked about adding the wordcount to pre-commit..
> 
>     Regarding using ULR vs. Flink runner: There seems to be confusion
>     between PortableRunner using the user supplied endpoint vs. trying
>     to launch a job server. I commented in the doc.
> 
>     Thomas
> 
> 
> 
>     On Wed, Nov 14, 2018 at 3:30 AM Maximilian Michels <mxm@apache.org
>     <ma...@apache.org>> wrote:
> 
>         Hi Ruoyun,
> 
>         I just ran the wordcount locally using the instructions on the
>         page.
>         I've tried the local file system and GCS. Both times it ran
>         successfully
>         and produced valid output.
> 
>         I'm assuming there is some problem with your setup. Which
>         platform are
>         you using? I'm on MacOS.
> 
>         Could you expand on the planned merge? From my understanding we
>         will
>         always need PortableRunner in Python to be able to submit
>         against the
>         Beam JobServer.
> 
>         Thanks,
>         Max
> 
>         On 14.11.18 00:39, Ruoyun Huang wrote:
>          > A quick follow-up on using current PortableRunner.
>          >
>          > I followed the exact three steps as Ankur and Maximilian
>         shared in
>          > https://beam.apache.org/roadmap/portability/#python-on-flink 
>         ;   The
>          > wordcount example keeps hanging after 10 minutes.  I also tried
>          > specifying explicit input/output args, either using gcs
>         folder or local
>          > file system, but none of them works.
>          >
>          > Spent some time looking into it but conclusion yet.  At this
>         point
>          > though, I guess it does not matter much any more, given we
>         already have
>          > the plan of merging PortableRunner into using java reference
>         runner
>          > (i.e. :beam-runners-reference-job-server).
>          >
>          > Still appreciated if someone can try out the python-on-flink
>          >
>         <https://beam.apache.org/roadmap/portability/#python-on-flink>instructions
> 
>          > in case it is just due to my local machine setup.  Thanks!
>          >
>          >
>          >
>          > On Thu, Nov 8, 2018 at 5:04 PM Ruoyun Huang
>         <ruoyun@google.com <ma...@google.com>
>          > <mailto:ruoyun@google.com <ma...@google.com>>> wrote:
>          >
>          >     Thanks Maximilian!
>          >
>          >     I am working on migrating existing PortableRunner to
>         using java ULR
>          >     (Link to Notes
>          >   
>           <https://docs.google.com/document/d/1S86saZqiDaE_M5wxO0zOQ_rwC6QHv7sp1BmGTm0dLNE/edit#>).
>          >     If this issue is non-trivial to solve, I would vote for
>         removing
>          >     this default behavior as part of the consolidation.
>          >
>          >     On Thu, Nov 8, 2018 at 2:58 AM Maximilian Michels
>         <mxm@apache.org <ma...@apache.org>
>          >     <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
>          >
>          >         In the long run, we should get rid of the
>         Docker-inside-Docker
>          >         approach,
>          >         which was only intended for testing anyways. It would
>         be cleaner to
>          >         start the SDK harness container alongside with
>         JobServer container.
>          >
>          >         Short term, I think it should be easy to either fix the
>          >         permissions of
>          >         the mounted "docker" executable or use a Docker image
>         for the
>          >         JobServer
>          >         which comes with Docker pre-installed.
>          >
>          >         JIRA: https://issues.apache.org/jira/browse/BEAM-6020
>          >
>          >         Thanks for reporting this Ruoyun!
>          >
>          >         -Max
>          >
>          >         On 08.11.18 00:10, Ruoyun Huang wrote:
>          >          > Thanks Ankur and Maximilian.
>          >          >
>          >          > Just for reference in case other people
>         encountering the same
>          >         error
>          >          > message, the "permission denied" error in my
>         original email
>          >         is exactly
>          >          > due to dockerinsidedocker issue that Ankur mentioned.
>          >         Thanks Ankur!
>          >          > Didn't make the link when you said it, had to
>         discover that
>          >         in a hard
>          >          > way (I thought it is due to my docker installation
>         messed up).
>          >          >
>          >          > On Tue, Nov 6, 2018 at 1:53 AM Maximilian Michels
>          >         <mxm@apache.org <ma...@apache.org>
>         <mailto:mxm@apache.org <ma...@apache.org>>
>          >          > <mailto:mxm@apache.org <ma...@apache.org>
>         <mailto:mxm@apache.org <ma...@apache.org>>>> wrote:
>          >          >
>          >          >     Hi,
>          >          >
>          >          >     Please follow
>          >          >
>         https://beam.apache.org/roadmap/portability/#python-on-flink
>          >          >
>          >          >     Cheers,
>          >          >     Max
>          >          >
>          >          >     On 06.11.18 01:14, Ankur Goenka wrote:
>          >          >      > Hi,
>          >          >      >
>          >          >      > The Portable Runner requires a job server
>         uri to work
>          >         with. The
>          >          >     current
>          >          >      > default job server docker image is broken
>         because of
>          >         docker inside
>          >          >      > docker issue.
>          >          >      >
>          >          >      > Please refer to
>          >          >      >
>          > https://beam.apache.org/roadmap/portability/#python-on-flink for
>          >          >     how to
>          >          >      > run a wordcount using Portable Flink Runner.
>          >          >      >
>          >          >      > Thanks,
>          >          >      > Ankur
>          >          >      >
>          >          >      > On Mon, Nov 5, 2018 at 3:41 PM Ruoyun Huang
>          >         <ruoyun@google.com <ma...@google.com>
>         <mailto:ruoyun@google.com <ma...@google.com>>
>          >          >     <mailto:ruoyun@google.com
>         <ma...@google.com> <mailto:ruoyun@google.com
>         <ma...@google.com>>>
>          >          >      > <mailto:ruoyun@google.com
>         <ma...@google.com> <mailto:ruoyun@google.com
>         <ma...@google.com>>
>          >         <mailto:ruoyun@google.com <ma...@google.com>
>         <mailto:ruoyun@google.com <ma...@google.com>>>>> wrote:
>          >          >      >
>          >          >      >     Hi, Folks,
>          >          >      >
>          >          >      >           I want to try out Python
>         PortableRunner, by
>          >         using following
>          >          >      >     command:
>          >          >      >
>          >          >      >     *sdk/python: python -m
>         apache_beam.examples.wordcount
>          >          >      >       --output=/tmp/test_output   --runner
>         PortableRunner*
>          >          >      >
>          >          >      >           It complains with following error
>         message:
>          >          >      >
>          >          >      >     Caused by: java.lang.Exception: The
>         user defined
>          >         'open()' method
>          >          >      >     caused an exception:
>         java.io.IOException: Cannot
>          >         run program
>          >          >      >     "docker": error=13, Permission denied
>          >          >      >     at
>          >          >
>          >         
>           org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
>          >          >      >     at
>          >          >      >
>          >          >
>          >         
>           org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>          >          >      >     at
>          >       
>           org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
>          >          >      >     ... 1 more
>          >          >      >     Caused by:
>          >          >      >
>          >          >
>          >         
>           org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.util.concurrent.UncheckedExecutionException:
>          >          >      >     java.io.IOException: Cannot run program
>         "docker":
>          >         error=13,
>          >          >      >     Permission denied
>          >          >      >     at
>          >          >      >
>          >          >
>          >         
>           org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4994)
>          >          >      >
>          >          >      >     ... 7 more
>          >          >      >
>          >          >      >
>          >          >      >
>          >          >      >     My py2 environment is properly
>         configured, because
>          >         DirectRunner
>          >          >      >     works.  Also I tested my docker
>         installation by
>          >         'docker run
>          >          >      >     hello-world ', no issue.
>          >          >      >
>          >          >      >
>          >          >      >     Thanks.
>          >          >      >     --
>          >          >      >     ================
>          >          >      >     Ruoyun  Huang
>          >          >      >
>          >          >
>          >          >
>          >          >
>          >          > --
>          >          > ================
>          >          > Ruoyun  Huang
>          >          >
>          >
>          >
>          >
>          >     --
>          >     ================
>          >     Ruoyun  Huang
>          >
>          >
>          >
>          > --
>          > ================
>          > Ruoyun  Huang
>          >
> 
> 
> 
> -- 
> ================
> Ruoyun  Huang
> 

Re: How to use "PortableRunner" in Python SDK?

Posted by Ruoyun Huang <ru...@google.com>.
Thanks Thomas!

My desktop runs Linux.  I was using gradle to run wordcount, and that was
how I got the job hanging. Since both of you get it working, I guess more
likely sth is wrong with my setup.


By using Thmoas's python command line exactly as is, I am able to see the
job run succeeds, however two questions:

1)  Did you check whether output file "/tmp/py-wordcount-direct" exists or
not?  I expect there should be a text output, but I don't see this file
afterwards.   (I am still in the stage building confidence in telling what
a succeeded run is.  Maybe I will try DataflowRunner and cross check
outputs).

2)  Why it needs a "--streaming" arg?  Isn't this a static batch input, by
feeding a txt file input?  In fact, I got failure message if I remove
'--streaming', not sure if it is due to my setup again.


On Wed, Nov 14, 2018 at 7:51 AM Thomas Weise <th...@apache.org> wrote:

> Works for me on macOS as well.
>
> In case you don't launch the pipeline through Gradle, this would be the
> command:
>
> python -m apache_beam.examples.wordcount \
>   --input=/etc/profile \
>   --output=/tmp/py-wordcount-direct \
>   --runner=PortableRunner \
>   --job_endpoint=localhost:8099 \
>   --parallelism=1 \
>   --OPTIONALflink_master=localhost:8081 \
>   --streaming
>
> We talked about adding the wordcount to pre-commit..
>
> Regarding using ULR vs. Flink runner: There seems to be confusion between
> PortableRunner using the user supplied endpoint vs. trying to launch a job
> server. I commented in the doc.
>
> Thomas
>
>
>
> On Wed, Nov 14, 2018 at 3:30 AM Maximilian Michels <mx...@apache.org> wrote:
>
>> Hi Ruoyun,
>>
>> I just ran the wordcount locally using the instructions on the page.
>> I've tried the local file system and GCS. Both times it ran successfully
>> and produced valid output.
>>
>> I'm assuming there is some problem with your setup. Which platform are
>> you using? I'm on MacOS.
>>
>> Could you expand on the planned merge? From my understanding we will
>> always need PortableRunner in Python to be able to submit against the
>> Beam JobServer.
>>
>> Thanks,
>> Max
>>
>> On 14.11.18 00:39, Ruoyun Huang wrote:
>> > A quick follow-up on using current PortableRunner.
>> >
>> > I followed the exact three steps as Ankur and Maximilian shared in
>> > https://beam.apache.org/roadmap/portability/#python-on-flink  ;   The
>> > wordcount example keeps hanging after 10 minutes.  I also tried
>> > specifying explicit input/output args, either using gcs folder or local
>> > file system, but none of them works.
>> >
>> > Spent some time looking into it but conclusion yet.  At this point
>> > though, I guess it does not matter much any more, given we already have
>> > the plan of merging PortableRunner into using java reference runner
>> > (i.e. :beam-runners-reference-job-server).
>> >
>> > Still appreciated if someone can try out the python-on-flink
>> > <https://beam.apache.org/roadmap/portability/#python-on-flink>instructions
>>
>> > in case it is just due to my local machine setup.  Thanks!
>> >
>> >
>> >
>> > On Thu, Nov 8, 2018 at 5:04 PM Ruoyun Huang <ruoyun@google.com
>> > <ma...@google.com>> wrote:
>> >
>> >     Thanks Maximilian!
>> >
>> >     I am working on migrating existing PortableRunner to using java ULR
>> >     (Link to Notes
>> >     <
>> https://docs.google.com/document/d/1S86saZqiDaE_M5wxO0zOQ_rwC6QHv7sp1BmGTm0dLNE/edit#
>> >).
>> >     If this issue is non-trivial to solve, I would vote for removing
>> >     this default behavior as part of the consolidation.
>> >
>> >     On Thu, Nov 8, 2018 at 2:58 AM Maximilian Michels <mxm@apache.org
>> >     <ma...@apache.org>> wrote:
>> >
>> >         In the long run, we should get rid of the Docker-inside-Docker
>> >         approach,
>> >         which was only intended for testing anyways. It would be
>> cleaner to
>> >         start the SDK harness container alongside with JobServer
>> container.
>> >
>> >         Short term, I think it should be easy to either fix the
>> >         permissions of
>> >         the mounted "docker" executable or use a Docker image for the
>> >         JobServer
>> >         which comes with Docker pre-installed.
>> >
>> >         JIRA: https://issues.apache.org/jira/browse/BEAM-6020
>> >
>> >         Thanks for reporting this Ruoyun!
>> >
>> >         -Max
>> >
>> >         On 08.11.18 00:10, Ruoyun Huang wrote:
>> >          > Thanks Ankur and Maximilian.
>> >          >
>> >          > Just for reference in case other people encountering the same
>> >         error
>> >          > message, the "permission denied" error in my original email
>> >         is exactly
>> >          > due to dockerinsidedocker issue that Ankur mentioned.
>> >         Thanks Ankur!
>> >          > Didn't make the link when you said it, had to discover that
>> >         in a hard
>> >          > way (I thought it is due to my docker installation messed
>> up).
>> >          >
>> >          > On Tue, Nov 6, 2018 at 1:53 AM Maximilian Michels
>> >         <mxm@apache.org <ma...@apache.org>
>> >          > <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
>> >          >
>> >          >     Hi,
>> >          >
>> >          >     Please follow
>> >          > https://beam.apache.org/roadmap/portability/#python-on-flink
>> >          >
>> >          >     Cheers,
>> >          >     Max
>> >          >
>> >          >     On 06.11.18 01:14, Ankur Goenka wrote:
>> >          >      > Hi,
>> >          >      >
>> >          >      > The Portable Runner requires a job server uri to work
>> >         with. The
>> >          >     current
>> >          >      > default job server docker image is broken because of
>> >         docker inside
>> >          >      > docker issue.
>> >          >      >
>> >          >      > Please refer to
>> >          >      >
>> >         https://beam.apache.org/roadmap/portability/#python-on-flink
>> for
>> >          >     how to
>> >          >      > run a wordcount using Portable Flink Runner.
>> >          >      >
>> >          >      > Thanks,
>> >          >      > Ankur
>> >          >      >
>> >          >      > On Mon, Nov 5, 2018 at 3:41 PM Ruoyun Huang
>> >         <ruoyun@google.com <ma...@google.com>
>> >          >     <mailto:ruoyun@google.com <ma...@google.com>>
>> >          >      > <mailto:ruoyun@google.com <ma...@google.com>
>> >         <mailto:ruoyun@google.com <ma...@google.com>>>> wrote:
>> >          >      >
>> >          >      >     Hi, Folks,
>> >          >      >
>> >          >      >           I want to try out Python PortableRunner, by
>> >         using following
>> >          >      >     command:
>> >          >      >
>> >          >      >     *sdk/python: python -m
>> apache_beam.examples.wordcount
>> >          >      >       --output=/tmp/test_output   --runner
>> PortableRunner*
>> >          >      >
>> >          >      >           It complains with following error message:
>> >          >      >
>> >          >      >     Caused by: java.lang.Exception: The user defined
>> >         'open()' method
>> >          >      >     caused an exception: java.io.IOException: Cannot
>> >         run program
>> >          >      >     "docker": error=13, Permission denied
>> >          >      >     at
>> >          >
>> >
>>  org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
>> >          >      >     at
>> >          >      >
>> >          >
>> >
>>  org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>> >          >      >     at
>> >         org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
>> >          >      >     ... 1 more
>> >          >      >     Caused by:
>> >          >      >
>> >          >
>> >
>>  org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.util.concurrent.UncheckedExecutionException:
>> >          >      >     java.io.IOException: Cannot run program "docker":
>> >         error=13,
>> >          >      >     Permission denied
>> >          >      >     at
>> >          >      >
>> >          >
>> >
>>  org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4994)
>> >          >      >
>> >          >      >     ... 7 more
>> >          >      >
>> >          >      >
>> >          >      >
>> >          >      >     My py2 environment is properly configured, because
>> >         DirectRunner
>> >          >      >     works.  Also I tested my docker installation by
>> >         'docker run
>> >          >      >     hello-world ', no issue.
>> >          >      >
>> >          >      >
>> >          >      >     Thanks.
>> >          >      >     --
>> >          >      >     ================
>> >          >      >     Ruoyun  Huang
>> >          >      >
>> >          >
>> >          >
>> >          >
>> >          > --
>> >          > ================
>> >          > Ruoyun  Huang
>> >          >
>> >
>> >
>> >
>> >     --
>> >     ================
>> >     Ruoyun  Huang
>> >
>> >
>> >
>> > --
>> > ================
>> > Ruoyun  Huang
>> >
>>
>

-- 
================
Ruoyun  Huang

Re: How to use "PortableRunner" in Python SDK?

Posted by Thomas Weise <th...@apache.org>.
Works for me on macOS as well.

In case you don't launch the pipeline through Gradle, this would be the
command:

python -m apache_beam.examples.wordcount \
  --input=/etc/profile \
  --output=/tmp/py-wordcount-direct \
  --runner=PortableRunner \
  --job_endpoint=localhost:8099 \
  --parallelism=1 \
  --OPTIONALflink_master=localhost:8081 \
  --streaming

We talked about adding the wordcount to pre-commit..

Regarding using ULR vs. Flink runner: There seems to be confusion between
PortableRunner using the user supplied endpoint vs. trying to launch a job
server. I commented in the doc.

Thomas



On Wed, Nov 14, 2018 at 3:30 AM Maximilian Michels <mx...@apache.org> wrote:

> Hi Ruoyun,
>
> I just ran the wordcount locally using the instructions on the page.
> I've tried the local file system and GCS. Both times it ran successfully
> and produced valid output.
>
> I'm assuming there is some problem with your setup. Which platform are
> you using? I'm on MacOS.
>
> Could you expand on the planned merge? From my understanding we will
> always need PortableRunner in Python to be able to submit against the
> Beam JobServer.
>
> Thanks,
> Max
>
> On 14.11.18 00:39, Ruoyun Huang wrote:
> > A quick follow-up on using current PortableRunner.
> >
> > I followed the exact three steps as Ankur and Maximilian shared in
> > https://beam.apache.org/roadmap/portability/#python-on-flink  ;   The
> > wordcount example keeps hanging after 10 minutes.  I also tried
> > specifying explicit input/output args, either using gcs folder or local
> > file system, but none of them works.
> >
> > Spent some time looking into it but conclusion yet.  At this point
> > though, I guess it does not matter much any more, given we already have
> > the plan of merging PortableRunner into using java reference runner
> > (i.e. :beam-runners-reference-job-server).
> >
> > Still appreciated if someone can try out the python-on-flink
> > <https://beam.apache.org/roadmap/portability/#python-on-flink>instructions
>
> > in case it is just due to my local machine setup.  Thanks!
> >
> >
> >
> > On Thu, Nov 8, 2018 at 5:04 PM Ruoyun Huang <ruoyun@google.com
> > <ma...@google.com>> wrote:
> >
> >     Thanks Maximilian!
> >
> >     I am working on migrating existing PortableRunner to using java ULR
> >     (Link to Notes
> >     <
> https://docs.google.com/document/d/1S86saZqiDaE_M5wxO0zOQ_rwC6QHv7sp1BmGTm0dLNE/edit#
> >).
> >     If this issue is non-trivial to solve, I would vote for removing
> >     this default behavior as part of the consolidation.
> >
> >     On Thu, Nov 8, 2018 at 2:58 AM Maximilian Michels <mxm@apache.org
> >     <ma...@apache.org>> wrote:
> >
> >         In the long run, we should get rid of the Docker-inside-Docker
> >         approach,
> >         which was only intended for testing anyways. It would be cleaner
> to
> >         start the SDK harness container alongside with JobServer
> container.
> >
> >         Short term, I think it should be easy to either fix the
> >         permissions of
> >         the mounted "docker" executable or use a Docker image for the
> >         JobServer
> >         which comes with Docker pre-installed.
> >
> >         JIRA: https://issues.apache.org/jira/browse/BEAM-6020
> >
> >         Thanks for reporting this Ruoyun!
> >
> >         -Max
> >
> >         On 08.11.18 00:10, Ruoyun Huang wrote:
> >          > Thanks Ankur and Maximilian.
> >          >
> >          > Just for reference in case other people encountering the same
> >         error
> >          > message, the "permission denied" error in my original email
> >         is exactly
> >          > due to dockerinsidedocker issue that Ankur mentioned.
> >         Thanks Ankur!
> >          > Didn't make the link when you said it, had to discover that
> >         in a hard
> >          > way (I thought it is due to my docker installation messed up).
> >          >
> >          > On Tue, Nov 6, 2018 at 1:53 AM Maximilian Michels
> >         <mxm@apache.org <ma...@apache.org>
> >          > <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
> >          >
> >          >     Hi,
> >          >
> >          >     Please follow
> >          > https://beam.apache.org/roadmap/portability/#python-on-flink
> >          >
> >          >     Cheers,
> >          >     Max
> >          >
> >          >     On 06.11.18 01:14, Ankur Goenka wrote:
> >          >      > Hi,
> >          >      >
> >          >      > The Portable Runner requires a job server uri to work
> >         with. The
> >          >     current
> >          >      > default job server docker image is broken because of
> >         docker inside
> >          >      > docker issue.
> >          >      >
> >          >      > Please refer to
> >          >      >
> >         https://beam.apache.org/roadmap/portability/#python-on-flink for
> >          >     how to
> >          >      > run a wordcount using Portable Flink Runner.
> >          >      >
> >          >      > Thanks,
> >          >      > Ankur
> >          >      >
> >          >      > On Mon, Nov 5, 2018 at 3:41 PM Ruoyun Huang
> >         <ruoyun@google.com <ma...@google.com>
> >          >     <mailto:ruoyun@google.com <ma...@google.com>>
> >          >      > <mailto:ruoyun@google.com <ma...@google.com>
> >         <mailto:ruoyun@google.com <ma...@google.com>>>> wrote:
> >          >      >
> >          >      >     Hi, Folks,
> >          >      >
> >          >      >           I want to try out Python PortableRunner, by
> >         using following
> >          >      >     command:
> >          >      >
> >          >      >     *sdk/python: python -m
> apache_beam.examples.wordcount
> >          >      >       --output=/tmp/test_output   --runner
> PortableRunner*
> >          >      >
> >          >      >           It complains with following error message:
> >          >      >
> >          >      >     Caused by: java.lang.Exception: The user defined
> >         'open()' method
> >          >      >     caused an exception: java.io.IOException: Cannot
> >         run program
> >          >      >     "docker": error=13, Permission denied
> >          >      >     at
> >          >
> >
>  org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
> >          >      >     at
> >          >      >
> >          >
> >
>  org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> >          >      >     at
> >         org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
> >          >      >     ... 1 more
> >          >      >     Caused by:
> >          >      >
> >          >
> >
>  org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.util.concurrent.UncheckedExecutionException:
> >          >      >     java.io.IOException: Cannot run program "docker":
> >         error=13,
> >          >      >     Permission denied
> >          >      >     at
> >          >      >
> >          >
> >
>  org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4994)
> >          >      >
> >          >      >     ... 7 more
> >          >      >
> >          >      >
> >          >      >
> >          >      >     My py2 environment is properly configured, because
> >         DirectRunner
> >          >      >     works.  Also I tested my docker installation by
> >         'docker run
> >          >      >     hello-world ', no issue.
> >          >      >
> >          >      >
> >          >      >     Thanks.
> >          >      >     --
> >          >      >     ================
> >          >      >     Ruoyun  Huang
> >          >      >
> >          >
> >          >
> >          >
> >          > --
> >          > ================
> >          > Ruoyun  Huang
> >          >
> >
> >
> >
> >     --
> >     ================
> >     Ruoyun  Huang
> >
> >
> >
> > --
> > ================
> > Ruoyun  Huang
> >
>

Re: How to use "PortableRunner" in Python SDK?

Posted by Robert Bradshaw <ro...@google.com>.
We should probably make the job endpoint mandatory for PortableRunner,
and offer a separate FlinkRunner (and others) that provides a default
endpoint and otherwise delegates everything down.

On Thu, Nov 15, 2018 at 12:07 PM Maximilian Michels <mx...@apache.org> wrote:
>
> > 1) The default behavior, where PortableRunner starts a flink server. It is confusing to new users
> It does that only if no JobServer endpoint is specified. AFAIK there a
> problems with the bootstrapping, it can definitely be improved.
>
> > 2) All the related docs and inline comments.  Similarly, it could be very confusing connecting PortableRunner to Flink server.
> +1 We definitely need to improve docs and usability.
>
> > 3) [Probably no longer an issue].   I couldn't make the flink server example working.  And I could not make example working on Java-ULR either.
> AFAIK Java URL hasn't received love for a long time.
>
> -Max
>
> On 14.11.18 20:57, Ruoyun Huang wrote:
> > To answer Maximilian's question.
> >
> > I am using Linux, debian distribution.
> >
> > It probably sounded too much when I used the word 'planned merge'. What
> > I really meant entails less change than it sounds. More specifically:
> >
> > 1) The default behavior, where PortableRunner starts a flink server.  It
> > is confusing to new users.
> > 2) All the related docs and inline comments.  Similarly, it could be
> > very confusing connecting PortableRunner to Flink server.
> > 3) [Probably no longer an issue].   I couldn't make the flink server
> > example working.  And I could not make example working on Java-ULR
> > either.  Both will require debugging for resolutions.  Thus I figured
> > maybe let us only focus on one single thing: the java-ULR part, without
> > worrying about Flink-server.   Again, looks like this may not be a valid
> > concern, given flink part is most likely due to my setup.
> >
> >
> > On Wed, Nov 14, 2018 at 3:30 AM Maximilian Michels <mxm@apache.org
> > <ma...@apache.org>> wrote:
> >
> >     Hi Ruoyun,
> >
> >     I just ran the wordcount locally using the instructions on the page.
> >     I've tried the local file system and GCS. Both times it ran
> >     successfully
> >     and produced valid output.
> >
> >     I'm assuming there is some problem with your setup. Which platform are
> >     you using? I'm on MacOS.
> >
> >     Could you expand on the planned merge? From my understanding we will
> >     always need PortableRunner in Python to be able to submit against the
> >     Beam JobServer.
> >
> >     Thanks,
> >     Max
> >
> >     On 14.11.18 00:39, Ruoyun Huang wrote:
> >      > A quick follow-up on using current PortableRunner.
> >      >
> >      > I followed the exact three steps as Ankur and Maximilian shared in
> >      > https://beam.apache.org/roadmap/portability/#python-on-flink  ;
> >       The
> >      > wordcount example keeps hanging after 10 minutes.  I also tried
> >      > specifying explicit input/output args, either using gcs folder or
> >     local
> >      > file system, but none of them works.
> >      >
> >      > Spent some time looking into it but conclusion yet.  At this point
> >      > though, I guess it does not matter much any more, given we
> >     already have
> >      > the plan of merging PortableRunner into using java reference runner
> >      > (i.e. :beam-runners-reference-job-server).
> >      >
> >      > Still appreciated if someone can try out the python-on-flink
> >      >
> >     <https://beam.apache.org/roadmap/portability/#python-on-flink>instructions
> >
> >      > in case it is just due to my local machine setup.  Thanks!
> >      >
> >      >
> >      >
> >      > On Thu, Nov 8, 2018 at 5:04 PM Ruoyun Huang <ruoyun@google.com
> >     <ma...@google.com>
> >      > <mailto:ruoyun@google.com <ma...@google.com>>> wrote:
> >      >
> >      >     Thanks Maximilian!
> >      >
> >      >     I am working on migrating existing PortableRunner to using
> >     java ULR
> >      >     (Link to Notes
> >      >
> >       <https://docs.google.com/document/d/1S86saZqiDaE_M5wxO0zOQ_rwC6QHv7sp1BmGTm0dLNE/edit#>).
> >      >     If this issue is non-trivial to solve, I would vote for removing
> >      >     this default behavior as part of the consolidation.
> >      >
> >      >     On Thu, Nov 8, 2018 at 2:58 AM Maximilian Michels
> >     <mxm@apache.org <ma...@apache.org>
> >      >     <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
> >      >
> >      >         In the long run, we should get rid of the
> >     Docker-inside-Docker
> >      >         approach,
> >      >         which was only intended for testing anyways. It would be
> >     cleaner to
> >      >         start the SDK harness container alongside with JobServer
> >     container.
> >      >
> >      >         Short term, I think it should be easy to either fix the
> >      >         permissions of
> >      >         the mounted "docker" executable or use a Docker image for the
> >      >         JobServer
> >      >         which comes with Docker pre-installed.
> >      >
> >      >         JIRA: https://issues.apache.org/jira/browse/BEAM-6020
> >      >
> >      >         Thanks for reporting this Ruoyun!
> >      >
> >      >         -Max
> >      >
> >      >         On 08.11.18 00:10, Ruoyun Huang wrote:
> >      >          > Thanks Ankur and Maximilian.
> >      >          >
> >      >          > Just for reference in case other people encountering
> >     the same
> >      >         error
> >      >          > message, the "permission denied" error in my original
> >     email
> >      >         is exactly
> >      >          > due to dockerinsidedocker issue that Ankur mentioned.
> >      >         Thanks Ankur!
> >      >          > Didn't make the link when you said it, had to discover
> >     that
> >      >         in a hard
> >      >          > way (I thought it is due to my docker installation
> >     messed up).
> >      >          >
> >      >          > On Tue, Nov 6, 2018 at 1:53 AM Maximilian Michels
> >      >         <mxm@apache.org <ma...@apache.org>
> >     <mailto:mxm@apache.org <ma...@apache.org>>
> >      >          > <mailto:mxm@apache.org <ma...@apache.org>
> >     <mailto:mxm@apache.org <ma...@apache.org>>>> wrote:
> >      >          >
> >      >          >     Hi,
> >      >          >
> >      >          >     Please follow
> >      >          >
> >     https://beam.apache.org/roadmap/portability/#python-on-flink
> >      >          >
> >      >          >     Cheers,
> >      >          >     Max
> >      >          >
> >      >          >     On 06.11.18 01:14, Ankur Goenka wrote:
> >      >          >      > Hi,
> >      >          >      >
> >      >          >      > The Portable Runner requires a job server uri
> >     to work
> >      >         with. The
> >      >          >     current
> >      >          >      > default job server docker image is broken
> >     because of
> >      >         docker inside
> >      >          >      > docker issue.
> >      >          >      >
> >      >          >      > Please refer to
> >      >          >      >
> >      > https://beam.apache.org/roadmap/portability/#python-on-flink for
> >      >          >     how to
> >      >          >      > run a wordcount using Portable Flink Runner.
> >      >          >      >
> >      >          >      > Thanks,
> >      >          >      > Ankur
> >      >          >      >
> >      >          >      > On Mon, Nov 5, 2018 at 3:41 PM Ruoyun Huang
> >      >         <ruoyun@google.com <ma...@google.com>
> >     <mailto:ruoyun@google.com <ma...@google.com>>
> >      >          >     <mailto:ruoyun@google.com
> >     <ma...@google.com> <mailto:ruoyun@google.com
> >     <ma...@google.com>>>
> >      >          >      > <mailto:ruoyun@google.com
> >     <ma...@google.com> <mailto:ruoyun@google.com
> >     <ma...@google.com>>
> >      >         <mailto:ruoyun@google.com <ma...@google.com>
> >     <mailto:ruoyun@google.com <ma...@google.com>>>>> wrote:
> >      >          >      >
> >      >          >      >     Hi, Folks,
> >      >          >      >
> >      >          >      >           I want to try out Python
> >     PortableRunner, by
> >      >         using following
> >      >          >      >     command:
> >      >          >      >
> >      >          >      >     *sdk/python: python -m
> >     apache_beam.examples.wordcount
> >      >          >      >       --output=/tmp/test_output   --runner
> >     PortableRunner*
> >      >          >      >
> >      >          >      >           It complains with following error
> >     message:
> >      >          >      >
> >      >          >      >     Caused by: java.lang.Exception: The user
> >     defined
> >      >         'open()' method
> >      >          >      >     caused an exception: java.io.IOException:
> >     Cannot
> >      >         run program
> >      >          >      >     "docker": error=13, Permission denied
> >      >          >      >     at
> >      >          >
> >      >
> >       org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
> >      >          >      >     at
> >      >          >      >
> >      >          >
> >      >
> >       org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> >      >          >      >     at
> >      >         org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
> >      >          >      >     ... 1 more
> >      >          >      >     Caused by:
> >      >          >      >
> >      >          >
> >      >
> >       org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.util.concurrent.UncheckedExecutionException:
> >      >          >      >     java.io.IOException: Cannot run program
> >     "docker":
> >      >         error=13,
> >      >          >      >     Permission denied
> >      >          >      >     at
> >      >          >      >
> >      >          >
> >      >
> >       org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4994)
> >      >          >      >
> >      >          >      >     ... 7 more
> >      >          >      >
> >      >          >      >
> >      >          >      >
> >      >          >      >     My py2 environment is properly configured,
> >     because
> >      >         DirectRunner
> >      >          >      >     works.  Also I tested my docker installation by
> >      >         'docker run
> >      >          >      >     hello-world ', no issue.
> >      >          >      >
> >      >          >      >
> >      >          >      >     Thanks.
> >      >          >      >     --
> >      >          >      >     ================
> >      >          >      >     Ruoyun  Huang
> >      >          >      >
> >      >          >
> >      >          >
> >      >          >
> >      >          > --
> >      >          > ================
> >      >          > Ruoyun  Huang
> >      >          >
> >      >
> >      >
> >      >
> >      >     --
> >      >     ================
> >      >     Ruoyun  Huang
> >      >
> >      >
> >      >
> >      > --
> >      > ================
> >      > Ruoyun  Huang
> >      >
> >
> >
> >
> > --
> > ================
> > Ruoyun  Huang
> >

Re: How to use "PortableRunner" in Python SDK?

Posted by Maximilian Michels <mx...@apache.org>.
> 1) The default behavior, where PortableRunner starts a flink server. It is confusing to new users
It does that only if no JobServer endpoint is specified. AFAIK there a 
problems with the bootstrapping, it can definitely be improved.

> 2) All the related docs and inline comments.  Similarly, it could be very confusing connecting PortableRunner to Flink server.
+1 We definitely need to improve docs and usability.

> 3) [Probably no longer an issue].   I couldn't make the flink server example working.  And I could not make example working on Java-ULR either. 
AFAIK Java URL hasn't received love for a long time.

-Max

On 14.11.18 20:57, Ruoyun Huang wrote:
> To answer Maximilian's question.
> 
> I am using Linux, debian distribution.
> 
> It probably sounded too much when I used the word 'planned merge'. What 
> I really meant entails less change than it sounds. More specifically:
> 
> 1) The default behavior, where PortableRunner starts a flink server.  It 
> is confusing to new users.
> 2) All the related docs and inline comments.  Similarly, it could be 
> very confusing connecting PortableRunner to Flink server.
> 3) [Probably no longer an issue].   I couldn't make the flink server 
> example working.  And I could not make example working on Java-ULR 
> either.  Both will require debugging for resolutions.  Thus I figured 
> maybe let us only focus on one single thing: the java-ULR part, without 
> worrying about Flink-server.   Again, looks like this may not be a valid 
> concern, given flink part is most likely due to my setup.
> 
> 
> On Wed, Nov 14, 2018 at 3:30 AM Maximilian Michels <mxm@apache.org 
> <ma...@apache.org>> wrote:
> 
>     Hi Ruoyun,
> 
>     I just ran the wordcount locally using the instructions on the page.
>     I've tried the local file system and GCS. Both times it ran
>     successfully
>     and produced valid output.
> 
>     I'm assuming there is some problem with your setup. Which platform are
>     you using? I'm on MacOS.
> 
>     Could you expand on the planned merge? From my understanding we will
>     always need PortableRunner in Python to be able to submit against the
>     Beam JobServer.
> 
>     Thanks,
>     Max
> 
>     On 14.11.18 00:39, Ruoyun Huang wrote:
>      > A quick follow-up on using current PortableRunner.
>      >
>      > I followed the exact three steps as Ankur and Maximilian shared in
>      > https://beam.apache.org/roadmap/portability/#python-on-flink  ; 
>       The
>      > wordcount example keeps hanging after 10 minutes.  I also tried
>      > specifying explicit input/output args, either using gcs folder or
>     local
>      > file system, but none of them works.
>      >
>      > Spent some time looking into it but conclusion yet.  At this point
>      > though, I guess it does not matter much any more, given we
>     already have
>      > the plan of merging PortableRunner into using java reference runner
>      > (i.e. :beam-runners-reference-job-server).
>      >
>      > Still appreciated if someone can try out the python-on-flink
>      >
>     <https://beam.apache.org/roadmap/portability/#python-on-flink>instructions
> 
>      > in case it is just due to my local machine setup.  Thanks!
>      >
>      >
>      >
>      > On Thu, Nov 8, 2018 at 5:04 PM Ruoyun Huang <ruoyun@google.com
>     <ma...@google.com>
>      > <mailto:ruoyun@google.com <ma...@google.com>>> wrote:
>      >
>      >     Thanks Maximilian!
>      >
>      >     I am working on migrating existing PortableRunner to using
>     java ULR
>      >     (Link to Notes
>      >   
>       <https://docs.google.com/document/d/1S86saZqiDaE_M5wxO0zOQ_rwC6QHv7sp1BmGTm0dLNE/edit#>).
>      >     If this issue is non-trivial to solve, I would vote for removing
>      >     this default behavior as part of the consolidation.
>      >
>      >     On Thu, Nov 8, 2018 at 2:58 AM Maximilian Michels
>     <mxm@apache.org <ma...@apache.org>
>      >     <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
>      >
>      >         In the long run, we should get rid of the
>     Docker-inside-Docker
>      >         approach,
>      >         which was only intended for testing anyways. It would be
>     cleaner to
>      >         start the SDK harness container alongside with JobServer
>     container.
>      >
>      >         Short term, I think it should be easy to either fix the
>      >         permissions of
>      >         the mounted "docker" executable or use a Docker image for the
>      >         JobServer
>      >         which comes with Docker pre-installed.
>      >
>      >         JIRA: https://issues.apache.org/jira/browse/BEAM-6020
>      >
>      >         Thanks for reporting this Ruoyun!
>      >
>      >         -Max
>      >
>      >         On 08.11.18 00:10, Ruoyun Huang wrote:
>      >          > Thanks Ankur and Maximilian.
>      >          >
>      >          > Just for reference in case other people encountering
>     the same
>      >         error
>      >          > message, the "permission denied" error in my original
>     email
>      >         is exactly
>      >          > due to dockerinsidedocker issue that Ankur mentioned.
>      >         Thanks Ankur!
>      >          > Didn't make the link when you said it, had to discover
>     that
>      >         in a hard
>      >          > way (I thought it is due to my docker installation
>     messed up).
>      >          >
>      >          > On Tue, Nov 6, 2018 at 1:53 AM Maximilian Michels
>      >         <mxm@apache.org <ma...@apache.org>
>     <mailto:mxm@apache.org <ma...@apache.org>>
>      >          > <mailto:mxm@apache.org <ma...@apache.org>
>     <mailto:mxm@apache.org <ma...@apache.org>>>> wrote:
>      >          >
>      >          >     Hi,
>      >          >
>      >          >     Please follow
>      >          >
>     https://beam.apache.org/roadmap/portability/#python-on-flink
>      >          >
>      >          >     Cheers,
>      >          >     Max
>      >          >
>      >          >     On 06.11.18 01:14, Ankur Goenka wrote:
>      >          >      > Hi,
>      >          >      >
>      >          >      > The Portable Runner requires a job server uri
>     to work
>      >         with. The
>      >          >     current
>      >          >      > default job server docker image is broken
>     because of
>      >         docker inside
>      >          >      > docker issue.
>      >          >      >
>      >          >      > Please refer to
>      >          >      >
>      > https://beam.apache.org/roadmap/portability/#python-on-flink for
>      >          >     how to
>      >          >      > run a wordcount using Portable Flink Runner.
>      >          >      >
>      >          >      > Thanks,
>      >          >      > Ankur
>      >          >      >
>      >          >      > On Mon, Nov 5, 2018 at 3:41 PM Ruoyun Huang
>      >         <ruoyun@google.com <ma...@google.com>
>     <mailto:ruoyun@google.com <ma...@google.com>>
>      >          >     <mailto:ruoyun@google.com
>     <ma...@google.com> <mailto:ruoyun@google.com
>     <ma...@google.com>>>
>      >          >      > <mailto:ruoyun@google.com
>     <ma...@google.com> <mailto:ruoyun@google.com
>     <ma...@google.com>>
>      >         <mailto:ruoyun@google.com <ma...@google.com>
>     <mailto:ruoyun@google.com <ma...@google.com>>>>> wrote:
>      >          >      >
>      >          >      >     Hi, Folks,
>      >          >      >
>      >          >      >           I want to try out Python
>     PortableRunner, by
>      >         using following
>      >          >      >     command:
>      >          >      >
>      >          >      >     *sdk/python: python -m
>     apache_beam.examples.wordcount
>      >          >      >       --output=/tmp/test_output   --runner
>     PortableRunner*
>      >          >      >
>      >          >      >           It complains with following error
>     message:
>      >          >      >
>      >          >      >     Caused by: java.lang.Exception: The user
>     defined
>      >         'open()' method
>      >          >      >     caused an exception: java.io.IOException:
>     Cannot
>      >         run program
>      >          >      >     "docker": error=13, Permission denied
>      >          >      >     at
>      >          >
>      >         
>       org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
>      >          >      >     at
>      >          >      >
>      >          >
>      >         
>       org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>      >          >      >     at
>      >         org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
>      >          >      >     ... 1 more
>      >          >      >     Caused by:
>      >          >      >
>      >          >
>      >         
>       org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.util.concurrent.UncheckedExecutionException:
>      >          >      >     java.io.IOException: Cannot run program
>     "docker":
>      >         error=13,
>      >          >      >     Permission denied
>      >          >      >     at
>      >          >      >
>      >          >
>      >         
>       org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4994)
>      >          >      >
>      >          >      >     ... 7 more
>      >          >      >
>      >          >      >
>      >          >      >
>      >          >      >     My py2 environment is properly configured,
>     because
>      >         DirectRunner
>      >          >      >     works.  Also I tested my docker installation by
>      >         'docker run
>      >          >      >     hello-world ', no issue.
>      >          >      >
>      >          >      >
>      >          >      >     Thanks.
>      >          >      >     --
>      >          >      >     ================
>      >          >      >     Ruoyun  Huang
>      >          >      >
>      >          >
>      >          >
>      >          >
>      >          > --
>      >          > ================
>      >          > Ruoyun  Huang
>      >          >
>      >
>      >
>      >
>      >     --
>      >     ================
>      >     Ruoyun  Huang
>      >
>      >
>      >
>      > --
>      > ================
>      > Ruoyun  Huang
>      >
> 
> 
> 
> -- 
> ================
> Ruoyun  Huang
> 

Re: How to use "PortableRunner" in Python SDK?

Posted by Ankur Goenka <go...@google.com>.
Hi Jun,

Thanks for reporting the error.
This seems to be because of mismatch in SDKHarness (Docker image) and the
Python SDK. As we are actively developing, this can happen.

Can you please retry after rebuilding the docker images and the Python sdk
from master and install the python sdk to your virtual environment.

Thanks,
Ankur

On Tue, Jan 22, 2019 at 11:44 AM junwan01@gmail.com <ju...@gmail.com>
wrote:

> I downgraded the Flink from 1.7.1 to 1.5.6, and was able to go further,
> but still fails, here is the latest error from Flink. Thanks!
>
> the job cmd I launched : python -m apache_beam.examples.wordcount
> --input=/etc/profile --output=/tmp/py-wordcount-direct
> --runner=PortableRunner --job_endpoint=localhost:8099 --parallelism=1
> --OPTIONALflink_master=localhost:8081 --streaming
> --experiments=worker_threads=100 --execution_mode_for_batch=BATCH_FORCED
> --experiments=beam_fn_api
>
> Jun
>
> ---- log starts ----
> [flink-runner-job-server] INFO
> org.apache.beam.runners.flink.FlinkExecutionEnvironments - Running remotely
> at localhost:8081
> [flink-runner-job-server] WARN
> org.apache.flink.configuration.Configuration - Config uses deprecated
> configuration key 'jobmanager.rpc.address' instead of proper key
> 'rest.address'
> [flink-runner-job-server] INFO org.apache.flink.runtime.rest.RestClient -
> Rest client endpoint started.
> [flink-runner-job-server] INFO
> org.apache.flink.client.program.rest.RestClusterClient - Submitting job
> 4ecb5e5cfd4718de440f48cbfaf7216a (detached: false).
> [flink-runner-job-server] INFO org.apache.flink.runtime.rest.RestClient -
> Shutting down rest endpoint.
> [flink-runner-job-server] INFO org.apache.flink.runtime.rest.RestClient -
> Rest endpoint shutdown complete.
> [flink-runner-job-server] ERROR
> org.apache.beam.runners.flink.FlinkJobInvocation - Error during job
> invocation
> BeamApp-jwan-0121211115-328178bb_d2dadedb-6dbf-4c1e-82d4-208a2d3177e9.
> org.apache.flink.client.program.ProgramInvocationException: Job
> 4ecb5e5cfd4718de440f48cbfaf7216a failed.
>         at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
>         at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
>         at
> org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.executeRemotely(FlinkExecutionEnvironments.java:355)
>         at
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:179)
>         at
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator$StreamingTranslationContext.execute(FlinkStreamingPortablePipelineTranslator.java:158)
>         at
> org.apache.beam.runners.flink.FlinkJobInvocation.runPipelineWithTranslator(FlinkJobInvocation.java:142)
>         at
> org.apache.beam.runners.flink.FlinkJobInvocation.runPipeline(FlinkJobInvocation.java:112)
>         at org.apache.beam.repackaged.beam_runners_flink_2.11.com
> .google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:111)
>         at org.apache.beam.repackaged.beam_runners_flink_2.11.com
> .google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:58)
>         at org.apache.beam.repackaged.beam_runners_flink_2.11.com
> .google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:75)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
>         at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>         at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
>         ... 12 more
> Caused by: java.lang.RuntimeException: Exception occurred while processing
> valve output watermark:
>         at org.apache.flink.streaming.runtime.io
> .StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
>         at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>         at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>         at org.apache.flink.streaming.runtime.io
> .StreamInputProcessor.processInput(StreamInputProcessor.java:184)
>         at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
>         ... 1 more
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:483)
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:694)
>         at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.emitWatermark(DoFnOperator.java:591)
>         at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:581)
>         at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:540)
>         at org.apache.flink.streaming.runtime.io
> .StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
>         ... 7 more
> Caused by: java.lang.RuntimeException: Failed to finish remote bundle
>         at
> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:626)
>         at
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:87)
>         at
> org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
>         at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:677)
>         at
> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.processWatermark(ExecutableStageDoFnOperator.java:471)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:479)
>         ... 12 more
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.RuntimeException: Error received from SDK harness for instruction
> 21: Traceback (most recent call last):
>   File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 148, in _execute
>     response = task()
>   File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 183, in <lambda>
>     self._execute(lambda: worker.do_instruction(work), work)
>   File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 256, in do_instruction
>     request.instruction_id)
>   File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 270, in process_bundle
>     request.process_bundle_descriptor_reference) as bundle_processor:
>   File "/usr/local/lib/python2.7/contextlib.py", line 17, in __enter__
>     return self.gen.next()
>   File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 292, in get_bundle_processor
>     self.data_channel_factory)
>   File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 404, in __init__
>     self.ops = self.create_execution_tree(self.process_bundle_descriptor)
>   File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 448, in create_execution_tree
>     descriptor.transforms, key=topological_height, reverse=True)])
>   File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 381, in wrapper
>     result = cache[args] = func(*args)
>   File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 431, in get_operation
>     in descriptor.transforms[transform_id].outputs.items()
>   File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 430, in <dictcomp>
>     for tag, pcoll_id
>   File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 381, in wrapper
>     result = cache[args] = func(*args)
>   File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 434, in get_operation
>     transform_id, transform_consumers)
>   File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 584, in create_operation
>     return creator(self, transform_id, transform_proto, payload, consumers)
>   File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 768, in create
>     serialized_fn, parameter)
>   File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 806, in _create_pardo_operation
>     dofn_data = pickler.loads(serialized_fn)
>   File
> "/usr/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py",
> line 247, in loads
>     return dill.loads(s)
>   File "/usr/local/lib/python2.7/site-packages/dill/_dill.py", line 316,
> in loads
>     return load(file, ignore)
>   File "/usr/local/lib/python2.7/site-packages/dill/_dill.py", line 304,
> in load
>     obj = pik.load()
>   File "/usr/local/lib/python2.7/pickle.py", line 864, in load
>     dispatch[key](self)
>   File "/usr/local/lib/python2.7/pickle.py", line 1230, in load_build
>     d = inst.__dict__
> AttributeError: 'apache_beam.utils.windowed_value.PaneInfo' object has no
> attribute '__dict__'
>
>         at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>         at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>         at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
>         at
> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$ActiveBundle.close(SdkHarnessClient.java:263)
>         at
> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:623)
>         ... 17 more
> Caused by: java.lang.RuntimeException: Error received from SDK harness for
> instruction 21: Traceback (most recent call last):
>   File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 148, in _execute
>     response = task()
>   File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 183, in <lambda>
>     self._execute(lambda: worker.do_instruction(work), work)
>   File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 256, in do_instruction
>     request.instruction_id)
>   File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 270, in process_bundle
>     request.process_bundle_descriptor_reference) as bundle_processor:
>   File "/usr/local/lib/python2.7/contextlib.py", line 17, in __enter__
>     return self.gen.next()
>   File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 292, in get_bundle_processor
>     self.data_channel_factory)
>   File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 404, in __init__
>     self.ops = self.create_execution_tree(self.process_bundle_descriptor)
>   File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 448, in create_execution_tree
>     descriptor.transforms, key=topological_height, reverse=True)])
>   File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 381, in wrapper
>     result = cache[args] = func(*args)
>   File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 431, in get_operation
>     in descriptor.transforms[transform_id].outputs.items()
>   File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 430, in <dictcomp>
>     for tag, pcoll_id
>   File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 381, in wrapper
>     result = cache[args] = func(*args)
>   File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 434, in get_operation
>     transform_id, transform_consumers)
>   File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 584, in create_operation
>     return creator(self, transform_id, transform_proto, payload, consumers)
>   File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 768, in create
>     serialized_fn, parameter)
>   File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 806, in _create_pardo_operation
>     dofn_data = pickler.loads(serialized_fn)
>   File
> "/usr/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py",
> line 247, in loads
>     return dill.loads(s)
>   File "/usr/local/lib/python2.7/site-packages/dill/_dill.py", line 316,
> in loads
>     return load(file, ignore)
>   File "/usr/local/lib/python2.7/site-packages/dill/_dill.py", line 304,
> in load
>     obj = pik.load()
>   File "/usr/local/lib/python2.7/pickle.py", line 864, in load
>     dispatch[key](self)
>   File "/usr/local/lib/python2.7/pickle.py", line 1230, in load_build
>     d = inst.__dict__
> AttributeError: 'apache_beam.utils.windowed_value.PaneInfo' object has no
> attribute '__dict__'
>
>         at
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)
>         at
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:140)
>         at
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:248)
>         at
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
>         at
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
>         at
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:263)
>         at
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:683)
>         at
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>         at
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         ... 1 more
> ---- log ends ----
>
>
> On 2018/11/14 19:57:19, Ruoyun Huang <ru...@google.com> wrote:
> > To answer Maximilian's question.
> >
> > I am using Linux, debian distribution.
> >
> > It probably sounded too much when I used the word 'planned merge'. What I
> > really meant entails less change than it sounds. More specifically:
> >
> > 1) The default behavior, where PortableRunner starts a flink server.  It
> is
> > confusing to new users.
> > 2) All the related docs and inline comments.  Similarly, it could be very
> > confusing connecting PortableRunner to Flink server.
> > 3) [Probably no longer an issue].   I couldn't make the flink server
> > example working.  And I could not make example working on Java-ULR
> either.
> > Both will require debugging for resolutions.  Thus I figured maybe let us
> > only focus on one single thing: the java-ULR part, without worrying about
> > Flink-server.   Again, looks like this may not be a valid concern, given
> > flink part is most likely due to my setup.
> >
> >
> > On Wed, Nov 14, 2018 at 3:30 AM Maximilian Michels <mx...@apache.org>
> wrote:
> >
> > > Hi Ruoyun,
> > >
> > > I just ran the wordcount locally using the instructions on the page.
> > > I've tried the local file system and GCS. Both times it ran
> successfully
> > > and produced valid output.
> > >
> > > I'm assuming there is some problem with your setup. Which platform are
> > > you using? I'm on MacOS.
> > >
> > > Could you expand on the planned merge? From my understanding we will
> > > always need PortableRunner in Python to be able to submit against the
> > > Beam JobServer.
> > >
> > > Thanks,
> > > Max
> > >
> > > On 14.11.18 00:39, Ruoyun Huang wrote:
> > > > A quick follow-up on using current PortableRunner.
> > > >
> > > > I followed the exact three steps as Ankur and Maximilian shared in
> > > > https://beam.apache.org/roadmap/portability/#python-on-flink  ;
>  The
> > > > wordcount example keeps hanging after 10 minutes.  I also tried
> > > > specifying explicit input/output args, either using gcs folder or
> local
> > > > file system, but none of them works.
> > > >
> > > > Spent some time looking into it but conclusion yet.  At this point
> > > > though, I guess it does not matter much any more, given we already
> have
> > > > the plan of merging PortableRunner into using java reference runner
> > > > (i.e. :beam-runners-reference-job-server).
> > > >
> > > > Still appreciated if someone can try out the python-on-flink
> > > > <https://beam.apache.org/roadmap/portability/#python-on-flink
> >instructions
> > >
> > > > in case it is just due to my local machine setup.  Thanks!
> > > >
> > > >
> > > >
> > > > On Thu, Nov 8, 2018 at 5:04 PM Ruoyun Huang <ruoyun@google.com
> > > > <ma...@google.com>> wrote:
> > > >
> > > >     Thanks Maximilian!
> > > >
> > > >     I am working on migrating existing PortableRunner to using java
> ULR
> > > >     (Link to Notes
> > > >     <
> > >
> https://docs.google.com/document/d/1S86saZqiDaE_M5wxO0zOQ_rwC6QHv7sp1BmGTm0dLNE/edit#
> > > >).
> > > >     If this issue is non-trivial to solve, I would vote for removing
> > > >     this default behavior as part of the consolidation.
> > > >
> > > >     On Thu, Nov 8, 2018 at 2:58 AM Maximilian Michels <
> mxm@apache.org
> > > >     <ma...@apache.org>> wrote:
> > > >
> > > >         In the long run, we should get rid of the
> Docker-inside-Docker
> > > >         approach,
> > > >         which was only intended for testing anyways. It would be
> cleaner
> > > to
> > > >         start the SDK harness container alongside with JobServer
> > > container.
> > > >
> > > >         Short term, I think it should be easy to either fix the
> > > >         permissions of
> > > >         the mounted "docker" executable or use a Docker image for the
> > > >         JobServer
> > > >         which comes with Docker pre-installed.
> > > >
> > > >         JIRA: https://issues.apache.org/jira/browse/BEAM-6020
> > > >
> > > >         Thanks for reporting this Ruoyun!
> > > >
> > > >         -Max
> > > >
> > > >         On 08.11.18 00:10, Ruoyun Huang wrote:
> > > >          > Thanks Ankur and Maximilian.
> > > >          >
> > > >          > Just for reference in case other people encountering the
> same
> > > >         error
> > > >          > message, the "permission denied" error in my original
> email
> > > >         is exactly
> > > >          > due to dockerinsidedocker issue that Ankur mentioned.
> > > >         Thanks Ankur!
> > > >          > Didn't make the link when you said it, had to discover
> that
> > > >         in a hard
> > > >          > way (I thought it is due to my docker installation messed
> up).
> > > >          >
> > > >          > On Tue, Nov 6, 2018 at 1:53 AM Maximilian Michels
> > > >         <mxm@apache.org <ma...@apache.org>
> > > >          > <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
> > > >          >
> > > >          >     Hi,
> > > >          >
> > > >          >     Please follow
> > > >          >
> https://beam.apache.org/roadmap/portability/#python-on-flink
> > > >          >
> > > >          >     Cheers,
> > > >          >     Max
> > > >          >
> > > >          >     On 06.11.18 01:14, Ankur Goenka wrote:
> > > >          >      > Hi,
> > > >          >      >
> > > >          >      > The Portable Runner requires a job server uri to
> work
> > > >         with. The
> > > >          >     current
> > > >          >      > default job server docker image is broken because
> of
> > > >         docker inside
> > > >          >      > docker issue.
> > > >          >      >
> > > >          >      > Please refer to
> > > >          >      >
> > > >         https://beam.apache.org/roadmap/portability/#python-on-flink
> for
> > > >          >     how to
> > > >          >      > run a wordcount using Portable Flink Runner.
> > > >          >      >
> > > >          >      > Thanks,
> > > >          >      > Ankur
> > > >          >      >
> > > >          >      > On Mon, Nov 5, 2018 at 3:41 PM Ruoyun Huang
> > > >         <ruoyun@google.com <ma...@google.com>
> > > >          >     <mailto:ruoyun@google.com <ma...@google.com>>
> > > >          >      > <mailto:ruoyun@google.com <mailto:
> ruoyun@google.com>
> > > >         <mailto:ruoyun@google.com <ma...@google.com>>>>
> wrote:
> > > >          >      >
> > > >          >      >     Hi, Folks,
> > > >          >      >
> > > >          >      >           I want to try out Python PortableRunner,
> by
> > > >         using following
> > > >          >      >     command:
> > > >          >      >
> > > >          >      >     *sdk/python: python -m
> > > apache_beam.examples.wordcount
> > > >          >      >       --output=/tmp/test_output   --runner
> > > PortableRunner*
> > > >          >      >
> > > >          >      >           It complains with following error
> message:
> > > >          >      >
> > > >          >      >     Caused by: java.lang.Exception: The user
> defined
> > > >         'open()' method
> > > >          >      >     caused an exception: java.io.IOException:
> Cannot
> > > >         run program
> > > >          >      >     "docker": error=13, Permission denied
> > > >          >      >     at
> > > >          >
> > > >
> > >  org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
> > > >          >      >     at
> > > >          >      >
> > > >          >
> > > >
> > >
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> > > >          >      >     at
> > > >         org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
> > > >          >      >     ... 1 more
> > > >          >      >     Caused by:
> > > >          >      >
> > > >          >
> > > >
> > >
> org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.util.concurrent.UncheckedExecutionException:
> > > >          >      >     java.io.IOException: Cannot run program
> "docker":
> > > >         error=13,
> > > >          >      >     Permission denied
> > > >          >      >     at
> > > >          >      >
> > > >          >
> > > >
> > >
> org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4994)
> > > >          >      >
> > > >          >      >     ... 7 more
> > > >          >      >
> > > >          >      >
> > > >          >      >
> > > >          >      >     My py2 environment is properly configured,
> because
> > > >         DirectRunner
> > > >          >      >     works.  Also I tested my docker installation by
> > > >         'docker run
> > > >          >      >     hello-world ', no issue.
> > > >          >      >
> > > >          >      >
> > > >          >      >     Thanks.
> > > >          >      >     --
> > > >          >      >     ================
> > > >          >      >     Ruoyun  Huang
> > > >          >      >
> > > >          >
> > > >          >
> > > >          >
> > > >          > --
> > > >          > ================
> > > >          > Ruoyun  Huang
> > > >          >
> > > >
> > > >
> > > >
> > > >     --
> > > >     ================
> > > >     Ruoyun  Huang
> > > >
> > > >
> > > >
> > > > --
> > > > ================
> > > > Ruoyun  Huang
> > > >
> > >
> >
> >
> > --
> > ================
> > Ruoyun  Huang
> >
>

Re: How to use "PortableRunner" in Python SDK?

Posted by ju...@gmail.com, ju...@gmail.com.
I downgraded the Flink from 1.7.1 to 1.5.6, and was able to go further, but still fails, here is the latest error from Flink. Thanks!

the job cmd I launched : python -m apache_beam.examples.wordcount --input=/etc/profile --output=/tmp/py-wordcount-direct --runner=PortableRunner --job_endpoint=localhost:8099 --parallelism=1 --OPTIONALflink_master=localhost:8081 --streaming --experiments=worker_threads=100 --execution_mode_for_batch=BATCH_FORCED --experiments=beam_fn_api

Jun
 
---- log starts ----
[flink-runner-job-server] INFO org.apache.beam.runners.flink.FlinkExecutionEnvironments - Running remotely at localhost:8081
[flink-runner-job-server] WARN org.apache.flink.configuration.Configuration - Config uses deprecated configuration key 'jobmanager.rpc.address' instead of proper key 'rest.address'
[flink-runner-job-server] INFO org.apache.flink.runtime.rest.RestClient - Rest client endpoint started.
[flink-runner-job-server] INFO org.apache.flink.client.program.rest.RestClusterClient - Submitting job 4ecb5e5cfd4718de440f48cbfaf7216a (detached: false).
[flink-runner-job-server] INFO org.apache.flink.runtime.rest.RestClient - Shutting down rest endpoint.
[flink-runner-job-server] INFO org.apache.flink.runtime.rest.RestClient - Rest endpoint shutdown complete.
[flink-runner-job-server] ERROR org.apache.beam.runners.flink.FlinkJobInvocation - Error during job invocation BeamApp-jwan-0121211115-328178bb_d2dadedb-6dbf-4c1e-82d4-208a2d3177e9.
org.apache.flink.client.program.ProgramInvocationException: Job 4ecb5e5cfd4718de440f48cbfaf7216a failed.
        at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
        at org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.executeRemotely(FlinkExecutionEnvironments.java:355)
        at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:179)
        at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator$StreamingTranslationContext.execute(FlinkStreamingPortablePipelineTranslator.java:158)
        at org.apache.beam.runners.flink.FlinkJobInvocation.runPipelineWithTranslator(FlinkJobInvocation.java:142)
        at org.apache.beam.runners.flink.FlinkJobInvocation.runPipeline(FlinkJobInvocation.java:112)
        at org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:111)
        at org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:58)
        at org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:75)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
        at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
        ... 12 more
Caused by: java.lang.RuntimeException: Exception occurred while processing valve output watermark:
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
        ... 1 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:483)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:694)
        at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.emitWatermark(DoFnOperator.java:591)
        at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:581)
        at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:540)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
        ... 7 more
Caused by: java.lang.RuntimeException: Failed to finish remote bundle
        at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:626)
        at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:87)
        at org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
        at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:677)
        at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.processWatermark(ExecutableStageDoFnOperator.java:471)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:479)
        ... 12 more
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction 21: Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 148, in _execute
    response = task()
  File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 183, in <lambda>
    self._execute(lambda: worker.do_instruction(work), work)
  File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 256, in do_instruction
    request.instruction_id)
  File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 270, in process_bundle
    request.process_bundle_descriptor_reference) as bundle_processor:
  File "/usr/local/lib/python2.7/contextlib.py", line 17, in __enter__
    return self.gen.next()
  File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 292, in get_bundle_processor
    self.data_channel_factory)
  File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 404, in __init__
    self.ops = self.create_execution_tree(self.process_bundle_descriptor)
  File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 448, in create_execution_tree
    descriptor.transforms, key=topological_height, reverse=True)])
  File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 381, in wrapper
    result = cache[args] = func(*args)
  File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 431, in get_operation
    in descriptor.transforms[transform_id].outputs.items()
  File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 430, in <dictcomp>
    for tag, pcoll_id
  File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 381, in wrapper
    result = cache[args] = func(*args)
  File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 434, in get_operation
    transform_id, transform_consumers)
  File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 584, in create_operation
    return creator(self, transform_id, transform_proto, payload, consumers)
  File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 768, in create
    serialized_fn, parameter)
  File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 806, in _create_pardo_operation
    dofn_data = pickler.loads(serialized_fn)
  File "/usr/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py", line 247, in loads
    return dill.loads(s)
  File "/usr/local/lib/python2.7/site-packages/dill/_dill.py", line 316, in loads
    return load(file, ignore)
  File "/usr/local/lib/python2.7/site-packages/dill/_dill.py", line 304, in load
    obj = pik.load()
  File "/usr/local/lib/python2.7/pickle.py", line 864, in load
    dispatch[key](self)
  File "/usr/local/lib/python2.7/pickle.py", line 1230, in load_build
    d = inst.__dict__
AttributeError: 'apache_beam.utils.windowed_value.PaneInfo' object has no attribute '__dict__'

        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
        at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
        at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$ActiveBundle.close(SdkHarnessClient.java:263)
        at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:623)
        ... 17 more
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 21: Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 148, in _execute
    response = task()
  File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 183, in <lambda>
    self._execute(lambda: worker.do_instruction(work), work)
  File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 256, in do_instruction
    request.instruction_id)
  File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 270, in process_bundle
    request.process_bundle_descriptor_reference) as bundle_processor:
  File "/usr/local/lib/python2.7/contextlib.py", line 17, in __enter__
    return self.gen.next()
  File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 292, in get_bundle_processor
    self.data_channel_factory)
  File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 404, in __init__
    self.ops = self.create_execution_tree(self.process_bundle_descriptor)
  File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 448, in create_execution_tree
    descriptor.transforms, key=topological_height, reverse=True)])
  File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 381, in wrapper
    result = cache[args] = func(*args)
  File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 431, in get_operation
    in descriptor.transforms[transform_id].outputs.items()
  File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 430, in <dictcomp>
    for tag, pcoll_id
  File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 381, in wrapper
    result = cache[args] = func(*args)
  File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 434, in get_operation
    transform_id, transform_consumers)
  File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 584, in create_operation
    return creator(self, transform_id, transform_proto, payload, consumers)
  File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 768, in create
    serialized_fn, parameter)
  File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 806, in _create_pardo_operation
    dofn_data = pickler.loads(serialized_fn)
  File "/usr/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py", line 247, in loads
    return dill.loads(s)
  File "/usr/local/lib/python2.7/site-packages/dill/_dill.py", line 316, in loads
    return load(file, ignore)
  File "/usr/local/lib/python2.7/site-packages/dill/_dill.py", line 304, in load
    obj = pik.load()
  File "/usr/local/lib/python2.7/pickle.py", line 864, in load
    dispatch[key](self)
  File "/usr/local/lib/python2.7/pickle.py", line 1230, in load_build
    d = inst.__dict__
AttributeError: 'apache_beam.utils.windowed_value.PaneInfo' object has no attribute '__dict__'

        at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)
        at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:140)
        at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:248)
        at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
        at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
        at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:263)
        at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:683)
        at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
---- log ends ----


On 2018/11/14 19:57:19, Ruoyun Huang <ru...@google.com> wrote: 
> To answer Maximilian's question.
> 
> I am using Linux, debian distribution.
> 
> It probably sounded too much when I used the word 'planned merge'. What I
> really meant entails less change than it sounds. More specifically:
> 
> 1) The default behavior, where PortableRunner starts a flink server.  It is
> confusing to new users.
> 2) All the related docs and inline comments.  Similarly, it could be very
> confusing connecting PortableRunner to Flink server.
> 3) [Probably no longer an issue].   I couldn't make the flink server
> example working.  And I could not make example working on Java-ULR either.
> Both will require debugging for resolutions.  Thus I figured maybe let us
> only focus on one single thing: the java-ULR part, without worrying about
> Flink-server.   Again, looks like this may not be a valid concern, given
> flink part is most likely due to my setup.
> 
> 
> On Wed, Nov 14, 2018 at 3:30 AM Maximilian Michels <mx...@apache.org> wrote:
> 
> > Hi Ruoyun,
> >
> > I just ran the wordcount locally using the instructions on the page.
> > I've tried the local file system and GCS. Both times it ran successfully
> > and produced valid output.
> >
> > I'm assuming there is some problem with your setup. Which platform are
> > you using? I'm on MacOS.
> >
> > Could you expand on the planned merge? From my understanding we will
> > always need PortableRunner in Python to be able to submit against the
> > Beam JobServer.
> >
> > Thanks,
> > Max
> >
> > On 14.11.18 00:39, Ruoyun Huang wrote:
> > > A quick follow-up on using current PortableRunner.
> > >
> > > I followed the exact three steps as Ankur and Maximilian shared in
> > > https://beam.apache.org/roadmap/portability/#python-on-flink  ;   The
> > > wordcount example keeps hanging after 10 minutes.  I also tried
> > > specifying explicit input/output args, either using gcs folder or local
> > > file system, but none of them works.
> > >
> > > Spent some time looking into it but conclusion yet.  At this point
> > > though, I guess it does not matter much any more, given we already have
> > > the plan of merging PortableRunner into using java reference runner
> > > (i.e. :beam-runners-reference-job-server).
> > >
> > > Still appreciated if someone can try out the python-on-flink
> > > <https://beam.apache.org/roadmap/portability/#python-on-flink>instructions
> >
> > > in case it is just due to my local machine setup.  Thanks!
> > >
> > >
> > >
> > > On Thu, Nov 8, 2018 at 5:04 PM Ruoyun Huang <ruoyun@google.com
> > > <ma...@google.com>> wrote:
> > >
> > >     Thanks Maximilian!
> > >
> > >     I am working on migrating existing PortableRunner to using java ULR
> > >     (Link to Notes
> > >     <
> > https://docs.google.com/document/d/1S86saZqiDaE_M5wxO0zOQ_rwC6QHv7sp1BmGTm0dLNE/edit#
> > >).
> > >     If this issue is non-trivial to solve, I would vote for removing
> > >     this default behavior as part of the consolidation.
> > >
> > >     On Thu, Nov 8, 2018 at 2:58 AM Maximilian Michels <mxm@apache.org
> > >     <ma...@apache.org>> wrote:
> > >
> > >         In the long run, we should get rid of the Docker-inside-Docker
> > >         approach,
> > >         which was only intended for testing anyways. It would be cleaner
> > to
> > >         start the SDK harness container alongside with JobServer
> > container.
> > >
> > >         Short term, I think it should be easy to either fix the
> > >         permissions of
> > >         the mounted "docker" executable or use a Docker image for the
> > >         JobServer
> > >         which comes with Docker pre-installed.
> > >
> > >         JIRA: https://issues.apache.org/jira/browse/BEAM-6020
> > >
> > >         Thanks for reporting this Ruoyun!
> > >
> > >         -Max
> > >
> > >         On 08.11.18 00:10, Ruoyun Huang wrote:
> > >          > Thanks Ankur and Maximilian.
> > >          >
> > >          > Just for reference in case other people encountering the same
> > >         error
> > >          > message, the "permission denied" error in my original email
> > >         is exactly
> > >          > due to dockerinsidedocker issue that Ankur mentioned.
> > >         Thanks Ankur!
> > >          > Didn't make the link when you said it, had to discover that
> > >         in a hard
> > >          > way (I thought it is due to my docker installation messed up).
> > >          >
> > >          > On Tue, Nov 6, 2018 at 1:53 AM Maximilian Michels
> > >         <mxm@apache.org <ma...@apache.org>
> > >          > <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
> > >          >
> > >          >     Hi,
> > >          >
> > >          >     Please follow
> > >          > https://beam.apache.org/roadmap/portability/#python-on-flink
> > >          >
> > >          >     Cheers,
> > >          >     Max
> > >          >
> > >          >     On 06.11.18 01:14, Ankur Goenka wrote:
> > >          >      > Hi,
> > >          >      >
> > >          >      > The Portable Runner requires a job server uri to work
> > >         with. The
> > >          >     current
> > >          >      > default job server docker image is broken because of
> > >         docker inside
> > >          >      > docker issue.
> > >          >      >
> > >          >      > Please refer to
> > >          >      >
> > >         https://beam.apache.org/roadmap/portability/#python-on-flink for
> > >          >     how to
> > >          >      > run a wordcount using Portable Flink Runner.
> > >          >      >
> > >          >      > Thanks,
> > >          >      > Ankur
> > >          >      >
> > >          >      > On Mon, Nov 5, 2018 at 3:41 PM Ruoyun Huang
> > >         <ruoyun@google.com <ma...@google.com>
> > >          >     <mailto:ruoyun@google.com <ma...@google.com>>
> > >          >      > <mailto:ruoyun@google.com <ma...@google.com>
> > >         <mailto:ruoyun@google.com <ma...@google.com>>>> wrote:
> > >          >      >
> > >          >      >     Hi, Folks,
> > >          >      >
> > >          >      >           I want to try out Python PortableRunner, by
> > >         using following
> > >          >      >     command:
> > >          >      >
> > >          >      >     *sdk/python: python -m
> > apache_beam.examples.wordcount
> > >          >      >       --output=/tmp/test_output   --runner
> > PortableRunner*
> > >          >      >
> > >          >      >           It complains with following error message:
> > >          >      >
> > >          >      >     Caused by: java.lang.Exception: The user defined
> > >         'open()' method
> > >          >      >     caused an exception: java.io.IOException: Cannot
> > >         run program
> > >          >      >     "docker": error=13, Permission denied
> > >          >      >     at
> > >          >
> > >
> >  org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
> > >          >      >     at
> > >          >      >
> > >          >
> > >
> >  org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> > >          >      >     at
> > >         org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
> > >          >      >     ... 1 more
> > >          >      >     Caused by:
> > >          >      >
> > >          >
> > >
> >  org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.util.concurrent.UncheckedExecutionException:
> > >          >      >     java.io.IOException: Cannot run program "docker":
> > >         error=13,
> > >          >      >     Permission denied
> > >          >      >     at
> > >          >      >
> > >          >
> > >
> >  org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4994)
> > >          >      >
> > >          >      >     ... 7 more
> > >          >      >
> > >          >      >
> > >          >      >
> > >          >      >     My py2 environment is properly configured, because
> > >         DirectRunner
> > >          >      >     works.  Also I tested my docker installation by
> > >         'docker run
> > >          >      >     hello-world ', no issue.
> > >          >      >
> > >          >      >
> > >          >      >     Thanks.
> > >          >      >     --
> > >          >      >     ================
> > >          >      >     Ruoyun  Huang
> > >          >      >
> > >          >
> > >          >
> > >          >
> > >          > --
> > >          > ================
> > >          > Ruoyun  Huang
> > >          >
> > >
> > >
> > >
> > >     --
> > >     ================
> > >     Ruoyun  Huang
> > >
> > >
> > >
> > > --
> > > ================
> > > Ruoyun  Huang
> > >
> >
> 
> 
> -- 
> ================
> Ruoyun  Huang
> 

Re: How to use "PortableRunner" in Python SDK?

Posted by Ruoyun Huang <ru...@google.com>.
To answer Maximilian's question.

I am using Linux, debian distribution.

It probably sounded too much when I used the word 'planned merge'. What I
really meant entails less change than it sounds. More specifically:

1) The default behavior, where PortableRunner starts a flink server.  It is
confusing to new users.
2) All the related docs and inline comments.  Similarly, it could be very
confusing connecting PortableRunner to Flink server.
3) [Probably no longer an issue].   I couldn't make the flink server
example working.  And I could not make example working on Java-ULR either.
Both will require debugging for resolutions.  Thus I figured maybe let us
only focus on one single thing: the java-ULR part, without worrying about
Flink-server.   Again, looks like this may not be a valid concern, given
flink part is most likely due to my setup.


On Wed, Nov 14, 2018 at 3:30 AM Maximilian Michels <mx...@apache.org> wrote:

> Hi Ruoyun,
>
> I just ran the wordcount locally using the instructions on the page.
> I've tried the local file system and GCS. Both times it ran successfully
> and produced valid output.
>
> I'm assuming there is some problem with your setup. Which platform are
> you using? I'm on MacOS.
>
> Could you expand on the planned merge? From my understanding we will
> always need PortableRunner in Python to be able to submit against the
> Beam JobServer.
>
> Thanks,
> Max
>
> On 14.11.18 00:39, Ruoyun Huang wrote:
> > A quick follow-up on using current PortableRunner.
> >
> > I followed the exact three steps as Ankur and Maximilian shared in
> > https://beam.apache.org/roadmap/portability/#python-on-flink  ;   The
> > wordcount example keeps hanging after 10 minutes.  I also tried
> > specifying explicit input/output args, either using gcs folder or local
> > file system, but none of them works.
> >
> > Spent some time looking into it but conclusion yet.  At this point
> > though, I guess it does not matter much any more, given we already have
> > the plan of merging PortableRunner into using java reference runner
> > (i.e. :beam-runners-reference-job-server).
> >
> > Still appreciated if someone can try out the python-on-flink
> > <https://beam.apache.org/roadmap/portability/#python-on-flink>instructions
>
> > in case it is just due to my local machine setup.  Thanks!
> >
> >
> >
> > On Thu, Nov 8, 2018 at 5:04 PM Ruoyun Huang <ruoyun@google.com
> > <ma...@google.com>> wrote:
> >
> >     Thanks Maximilian!
> >
> >     I am working on migrating existing PortableRunner to using java ULR
> >     (Link to Notes
> >     <
> https://docs.google.com/document/d/1S86saZqiDaE_M5wxO0zOQ_rwC6QHv7sp1BmGTm0dLNE/edit#
> >).
> >     If this issue is non-trivial to solve, I would vote for removing
> >     this default behavior as part of the consolidation.
> >
> >     On Thu, Nov 8, 2018 at 2:58 AM Maximilian Michels <mxm@apache.org
> >     <ma...@apache.org>> wrote:
> >
> >         In the long run, we should get rid of the Docker-inside-Docker
> >         approach,
> >         which was only intended for testing anyways. It would be cleaner
> to
> >         start the SDK harness container alongside with JobServer
> container.
> >
> >         Short term, I think it should be easy to either fix the
> >         permissions of
> >         the mounted "docker" executable or use a Docker image for the
> >         JobServer
> >         which comes with Docker pre-installed.
> >
> >         JIRA: https://issues.apache.org/jira/browse/BEAM-6020
> >
> >         Thanks for reporting this Ruoyun!
> >
> >         -Max
> >
> >         On 08.11.18 00:10, Ruoyun Huang wrote:
> >          > Thanks Ankur and Maximilian.
> >          >
> >          > Just for reference in case other people encountering the same
> >         error
> >          > message, the "permission denied" error in my original email
> >         is exactly
> >          > due to dockerinsidedocker issue that Ankur mentioned.
> >         Thanks Ankur!
> >          > Didn't make the link when you said it, had to discover that
> >         in a hard
> >          > way (I thought it is due to my docker installation messed up).
> >          >
> >          > On Tue, Nov 6, 2018 at 1:53 AM Maximilian Michels
> >         <mxm@apache.org <ma...@apache.org>
> >          > <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
> >          >
> >          >     Hi,
> >          >
> >          >     Please follow
> >          > https://beam.apache.org/roadmap/portability/#python-on-flink
> >          >
> >          >     Cheers,
> >          >     Max
> >          >
> >          >     On 06.11.18 01:14, Ankur Goenka wrote:
> >          >      > Hi,
> >          >      >
> >          >      > The Portable Runner requires a job server uri to work
> >         with. The
> >          >     current
> >          >      > default job server docker image is broken because of
> >         docker inside
> >          >      > docker issue.
> >          >      >
> >          >      > Please refer to
> >          >      >
> >         https://beam.apache.org/roadmap/portability/#python-on-flink for
> >          >     how to
> >          >      > run a wordcount using Portable Flink Runner.
> >          >      >
> >          >      > Thanks,
> >          >      > Ankur
> >          >      >
> >          >      > On Mon, Nov 5, 2018 at 3:41 PM Ruoyun Huang
> >         <ruoyun@google.com <ma...@google.com>
> >          >     <mailto:ruoyun@google.com <ma...@google.com>>
> >          >      > <mailto:ruoyun@google.com <ma...@google.com>
> >         <mailto:ruoyun@google.com <ma...@google.com>>>> wrote:
> >          >      >
> >          >      >     Hi, Folks,
> >          >      >
> >          >      >           I want to try out Python PortableRunner, by
> >         using following
> >          >      >     command:
> >          >      >
> >          >      >     *sdk/python: python -m
> apache_beam.examples.wordcount
> >          >      >       --output=/tmp/test_output   --runner
> PortableRunner*
> >          >      >
> >          >      >           It complains with following error message:
> >          >      >
> >          >      >     Caused by: java.lang.Exception: The user defined
> >         'open()' method
> >          >      >     caused an exception: java.io.IOException: Cannot
> >         run program
> >          >      >     "docker": error=13, Permission denied
> >          >      >     at
> >          >
> >
>  org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
> >          >      >     at
> >          >      >
> >          >
> >
>  org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> >          >      >     at
> >         org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
> >          >      >     ... 1 more
> >          >      >     Caused by:
> >          >      >
> >          >
> >
>  org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.util.concurrent.UncheckedExecutionException:
> >          >      >     java.io.IOException: Cannot run program "docker":
> >         error=13,
> >          >      >     Permission denied
> >          >      >     at
> >          >      >
> >          >
> >
>  org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4994)
> >          >      >
> >          >      >     ... 7 more
> >          >      >
> >          >      >
> >          >      >
> >          >      >     My py2 environment is properly configured, because
> >         DirectRunner
> >          >      >     works.  Also I tested my docker installation by
> >         'docker run
> >          >      >     hello-world ', no issue.
> >          >      >
> >          >      >
> >          >      >     Thanks.
> >          >      >     --
> >          >      >     ================
> >          >      >     Ruoyun  Huang
> >          >      >
> >          >
> >          >
> >          >
> >          > --
> >          > ================
> >          > Ruoyun  Huang
> >          >
> >
> >
> >
> >     --
> >     ================
> >     Ruoyun  Huang
> >
> >
> >
> > --
> > ================
> > Ruoyun  Huang
> >
>


-- 
================
Ruoyun  Huang

Re: How to use "PortableRunner" in Python SDK?

Posted by Maximilian Michels <mx...@apache.org>.
Hi Ruoyun,

I just ran the wordcount locally using the instructions on the page. 
I've tried the local file system and GCS. Both times it ran successfully 
and produced valid output.

I'm assuming there is some problem with your setup. Which platform are 
you using? I'm on MacOS.

Could you expand on the planned merge? From my understanding we will 
always need PortableRunner in Python to be able to submit against the 
Beam JobServer.

Thanks,
Max

On 14.11.18 00:39, Ruoyun Huang wrote:
> A quick follow-up on using current PortableRunner.
> 
> I followed the exact three steps as Ankur and Maximilian shared in 
> https://beam.apache.org/roadmap/portability/#python-on-flink  ;   The 
> wordcount example keeps hanging after 10 minutes.  I also tried 
> specifying explicit input/output args, either using gcs folder or local 
> file system, but none of them works.
> 
> Spent some time looking into it but conclusion yet.  At this point 
> though, I guess it does not matter much any more, given we already have 
> the plan of merging PortableRunner into using java reference runner 
> (i.e. :beam-runners-reference-job-server).
> 
> Still appreciated if someone can try out the python-on-flink 
> <https://beam.apache.org/roadmap/portability/#python-on-flink>instructions 
> in case it is just due to my local machine setup.  Thanks!
> 
> 
> 
> On Thu, Nov 8, 2018 at 5:04 PM Ruoyun Huang <ruoyun@google.com 
> <ma...@google.com>> wrote:
> 
>     Thanks Maximilian!
> 
>     I am working on migrating existing PortableRunner to using java ULR
>     (Link to Notes
>     <https://docs.google.com/document/d/1S86saZqiDaE_M5wxO0zOQ_rwC6QHv7sp1BmGTm0dLNE/edit#>).
>     If this issue is non-trivial to solve, I would vote for removing
>     this default behavior as part of the consolidation.
> 
>     On Thu, Nov 8, 2018 at 2:58 AM Maximilian Michels <mxm@apache.org
>     <ma...@apache.org>> wrote:
> 
>         In the long run, we should get rid of the Docker-inside-Docker
>         approach,
>         which was only intended for testing anyways. It would be cleaner to
>         start the SDK harness container alongside with JobServer container.
> 
>         Short term, I think it should be easy to either fix the
>         permissions of
>         the mounted "docker" executable or use a Docker image for the
>         JobServer
>         which comes with Docker pre-installed.
> 
>         JIRA: https://issues.apache.org/jira/browse/BEAM-6020
> 
>         Thanks for reporting this Ruoyun!
> 
>         -Max
> 
>         On 08.11.18 00:10, Ruoyun Huang wrote:
>          > Thanks Ankur and Maximilian.
>          >
>          > Just for reference in case other people encountering the same
>         error
>          > message, the "permission denied" error in my original email
>         is exactly
>          > due to dockerinsidedocker issue that Ankur mentioned.     
>         Thanks Ankur!
>          > Didn't make the link when you said it, had to discover that
>         in a hard
>          > way (I thought it is due to my docker installation messed up).
>          >
>          > On Tue, Nov 6, 2018 at 1:53 AM Maximilian Michels
>         <mxm@apache.org <ma...@apache.org>
>          > <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
>          >
>          >     Hi,
>          >
>          >     Please follow
>          > https://beam.apache.org/roadmap/portability/#python-on-flink
>          >
>          >     Cheers,
>          >     Max
>          >
>          >     On 06.11.18 01:14, Ankur Goenka wrote:
>          >      > Hi,
>          >      >
>          >      > The Portable Runner requires a job server uri to work
>         with. The
>          >     current
>          >      > default job server docker image is broken because of
>         docker inside
>          >      > docker issue.
>          >      >
>          >      > Please refer to
>          >      >
>         https://beam.apache.org/roadmap/portability/#python-on-flink for
>          >     how to
>          >      > run a wordcount using Portable Flink Runner.
>          >      >
>          >      > Thanks,
>          >      > Ankur
>          >      >
>          >      > On Mon, Nov 5, 2018 at 3:41 PM Ruoyun Huang
>         <ruoyun@google.com <ma...@google.com>
>          >     <mailto:ruoyun@google.com <ma...@google.com>>
>          >      > <mailto:ruoyun@google.com <ma...@google.com>
>         <mailto:ruoyun@google.com <ma...@google.com>>>> wrote:
>          >      >
>          >      >     Hi, Folks,
>          >      >
>          >      >           I want to try out Python PortableRunner, by
>         using following
>          >      >     command:
>          >      >
>          >      >     *sdk/python: python -m apache_beam.examples.wordcount
>          >      >       --output=/tmp/test_output   --runner PortableRunner*
>          >      >
>          >      >           It complains with following error message:
>          >      >
>          >      >     Caused by: java.lang.Exception: The user defined
>         'open()' method
>          >      >     caused an exception: java.io.IOException: Cannot
>         run program
>          >      >     "docker": error=13, Permission denied
>          >      >     at
>          >   
>           org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
>          >      >     at
>          >      >
>          >     
>           org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>          >      >     at
>         org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
>          >      >     ... 1 more
>          >      >     Caused by:
>          >      >
>          >     
>           org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.util.concurrent.UncheckedExecutionException:
>          >      >     java.io.IOException: Cannot run program "docker":
>         error=13,
>          >      >     Permission denied
>          >      >     at
>          >      >
>          >     
>           org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4994)
>          >      >
>          >      >     ... 7 more
>          >      >
>          >      >
>          >      >
>          >      >     My py2 environment is properly configured, because
>         DirectRunner
>          >      >     works.  Also I tested my docker installation by
>         'docker run
>          >      >     hello-world ', no issue.
>          >      >
>          >      >
>          >      >     Thanks.
>          >      >     --
>          >      >     ================
>          >      >     Ruoyun  Huang
>          >      >
>          >
>          >
>          >
>          > --
>          > ================
>          > Ruoyun  Huang
>          >
> 
> 
> 
>     -- 
>     ================
>     Ruoyun  Huang
> 
> 
> 
> -- 
> ================
> Ruoyun  Huang
> 

Re: How to use "PortableRunner" in Python SDK?

Posted by Ruoyun Huang <ru...@google.com>.
A quick follow-up on using current PortableRunner.

I followed the exact three steps as Ankur and Maximilian shared in
https://beam.apache.org/roadmap/portability/#python-on-flink  ;   The
wordcount example keeps hanging after 10 minutes.  I also tried specifying
explicit input/output args, either using gcs folder or local file system,
but none of them works.

Spent some time looking into it but conclusion yet.  At this point though,
I guess it does not matter much any more, given we already have the plan of
merging PortableRunner into using java reference runner (i.e.
:beam-runners-reference-job-server).

Still appreciated if someone can try out the python-on-flink
<https://beam.apache.org/roadmap/portability/#python-on-flink>instructions
in case it is just due to my local machine setup.  Thanks!



On Thu, Nov 8, 2018 at 5:04 PM Ruoyun Huang <ru...@google.com> wrote:

> Thanks Maximilian!
>
> I am working on migrating existing PortableRunner to using java ULR (Link
> to Notes
> <https://docs.google.com/document/d/1S86saZqiDaE_M5wxO0zOQ_rwC6QHv7sp1BmGTm0dLNE/edit#>).
> If this issue is non-trivial to solve, I would vote for removing this
> default behavior as part of the consolidation.
>
> On Thu, Nov 8, 2018 at 2:58 AM Maximilian Michels <mx...@apache.org> wrote:
>
>> In the long run, we should get rid of the Docker-inside-Docker approach,
>> which was only intended for testing anyways. It would be cleaner to
>> start the SDK harness container alongside with JobServer container.
>>
>> Short term, I think it should be easy to either fix the permissions of
>> the mounted "docker" executable or use a Docker image for the JobServer
>> which comes with Docker pre-installed.
>>
>> JIRA: https://issues.apache.org/jira/browse/BEAM-6020
>>
>> Thanks for reporting this Ruoyun!
>>
>> -Max
>>
>> On 08.11.18 00:10, Ruoyun Huang wrote:
>> > Thanks Ankur and Maximilian.
>> >
>> > Just for reference in case other people encountering the same error
>> > message, the "permission denied" error in my original email is exactly
>> > due to dockerinsidedocker issue that Ankur mentioned.      Thanks
>> Ankur!
>> > Didn't make the link when you said it, had to discover that in a hard
>> > way (I thought it is due to my docker installation messed up).
>> >
>> > On Tue, Nov 6, 2018 at 1:53 AM Maximilian Michels <mxm@apache.org
>> > <ma...@apache.org>> wrote:
>> >
>> >     Hi,
>> >
>> >     Please follow
>> >     https://beam.apache.org/roadmap/portability/#python-on-flink
>> >
>> >     Cheers,
>> >     Max
>> >
>> >     On 06.11.18 01:14, Ankur Goenka wrote:
>> >      > Hi,
>> >      >
>> >      > The Portable Runner requires a job server uri to work with. The
>> >     current
>> >      > default job server docker image is broken because of docker
>> inside
>> >      > docker issue.
>> >      >
>> >      > Please refer to
>> >      > https://beam.apache.org/roadmap/portability/#python-on-flink for
>> >     how to
>> >      > run a wordcount using Portable Flink Runner.
>> >      >
>> >      > Thanks,
>> >      > Ankur
>> >      >
>> >      > On Mon, Nov 5, 2018 at 3:41 PM Ruoyun Huang <ruoyun@google.com
>> >     <ma...@google.com>
>> >      > <mailto:ruoyun@google.com <ma...@google.com>>> wrote:
>> >      >
>> >      >     Hi, Folks,
>> >      >
>> >      >           I want to try out Python PortableRunner, by using
>> following
>> >      >     command:
>> >      >
>> >      >     *sdk/python: python -m apache_beam.examples.wordcount
>> >      >       --output=/tmp/test_output   --runner PortableRunner*
>> >      >
>> >      >           It complains with following error message:
>> >      >
>> >      >     Caused by: java.lang.Exception: The user defined 'open()'
>> method
>> >      >     caused an exception: java.io.IOException: Cannot run program
>> >      >     "docker": error=13, Permission denied
>> >      >     at
>> >     org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
>> >      >     at
>> >      >
>> >
>>  org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>> >      >     at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
>> >      >     ... 1 more
>> >      >     Caused by:
>> >      >
>> >
>>  org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.util.concurrent.UncheckedExecutionException:
>> >      >     java.io.IOException: Cannot run program "docker": error=13,
>> >      >     Permission denied
>> >      >     at
>> >      >
>> >
>>  org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4994)
>> >      >
>> >      >     ... 7 more
>> >      >
>> >      >
>> >      >
>> >      >     My py2 environment is properly configured, because
>> DirectRunner
>> >      >     works.  Also I tested my docker installation by 'docker run
>> >      >     hello-world ', no issue.
>> >      >
>> >      >
>> >      >     Thanks.
>> >      >     --
>> >      >     ================
>> >      >     Ruoyun  Huang
>> >      >
>> >
>> >
>> >
>> > --
>> > ================
>> > Ruoyun  Huang
>> >
>>
>
>
> --
> ================
> Ruoyun  Huang
>
>

-- 
================
Ruoyun  Huang

Re: How to use "PortableRunner" in Python SDK?

Posted by Ruoyun Huang <ru...@google.com>.
Thanks Maximilian!

I am working on migrating existing PortableRunner to using java ULR (Link
to Notes
<https://docs.google.com/document/d/1S86saZqiDaE_M5wxO0zOQ_rwC6QHv7sp1BmGTm0dLNE/edit#>).
If this issue is non-trivial to solve, I would vote for removing this
default behavior as part of the consolidation.

On Thu, Nov 8, 2018 at 2:58 AM Maximilian Michels <mx...@apache.org> wrote:

> In the long run, we should get rid of the Docker-inside-Docker approach,
> which was only intended for testing anyways. It would be cleaner to
> start the SDK harness container alongside with JobServer container.
>
> Short term, I think it should be easy to either fix the permissions of
> the mounted "docker" executable or use a Docker image for the JobServer
> which comes with Docker pre-installed.
>
> JIRA: https://issues.apache.org/jira/browse/BEAM-6020
>
> Thanks for reporting this Ruoyun!
>
> -Max
>
> On 08.11.18 00:10, Ruoyun Huang wrote:
> > Thanks Ankur and Maximilian.
> >
> > Just for reference in case other people encountering the same error
> > message, the "permission denied" error in my original email is exactly
> > due to dockerinsidedocker issue that Ankur mentioned.      Thanks Ankur!
> > Didn't make the link when you said it, had to discover that in a hard
> > way (I thought it is due to my docker installation messed up).
> >
> > On Tue, Nov 6, 2018 at 1:53 AM Maximilian Michels <mxm@apache.org
> > <ma...@apache.org>> wrote:
> >
> >     Hi,
> >
> >     Please follow
> >     https://beam.apache.org/roadmap/portability/#python-on-flink
> >
> >     Cheers,
> >     Max
> >
> >     On 06.11.18 01:14, Ankur Goenka wrote:
> >      > Hi,
> >      >
> >      > The Portable Runner requires a job server uri to work with. The
> >     current
> >      > default job server docker image is broken because of docker inside
> >      > docker issue.
> >      >
> >      > Please refer to
> >      > https://beam.apache.org/roadmap/portability/#python-on-flink for
> >     how to
> >      > run a wordcount using Portable Flink Runner.
> >      >
> >      > Thanks,
> >      > Ankur
> >      >
> >      > On Mon, Nov 5, 2018 at 3:41 PM Ruoyun Huang <ruoyun@google.com
> >     <ma...@google.com>
> >      > <mailto:ruoyun@google.com <ma...@google.com>>> wrote:
> >      >
> >      >     Hi, Folks,
> >      >
> >      >           I want to try out Python PortableRunner, by using
> following
> >      >     command:
> >      >
> >      >     *sdk/python: python -m apache_beam.examples.wordcount
> >      >       --output=/tmp/test_output   --runner PortableRunner*
> >      >
> >      >           It complains with following error message:
> >      >
> >      >     Caused by: java.lang.Exception: The user defined 'open()'
> method
> >      >     caused an exception: java.io.IOException: Cannot run program
> >      >     "docker": error=13, Permission denied
> >      >     at
> >     org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
> >      >     at
> >      >
> >
>  org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> >      >     at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
> >      >     ... 1 more
> >      >     Caused by:
> >      >
> >
>  org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.util.concurrent.UncheckedExecutionException:
> >      >     java.io.IOException: Cannot run program "docker": error=13,
> >      >     Permission denied
> >      >     at
> >      >
> >
>  org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4994)
> >      >
> >      >     ... 7 more
> >      >
> >      >
> >      >
> >      >     My py2 environment is properly configured, because
> DirectRunner
> >      >     works.  Also I tested my docker installation by 'docker run
> >      >     hello-world ', no issue.
> >      >
> >      >
> >      >     Thanks.
> >      >     --
> >      >     ================
> >      >     Ruoyun  Huang
> >      >
> >
> >
> >
> > --
> > ================
> > Ruoyun  Huang
> >
>


-- 
================
Ruoyun  Huang

Re: How to use "PortableRunner" in Python SDK?

Posted by Maximilian Michels <mx...@apache.org>.
In the long run, we should get rid of the Docker-inside-Docker approach, 
which was only intended for testing anyways. It would be cleaner to 
start the SDK harness container alongside with JobServer container.

Short term, I think it should be easy to either fix the permissions of 
the mounted "docker" executable or use a Docker image for the JobServer 
which comes with Docker pre-installed.

JIRA: https://issues.apache.org/jira/browse/BEAM-6020

Thanks for reporting this Ruoyun!

-Max

On 08.11.18 00:10, Ruoyun Huang wrote:
> Thanks Ankur and Maximilian.
> 
> Just for reference in case other people encountering the same error 
> message, the "permission denied" error in my original email is exactly 
> due to dockerinsidedocker issue that Ankur mentioned.      Thanks Ankur! 
> Didn't make the link when you said it, had to discover that in a hard 
> way (I thought it is due to my docker installation messed up).
> 
> On Tue, Nov 6, 2018 at 1:53 AM Maximilian Michels <mxm@apache.org 
> <ma...@apache.org>> wrote:
> 
>     Hi,
> 
>     Please follow
>     https://beam.apache.org/roadmap/portability/#python-on-flink
> 
>     Cheers,
>     Max
> 
>     On 06.11.18 01:14, Ankur Goenka wrote:
>      > Hi,
>      >
>      > The Portable Runner requires a job server uri to work with. The
>     current
>      > default job server docker image is broken because of docker inside
>      > docker issue.
>      >
>      > Please refer to
>      > https://beam.apache.org/roadmap/portability/#python-on-flink for
>     how to
>      > run a wordcount using Portable Flink Runner.
>      >
>      > Thanks,
>      > Ankur
>      >
>      > On Mon, Nov 5, 2018 at 3:41 PM Ruoyun Huang <ruoyun@google.com
>     <ma...@google.com>
>      > <mailto:ruoyun@google.com <ma...@google.com>>> wrote:
>      >
>      >     Hi, Folks,
>      >
>      >           I want to try out Python PortableRunner, by using following
>      >     command:
>      >
>      >     *sdk/python: python -m apache_beam.examples.wordcount
>      >       --output=/tmp/test_output   --runner PortableRunner*
>      >
>      >           It complains with following error message:
>      >
>      >     Caused by: java.lang.Exception: The user defined 'open()' method
>      >     caused an exception: java.io.IOException: Cannot run program
>      >     "docker": error=13, Permission denied
>      >     at
>     org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
>      >     at
>      >   
>       org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>      >     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
>      >     ... 1 more
>      >     Caused by:
>      >   
>       org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.util.concurrent.UncheckedExecutionException:
>      >     java.io.IOException: Cannot run program "docker": error=13,
>      >     Permission denied
>      >     at
>      >   
>       org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4994)
>      >
>      >     ... 7 more
>      >
>      >
>      >
>      >     My py2 environment is properly configured, because DirectRunner
>      >     works.  Also I tested my docker installation by 'docker run
>      >     hello-world ', no issue.
>      >
>      >
>      >     Thanks.
>      >     --
>      >     ================
>      >     Ruoyun  Huang
>      >
> 
> 
> 
> -- 
> ================
> Ruoyun  Huang
> 

Re: How to use "PortableRunner" in Python SDK?

Posted by Ruoyun Huang <ru...@google.com>.
Thanks Ankur and Maximilian.

Just for reference in case other people encountering the same error
message, the "permission denied" error in my original email is exactly due
to docker inside docker issue that Ankur mentioned.      Thanks Ankur!
Didn't make the link when you said it, had to discover that in a hard way
(I thought it is due to my docker installation messed up).

On Tue, Nov 6, 2018 at 1:53 AM Maximilian Michels <mx...@apache.org> wrote:

> Hi,
>
> Please follow https://beam.apache.org/roadmap/portability/#python-on-flink
>
> Cheers,
> Max
>
> On 06.11.18 01:14, Ankur Goenka wrote:
> > Hi,
> >
> > The Portable Runner requires a job server uri to work with. The current
> > default job server docker image is broken because of docker inside
> > docker issue.
> >
> > Please refer to
> > https://beam.apache.org/roadmap/portability/#python-on-flink for how to
> > run a wordcount using Portable Flink Runner.
> >
> > Thanks,
> > Ankur
> >
> > On Mon, Nov 5, 2018 at 3:41 PM Ruoyun Huang <ruoyun@google.com
> > <ma...@google.com>> wrote:
> >
> >     Hi, Folks,
> >
> >           I want to try out Python PortableRunner, by using following
> >     command:
> >
> >     *sdk/python: python -m apache_beam.examples.wordcount
> >       --output=/tmp/test_output   --runner PortableRunner*
> >
> >           It complains with following error message:
> >
> >     Caused by: java.lang.Exception: The user defined 'open()' method
> >     caused an exception: java.io.IOException: Cannot run program
> >     "docker": error=13, Permission denied
> >     at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
> >     at
> >
>  org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> >     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
> >     ... 1 more
> >     Caused by:
> >
>  org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.util.concurrent.UncheckedExecutionException:
> >     java.io.IOException: Cannot run program "docker": error=13,
> >     Permission denied
> >     at
> >
>  org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4994)
> >
> >     ... 7 more
> >
> >
> >
> >     My py2 environment is properly configured, because DirectRunner
> >     works.  Also I tested my docker installation by 'docker run
> >     hello-world ', no issue.
> >
> >
> >     Thanks.
> >     --
> >     ================
> >     Ruoyun  Huang
> >
>


-- 
================
Ruoyun  Huang

Re: How to use "PortableRunner" in Python SDK?

Posted by Maximilian Michels <mx...@apache.org>.
Hi,

Please follow https://beam.apache.org/roadmap/portability/#python-on-flink

Cheers,
Max

On 06.11.18 01:14, Ankur Goenka wrote:
> Hi,
> 
> The Portable Runner requires a job server uri to work with. The current 
> default job server docker image is broken because of docker inside 
> docker issue.
> 
> Please refer to 
> https://beam.apache.org/roadmap/portability/#python-on-flink for how to 
> run a wordcount using Portable Flink Runner.
> 
> Thanks,
> Ankur
> 
> On Mon, Nov 5, 2018 at 3:41 PM Ruoyun Huang <ruoyun@google.com 
> <ma...@google.com>> wrote:
> 
>     Hi, Folks,
> 
>           I want to try out Python PortableRunner, by using following
>     command:
> 
>     *sdk/python: python -m apache_beam.examples.wordcount 
>       --output=/tmp/test_output   --runner PortableRunner*
> 
>           It complains with following error message:
> 
>     Caused by: java.lang.Exception: The user defined 'open()' method
>     caused an exception: java.io.IOException: Cannot run program
>     "docker": error=13, Permission denied
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
>     at
>     org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
>     ... 1 more
>     Caused by:
>     org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.util.concurrent.UncheckedExecutionException:
>     java.io.IOException: Cannot run program "docker": error=13,
>     Permission denied
>     at
>     org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4994)
> 
>     ... 7 more
> 
> 
> 
>     My py2 environment is properly configured, because DirectRunner
>     works.  Also I tested my docker installation by 'docker run
>     hello-world ', no issue.
> 
> 
>     Thanks.
>     -- 
>     ================
>     Ruoyun  Huang
> 

Re: How to use "PortableRunner" in Python SDK?

Posted by Ankur Goenka <go...@google.com>.
Hi,

The Portable Runner requires a job server uri to work with. The current
default job server docker image is broken because of docker inside docker
issue.

Please refer to https://beam.apache.org/roadmap/portability/#python-on-flink
for how to run a wordcount using Portable Flink Runner.

Thanks,
Ankur

On Mon, Nov 5, 2018 at 3:41 PM Ruoyun Huang <ru...@google.com> wrote:

> Hi, Folks,
>
>      I want to try out Python PortableRunner, by using following command:
>
> *sdk/python: python -m apache_beam.examples.wordcount
>  --output=/tmp/test_output   --runner PortableRunner*
>
>      It complains with following error message:
>
> Caused by: java.lang.Exception: The user defined 'open()' method caused an
> exception: java.io.IOException: Cannot run program "docker": error=13,
> Permission denied
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
> ... 1 more
> Caused by:
> org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.util.concurrent.UncheckedExecutionException:
> java.io.IOException: Cannot run program "docker": error=13, Permission
> denied
> at
> org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4994)
>
> ... 7 more
>
>
>
> My py2 environment is properly configured, because DirectRunner works.
> Also I tested my docker installation by 'docker run hello-world ', no
> issue.
>
>
> Thanks.
> --
> ================
> Ruoyun  Huang
>
>