You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Mahan Hosseinzadeh <ma...@gmail.com> on 2021/08/10 10:53:02 UTC

Submit Python Beam on Spark Dataproc

Hi,

I have a Python Beam job that works on Dataflow but we would like to submit
it on a Spark Dataproc cluster with no Flink involvement.
I already spent days but failed to figure out how to use PortableRunner
with the beam_spark_job_server to submit my Python Beam job to Spark
Dataproc. All the Beam docs are about Flink and there is no guideline about
Spark with Dataproc.
Some relevant questions might be:
1- What's spark-master-url in case of a remote cluster on Dataproc? Is 7077
the master url port?
2- Should we ssh tunnel to sparkMasterUrl port using gcloud compute ssh?
3- What's the environment_type? Can we use DOCKER? Then what's the SDK
Harness Configuration?
4- Should we run the job-server outside of the Dataproc cluster or should
we run it in the master node?

Thanks,
Mahan

Re: Submit Python Beam on Spark Dataproc

Posted by Kyle Weaver <kc...@google.com>.
>
> I already spent days but failed to figure out how to use PortableRunner
> with the beam_spark_job_server to submit my Python Beam job to Spark
> Dataproc. All the Beam docs are about Flink and there is no guideline about
> Spark with Dataproc.


I'm not aware of any docs for Beam Python + Spark + Dataproc either. But if
you absolutely must use Spark, the instructions for Beam Python + Flink +
Dataproc should mostly apply to Spark as well.

Since you're getting worker startup errors, that means you have at least
submitted the job successfully, so you've presumably already figured out
the job server questions.

3- What's the environment_type? Can we use DOCKER? Then what's the SDK
> Harness Configuration?
>

I recommend Docker. You would set "--environment_type=DOCKER". No
"--environment_config" option should be necessary. Like Yu Watanabe said,
though, it will require Docker to be installed on all the Spark worker
nodes. You can do this by installing the optional DOCKER component when
setting up your Dataproc cluster. [2]

[1]
https://cloud.google.com/dataproc/docs/concepts/components/flink#portable_beam_jobs
[2] https://cloud.google.com/dataproc/docs/concepts/components/docker

On Sun, Aug 15, 2021 at 7:52 AM Yu Watanabe <yu...@gmail.com> wrote:

