You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by "Matthew K." <so...@gmx.com> on 2019/11/06 22:24:00 UTC

Command for Beam worker on Spark cluster

Hi all,



I am trying to run *Python* beam pipeline on a Spark cluster. Since workers
are running on separate nodes, I am using "`PROCESS`" for "evironment_type" in
pipeline options, but I couldn't find any documentation on what "command" I
should pass to "`environment_config"` to run on the worker, so executor can be
able to communicate with.



Can someone help me on that?


Re: Command for Beam worker on Spark cluster

Posted by Kyle Weaver <kc...@google.com>.
Not sure what's causing the error. We should be able to see output from the
process if you set the logging level to DEBUG.

> Some debugging to boot.go and running it manually shows it doesn't return
from "artifact.Materialize" function.

Running boot.go by itself won't work if there is no artifact server running
(which normally Beam will start automatically):
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/artifact/materialize.go#L43

On Thu, Nov 7, 2019 at 10:05 AM Matthew K. <so...@gmx.com> wrote:

> Thanks, but still have problem making remote worker on k8s work (important
> to point out that I had to create shared volume between nodes in order all
> have access to the same /tmp, since beam runner creates artifact staging
> files on the machine it is running on, and expects workers to read from it).
>
> However, I get this error from executor:
>
>
> INFO AbstractArtifactRetrievalService: GetManifest for
> /tmp/beam-artifact-staging/job_cca3e889-76d9-4c8a-a942-a64ddbd2dd1f/MANIFEST
> INFO AbstractArtifactRetrievalService: GetManifest for
> /tmp/beam-artifact-staging/job_cca3e889-76d9-4c8a-a942-a64ddbd2dd1f/MANIFEST
> -> 0 artifacts
> INFO ProcessEnvironmentFactory: Still waiting for startup of environment
> '/opt/spark/beam/sdks/python/container/build/target/launcher/linux_amd64/boot'
> for worker id 3-1
> ERROR Executor: Exception in task 0.1 in stage 0.0 (TID 2)
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
> java.lang.IllegalStateException: Process died with exit code 1
>         at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)
>
> (note that job manifest has no artifacts in it)
>
> I can see ports for enpoints (logging, artifact, ...) are open on the
> worker. Some debugging to boot.go and running it manually shows it doesn't
> return from "artifact.Materialize" function.
>
> Any idea what could be wrong in setup?
>
> *Sent:* Wednesday, November 06, 2019 at 5:45 PM
> *From:* "Kyle Weaver" <kc...@google.com>
> *To:* dev <de...@beam.apache.org>
> *Subject:* Re: Command for Beam worker on Spark cluster
> > Where can I extract these parameters from?
>
> These parameters should be passed automatically when the process is run
> (note the use of $* in the example script):
> https://github.com/apache/beam/blob/fbc84b61240a3d83d9c19f7ccc17ba22e5d7e2c9/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironmentFactory.java#L115-L121
>
> > Also, how spark executor can find the port that grpc server is running
> on?
> Not sure which grpc server you mean here.
>
> On Wed, Nov 6, 2019 at 3:32 PM Matthew K. <so...@gmx.com> wrote:
>
>> Thanks, still I need to pass parameters to the boot executable, such as,
>> worker id, control endpoint, logging endpoint, etc.
>>
>> Where can I extract these parameters from? (In apache_beam Python code,
>> those can be extracted from StartWorker request parameters)
>>
>> Also, how spark executor can find the port that grpc server is running on?
>>
>> *Sent:* Wednesday, November 06, 2019 at 5:07 PM
>> *From:* "Kyle Weaver" <kc...@google.com>
>> *To:* dev <de...@beam.apache.org>
>> *Subject:* Re: Command for Beam worker on Spark cluster
>> In Docker mode, most everything's taken care of for you, but in process
>> mode you have to do a lot of setup yourself. The command you're looking for
>> is `sdks/python/container/build/target/launcher/linux_amd64/boot`. You will
>> be required to have both that executable (which you can build from source
>> using `./gradlew :sdks:python:container:build`) and a Python installation
>> including Beam and other dependencies on all of your worker machines.
>>
>> The best example I know of is here:
>> https://github.com/apache/beam/blob/cbf8a900819c52940a0edd90f59bf6aec55c817a/sdks/python/test-suites/portable/py2/build.gradle#L146-L165
>>
>> On Wed, Nov 6, 2019 at 2:24 PM Matthew K. <so...@gmx.com> wrote:
>>
>>> Hi all,
>>>
>>> I am trying to run *Python* beam pipeline on a Spark cluster. Since
>>> workers are running on separate nodes, I am using "PROCESS" for
>>> "evironment_type" in pipeline options, but I couldn't find any
>>> documentation on what "command" I should pass to "environment_config"
>>> to run on the worker, so executor can be able to communicate with.
>>>
>>> Can someone help me on that?
>>>
>>

Re: Command for Beam worker on Spark cluster

Posted by "Matthew K." <so...@gmx.com>.
Thanks, but still have problem making remote worker on k8s work (important to
point out that I had to create shared volume between nodes in order all have
access to the same /tmp, since beam runner creates artifact staging files on
the machine it is running on, and expects workers to read from it).



However, I get this error from executor:



  
INFO AbstractArtifactRetrievalService: GetManifest for /tmp/beam-artifact-
staging/job_cca3e889-76d9-4c8a-a942-a64ddbd2dd1f/MANIFEST  
INFO AbstractArtifactRetrievalService: GetManifest for /tmp/beam-artifact-
staging/job_cca3e889-76d9-4c8a-a942-a64ddbd2dd1f/MANIFEST -> 0 artifacts  
INFO ProcessEnvironmentFactory: Still waiting for startup of environment
'/opt/spark/beam/sdks/python/container/build/target/launcher/linux_amd64/boot'
for worker id 3-1  
ERROR Executor: Exception in task 0.1 in stage 0.0 (TID 2)

org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
java.lang.IllegalStateException: Process died with exit code 1  
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)



(note that job manifest has no artifacts in it)



I can see ports for enpoints (logging, artifact, ...) are open on the worker.
Some debugging to boot.go and running it manually shows it doesn't return from
"artifact.Materialize" function.



Any idea what could be wrong in setup?



**Sent:**  Wednesday, November 06, 2019 at 5:45 PM  
**From:**  "Kyle Weaver" <kc...@google.com>  
**To:**  dev <de...@beam.apache.org>  
**Subject:**  Re: Command for Beam worker on Spark cluster

> Where can I extract these parameters from?



These parameters should be passed automatically when the process is run (note
the use of $* in the example script):
<https://github.com/apache/beam/blob/fbc84b61240a3d83d9c19f7ccc17ba22e5d7e2c9/runners/java-
fn-
execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironmentFactory.java#L115-L121>



> Also, how spark executor can find the port that grpc server is running on?

Not sure which grpc server you mean here.



On Wed, Nov 6, 2019 at 3:32 PM Matthew K.
<[softmatt@gmx.com](mailto:softmatt@gmx.com)> wrote:

> Thanks, still I need to pass parameters to the boot executable, such as,
worker id, control endpoint, logging endpoint, etc.

>

>  
>

> Where can I extract these parameters from? (In apache_beam Python code,
those can be extracted from StartWorker request parameters)

>

>  
>

> Also, how spark executor can find the port that grpc server is running on?

>

>  
>

> **Sent:**  Wednesday, November 06, 2019 at 5:07 PM  
>  **From:**  "Kyle Weaver"
<[kcweaver@google.com](mailto:kcweaver@google.com)>  
>  **To:**  dev <[dev@beam.apache.org](mailto:dev@beam.apache.org)>  
>  **Subject:**  Re: Command for Beam worker on Spark cluster

>

> In Docker mode, most everything's taken care of for you, but in process mode
you have to do a lot of setup yourself. The command you're looking for is
`sdks/python/container/build/target/launcher/linux_amd64/boot`. You will be
required to have both that executable (which you can build from source using
`./gradlew :sdks:python:container:build`) and a Python installation including
Beam and other dependencies on all of your worker machines.

>

>  
>

> The best example I know of is here:
<https://github.com/apache/beam/blob/cbf8a900819c52940a0edd90f59bf6aec55c817a/sdks/python/test-
suites/portable/py2/build.gradle#L146-L165>

>

>  
>

> On Wed, Nov 6, 2019 at 2:24 PM Matthew K.
<[softmatt@gmx.com](mailto:softmatt@gmx.com)> wrote:

>

>> Hi all,

>>

>>  
>>

>> I am trying to run *Python* beam pipeline on a Spark cluster. Since workers
are running on separate nodes, I am using "`PROCESS`" for "evironment_type" in
pipeline options, but I couldn't find any documentation on what "command" I
should pass to "`environment_config"` to run on the worker, so executor can be
able to communicate with.