> Hello Mahan.
>
> Sorry for the late reply.
>
> > Still waiting for startup of environment from localhost:50000 for worker
> id 1-1
>
> From the message , it seems that something is wrong with connection
> between Worker node in spark cluster and SDK harness.
>
> According to this slide runner worker (in your context spark worker) ,
> should also have connectivity with sdk harness container.
>
>
> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE/edit#slide=id.g42e4c9aad6_1_0
>
> Could you please also try setting ssh tunneling to spark worker node as
> well ?
>
> Thanks,
> Yu
>
> On Thu, Aug 12, 2021 at 9:07 PM Mahan Hosseinzadeh <ma...@gmail.com>
> wrote:
>
>> Thanks Yu for the help and the tips.
>>
>> I ran the following steps but my job is stuck and can't get submitted to
>> Dataproc and I keep getting this message in job-server:
>> Still waiting for startup of environment from localhost:50000 for worker
>> id 1-1
>>
>>
>> ---------------------------------------------------------------------------------------------------------
>> *Beam code:*
>> pipeline_options = PipelineOptions([
>>             "--runner=PortableRunner",
>>             "--job_endpoint=localhost:8099",
>>             "--environment_type=EXTERNAL",
>>             "--environment_config=localhost:50000"
>>         ])
>>
>> ---------------------------------------------------------------------------------------------------------
>> *Job Server:*
>> I couldn't use Docker because host networking doesn't work on Mac OS and
>> I used Gradle instead
>>
>> ./gradlew :runners:spark:3:job-server:runShadow
>>
>> ---------------------------------------------------------------------------------------------------------
>> *Beam Worker Pool:*
>> docker run -p=50000:50000 apache/beam_python3.7_sdk --worker_pool
>>
>> ---------------------------------------------------------------------------------------------------------
>> *SSH tunnel to the master node:*
>> gcloud compute ssh <my-master-node-m> \
>>     --project <my-gcp-project> \
>>     --zone <my-zone>  \
>>     -- -NL 7077:localhost:7077
>>
>> ---------------------------------------------------------------------------------------------------------
>>
>> Thanks,
>> Mahan
>>
>> On Tue, Aug 10, 2021 at 3:53 PM Yu Watanabe <yu...@gmail.com>
>> wrote:
>>
>>> Hello .
>>>
>>> Would this page help ? I hope it helps.
>>>
>>> https://beam.apache.org/documentation/runners/spark/
>>>
>>> > Running on a pre-deployed Spark cluster
>>>
>>> 1- What's spark-master-url in case of a remote cluster on Dataproc? Is
>>> 7077 the master url port?
>>> * Yes.
>>>
>>> 2- Should we ssh tunnel to sparkMasterUrl port using gcloud compute ssh?
>>> * Job server should be able to communicate with Spark master node port
>>> 7077. So I believe it is Yes.
>>>
>>> 3- What's the environment_type? Can we use DOCKER? Then what's the SDK
>>> Harness Configuration?
>>> * This is the configuration of how you want  your harness container to
>>> spin up.
>>>
>>> https://beam.apache.org/documentation/runtime/sdk-harness-config/
>>>
>>> For DOCKER , you will need docker deployed on all spark worker nodes.
>>> > User code is executed within a container started on each worker node
>>>
>>> I used EXTERNAL when I did it with flink cluster before.
>>>
>>> e.g
>>>
>>> https://github.com/yuwtennis/apache-beam/blob/master/flink-session-cluster/docker/samples/src/sample.py#L14
>>>
>>> 4- Should we run the job-server outside of the Dataproc cluster or
>>> should we run it in the master node?
>>> * Depends. It could be inside or outside the master node. But if you are
>>> connecting to full managed service, then outside might be better.
>>>
>>> https://beam.apache.org/documentation/runners/spark/
>>>
>>> > Start JobService that will connect with the Spark master
>>>
>>> Thanks,
>>> Yu
>>>
>>> On Tue, Aug 10, 2021 at 7:53 PM Mahan Hosseinzadeh <ma...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have a Python Beam job that works on Dataflow but we would like to
>>>> submit it on a Spark Dataproc cluster with no Flink involvement.
>>>> I already spent days but failed to figure out how to use PortableRunner
>>>> with the beam_spark_job_server to submit my Python Beam job to Spark
>>>> Dataproc. All the Beam docs are about Flink and there is no guideline about
>>>> Spark with Dataproc.
>>>> Some relevant questions might be:
>>>> 1- What's spark-master-url in case of a remote cluster on Dataproc? Is
>>>> 7077 the master url port?
>>>> 2- Should we ssh tunnel to sparkMasterUrl port using gcloud compute ssh?
>>>> 3- What's the environment_type? Can we use DOCKER? Then what's the SDK
>>>> Harness Configuration?
>>>> 4- Should we run the job-server outside of the Dataproc cluster or
>>>> should we run it in the master node?
>>>>
>>>> Thanks,
>>>> Mahan
>>>>
>>>
>>>
>>> --
>>> Yu Watanabe
>>>
>>> linkedin: www.linkedin.com/in/yuwatanabe1/
>>> twitter:   twitter.com/yuwtennis
>>>
>>>
>>
>
> --
> Yu Watanabe
>
> linkedin: www.linkedin.com/in/yuwatanabe1/
> twitter:   twitter.com/yuwtennis
>
>

Re: Submit Python Beam on Spark Dataproc

Posted by Yu Watanabe <yu...@gmail.com>.
Hello Mahan.

Sorry for the late reply.

> Still waiting for startup of environment from localhost:50000 for worker
id 1-1

From the message , it seems that something is wrong with connection between
Worker node in spark cluster and SDK harness.

According to this slide runner worker (in your context spark worker) ,
should also have connectivity with sdk harness container.

https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE/edit#slide=id.g42e4c9aad6_1_0

Could you please also try setting ssh tunneling to spark worker node as
well ?

Thanks,
Yu

On Thu, Aug 12, 2021 at 9:07 PM Mahan Hosseinzadeh <ma...@gmail.com>
wrote:

> Thanks Yu for the help and the tips.
>
> I ran the following steps but my job is stuck and can't get submitted to
> Dataproc and I keep getting this message in job-server:
> Still waiting for startup of environment from localhost:50000 for worker
> id 1-1
>
>
> ---------------------------------------------------------------------------------------------------------
> *Beam code:*
> pipeline_options = PipelineOptions([
>             "--runner=PortableRunner",
>             "--job_endpoint=localhost:8099",
>             "--environment_type=EXTERNAL",
>             "--environment_config=localhost:50000"
>         ])
>
> ---------------------------------------------------------------------------------------------------------
> *Job Server:*
> I couldn't use Docker because host networking doesn't work on Mac OS and I
> used Gradle instead
>
> ./gradlew :runners:spark:3:job-server:runShadow
>
> ---------------------------------------------------------------------------------------------------------
> *Beam Worker Pool:*
> docker run -p=50000:50000 apache/beam_python3.7_sdk --worker_pool
>
> ---------------------------------------------------------------------------------------------------------
> *SSH tunnel to the master node:*
> gcloud compute ssh <my-master-node-m> \
>     --project <my-gcp-project> \
>     --zone <my-zone>  \
>     -- -NL 7077:localhost:7077
>
> ---------------------------------------------------------------------------------------------------------
>
> Thanks,
> Mahan
>
> On Tue, Aug 10, 2021 at 3:53 PM Yu Watanabe <yu...@gmail.com> wrote:
>
>> Hello .
>>
>> Would this page help ? I hope it helps.
>>
>> https://beam.apache.org/documentation/runners/spark/
>>
>> > Running on a pre-deployed Spark cluster
>>
>> 1- What's spark-master-url in case of a remote cluster on Dataproc? Is
>> 7077 the master url port?
>> * Yes.
>>
>> 2- Should we ssh tunnel to sparkMasterUrl port using gcloud compute ssh?
>> * Job server should be able to communicate with Spark master node port
>> 7077. So I believe it is Yes.
>>
>> 3- What's the environment_type? Can we use DOCKER? Then what's the SDK
>> Harness Configuration?
>> * This is the configuration of how you want  your harness container to
>> spin up.
>>
>> https://beam.apache.org/documentation/runtime/sdk-harness-config/
>>
>> For DOCKER , you will need docker deployed on all spark worker nodes.
>> > User code is executed within a container started on each worker node
>>
>> I used EXTERNAL when I did it with flink cluster before.
>>
>> e.g
>>
>> https://github.com/yuwtennis/apache-beam/blob/master/flink-session-cluster/docker/samples/src/sample.py#L14
>>
>> 4- Should we run the job-server outside of the Dataproc cluster or should
>> we run it in the master node?
>> * Depends. It could be inside or outside the master node. But if you are
>> connecting to full managed service, then outside might be better.
>>
>> https://beam.apache.org/documentation/runners/spark/
>>
>> > Start JobService that will connect with the Spark master
>>
>> Thanks,
>> Yu
>>
>> On Tue, Aug 10, 2021 at 7:53 PM Mahan Hosseinzadeh <ma...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I have a Python Beam job that works on Dataflow but we would like to
>>> submit it on a Spark Dataproc cluster with no Flink involvement.
>>> I already spent days but failed to figure out how to use PortableRunner
>>> with the beam_spark_job_server to submit my Python Beam job to Spark
>>> Dataproc. All the Beam docs are about Flink and there is no guideline about
>>> Spark with Dataproc.
>>> Some relevant questions might be:
>>> 1- What's spark-master-url in case of a remote cluster on Dataproc? Is
>>> 7077 the master url port?
>>> 2- Should we ssh tunnel to sparkMasterUrl port using gcloud compute ssh?
>>> 3- What's the environment_type? Can we use DOCKER? Then what's the SDK
>>> Harness Configuration?
>>> 4- Should we run the job-server outside of the Dataproc cluster or
>>> should we run it in the master node?
>>>
>>> Thanks,
>>> Mahan
>>>
>>
>>
>> --
>> Yu Watanabe
>>
>> linkedin: www.linkedin.com/in/yuwatanabe1/
>> twitter:   twitter.com/yuwtennis
>>
>>
>

-- 
Yu Watanabe

linkedin: www.linkedin.com/in/yuwatanabe1/
twitter:   twitter.com/yuwtennis

Re: Submit Python Beam on Spark Dataproc

Posted by Mahan Hosseinzadeh <ma...@gmail.com>.
Thanks Yu for the help and the tips.

I ran the following steps but my job is stuck and can't get submitted to
Dataproc and I keep getting this message in job-server:
Still waiting for startup of environment from localhost:50000 for worker id
1-1

---------------------------------------------------------------------------------------------------------
*Beam code:*
pipeline_options = PipelineOptions([
            "--runner=PortableRunner",
            "--job_endpoint=localhost:8099",
            "--environment_type=EXTERNAL",
            "--environment_config=localhost:50000"
        ])
---------------------------------------------------------------------------------------------------------
*Job Server:*
I couldn't use Docker because host networking doesn't work on Mac OS and I
used Gradle instead

./gradlew :runners:spark:3:job-server:runShadow
---------------------------------------------------------------------------------------------------------
*Beam Worker Pool:*
docker run -p=50000:50000 apache/beam_python3.7_sdk --worker_pool
---------------------------------------------------------------------------------------------------------
*SSH tunnel to the master node:*
gcloud compute ssh <my-master-node-m> \
    --project <my-gcp-project> \
    --zone <my-zone>  \
    -- -NL 7077:localhost:7077
---------------------------------------------------------------------------------------------------------

Thanks,
Mahan

On Tue, Aug 10, 2021 at 3:53 PM Yu Watanabe <yu...@gmail.com> wrote:

> Hello .
>
> Would this page help ? I hope it helps.
>
> https://beam.apache.org/documentation/runners/spark/
>
> > Running on a pre-deployed Spark cluster
>
> 1- What's spark-master-url in case of a remote cluster on Dataproc? Is
> 7077 the master url port?
> * Yes.
>
> 2- Should we ssh tunnel to sparkMasterUrl port using gcloud compute ssh?
> * Job server should be able to communicate with Spark master node port
> 7077. So I believe it is Yes.
>
> 3- What's the environment_type? Can we use DOCKER? Then what's the SDK
> Harness Configuration?
> * This is the configuration of how you want  your harness container to
> spin up.
>
> https://beam.apache.org/documentation/runtime/sdk-harness-config/
>
> For DOCKER , you will need docker deployed on all spark worker nodes.
> > User code is executed within a container started on each worker node
>
> I used EXTERNAL when I did it with flink cluster before.
>
> e.g
>
> https://github.com/yuwtennis/apache-beam/blob/master/flink-session-cluster/docker/samples/src/sample.py#L14
>
> 4- Should we run the job-server outside of the Dataproc cluster or should
> we run it in the master node?
> * Depends. It could be inside or outside the master node. But if you are
> connecting to full managed service, then outside might be better.
>
> https://beam.apache.org/documentation/runners/spark/
>
> > Start JobService that will connect with the Spark master
>
> Thanks,
> Yu
>
> On Tue, Aug 10, 2021 at 7:53 PM Mahan Hosseinzadeh <ma...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I have a Python Beam job that works on Dataflow but we would like to
>> submit it on a Spark Dataproc cluster with no Flink involvement.
>> I already spent days but failed to figure out how to use PortableRunner
>> with the beam_spark_job_server to submit my Python Beam job to Spark
>> Dataproc. All the Beam docs are about Flink and there is no guideline about
>> Spark with Dataproc.
>> Some relevant questions might be:
>> 1- What's spark-master-url in case of a remote cluster on Dataproc? Is
>> 7077 the master url port?
>> 2- Should we ssh tunnel to sparkMasterUrl port using gcloud compute ssh?
>> 3- What's the environment_type? Can we use DOCKER? Then what's the SDK
>> Harness Configuration?
>> 4- Should we run the job-server outside of the Dataproc cluster or should
>> we run it in the master node?
>>
>> Thanks,
>> Mahan
>>
>
>
> --
> Yu Watanabe
>
> linkedin: www.linkedin.com/in/yuwatanabe1/
> twitter:   twitter.com/yuwtennis
>
>

Re: Submit Python Beam on Spark Dataproc

Posted by Yu Watanabe <yu...@gmail.com>.
Hello .

Would this page help ? I hope it helps.

https://beam.apache.org/documentation/runners/spark/

> Running on a pre-deployed Spark cluster

1- What's spark-master-url in case of a remote cluster on Dataproc? Is 7077
the master url port?
* Yes.

2- Should we ssh tunnel to sparkMasterUrl port using gcloud compute ssh?
* Job server should be able to communicate with Spark master node port
7077. So I believe it is Yes.

3- What's the environment_type? Can we use DOCKER? Then what's the SDK
Harness Configuration?
* This is the configuration of how you want  your harness container to spin
up.

https://beam.apache.org/documentation/runtime/sdk-harness-config/

For DOCKER , you will need docker deployed on all spark worker nodes.
> User code is executed within a container started on each worker node

I used EXTERNAL when I did it with flink cluster before.

e.g
https://github.com/yuwtennis/apache-beam/blob/master/flink-session-cluster/docker/samples/src/sample.py#L14

4- Should we run the job-server outside of the Dataproc cluster or should
we run it in the master node?
* Depends. It could be inside or outside the master node. But if you are
connecting to full managed service, then outside might be better.

https://beam.apache.org/documentation/runners/spark/

> Start JobService that will connect with the Spark master

Thanks,
Yu

On Tue, Aug 10, 2021 at 7:53 PM Mahan Hosseinzadeh <ma...@gmail.com>
wrote:

> Hi,
>
> I have a Python Beam job that works on Dataflow but we would like to
> submit it on a Spark Dataproc cluster with no Flink involvement.
> I already spent days but failed to figure out how to use PortableRunner
> with the beam_spark_job_server to submit my Python Beam job to Spark
> Dataproc. All the Beam docs are about Flink and there is no guideline about
> Spark with Dataproc.
> Some relevant questions might be:
> 1- What's spark-master-url in case of a remote cluster on Dataproc? Is
> 7077 the master url port?
> 2- Should we ssh tunnel to sparkMasterUrl port using gcloud compute ssh?
> 3- What's the environment_type? Can we use DOCKER? Then what's the SDK
> Harness Configuration?
> 4- Should we run the job-server outside of the Dataproc cluster or should
> we run it in the master node?
>
> Thanks,
> Mahan
>


-- 
Yu Watanabe

linkedin: www.linkedin.com/in/yuwatanabe1/
twitter:   twitter.com/yuwtennis