>>

>>  
>>

>> Can someone help me on that?


Re: Command for Beam worker on Spark cluster

Posted by Kyle Weaver <kc...@google.com>.
> Where can I extract these parameters from?

These parameters should be passed automatically when the process is run
(note the use of $* in the example script):
https://github.com/apache/beam/blob/fbc84b61240a3d83d9c19f7ccc17ba22e5d7e2c9/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironmentFactory.java#L115-L121

> Also, how spark executor can find the port that grpc server is running on?

Not sure which grpc server you mean here.

On Wed, Nov 6, 2019 at 3:32 PM Matthew K. <so...@gmx.com> wrote:

> Thanks, still I need to pass parameters to the boot executable, such as,
> worker id, control endpoint, logging endpoint, etc.
>
> Where can I extract these parameters from? (In apache_beam Python code,
> those can be extracted from StartWorker request parameters)
>
> Also, how spark executor can find the port that grpc server is running on?
>
> *Sent:* Wednesday, November 06, 2019 at 5:07 PM
> *From:* "Kyle Weaver" <kc...@google.com>
> *To:* dev <de...@beam.apache.org>
> *Subject:* Re: Command for Beam worker on Spark cluster
> In Docker mode, most everything's taken care of for you, but in process
> mode you have to do a lot of setup yourself. The command you're looking for
> is `sdks/python/container/build/target/launcher/linux_amd64/boot`. You will
> be required to have both that executable (which you can build from source
> using `./gradlew :sdks:python:container:build`) and a Python installation
> including Beam and other dependencies on all of your worker machines.
>
> The best example I know of is here:
> https://github.com/apache/beam/blob/cbf8a900819c52940a0edd90f59bf6aec55c817a/sdks/python/test-suites/portable/py2/build.gradle#L146-L165
>
> On Wed, Nov 6, 2019 at 2:24 PM Matthew K. <so...@gmx.com> wrote:
>
>> Hi all,
>>
>> I am trying to run *Python* beam pipeline on a Spark cluster. Since
>> workers are running on separate nodes, I am using "PROCESS" for
>> "evironment_type" in pipeline options, but I couldn't find any
>> documentation on what "command" I should pass to "environment_config" to
>> run on the worker, so executor can be able to communicate with.
>>
>> Can someone help me on that?
>>
>

Re: Command for Beam worker on Spark cluster

Posted by "Matthew K." <so...@gmx.com>.
Thanks, still I need to pass parameters to the boot executable, such as,
worker id, control endpoint, logging endpoint, etc.



Where can I extract these parameters from? (In apache_beam Python code, those
can be extracted from StartWorker request parameters)



Also, how spark executor can find the port that grpc server is running on?



**Sent:**  Wednesday, November 06, 2019 at 5:07 PM  
**From:**  "Kyle Weaver" <kc...@google.com>  
**To:**  dev <de...@beam.apache.org>  
**Subject:**  Re: Command for Beam worker on Spark cluster

In Docker mode, most everything's taken care of for you, but in process mode
you have to do a lot of setup yourself. The command you're looking for is
`sdks/python/container/build/target/launcher/linux_amd64/boot`. You will be
required to have both that executable (which you can build from source using
`./gradlew :sdks:python:container:build`) and a Python installation including
Beam and other dependencies on all of your worker machines.



The best example I know of is here:
<https://github.com/apache/beam/blob/cbf8a900819c52940a0edd90f59bf6aec55c817a/sdks/python/test-
suites/portable/py2/build.gradle#L146-L165>



On Wed, Nov 6, 2019 at 2:24 PM Matthew K.
<[softmatt@gmx.com](mailto:softmatt@gmx.com)> wrote:

> Hi all,

>

>  
>

> I am trying to run *Python* beam pipeline on a Spark cluster. Since workers
are running on separate nodes, I am using "`PROCESS`" for "evironment_type" in
pipeline options, but I couldn't find any documentation on what "command" I
should pass to "`environment_config"` to run on the worker, so executor can be
able to communicate with.

>

>  
>

> Can someone help me on that?


Re: Command for Beam worker on Spark cluster

Posted by Kyle Weaver <kc...@google.com>.
In Docker mode, most everything's taken care of for you, but in process
mode you have to do a lot of setup yourself. The command you're looking for
is `sdks/python/container/build/target/launcher/linux_amd64/boot`. You will
be required to have both that executable (which you can build from source
using `./gradlew :sdks:python:container:build`) and a Python installation
including Beam and other dependencies on all of your worker machines.

The best example I know of is here:
https://github.com/apache/beam/blob/cbf8a900819c52940a0edd90f59bf6aec55c817a/sdks/python/test-suites/portable/py2/build.gradle#L146-L165

On Wed, Nov 6, 2019 at 2:24 PM Matthew K. <so...@gmx.com> wrote:

> Hi all,
>
> I am trying to run *Python* beam pipeline on a Spark cluster. Since
> workers are running on separate nodes, I am using "PROCESS" for
> "evironment_type" in pipeline options, but I couldn't find any
> documentation on what "command" I should pass to "environment_config" to
> run on the worker, so executor can be able to communicate with.
>
> Can someone help me on that?
>