You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "Ramanan, Buvana (Nokia - US/Murray Hill)" <bu...@nokia-bell-labs.com> on 2020/04/10 23:08:37 UTC

SparkRunner on k8s

Hello,

I newly joined this group and I went through the archive to see if any discussion exists on submitting Beam pipelines to a SparkRunner on k8s.

I run my Spark jobs on a k8s cluster in the cluster mode. Would like to deploy my beam pipeline on a SparkRunner with k8s underneath.

The Beam documentation:
https://beam.apache.org/documentation/runners/spark/
does not discuss about k8s (though there is mention of Mesos and YARN).

Can someone please point me to relevant material in this regard? Or, provide the steps for running my beam pipeline in this configuration?

Thank you,
Regards,
Buvana

Re: SparkRunner on k8s

Posted by "Ramanan, Buvana (Nokia - US/Murray Hill)" <bu...@nokia-bell-labs.com>.
Kyle,

Thank you for the response. I paste my simple Python code (masked out IP) below my sign, which works totally fine with default runner.

PipelineOptions are:
    "--runner=PortableRunner",
    "--job_endpoint=XXXXXXXXXX:8099"
(Replace XXXX with the IP of job runner)

I use ParDo

-Buvana

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

class Printer(beam.DoFn):
    def process(self,data_item):
        print (data_item)

class DateExtractor(beam.DoFn):
    def process(self,data_item):
        return [data_item.split(',')[0]]

# instantiate the pipeline
options = PipelineOptions([
    "--runner=PortableRunner",
    "--job_endpoint=XXXXXXXXXX:8099"
])
with beam.Pipeline(options=options) as p:
    data_from_source = (p
                    | 'ReadMyFile 01' >> beam.io.ReadFromText('apache-beam-tutorial/data/sp500.csv')
                    | 'Splitter using beam.ParDo 01' >> beam.ParDo(DateExtractor())
                    | 'Printer the data 02' >> beam.ParDo(Printer())
                    )

From: Kyle Weaver <kc...@google.com>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Thursday, April 16, 2020 at 3:16 PM
To: "user@beam.apache.org" <us...@beam.apache.org>
Subject: Re: SparkRunner on k8s

Hi Buvana,

After looking a bit closer, it seems this might be a bug in Beam. If you could share a) the pipeline options you are using and b) which source (read) transform you are using in your pipeline, that would be helpful for debugging.

Thanks,
Kyle

On Wed, Apr 15, 2020 at 11:21 PM Ramanan, Buvana (Nokia - US/Murray Hill) <bu...@nokia-bell-labs.com>> wrote:
Kyle,

I also built Python SDK from source of the same branch (release-2.19.0) that is being used by the Job Runner. Same error is manifesting (INVALID_ARGUMENT)

This has been a very tedious venture with no luck so far. Hope to get something working soon.

-Buvana

From: "Ramanan, Buvana (Nokia - US/Murray Hill)" <bu...@nokia-bell-labs.com>>
Reply-To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Date: Wednesday, April 15, 2020 at 9:05 PM
To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Subject: Re: SparkRunner on k8s

Hi Kyle,

Thanks a lot for pointing that out. I am using version Python SDK 2.19.0; as per your email, I now did a git checkout of release-2.19.0 branch and executed the portable runner and I still encounter this error.  ☹

-Buvana

From: Kyle Weaver <kc...@google.com>>
Reply-To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Date: Wednesday, April 15, 2020 at 2:48 PM
To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Subject: Re: SparkRunner on k8s

Hi Buvana,

The usual cause of errors like this is a mismatch between the Python SDK and the Spark job server. Since you are building the job server from source, I would make sure you have checked out the same version as the Python SDK you are using.

Hope that helps.
Kyle

On Mon, Apr 13, 2020 at 7:41 PM Ramanan, Buvana (Nokia - US/Murray Hill) <bu...@nokia-bell-labs.com>> wrote:
Hello,

I am trying to test the Beam Python pipeline on SparkRunner with Spark on Mesos. Followed the instructions here:
https://beam.apache.org/documentation/runners/spark/

The portable job  runner is up and running and is pointing to a valid Spark Master URL (Spark on Mesos).

However, a simple Beam Python Pipeline (works totally fine on Local Runner) submitted to this job runner fails with error code displayed below my sign. It appears that the job runner finds issues with the pipeline syntax – may be its looking for jvm pipelines and got a Python pipeline?

I would appreciate any pointers that you can provide.

Thank you,
Regards,
Buvana

Client Side:
=========
$ python test-beam.py

WARNING:root:Make sure that locally built Python SDK docker image has Python 3.6 interpreter.
Traceback (most recent call last):
  File "test-beam.py", line 117, in <module>
    beam.io.WriteToText(output_filename)
  File "/home/tfs/venv_beam3/lib/python3.6/site-packages/apache_beam/pipeline.py", line 481, in __exit__
    self.run().wait_until_finish()
  File "/home/tfs/venv_beam3/lib/python3.6/site-packages/apache_beam/pipeline.py", line 461, in run
    self._options).run(False)
  File "/home/tfs/venv_beam3/lib/python3.6/site-packages/apache_beam/pipeline.py", line 474, in run
    return self.runner.run_pipeline(self, self._options)
  File "/home/tfs/venv_beam3/lib/python3.6/site-packages/apache_beam/runners/portability/portable_runner.py", line 317, in run_pipeline
    retrieval_token=retrieval_token))
  File "/home/tfs/venv_beam3/lib/python3.6/site-packages/grpc/_channel.py", line 826, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/home/tfs/venv_beam3/lib/python3.6/site-packages/grpc/_channel.py", line 729, in _end_unary_response_blocking
    raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
                status = StatusCode.INVALID_ARGUMENT
                details = ""
                debug_error_string = "{"created":"@1586818954.316495237","description":"Error received from peer ipv4:XXXXXXXXXX:8000","file":"src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"","grpc_status":3}"
 -----------------------------------------------------------------------
job Runner:
 =========
tfs@datamon4:/nas2/tfs/beam$ ./gradlew :runners:spark:job-server:runShadow -PsparkMasterUrl=spark://$SPARK_MASTER:7077
Starting a Gradle Daemon (subsequent builds will be faster)
Configuration on demand is an incubating feature.

> Task :runners:spark:job-server:runShadow
Listening for transport dt_socket at address: 5005
20/04/13 19:02:04 INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver: LegacyArtifactStagingService started on localhost:8098
20/04/13 19:02:04 INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver: Java ExpansionService started on localhost:8097
20/04/13 19:02:04 INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver: JobService started on localhost:8099
20/04/13 19:02:34 WARN org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService: Encountered Unexpected Exception during validation
java.lang.RuntimeException: Failed to validate transform ref_AppliedPTransform_ReadFromText/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)_6
        at org.apache.beam.runners.core.construction.graph.PipelineValidator.validateTransform(PipelineValidator.java:215)
        at org.apache.beam.runners.core.construction.graph.PipelineValidator.validateComponents(PipelineValidator.java:123)
        at org.apache.beam.runners.core.construction.graph.PipelineValidator.validate(PipelineValidator.java:103)
        at org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.run(InMemoryJobService.java:223)
        at org.apache.beam.model.jobmanagement.v1.JobServiceGrpc$MethodHandlers.invoke(JobServiceGrpc.java:961)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:127)
        at org.apache.beam.runners.core.construction.graph.PipelineValidator.validateParDo(PipelineValidator.java:238)
        at org.apache.beam.runners.core.construction.graph.PipelineValidator.validateTransform(PipelineValidator.java:213)
        ... 16 more


From: "Ramanan, Buvana (Nokia - US/Murray Hill)" <bu...@nokia-bell-labs.com>>
Reply-To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Date: Monday, April 13, 2020 at 6:55 PM
To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Subject: Re: SparkRunner on k8s

Kyle,

Thanks a lot for the pointers. I got interested to run my beam pipeline on FlinkRunner and got a local Flink cluster setup, tested a sample code to work fine.

I started the Beam job runner going:
docker run --net=host apachebeam/flink1.8_job_server:latest --flink-master $IP:8081 --job-host $IP  --job-port 8099

Submitted a beam pipeline, which when run with LocalRunner works totally fine. The last stage of the pipeline code looks as follows:
. . .
. . .
. . .
    output= (
        {
            'Mean Open': mean_open,
            'Mean Close': mean_close
        } |
        beam.CoGroupByKey() |
        beam.io.WriteToText(args.output)
    )

So, we are ending the pipeline with a io.WriteToText()

Now, when I supply a filename, whether residing in local disk (/tmp) or network mounted disk(e.g /nas2), I get the following error:
python test-beam.py –input data/sp500.csv –output /tmp/result.txt

WARNING:root:Make sure that locally built Python SDK docker image has Python 3.6 interpreter.
ERROR:root:java.lang.RuntimeException: Error received from SDK harness for instruction 2: Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 883, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 667, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 748, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py", line 1095, in _finalize_write
    writer = sink.open_writer(init_result, str(uuid.uuid4()))
  File "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", line 140, in _f
    return fnc(self, *args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line 191, in open_writer
    return FileBasedSinkWriter(self, writer_path)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line 395, in __init__
    self.temp_handle = self.sink.open(temp_shard_path)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/textio.py", line 397, in open
    file_handle = super(_TextSink, self).open(temp_path)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", line 140, in _f
    return fnc(self, *args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line 134, in open
    return FileSystems.create(temp_path, self.mime_type, self.compression_type)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", line 217, in create
    return filesystem.create(path, mime_type, compression_type)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/localfilesystem.py", line 155, in create
    return self._path_open(path, 'wb', mime_type, compression_type)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/localfilesystem.py", line 137, in _path_open
    raw_file = open(path, mode)
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/beam-temp-result.txt-43eab4947dd811eab6a2002590f97cb6/dbc67656-ad7a-4b8b-97f1-6a223bb7afde.result.txt'


It appears that the filesystem in the client side is not the same as the environment that Flink creates to run the Beam pipeline (I think Flink does a docker run of the python sdk to run the Beam pipeline? In that case, how would the container know where to write the file?)

Please help me debug. The Flink monitoring dashboard shows the several stages of the job, Map, Reduce and what not… In the end, the status is FAILED.

-Buvana

From: Kyle Weaver <kc...@google.com>>
Reply-To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Date: Monday, April 13, 2020 at 11:57 AM
To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Subject: Re: SparkRunner on k8s

Hi Buvana,

Running Beam Python on Spark on Kubernetes is more complicated, because Beam has its own solution for running Python code [1]. Unfortunately there's no guide that I know of for Spark yet, however we do have instructions for Flink [2]. Beam's Flink and Spark runners, and I assume GCP's (unofficial) Flink and Spark [3] operators, are probably similar enough that it shouldn't be too hard to port the YAML from the Flink operator to the Spark operator. I filed an issue for it [4], but I probably won't have the bandwidth to work on it myself for a while.

- Kyle

[1] https://beam.apache.org/roadmap/portability/
[2] https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md
[3] https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/
[4] https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/issues/870

On Sat, Apr 11, 2020 at 4:33 PM Ramanan, Buvana (Nokia - US/Murray Hill) <bu...@nokia-bell-labs.com>> wrote:
Thank you, Rahul for your very useful response. Can you please extend your response by commenting on the procedure for Beam python pipeline?

From: rahul patwari <ra...@gmail.com>>
Reply-To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Date: Friday, April 10, 2020 at 10:57 PM
To: user <us...@beam.apache.org>>
Subject: Re: SparkRunner on k8s

Hi Buvana,

You can submit a Beam Pipeline to Spark on k8s like any other Spark Pipeline using the spark-submit script.

Create an Uber Jar of your Beam code and provide it as the primary resource to spark-submit. Provide the k8s master and the container image to use as arguments to spark-submit.
Refer https://spark.apache.org/docs/latest/running-on-kubernetes.html to know more about how to run Spark on k8s.

The Beam pipeline will be translated to a Spark Pipeline using Spark APIs in Runtime.

Regards,
Rahul

On Sat, Apr 11, 2020 at 4:38 AM Ramanan, Buvana (Nokia - US/Murray Hill) <bu...@nokia-bell-labs.com>> wrote:
Hello,

I newly joined this group and I went through the archive to see if any discussion exists on submitting Beam pipelines to a SparkRunner on k8s.

I run my Spark jobs on a k8s cluster in the cluster mode. Would like to deploy my beam pipeline on a SparkRunner with k8s underneath.

The Beam documentation:
https://beam.apache.org/documentation/runners/spark/
does not discuss about k8s (though there is mention of Mesos and YARN).

Can someone please point me to relevant material in this regard? Or, provide the steps for running my beam pipeline in this configuration?

Thank you,
Regards,
Buvana

Re: SparkRunner on k8s

Posted by Kyle Weaver <kc...@google.com>.
Hi Buvana,

After looking a bit closer, it seems this might be a bug in Beam. If you
could share a) the pipeline options you are using and b) which source
(read) transform you are using in your pipeline, that would be helpful for
debugging.

Thanks,
Kyle

On Wed, Apr 15, 2020 at 11:21 PM Ramanan, Buvana (Nokia - US/Murray Hill) <
buvana.ramanan@nokia-bell-labs.com> wrote:

> Kyle,
>
>
>
> I also built Python SDK from source of the same branch (release-2.19.0)
> that is being used by the Job Runner. Same error is manifesting
> (INVALID_ARGUMENT)
>
>
>
> This has been a very tedious venture with no luck so far. Hope to get
> something working soon.
>
>
>
> -Buvana
>
>
>
> *From: *"Ramanan, Buvana (Nokia - US/Murray Hill)" <
> buvana.ramanan@nokia-bell-labs.com>
> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Date: *Wednesday, April 15, 2020 at 9:05 PM
> *To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Subject: *Re: SparkRunner on k8s
>
>
>
> Hi Kyle,
>
>
>
> Thanks a lot for pointing that out. I am using version Python SDK 2.19.0;
> as per your email, I now did a git checkout of release-2.19.0 branch and
> executed the portable runner and I still encounter this error.  ☹
>
>
>
> -Buvana
>
>
>
> *From: *Kyle Weaver <kc...@google.com>
> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Date: *Wednesday, April 15, 2020 at 2:48 PM
> *To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Subject: *Re: SparkRunner on k8s
>
>
>
> Hi Buvana,
>
>
>
> The usual cause of errors like this is a mismatch between the Python SDK
> and the Spark job server. Since you are building the job server from
> source, I would make sure you have checked out the same version as the
> Python SDK you are using.
>
>
>
> Hope that helps.
>
> Kyle
>
>
>
> On Mon, Apr 13, 2020 at 7:41 PM Ramanan, Buvana (Nokia - US/Murray Hill) <
> buvana.ramanan@nokia-bell-labs.com> wrote:
>
> Hello,
>
>
>
> I am trying to test the Beam Python pipeline on SparkRunner with Spark on
> Mesos. Followed the instructions here:
>
> https://beam.apache.org/documentation/runners/spark/
>
>
>
> The portable job  runner is up and running and is pointing to a valid
> Spark Master URL (Spark on Mesos).
>
>
>
> However, a simple Beam Python Pipeline (works totally fine on Local
> Runner) submitted to this job runner fails with error code displayed below
> my sign. It appears that the job runner finds issues with the pipeline
> syntax – may be its looking for jvm pipelines and got a Python pipeline?
>
>
>
> I would appreciate any pointers that you can provide.
>
>
>
> Thank you,
>
> Regards,
>
> Buvana
>
>
>
> Client Side:
>
> =========
>
> $ python test-beam.py
>
>
>
> WARNING:root:Make sure that locally built Python SDK docker image has
> Python 3.6 interpreter.
>
> Traceback (most recent call last):
>
>   File "test-beam.py", line 117, in <module>
>
>     beam.io.WriteToText(output_filename)
>
>   File
> "/home/tfs/venv_beam3/lib/python3.6/site-packages/apache_beam/pipeline.py",
> line 481, in __exit__
>
>     self.run().wait_until_finish()
>
>   File
> "/home/tfs/venv_beam3/lib/python3.6/site-packages/apache_beam/pipeline.py",
> line 461, in run
>
>     self._options).run(False)
>
>   File
> "/home/tfs/venv_beam3/lib/python3.6/site-packages/apache_beam/pipeline.py",
> line 474, in run
>
>     return self.runner.run_pipeline(self, self._options)
>
>   File
> "/home/tfs/venv_beam3/lib/python3.6/site-packages/apache_beam/runners/portability/portable_runner.py",
> line 317, in run_pipeline
>
>     retrieval_token=retrieval_token))
>
>   File
> "/home/tfs/venv_beam3/lib/python3.6/site-packages/grpc/_channel.py", line
> 826, in __call__
>
>     return _end_unary_response_blocking(state, call, False, None)
>
>   File
> "/home/tfs/venv_beam3/lib/python3.6/site-packages/grpc/_channel.py", line
> 729, in _end_unary_response_blocking
>
>     raise _InactiveRpcError(state)
>
> grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated
> with:
>
>                 status = StatusCode.INVALID_ARGUMENT
>
>                 details = ""
>
>                 debug_error_string =
> "{"created":"@1586818954.316495237","description":"Error received from peer
> ipv4:XXXXXXXXXX:8000","file":"src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"","grpc_status":3}"
>
>  -----------------------------------------------------------------------
>
> job Runner:
>
>  =========
>
> tfs@datamon4:/nas2/tfs/beam$ ./gradlew
> :runners:spark:job-server:runShadow
> -PsparkMasterUrl=spark://$SPARK_MASTER:7077
>
> Starting a Gradle Daemon (subsequent builds will be faster)
>
> Configuration on demand is an incubating feature.
>
>
>
> > Task :runners:spark:job-server:runShadow
>
> Listening for transport dt_socket at address: 5005
>
> 20/04/13 19:02:04 INFO
> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver:
> LegacyArtifactStagingService started on localhost:8098
>
> 20/04/13 19:02:04 INFO
> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver: Java
> ExpansionService started on localhost:8097
>
> 20/04/13 19:02:04 INFO
> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver:
> JobService started on localhost:8099
>
> 20/04/13 19:02:34 WARN
> org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService:
> Encountered Unexpected Exception during validation
>
> java.lang.RuntimeException: Failed to validate transform
> ref_AppliedPTransform_ReadFromText/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn
> )_6
>
>         at
> org.apache.beam.runners.core.construction.graph.PipelineValidator.validateTransform(PipelineValidator.java:215)
>
>         at
> org.apache.beam.runners.core.construction.graph.PipelineValidator.validateComponents(PipelineValidator.java:123)
>
>         at
> org.apache.beam.runners.core.construction.graph.PipelineValidator.validate(PipelineValidator.java:103)
>
>         at
> org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.run(InMemoryJobService.java:223)
>
>         at
> org.apache.beam.model.jobmanagement.v1.JobServiceGrpc$MethodHandlers.invoke(JobServiceGrpc.java:961)
>
>         at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
>
>         at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
>
>         at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
>
>         at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
>
>         at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
>
>         at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
>
>         at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
>
>         at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>
>         at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>         at java.lang.Thread.run(Thread.java:748)
>
> Caused by: java.lang.IllegalArgumentException
>
>         at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:127)
>
>         at
> org.apache.beam.runners.core.construction.graph.PipelineValidator.validateParDo(PipelineValidator.java:238)
>
>         at
> org.apache.beam.runners.core.construction.graph.PipelineValidator.validateTransform(PipelineValidator.java:213)
>
>         ... 16 more
>
>
>
>
>
> *From: *"Ramanan, Buvana (Nokia - US/Murray Hill)" <
> buvana.ramanan@nokia-bell-labs.com>
> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Date: *Monday, April 13, 2020 at 6:55 PM
> *To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Subject: *Re: SparkRunner on k8s
>
>
>
> Kyle,
>
>
>
> Thanks a lot for the pointers. I got interested to run my beam pipeline on
> FlinkRunner and got a local Flink cluster setup, tested a sample code to
> work fine.
>
>
>
> I started the Beam job runner going:
>
> docker run --net=host apachebeam/flink1.8_job_server:latest --flink-master
> $IP:8081 --job-host $IP  --job-port 8099
>
>
>
> Submitted a beam pipeline, which when run with LocalRunner works totally
> fine. The last stage of the pipeline code looks as follows:
>
> . . .
>
> . . .
>
> . . .
>
>     output= (
>
>         {
>
>             'Mean Open': mean_open,
>
>             'Mean Close': mean_close
>
>         } |
>
>         beam.CoGroupByKey() |
>
>         beam.io.WriteToText(args.output)
>
>     )
>
>
>
> So, we are ending the pipeline with a io.WriteToText()
>
>
>
> Now, when I supply a filename, whether residing in local disk (/tmp) or
> network mounted disk(e.g /nas2), I get the following error:
>
> python test-beam.py –input data/sp500.csv –output /tmp/result.txt
>
>
>
> WARNING:root:Make sure that locally built Python SDK docker image has
> Python 3.6 interpreter.
>
> ERROR:root:java.lang.RuntimeException: Error received from SDK harness for
> instruction 2: Traceback (most recent call last):
>
>   File "apache_beam/runners/common.py", line 883, in
> apache_beam.runners.common.DoFnRunner.process
>
>   File "apache_beam/runners/common.py", line 667, in
> apache_beam.runners.common.PerWindowInvoker.invoke_process
>
>   File "apache_beam/runners/common.py", line 748, in
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py",
> line 1095, in _finalize_write
>
>     writer = sink.open_writer(init_result, str(uuid.uuid4()))
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py",
> line 140, in _f
>
>     return fnc(self, *args, **kwargs)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py",
> line 191, in open_writer
>
>     return FileBasedSinkWriter(self, writer_path)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py",
> line 395, in __init__
>
>     self.temp_handle = self.sink.open(temp_shard_path)
>
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/io/textio.py",
> line 397, in open
>
>     file_handle = super(_TextSink, self).open(temp_path)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py",
> line 140, in _f
>
>     return fnc(self, *args, **kwargs)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py",
> line 134, in open
>
>     return FileSystems.create(temp_path, self.mime_type,
> self.compression_type)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py",
> line 217, in create
>
>     return filesystem.create(path, mime_type, compression_type)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/localfilesystem.py",
> line 155, in create
>
>     return self._path_open(path, 'wb', mime_type, compression_type)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/localfilesystem.py",
> line 137, in _path_open
>
>     raw_file = open(path, mode)
>
> FileNotFoundError: [Errno 2] No such file or directory:
> '/tmp/beam-temp-result.txt-43eab4947dd811eab6a2002590f97cb6/dbc67656-ad7a-4b8b-97f1-6a223bb7afde.result.txt'
>
>
>
>
>
> It appears that the filesystem in the client side is not the same as the
> environment that Flink creates to run the Beam pipeline (I think Flink does
> a docker run of the python sdk to run the Beam pipeline? In that case, how
> would the container know where to write the file?)
>
>
>
> Please help me debug. The Flink monitoring dashboard shows the several
> stages of the job, Map, Reduce and what not… In the end, the status is
> FAILED.
>
>
>
> -Buvana
>
>
>
> *From: *Kyle Weaver <kc...@google.com>
> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Date: *Monday, April 13, 2020 at 11:57 AM
> *To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Subject: *Re: SparkRunner on k8s
>
>
>
> Hi Buvana,
>
>
>
> Running Beam Python on Spark on Kubernetes is more complicated, because
> Beam has its own solution for running Python code [1]. Unfortunately
> there's no guide that I know of for Spark yet, however we do have
> instructions for Flink [2]. Beam's Flink and Spark runners, and I assume
> GCP's (unofficial) Flink and Spark [3] operators, are probably similar
> enough that it shouldn't be too hard to port the YAML from the Flink
> operator to the Spark operator. I filed an issue for it [4], but I probably
> won't have the bandwidth to work on it myself for a while.
>
>
>
> - Kyle
>
>
>
> [1] https://beam.apache.org/roadmap/portability/
>
> [2]
> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md
>
> [3] https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/
>
> [4]
> https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/issues/870
>
>
>
> On Sat, Apr 11, 2020 at 4:33 PM Ramanan, Buvana (Nokia - US/Murray Hill) <
> buvana.ramanan@nokia-bell-labs.com> wrote:
>
> Thank you, Rahul for your very useful response. Can you please extend your
> response by commenting on the procedure for Beam python pipeline?
>
>
>
> *From: *rahul patwari <ra...@gmail.com>
> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Date: *Friday, April 10, 2020 at 10:57 PM
> *To: *user <us...@beam.apache.org>
> *Subject: *Re: SparkRunner on k8s
>
>
>
> Hi Buvana,
>
>
>
> You can submit a Beam Pipeline to Spark on k8s like any other Spark
> Pipeline using the spark-submit script.
>
>
>
> Create an Uber Jar of your Beam code and provide it as the primary
> resource to spark-submit. Provide the k8s master and the container image to
> use as arguments to spark-submit.
>
> Refer https://spark.apache.org/docs/latest/running-on-kubernetes.html to
> know more about how to run Spark on k8s.
>
>
>
> The Beam pipeline will be translated to a Spark Pipeline using Spark APIs
> in Runtime.
>
>
>
> Regards,
>
> Rahul
>
>
>
> On Sat, Apr 11, 2020 at 4:38 AM Ramanan, Buvana (Nokia - US/Murray Hill) <
> buvana.ramanan@nokia-bell-labs.com> wrote:
>
> Hello,
>
>
>
> I newly joined this group and I went through the archive to see if any
> discussion exists on submitting Beam pipelines to a SparkRunner on k8s.
>
>
>
> I run my Spark jobs on a k8s cluster in the cluster mode. Would like to
> deploy my beam pipeline on a SparkRunner with k8s underneath.
>
>
>
> The Beam documentation:
>
> https://beam.apache.org/documentation/runners/spark/
>
> does not discuss about k8s (though there is mention of Mesos and YARN).
>
>
>
> Can someone please point me to relevant material in this regard? Or,
> provide the steps for running my beam pipeline in this configuration?
>
>
>
> Thank you,
>
> Regards,
>
> Buvana
>
>

Re: SparkRunner on k8s

Posted by "Ramanan, Buvana (Nokia - US/Murray Hill)" <bu...@nokia-bell-labs.com>.
Kyle,

I also built Python SDK from source of the same branch (release-2.19.0) that is being used by the Job Runner. Same error is manifesting (INVALID_ARGUMENT)

This has been a very tedious venture with no luck so far. Hope to get something working soon.

-Buvana

From: "Ramanan, Buvana (Nokia - US/Murray Hill)" <bu...@nokia-bell-labs.com>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Wednesday, April 15, 2020 at 9:05 PM
To: "user@beam.apache.org" <us...@beam.apache.org>
Subject: Re: SparkRunner on k8s

Hi Kyle,

Thanks a lot for pointing that out. I am using version Python SDK 2.19.0; as per your email, I now did a git checkout of release-2.19.0 branch and executed the portable runner and I still encounter this error.  ☹

-Buvana

From: Kyle Weaver <kc...@google.com>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Wednesday, April 15, 2020 at 2:48 PM
To: "user@beam.apache.org" <us...@beam.apache.org>
Subject: Re: SparkRunner on k8s

Hi Buvana,

The usual cause of errors like this is a mismatch between the Python SDK and the Spark job server. Since you are building the job server from source, I would make sure you have checked out the same version as the Python SDK you are using.

Hope that helps.
Kyle

On Mon, Apr 13, 2020 at 7:41 PM Ramanan, Buvana (Nokia - US/Murray Hill) <bu...@nokia-bell-labs.com>> wrote:
Hello,

I am trying to test the Beam Python pipeline on SparkRunner with Spark on Mesos. Followed the instructions here:
https://beam.apache.org/documentation/runners/spark/

The portable job  runner is up and running and is pointing to a valid Spark Master URL (Spark on Mesos).

However, a simple Beam Python Pipeline (works totally fine on Local Runner) submitted to this job runner fails with error code displayed below my sign. It appears that the job runner finds issues with the pipeline syntax – may be its looking for jvm pipelines and got a Python pipeline?

I would appreciate any pointers that you can provide.

Thank you,
Regards,
Buvana

Client Side:
=========
$ python test-beam.py

WARNING:root:Make sure that locally built Python SDK docker image has Python 3.6 interpreter.
Traceback (most recent call last):
  File "test-beam.py", line 117, in <module>
    beam.io.WriteToText(output_filename)
  File "/home/tfs/venv_beam3/lib/python3.6/site-packages/apache_beam/pipeline.py", line 481, in __exit__
    self.run().wait_until_finish()
  File "/home/tfs/venv_beam3/lib/python3.6/site-packages/apache_beam/pipeline.py", line 461, in run
    self._options).run(False)
  File "/home/tfs/venv_beam3/lib/python3.6/site-packages/apache_beam/pipeline.py", line 474, in run
    return self.runner.run_pipeline(self, self._options)
  File "/home/tfs/venv_beam3/lib/python3.6/site-packages/apache_beam/runners/portability/portable_runner.py", line 317, in run_pipeline
    retrieval_token=retrieval_token))
  File "/home/tfs/venv_beam3/lib/python3.6/site-packages/grpc/_channel.py", line 826, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/home/tfs/venv_beam3/lib/python3.6/site-packages/grpc/_channel.py", line 729, in _end_unary_response_blocking
    raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
                status = StatusCode.INVALID_ARGUMENT
                details = ""
                debug_error_string = "{"created":"@1586818954.316495237","description":"Error received from peer ipv4:XXXXXXXXXX:8000","file":"src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"","grpc_status":3}"
 -----------------------------------------------------------------------
job Runner:
 =========
tfs@datamon4:/nas2/tfs/beam$ ./gradlew :runners:spark:job-server:runShadow -PsparkMasterUrl=spark://$SPARK_MASTER:7077
Starting a Gradle Daemon (subsequent builds will be faster)
Configuration on demand is an incubating feature.

> Task :runners:spark:job-server:runShadow
Listening for transport dt_socket at address: 5005
20/04/13 19:02:04 INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver: LegacyArtifactStagingService started on localhost:8098
20/04/13 19:02:04 INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver: Java ExpansionService started on localhost:8097
20/04/13 19:02:04 INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver: JobService started on localhost:8099
20/04/13 19:02:34 WARN org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService: Encountered Unexpected Exception during validation
java.lang.RuntimeException: Failed to validate transform ref_AppliedPTransform_ReadFromText/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)_6
        at org.apache.beam.runners.core.construction.graph.PipelineValidator.validateTransform(PipelineValidator.java:215)
        at org.apache.beam.runners.core.construction.graph.PipelineValidator.validateComponents(PipelineValidator.java:123)
        at org.apache.beam.runners.core.construction.graph.PipelineValidator.validate(PipelineValidator.java:103)
        at org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.run(InMemoryJobService.java:223)
        at org.apache.beam.model.jobmanagement.v1.JobServiceGrpc$MethodHandlers.invoke(JobServiceGrpc.java:961)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:127)
        at org.apache.beam.runners.core.construction.graph.PipelineValidator.validateParDo(PipelineValidator.java:238)
        at org.apache.beam.runners.core.construction.graph.PipelineValidator.validateTransform(PipelineValidator.java:213)
        ... 16 more


From: "Ramanan, Buvana (Nokia - US/Murray Hill)" <bu...@nokia-bell-labs.com>>
Reply-To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Date: Monday, April 13, 2020 at 6:55 PM
To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Subject: Re: SparkRunner on k8s

Kyle,

Thanks a lot for the pointers. I got interested to run my beam pipeline on FlinkRunner and got a local Flink cluster setup, tested a sample code to work fine.

I started the Beam job runner going:
docker run --net=host apachebeam/flink1.8_job_server:latest --flink-master $IP:8081 --job-host $IP  --job-port 8099

Submitted a beam pipeline, which when run with LocalRunner works totally fine. The last stage of the pipeline code looks as follows:
. . .
. . .
. . .
    output= (
        {
            'Mean Open': mean_open,
            'Mean Close': mean_close
        } |
        beam.CoGroupByKey() |
        beam.io.WriteToText(args.output)
    )

So, we are ending the pipeline with a io.WriteToText()

Now, when I supply a filename, whether residing in local disk (/tmp) or network mounted disk(e.g /nas2), I get the following error:
python test-beam.py –input data/sp500.csv –output /tmp/result.txt

WARNING:root:Make sure that locally built Python SDK docker image has Python 3.6 interpreter.
ERROR:root:java.lang.RuntimeException: Error received from SDK harness for instruction 2: Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 883, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 667, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 748, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py", line 1095, in _finalize_write
    writer = sink.open_writer(init_result, str(uuid.uuid4()))
  File "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", line 140, in _f
    return fnc(self, *args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line 191, in open_writer
    return FileBasedSinkWriter(self, writer_path)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line 395, in __init__
    self.temp_handle = self.sink.open(temp_shard_path)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/textio.py", line 397, in open
    file_handle = super(_TextSink, self).open(temp_path)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", line 140, in _f
    return fnc(self, *args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line 134, in open
    return FileSystems.create(temp_path, self.mime_type, self.compression_type)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", line 217, in create
    return filesystem.create(path, mime_type, compression_type)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/localfilesystem.py", line 155, in create
    return self._path_open(path, 'wb', mime_type, compression_type)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/localfilesystem.py", line 137, in _path_open
    raw_file = open(path, mode)
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/beam-temp-result.txt-43eab4947dd811eab6a2002590f97cb6/dbc67656-ad7a-4b8b-97f1-6a223bb7afde.result.txt'


It appears that the filesystem in the client side is not the same as the environment that Flink creates to run the Beam pipeline (I think Flink does a docker run of the python sdk to run the Beam pipeline? In that case, how would the container know where to write the file?)

Please help me debug. The Flink monitoring dashboard shows the several stages of the job, Map, Reduce and what not… In the end, the status is FAILED.

-Buvana

From: Kyle Weaver <kc...@google.com>>
Reply-To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Date: Monday, April 13, 2020 at 11:57 AM
To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Subject: Re: SparkRunner on k8s

Hi Buvana,

Running Beam Python on Spark on Kubernetes is more complicated, because Beam has its own solution for running Python code [1]. Unfortunately there's no guide that I know of for Spark yet, however we do have instructions for Flink [2]. Beam's Flink and Spark runners, and I assume GCP's (unofficial) Flink and Spark [3] operators, are probably similar enough that it shouldn't be too hard to port the YAML from the Flink operator to the Spark operator. I filed an issue for it [4], but I probably won't have the bandwidth to work on it myself for a while.

- Kyle

[1] https://beam.apache.org/roadmap/portability/
[2] https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md
[3] https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/
[4] https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/issues/870

On Sat, Apr 11, 2020 at 4:33 PM Ramanan, Buvana (Nokia - US/Murray Hill) <bu...@nokia-bell-labs.com>> wrote:
Thank you, Rahul for your very useful response. Can you please extend your response by commenting on the procedure for Beam python pipeline?

From: rahul patwari <ra...@gmail.com>>
Reply-To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Date: Friday, April 10, 2020 at 10:57 PM
To: user <us...@beam.apache.org>>
Subject: Re: SparkRunner on k8s

Hi Buvana,

You can submit a Beam Pipeline to Spark on k8s like any other Spark Pipeline using the spark-submit script.

Create an Uber Jar of your Beam code and provide it as the primary resource to spark-submit. Provide the k8s master and the container image to use as arguments to spark-submit.
Refer https://spark.apache.org/docs/latest/running-on-kubernetes.html to know more about how to run Spark on k8s.

The Beam pipeline will be translated to a Spark Pipeline using Spark APIs in Runtime.

Regards,
Rahul

On Sat, Apr 11, 2020 at 4:38 AM Ramanan, Buvana (Nokia - US/Murray Hill) <bu...@nokia-bell-labs.com>> wrote:
Hello,

I newly joined this group and I went through the archive to see if any discussion exists on submitting Beam pipelines to a SparkRunner on k8s.

I run my Spark jobs on a k8s cluster in the cluster mode. Would like to deploy my beam pipeline on a SparkRunner with k8s underneath.

The Beam documentation:
https://beam.apache.org/documentation/runners/spark/
does not discuss about k8s (though there is mention of Mesos and YARN).

Can someone please point me to relevant material in this regard? Or, provide the steps for running my beam pipeline in this configuration?

Thank you,
Regards,
Buvana

Re: SparkRunner on k8s

Posted by "Ramanan, Buvana (Nokia - US/Murray Hill)" <bu...@nokia-bell-labs.com>.
Hi Kyle,

Thanks a lot for pointing that out. I am using version Python SDK 2.19.0; as per your email, I now did a git checkout of release-2.19.0 branch and executed the portable runner and I still encounter this error.  ☹

-Buvana

From: Kyle Weaver <kc...@google.com>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Wednesday, April 15, 2020 at 2:48 PM
To: "user@beam.apache.org" <us...@beam.apache.org>
Subject: Re: SparkRunner on k8s

Hi Buvana,

The usual cause of errors like this is a mismatch between the Python SDK and the Spark job server. Since you are building the job server from source, I would make sure you have checked out the same version as the Python SDK you are using.

Hope that helps.
Kyle

On Mon, Apr 13, 2020 at 7:41 PM Ramanan, Buvana (Nokia - US/Murray Hill) <bu...@nokia-bell-labs.com>> wrote:
Hello,

I am trying to test the Beam Python pipeline on SparkRunner with Spark on Mesos. Followed the instructions here:
https://beam.apache.org/documentation/runners/spark/

The portable job  runner is up and running and is pointing to a valid Spark Master URL (Spark on Mesos).

However, a simple Beam Python Pipeline (works totally fine on Local Runner) submitted to this job runner fails with error code displayed below my sign. It appears that the job runner finds issues with the pipeline syntax – may be its looking for jvm pipelines and got a Python pipeline?

I would appreciate any pointers that you can provide.

Thank you,
Regards,
Buvana

Client Side:
=========
$ python test-beam.py

WARNING:root:Make sure that locally built Python SDK docker image has Python 3.6 interpreter.
Traceback (most recent call last):
  File "test-beam.py", line 117, in <module>
    beam.io.WriteToText(output_filename)
  File "/home/tfs/venv_beam3/lib/python3.6/site-packages/apache_beam/pipeline.py", line 481, in __exit__
    self.run().wait_until_finish()
  File "/home/tfs/venv_beam3/lib/python3.6/site-packages/apache_beam/pipeline.py", line 461, in run
    self._options).run(False)
  File "/home/tfs/venv_beam3/lib/python3.6/site-packages/apache_beam/pipeline.py", line 474, in run
    return self.runner.run_pipeline(self, self._options)
  File "/home/tfs/venv_beam3/lib/python3.6/site-packages/apache_beam/runners/portability/portable_runner.py", line 317, in run_pipeline
    retrieval_token=retrieval_token))
  File "/home/tfs/venv_beam3/lib/python3.6/site-packages/grpc/_channel.py", line 826, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/home/tfs/venv_beam3/lib/python3.6/site-packages/grpc/_channel.py", line 729, in _end_unary_response_blocking
    raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
                status = StatusCode.INVALID_ARGUMENT
                details = ""
                debug_error_string = "{"created":"@1586818954.316495237","description":"Error received from peer ipv4:XXXXXXXXXX:8000","file":"src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"","grpc_status":3}"
 -----------------------------------------------------------------------
job Runner:
 =========
tfs@datamon4:/nas2/tfs/beam$ ./gradlew :runners:spark:job-server:runShadow -PsparkMasterUrl=spark://$SPARK_MASTER:7077
Starting a Gradle Daemon (subsequent builds will be faster)
Configuration on demand is an incubating feature.

> Task :runners:spark:job-server:runShadow
Listening for transport dt_socket at address: 5005
20/04/13 19:02:04 INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver: LegacyArtifactStagingService started on localhost:8098
20/04/13 19:02:04 INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver: Java ExpansionService started on localhost:8097
20/04/13 19:02:04 INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver: JobService started on localhost:8099
20/04/13 19:02:34 WARN org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService: Encountered Unexpected Exception during validation
java.lang.RuntimeException: Failed to validate transform ref_AppliedPTransform_ReadFromText/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)_6
        at org.apache.beam.runners.core.construction.graph.PipelineValidator.validateTransform(PipelineValidator.java:215)
        at org.apache.beam.runners.core.construction.graph.PipelineValidator.validateComponents(PipelineValidator.java:123)
        at org.apache.beam.runners.core.construction.graph.PipelineValidator.validate(PipelineValidator.java:103)
        at org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.run(InMemoryJobService.java:223)
        at org.apache.beam.model.jobmanagement.v1.JobServiceGrpc$MethodHandlers.invoke(JobServiceGrpc.java:961)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:127)
        at org.apache.beam.runners.core.construction.graph.PipelineValidator.validateParDo(PipelineValidator.java:238)
        at org.apache.beam.runners.core.construction.graph.PipelineValidator.validateTransform(PipelineValidator.java:213)
        ... 16 more


From: "Ramanan, Buvana (Nokia - US/Murray Hill)" <bu...@nokia-bell-labs.com>>
Reply-To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Date: Monday, April 13, 2020 at 6:55 PM
To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Subject: Re: SparkRunner on k8s

Kyle,

Thanks a lot for the pointers. I got interested to run my beam pipeline on FlinkRunner and got a local Flink cluster setup, tested a sample code to work fine.

I started the Beam job runner going:
docker run --net=host apachebeam/flink1.8_job_server:latest --flink-master $IP:8081 --job-host $IP  --job-port 8099

Submitted a beam pipeline, which when run with LocalRunner works totally fine. The last stage of the pipeline code looks as follows:
. . .
. . .
. . .
    output= (
        {
            'Mean Open': mean_open,
            'Mean Close': mean_close
        } |
        beam.CoGroupByKey() |
        beam.io.WriteToText(args.output)
    )

So, we are ending the pipeline with a io.WriteToText()

Now, when I supply a filename, whether residing in local disk (/tmp) or network mounted disk(e.g /nas2), I get the following error:
python test-beam.py –input data/sp500.csv –output /tmp/result.txt

WARNING:root:Make sure that locally built Python SDK docker image has Python 3.6 interpreter.
ERROR:root:java.lang.RuntimeException: Error received from SDK harness for instruction 2: Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 883, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 667, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 748, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py", line 1095, in _finalize_write
    writer = sink.open_writer(init_result, str(uuid.uuid4()))
  File "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", line 140, in _f
    return fnc(self, *args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line 191, in open_writer
    return FileBasedSinkWriter(self, writer_path)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line 395, in __init__
    self.temp_handle = self.sink.open(temp_shard_path)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/textio.py", line 397, in open
    file_handle = super(_TextSink, self).open(temp_path)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", line 140, in _f
    return fnc(self, *args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line 134, in open
    return FileSystems.create(temp_path, self.mime_type, self.compression_type)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", line 217, in create
    return filesystem.create(path, mime_type, compression_type)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/localfilesystem.py", line 155, in create
    return self._path_open(path, 'wb', mime_type, compression_type)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/localfilesystem.py", line 137, in _path_open
    raw_file = open(path, mode)
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/beam-temp-result.txt-43eab4947dd811eab6a2002590f97cb6/dbc67656-ad7a-4b8b-97f1-6a223bb7afde.result.txt'


It appears that the filesystem in the client side is not the same as the environment that Flink creates to run the Beam pipeline (I think Flink does a docker run of the python sdk to run the Beam pipeline? In that case, how would the container know where to write the file?)

Please help me debug. The Flink monitoring dashboard shows the several stages of the job, Map, Reduce and what not… In the end, the status is FAILED.

-Buvana

From: Kyle Weaver <kc...@google.com>>
Reply-To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Date: Monday, April 13, 2020 at 11:57 AM
To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Subject: Re: SparkRunner on k8s

Hi Buvana,

Running Beam Python on Spark on Kubernetes is more complicated, because Beam has its own solution for running Python code [1]. Unfortunately there's no guide that I know of for Spark yet, however we do have instructions for Flink [2]. Beam's Flink and Spark runners, and I assume GCP's (unofficial) Flink and Spark [3] operators, are probably similar enough that it shouldn't be too hard to port the YAML from the Flink operator to the Spark operator. I filed an issue for it [4], but I probably won't have the bandwidth to work on it myself for a while.

- Kyle

[1] https://beam.apache.org/roadmap/portability/
[2] https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md
[3] https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/
[4] https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/issues/870

On Sat, Apr 11, 2020 at 4:33 PM Ramanan, Buvana (Nokia - US/Murray Hill) <bu...@nokia-bell-labs.com>> wrote:
Thank you, Rahul for your very useful response. Can you please extend your response by commenting on the procedure for Beam python pipeline?

From: rahul patwari <ra...@gmail.com>>
Reply-To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Date: Friday, April 10, 2020 at 10:57 PM
To: user <us...@beam.apache.org>>
Subject: Re: SparkRunner on k8s

Hi Buvana,

You can submit a Beam Pipeline to Spark on k8s like any other Spark Pipeline using the spark-submit script.

Create an Uber Jar of your Beam code and provide it as the primary resource to spark-submit. Provide the k8s master and the container image to use as arguments to spark-submit.
Refer https://spark.apache.org/docs/latest/running-on-kubernetes.html to know more about how to run Spark on k8s.

The Beam pipeline will be translated to a Spark Pipeline using Spark APIs in Runtime.

Regards,
Rahul

On Sat, Apr 11, 2020 at 4:38 AM Ramanan, Buvana (Nokia - US/Murray Hill) <bu...@nokia-bell-labs.com>> wrote:
Hello,

I newly joined this group and I went through the archive to see if any discussion exists on submitting Beam pipelines to a SparkRunner on k8s.

I run my Spark jobs on a k8s cluster in the cluster mode. Would like to deploy my beam pipeline on a SparkRunner with k8s underneath.

The Beam documentation:
https://beam.apache.org/documentation/runners/spark/
does not discuss about k8s (though there is mention of Mesos and YARN).

Can someone please point me to relevant material in this regard? Or, provide the steps for running my beam pipeline in this configuration?

Thank you,
Regards,
Buvana

Re: SparkRunner on k8s

Posted by Kyle Weaver <kc...@google.com>.
Hi Buvana,

The usual cause of errors like this is a mismatch between the Python SDK
and the Spark job server. Since you are building the job server from
source, I would make sure you have checked out the same version as the
Python SDK you are using.

Hope that helps.
Kyle

On Mon, Apr 13, 2020 at 7:41 PM Ramanan, Buvana (Nokia - US/Murray Hill) <
buvana.ramanan@nokia-bell-labs.com> wrote:

> Hello,
>
>
>
> I am trying to test the Beam Python pipeline on SparkRunner with Spark on
> Mesos. Followed the instructions here:
>
> https://beam.apache.org/documentation/runners/spark/
>
>
>
> The portable job  runner is up and running and is pointing to a valid
> Spark Master URL (Spark on Mesos).
>
>
>
> However, a simple Beam Python Pipeline (works totally fine on Local
> Runner) submitted to this job runner fails with error code displayed below
> my sign. It appears that the job runner finds issues with the pipeline
> syntax – may be its looking for jvm pipelines and got a Python pipeline?
>
>
>
> I would appreciate any pointers that you can provide.
>
>
>
> Thank you,
>
> Regards,
>
> Buvana
>
>
>
> Client Side:
>
> =========
>
> $ python test-beam.py
>
>
>
> WARNING:root:Make sure that locally built Python SDK docker image has
> Python 3.6 interpreter.
>
> Traceback (most recent call last):
>
>   File "test-beam.py", line 117, in <module>
>
>     beam.io.WriteToText(output_filename)
>
>   File
> "/home/tfs/venv_beam3/lib/python3.6/site-packages/apache_beam/pipeline.py",
> line 481, in __exit__
>
>     self.run().wait_until_finish()
>
>   File
> "/home/tfs/venv_beam3/lib/python3.6/site-packages/apache_beam/pipeline.py",
> line 461, in run
>
>     self._options).run(False)
>
>   File
> "/home/tfs/venv_beam3/lib/python3.6/site-packages/apache_beam/pipeline.py",
> line 474, in run
>
>     return self.runner.run_pipeline(self, self._options)
>
>   File
> "/home/tfs/venv_beam3/lib/python3.6/site-packages/apache_beam/runners/portability/portable_runner.py",
> line 317, in run_pipeline
>
>     retrieval_token=retrieval_token))
>
>   File
> "/home/tfs/venv_beam3/lib/python3.6/site-packages/grpc/_channel.py", line
> 826, in __call__
>
>     return _end_unary_response_blocking(state, call, False, None)
>
>   File
> "/home/tfs/venv_beam3/lib/python3.6/site-packages/grpc/_channel.py", line
> 729, in _end_unary_response_blocking
>
>     raise _InactiveRpcError(state)
>
> grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated
> with:
>
>                 status = StatusCode.INVALID_ARGUMENT
>
>                 details = ""
>
>                 debug_error_string =
> "{"created":"@1586818954.316495237","description":"Error received from peer
> ipv4:XXXXXXXXXX:8000","file":"src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"","grpc_status":3}"
>
>  -----------------------------------------------------------------------
>
> job Runner:
>
>  =========
>
> tfs@datamon4:/nas2/tfs/beam$ ./gradlew
> :runners:spark:job-server:runShadow
> -PsparkMasterUrl=spark://$SPARK_MASTER:7077
>
> Starting a Gradle Daemon (subsequent builds will be faster)
>
> Configuration on demand is an incubating feature.
>
>
>
> > Task :runners:spark:job-server:runShadow
>
> Listening for transport dt_socket at address: 5005
>
> 20/04/13 19:02:04 INFO
> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver:
> LegacyArtifactStagingService started on localhost:8098
>
> 20/04/13 19:02:04 INFO
> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver: Java
> ExpansionService started on localhost:8097
>
> 20/04/13 19:02:04 INFO
> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver:
> JobService started on localhost:8099
>
> 20/04/13 19:02:34 WARN
> org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService:
> Encountered Unexpected Exception during validation
>
> java.lang.RuntimeException: Failed to validate transform
> ref_AppliedPTransform_ReadFromText/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn
> )_6
>
>         at
> org.apache.beam.runners.core.construction.graph.PipelineValidator.validateTransform(PipelineValidator.java:215)
>
>         at
> org.apache.beam.runners.core.construction.graph.PipelineValidator.validateComponents(PipelineValidator.java:123)
>
>         at
> org.apache.beam.runners.core.construction.graph.PipelineValidator.validate(PipelineValidator.java:103)
>
>         at
> org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.run(InMemoryJobService.java:223)
>
>         at
> org.apache.beam.model.jobmanagement.v1.JobServiceGrpc$MethodHandlers.invoke(JobServiceGrpc.java:961)
>
>         at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
>
>         at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
>
>         at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
>
>         at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
>
>         at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
>
>         at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
>
>         at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
>
>         at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>
>         at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>         at java.lang.Thread.run(Thread.java:748)
>
> Caused by: java.lang.IllegalArgumentException
>
>         at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:127)
>
>         at
> org.apache.beam.runners.core.construction.graph.PipelineValidator.validateParDo(PipelineValidator.java:238)
>
>         at
> org.apache.beam.runners.core.construction.graph.PipelineValidator.validateTransform(PipelineValidator.java:213)
>
>         ... 16 more
>
>
>
>
>
> *From: *"Ramanan, Buvana (Nokia - US/Murray Hill)" <
> buvana.ramanan@nokia-bell-labs.com>
> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Date: *Monday, April 13, 2020 at 6:55 PM
> *To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Subject: *Re: SparkRunner on k8s
>
>
>
> Kyle,
>
>
>
> Thanks a lot for the pointers. I got interested to run my beam pipeline on
> FlinkRunner and got a local Flink cluster setup, tested a sample code to
> work fine.
>
>
>
> I started the Beam job runner going:
>
> docker run --net=host apachebeam/flink1.8_job_server:latest --flink-master
> $IP:8081 --job-host $IP  --job-port 8099
>
>
>
> Submitted a beam pipeline, which when run with LocalRunner works totally
> fine. The last stage of the pipeline code looks as follows:
>
> . . .
>
> . . .
>
> . . .
>
>     output= (
>
>         {
>
>             'Mean Open': mean_open,
>
>             'Mean Close': mean_close
>
>         } |
>
>         beam.CoGroupByKey() |
>
>         beam.io.WriteToText(args.output)
>
>     )
>
>
>
> So, we are ending the pipeline with a io.WriteToText()
>
>
>
> Now, when I supply a filename, whether residing in local disk (/tmp) or
> network mounted disk(e.g /nas2), I get the following error:
>
> python test-beam.py –input data/sp500.csv –output /tmp/result.txt
>
>
>
> WARNING:root:Make sure that locally built Python SDK docker image has
> Python 3.6 interpreter.
>
> ERROR:root:java.lang.RuntimeException: Error received from SDK harness for
> instruction 2: Traceback (most recent call last):
>
>   File "apache_beam/runners/common.py", line 883, in
> apache_beam.runners.common.DoFnRunner.process
>
>   File "apache_beam/runners/common.py", line 667, in
> apache_beam.runners.common.PerWindowInvoker.invoke_process
>
>   File "apache_beam/runners/common.py", line 748, in
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py",
> line 1095, in _finalize_write
>
>     writer = sink.open_writer(init_result, str(uuid.uuid4()))
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py",
> line 140, in _f
>
>     return fnc(self, *args, **kwargs)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py",
> line 191, in open_writer
>
>     return FileBasedSinkWriter(self, writer_path)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py",
> line 395, in __init__
>
>     self.temp_handle = self.sink.open(temp_shard_path)
>
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/io/textio.py",
> line 397, in open
>
>     file_handle = super(_TextSink, self).open(temp_path)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py",
> line 140, in _f
>
>     return fnc(self, *args, **kwargs)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py",
> line 134, in open
>
>     return FileSystems.create(temp_path, self.mime_type,
> self.compression_type)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py",
> line 217, in create
>
>     return filesystem.create(path, mime_type, compression_type)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/localfilesystem.py",
> line 155, in create
>
>     return self._path_open(path, 'wb', mime_type, compression_type)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/localfilesystem.py",
> line 137, in _path_open
>
>     raw_file = open(path, mode)
>
> FileNotFoundError: [Errno 2] No such file or directory:
> '/tmp/beam-temp-result.txt-43eab4947dd811eab6a2002590f97cb6/dbc67656-ad7a-4b8b-97f1-6a223bb7afde.result.txt'
>
>
>
>
>
> It appears that the filesystem in the client side is not the same as the
> environment that Flink creates to run the Beam pipeline (I think Flink does
> a docker run of the python sdk to run the Beam pipeline? In that case, how
> would the container know where to write the file?)
>
>
>
> Please help me debug. The Flink monitoring dashboard shows the several
> stages of the job, Map, Reduce and what not… In the end, the status is
> FAILED.
>
>
>
> -Buvana
>
>
>
> *From: *Kyle Weaver <kc...@google.com>
> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Date: *Monday, April 13, 2020 at 11:57 AM
> *To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Subject: *Re: SparkRunner on k8s
>
>
>
> Hi Buvana,
>
>
>
> Running Beam Python on Spark on Kubernetes is more complicated, because
> Beam has its own solution for running Python code [1]. Unfortunately
> there's no guide that I know of for Spark yet, however we do have
> instructions for Flink [2]. Beam's Flink and Spark runners, and I assume
> GCP's (unofficial) Flink and Spark [3] operators, are probably similar
> enough that it shouldn't be too hard to port the YAML from the Flink
> operator to the Spark operator. I filed an issue for it [4], but I probably
> won't have the bandwidth to work on it myself for a while.
>
>
>
> - Kyle
>
>
>
> [1] https://beam.apache.org/roadmap/portability/
>
> [2]
> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md
>
> [3] https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/
>
> [4]
> https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/issues/870
>
>
>
> On Sat, Apr 11, 2020 at 4:33 PM Ramanan, Buvana (Nokia - US/Murray Hill) <
> buvana.ramanan@nokia-bell-labs.com> wrote:
>
> Thank you, Rahul for your very useful response. Can you please extend your
> response by commenting on the procedure for Beam python pipeline?
>
>
>
> *From: *rahul patwari <ra...@gmail.com>
> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Date: *Friday, April 10, 2020 at 10:57 PM
> *To: *user <us...@beam.apache.org>
> *Subject: *Re: SparkRunner on k8s
>
>
>
> Hi Buvana,
>
>
>
> You can submit a Beam Pipeline to Spark on k8s like any other Spark
> Pipeline using the spark-submit script.
>
>
>
> Create an Uber Jar of your Beam code and provide it as the primary
> resource to spark-submit. Provide the k8s master and the container image to
> use as arguments to spark-submit.
>
> Refer https://spark.apache.org/docs/latest/running-on-kubernetes.html to
> know more about how to run Spark on k8s.
>
>
>
> The Beam pipeline will be translated to a Spark Pipeline using Spark APIs
> in Runtime.
>
>
>
> Regards,
>
> Rahul
>
>
>
> On Sat, Apr 11, 2020 at 4:38 AM Ramanan, Buvana (Nokia - US/Murray Hill) <
> buvana.ramanan@nokia-bell-labs.com> wrote:
>
> Hello,
>
>
>
> I newly joined this group and I went through the archive to see if any
> discussion exists on submitting Beam pipelines to a SparkRunner on k8s.
>
>
>
> I run my Spark jobs on a k8s cluster in the cluster mode. Would like to
> deploy my beam pipeline on a SparkRunner with k8s underneath.
>
>
>
> The Beam documentation:
>
> https://beam.apache.org/documentation/runners/spark/
>
> does not discuss about k8s (though there is mention of Mesos and YARN).
>
>
>
> Can someone please point me to relevant material in this regard? Or,
> provide the steps for running my beam pipeline in this configuration?
>
>
>
> Thank you,
>
> Regards,
>
> Buvana
>
>

Re: SparkRunner on k8s

Posted by "Ramanan, Buvana (Nokia - US/Murray Hill)" <bu...@nokia-bell-labs.com>.
Hello,

I am trying to test the Beam Python pipeline on SparkRunner with Spark on Mesos. Followed the instructions here:
https://beam.apache.org/documentation/runners/spark/

The portable job  runner is up and running and is pointing to a valid Spark Master URL (Spark on Mesos).

However, a simple Beam Python Pipeline (works totally fine on Local Runner) submitted to this job runner fails with error code displayed below my sign. It appears that the job runner finds issues with the pipeline syntax – may be its looking for jvm pipelines and got a Python pipeline?

I would appreciate any pointers that you can provide.

Thank you,
Regards,
Buvana

Client Side:
=========
$ python test-beam.py

WARNING:root:Make sure that locally built Python SDK docker image has Python 3.6 interpreter.
Traceback (most recent call last):
  File "test-beam.py", line 117, in <module>
    beam.io.WriteToText(output_filename)
  File "/home/tfs/venv_beam3/lib/python3.6/site-packages/apache_beam/pipeline.py", line 481, in __exit__
    self.run().wait_until_finish()
  File "/home/tfs/venv_beam3/lib/python3.6/site-packages/apache_beam/pipeline.py", line 461, in run
    self._options).run(False)
  File "/home/tfs/venv_beam3/lib/python3.6/site-packages/apache_beam/pipeline.py", line 474, in run
    return self.runner.run_pipeline(self, self._options)
  File "/home/tfs/venv_beam3/lib/python3.6/site-packages/apache_beam/runners/portability/portable_runner.py", line 317, in run_pipeline
    retrieval_token=retrieval_token))
  File "/home/tfs/venv_beam3/lib/python3.6/site-packages/grpc/_channel.py", line 826, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/home/tfs/venv_beam3/lib/python3.6/site-packages/grpc/_channel.py", line 729, in _end_unary_response_blocking
    raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
                status = StatusCode.INVALID_ARGUMENT
                details = ""
                debug_error_string = "{"created":"@1586818954.316495237","description":"Error received from peer ipv4:XXXXXXXXXX:8000","file":"src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"","grpc_status":3}"
 -----------------------------------------------------------------------
job Runner:
 =========
tfs@datamon4:/nas2/tfs/beam$ ./gradlew :runners:spark:job-server:runShadow -PsparkMasterUrl=spark://$SPARK_MASTER:7077
Starting a Gradle Daemon (subsequent builds will be faster)
Configuration on demand is an incubating feature.

> Task :runners:spark:job-server:runShadow
Listening for transport dt_socket at address: 5005
20/04/13 19:02:04 INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver: LegacyArtifactStagingService started on localhost:8098
20/04/13 19:02:04 INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver: Java ExpansionService started on localhost:8097
20/04/13 19:02:04 INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver: JobService started on localhost:8099
20/04/13 19:02:34 WARN org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService: Encountered Unexpected Exception during validation
java.lang.RuntimeException: Failed to validate transform ref_AppliedPTransform_ReadFromText/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)_6
        at org.apache.beam.runners.core.construction.graph.PipelineValidator.validateTransform(PipelineValidator.java:215)
        at org.apache.beam.runners.core.construction.graph.PipelineValidator.validateComponents(PipelineValidator.java:123)
        at org.apache.beam.runners.core.construction.graph.PipelineValidator.validate(PipelineValidator.java:103)
        at org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.run(InMemoryJobService.java:223)
        at org.apache.beam.model.jobmanagement.v1.JobServiceGrpc$MethodHandlers.invoke(JobServiceGrpc.java:961)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:127)
        at org.apache.beam.runners.core.construction.graph.PipelineValidator.validateParDo(PipelineValidator.java:238)
        at org.apache.beam.runners.core.construction.graph.PipelineValidator.validateTransform(PipelineValidator.java:213)
        ... 16 more


From: "Ramanan, Buvana (Nokia - US/Murray Hill)" <bu...@nokia-bell-labs.com>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Monday, April 13, 2020 at 6:55 PM
To: "user@beam.apache.org" <us...@beam.apache.org>
Subject: Re: SparkRunner on k8s

Kyle,

Thanks a lot for the pointers. I got interested to run my beam pipeline on FlinkRunner and got a local Flink cluster setup, tested a sample code to work fine.

I started the Beam job runner going:
docker run --net=host apachebeam/flink1.8_job_server:latest --flink-master $IP:8081 --job-host $IP  --job-port 8099

Submitted a beam pipeline, which when run with LocalRunner works totally fine. The last stage of the pipeline code looks as follows:
. . .
. . .
. . .
    output= (
        {
            'Mean Open': mean_open,
            'Mean Close': mean_close
        } |
        beam.CoGroupByKey() |
        beam.io.WriteToText(args.output)
    )

So, we are ending the pipeline with a io.WriteToText()

Now, when I supply a filename, whether residing in local disk (/tmp) or network mounted disk(e.g /nas2), I get the following error:
python test-beam.py –input data/sp500.csv –output /tmp/result.txt

WARNING:root:Make sure that locally built Python SDK docker image has Python 3.6 interpreter.
ERROR:root:java.lang.RuntimeException: Error received from SDK harness for instruction 2: Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 883, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 667, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 748, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py", line 1095, in _finalize_write
    writer = sink.open_writer(init_result, str(uuid.uuid4()))
  File "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", line 140, in _f
    return fnc(self, *args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line 191, in open_writer
    return FileBasedSinkWriter(self, writer_path)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line 395, in __init__
    self.temp_handle = self.sink.open(temp_shard_path)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/textio.py", line 397, in open
    file_handle = super(_TextSink, self).open(temp_path)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", line 140, in _f
    return fnc(self, *args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line 134, in open
    return FileSystems.create(temp_path, self.mime_type, self.compression_type)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", line 217, in create
    return filesystem.create(path, mime_type, compression_type)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/localfilesystem.py", line 155, in create
    return self._path_open(path, 'wb', mime_type, compression_type)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/localfilesystem.py", line 137, in _path_open
    raw_file = open(path, mode)
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/beam-temp-result.txt-43eab4947dd811eab6a2002590f97cb6/dbc67656-ad7a-4b8b-97f1-6a223bb7afde.result.txt'


It appears that the filesystem in the client side is not the same as the environment that Flink creates to run the Beam pipeline (I think Flink does a docker run of the python sdk to run the Beam pipeline? In that case, how would the container know where to write the file?)

Please help me debug. The Flink monitoring dashboard shows the several stages of the job, Map, Reduce and what not… In the end, the status is FAILED.

-Buvana

From: Kyle Weaver <kc...@google.com>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Monday, April 13, 2020 at 11:57 AM
To: "user@beam.apache.org" <us...@beam.apache.org>
Subject: Re: SparkRunner on k8s

Hi Buvana,

Running Beam Python on Spark on Kubernetes is more complicated, because Beam has its own solution for running Python code [1]. Unfortunately there's no guide that I know of for Spark yet, however we do have instructions for Flink [2]. Beam's Flink and Spark runners, and I assume GCP's (unofficial) Flink and Spark [3] operators, are probably similar enough that it shouldn't be too hard to port the YAML from the Flink operator to the Spark operator. I filed an issue for it [4], but I probably won't have the bandwidth to work on it myself for a while.

- Kyle

[1] https://beam.apache.org/roadmap/portability/
[2] https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md
[3] https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/
[4] https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/issues/870

On Sat, Apr 11, 2020 at 4:33 PM Ramanan, Buvana (Nokia - US/Murray Hill) <bu...@nokia-bell-labs.com>> wrote:
Thank you, Rahul for your very useful response. Can you please extend your response by commenting on the procedure for Beam python pipeline?

From: rahul patwari <ra...@gmail.com>>
Reply-To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Date: Friday, April 10, 2020 at 10:57 PM
To: user <us...@beam.apache.org>>
Subject: Re: SparkRunner on k8s

Hi Buvana,

You can submit a Beam Pipeline to Spark on k8s like any other Spark Pipeline using the spark-submit script.

Create an Uber Jar of your Beam code and provide it as the primary resource to spark-submit. Provide the k8s master and the container image to use as arguments to spark-submit.
Refer https://spark.apache.org/docs/latest/running-on-kubernetes.html to know more about how to run Spark on k8s.

The Beam pipeline will be translated to a Spark Pipeline using Spark APIs in Runtime.

Regards,
Rahul

On Sat, Apr 11, 2020 at 4:38 AM Ramanan, Buvana (Nokia - US/Murray Hill) <bu...@nokia-bell-labs.com>> wrote:
Hello,

I newly joined this group and I went through the archive to see if any discussion exists on submitting Beam pipelines to a SparkRunner on k8s.

I run my Spark jobs on a k8s cluster in the cluster mode. Would like to deploy my beam pipeline on a SparkRunner with k8s underneath.

The Beam documentation:
https://beam.apache.org/documentation/runners/spark/
does not discuss about k8s (though there is mention of Mesos and YARN).

Can someone please point me to relevant material in this regard? Or, provide the steps for running my beam pipeline in this configuration?

Thank you,
Regards,
Buvana

Re: SparkRunner on k8s

Posted by Kyle Weaver <kc...@google.com>.
> In other words, are there options to the job runner that would eventually
translate to ' --volume /storage1:/storage1 ' while the docker container is
being run by Flink? Even if it means code changes and building from source,
its fine. Please point me in the right direction.

I found an open feature request for this, but unfortunately it looks like
neither of two attempted implementations ended up being merged:
https://issues.apache.org/jira/browse/BEAM-5440

Sorry I haven't had much time to look into your issue with the Spark
runner. If you are still interested in trying it, you might try using a
different Beam version and see if the problem persists.

On Wed, Apr 22, 2020 at 7:56 PM Ramanan, Buvana (Nokia - US/Murray Hill) <
buvana.ramanan@nokia-bell-labs.com> wrote:

> Hi Kyle,
>
> About FlinkRunner:
>
> "One is you can mount a directory from the Docker host inside the
> container(s). But the more scalable solution is to use a distributed file
> system, such as HDFS, Google Cloud Storage, or Amazon S3"
>
> I am running some benchmarking tests and so I prefer not to use GCS or S3
> (as the network delay can kill the performance).
>
> I would like to focus on the option of the host mounting the volume into
> the containers, but I have not come across a docker command where a host
> can mount volumes into running containers. I do not think 'docker create'
> volume will help here, please correct if I am wrong.
>
> Is there a way the job runner can tell the Flink cluster to mount certain
> volumes before running the sdk container? And if so, is there a way I can
> tell the job runner to tell Flink to mount these volumes?
>
> In other words, are there options to the job runner that would eventually
> translate to ' --volume /storage1:/storage1 ' while the docker container is
> being run by Flink? Even if it means code changes and building from source,
> its fine. Please point me in the right direction.
>
> Thanks,
> Buvana
> ------------------------------
> *From:* Kyle Weaver <kc...@google.com>
> *Sent:* Monday, April 13, 2020 7:34 PM
> *To:* user@beam.apache.org <us...@beam.apache.org>
> *Subject:* Re: SparkRunner on k8s
>
> > It appears that the filesystem in the client side is not the same as the
> environment that Flink creates to run the Beam pipeline (I think Flink does
> a docker run of the python sdk to run the Beam pipeline? In that case, how
> would the container know where to write the file?)
>
>
> You are correct. Beam Python execution takes place within a Docker
> container, or often multiple containers, depending on your pipeline and
> configuration. Multiple containers is probably the cause of the error here.
> The Python SDK doesn't do anything special with local file paths; it just
> writes them to the local file system of the container. So in order to get a
> persistent, shared file system, you have a couple options. One is you can
> mount a directory from the Docker host inside the container(s). But the
> more scalable solution is to use a distributed file system, such as HDFS,
> Google Cloud Storage, or Amazon S3. Check out the Beam programming guide
> for more info:
> https://beam.apache.org/documentation/programming-guide/#pipeline-io
>
> On Mon, Apr 13, 2020 at 6:55 PM Ramanan, Buvana (Nokia - US/Murray Hill) <
> buvana.ramanan@nokia-bell-labs.com> wrote:
>
> Kyle,
>
>
>
> Thanks a lot for the pointers. I got interested to run my beam pipeline on
> FlinkRunner and got a local Flink cluster setup, tested a sample code to
> work fine.
>
>
>
> I started the Beam job runner going:
>
> docker run --net=host apachebeam/flink1.8_job_server:latest --flink-master
> $IP:8081 --job-host $IP  --job-port 8099
>
>
>
> Submitted a beam pipeline, which when run with LocalRunner works totally
> fine. The last stage of the pipeline code looks as follows:
>
> . . .
>
> . . .
>
> . . .
>
>     output= (
>
>         {
>
>             'Mean Open': mean_open,
>
>             'Mean Close': mean_close
>
>         } |
>
>         beam.CoGroupByKey() |
>
>         beam.io.WriteToText(args.output)
>
>     )
>
>
>
> So, we are ending the pipeline with a io.WriteToText()
>
>
>
> Now, when I supply a filename, whether residing in local disk (/tmp) or
> network mounted disk(e.g /nas2), I get the following error:
>
> python test-beam.py –input data/sp500.csv –output /tmp/result.txt
>
>
>
> WARNING:root:Make sure that locally built Python SDK docker image has
> Python 3.6 interpreter.
>
> ERROR:root:java.lang.RuntimeException: Error received from SDK harness for
> instruction 2: Traceback (most recent call last):
>
>   File "apache_beam/runners/common.py", line 883, in
> apache_beam.runners.common.DoFnRunner.process
>
>   File "apache_beam/runners/common.py", line 667, in
> apache_beam.runners.common.PerWindowInvoker.invoke_process
>
>   File "apache_beam/runners/common.py", line 748, in
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py",
> line 1095, in _finalize_write
>
>     writer = sink.open_writer(init_result, str(uuid.uuid4()))
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py",
> line 140, in _f
>
>     return fnc(self, *args, **kwargs)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py",
> line 191, in open_writer
>
>     return FileBasedSinkWriter(self, writer_path)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py",
> line 395, in __init__
>
>     self.temp_handle = self.sink.open(temp_shard_path)
>
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/io/textio.py",
> line 397, in open
>
>     file_handle = super(_TextSink, self).open(temp_path)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py",
> line 140, in _f
>
>     return fnc(self, *args, **kwargs)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py",
> line 134, in open
>
>     return FileSystems.create(temp_path, self.mime_type,
> self.compression_type)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py",
> line 217, in create
>
>     return filesystem.create(path, mime_type, compression_type)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/localfilesystem.py",
> line 155, in create
>
>     return self._path_open(path, 'wb', mime_type, compression_type)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/localfilesystem.py",
> line 137, in _path_open
>
>     raw_file = open(path, mode)
>
> FileNotFoundError: [Errno 2] No such file or directory:
> '/tmp/beam-temp-result.txt-43eab4947dd811eab6a2002590f97cb6/dbc67656-ad7a-4b8b-97f1-6a223bb7afde.result.txt'
>
>
>
>
>
> It appears that the filesystem in the client side is not the same as the
> environment that Flink creates to run the Beam pipeline (I think Flink does
> a docker run of the python sdk to run the Beam pipeline? In that case, how
> would the container know where to write the file?)
>
>
>
> Please help me debug. The Flink monitoring dashboard shows the several
> stages of the job, Map, Reduce and what not… In the end, the status is
> FAILED.
>
>
>
> -Buvana
>
>
>
> *From: *Kyle Weaver <kc...@google.com>
> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Date: *Monday, April 13, 2020 at 11:57 AM
> *To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Subject: *Re: SparkRunner on k8s
>
>
>
> Hi Buvana,
>
>
>
> Running Beam Python on Spark on Kubernetes is more complicated, because
> Beam has its own solution for running Python code [1]. Unfortunately
> there's no guide that I know of for Spark yet, however we do have
> instructions for Flink [2]. Beam's Flink and Spark runners, and I assume
> GCP's (unofficial) Flink and Spark [3] operators, are probably similar
> enough that it shouldn't be too hard to port the YAML from the Flink
> operator to the Spark operator. I filed an issue for it [4], but I probably
> won't have the bandwidth to work on it myself for a while.
>
>
>
> - Kyle
>
>
>
> [1] https://beam.apache.org/roadmap/portability/
>
> [2]
> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md
>
> [3] https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/
>
> [4]
> https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/issues/870
>
>
>
> On Sat, Apr 11, 2020 at 4:33 PM Ramanan, Buvana (Nokia - US/Murray Hill) <
> buvana.ramanan@nokia-bell-labs.com> wrote:
>
> Thank you, Rahul for your very useful response. Can you please extend your
> response by commenting on the procedure for Beam python pipeline?
>
>
>
> *From: *rahul patwari <ra...@gmail.com>
> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Date: *Friday, April 10, 2020 at 10:57 PM
> *To: *user <us...@beam.apache.org>
> *Subject: *Re: SparkRunner on k8s
>
>
>
> Hi Buvana,
>
>
>
> You can submit a Beam Pipeline to Spark on k8s like any other Spark
> Pipeline using the spark-submit script.
>
>
>
> Create an Uber Jar of your Beam code and provide it as the primary
> resource to spark-submit. Provide the k8s master and the container image to
> use as arguments to spark-submit.
>
> Refer https://spark.apache.org/docs/latest/running-on-kubernetes.html to
> know more about how to run Spark on k8s.
>
>
>
> The Beam pipeline will be translated to a Spark Pipeline using Spark APIs
> in Runtime.
>
>
>
> Regards,
>
> Rahul
>
>
>
> On Sat, Apr 11, 2020 at 4:38 AM Ramanan, Buvana (Nokia - US/Murray Hill) <
> buvana.ramanan@nokia-bell-labs.com> wrote:
>
> Hello,
>
>
>
> I newly joined this group and I went through the archive to see if any
> discussion exists on submitting Beam pipelines to a SparkRunner on k8s.
>
>
>
> I run my Spark jobs on a k8s cluster in the cluster mode. Would like to
> deploy my beam pipeline on a SparkRunner with k8s underneath.
>
>
>
> The Beam documentation:
>
> https://beam.apache.org/documentation/runners/spark/
>
> does not discuss about k8s (though there is mention of Mesos and YARN).
>
>
>
> Can someone please point me to relevant material in this regard? Or,
> provide the steps for running my beam pipeline in this configuration?
>
>
>
> Thank you,
>
> Regards,
>
> Buvana
>
>

Re: SparkRunner on k8s

Posted by "Ramanan, Buvana (Nokia - US/Murray Hill)" <bu...@nokia-bell-labs.com>.
Hi Kyle,

About FlinkRunner:

"One is you can mount a directory from the Docker host inside the container(s). But the more scalable solution is to use a distributed file system, such as HDFS, Google Cloud Storage, or Amazon S3"

I am running some benchmarking tests and so I prefer not to use GCS or S3 (as the network delay can kill the performance).

I would like to focus on the option of the host mounting the volume into the containers, but I have not come across a docker command where a host can mount volumes into running containers. I do not think 'docker create' volume will help here, please correct if I am wrong.

Is there a way the job runner can tell the Flink cluster to mount certain volumes before running the sdk container? And if so, is there a way I can tell the job runner to tell Flink to mount these volumes?

In other words, are there options to the job runner that would eventually translate to ' --volume /storage1:/storage1 ' while the docker container is being run by Flink? Even if it means code changes and building from source, its fine. Please point me in the right direction.

Thanks,
Buvana
________________________________
From: Kyle Weaver <kc...@google.com>
Sent: Monday, April 13, 2020 7:34 PM
To: user@beam.apache.org <us...@beam.apache.org>
Subject: Re: SparkRunner on k8s

> It appears that the filesystem in the client side is not the same as the environment that Flink creates to run the Beam pipeline (I think Flink does a docker run of the python sdk to run the Beam pipeline? In that case, how would the container know where to write the file?)

You are correct. Beam Python execution takes place within a Docker container, or often multiple containers, depending on your pipeline and configuration. Multiple containers is probably the cause of the error here. The Python SDK doesn't do anything special with local file paths; it just writes them to the local file system of the container. So in order to get a persistent, shared file system, you have a couple options. One is you can mount a directory from the Docker host inside the container(s). But the more scalable solution is to use a distributed file system, such as HDFS, Google Cloud Storage, or Amazon S3. Check out the Beam programming guide for more info: https://beam.apache.org/documentation/programming-guide/#pipeline-io

On Mon, Apr 13, 2020 at 6:55 PM Ramanan, Buvana (Nokia - US/Murray Hill) <bu...@nokia-bell-labs.com>> wrote:

Kyle,



Thanks a lot for the pointers. I got interested to run my beam pipeline on FlinkRunner and got a local Flink cluster setup, tested a sample code to work fine.



I started the Beam job runner going:

docker run --net=host apachebeam/flink1.8_job_server:latest --flink-master $IP:8081 --job-host $IP  --job-port 8099



Submitted a beam pipeline, which when run with LocalRunner works totally fine. The last stage of the pipeline code looks as follows:

. . .

. . .

. . .

    output= (

        {

            'Mean Open': mean_open,

            'Mean Close': mean_close

        } |

        beam.CoGroupByKey() |

        beam.io.WriteToText(args.output)

    )



So, we are ending the pipeline with a io.WriteToText()



Now, when I supply a filename, whether residing in local disk (/tmp) or network mounted disk(e.g /nas2), I get the following error:

python test-beam.py –input data/sp500.csv –output /tmp/result.txt



WARNING:root:Make sure that locally built Python SDK docker image has Python 3.6 interpreter.

ERROR:root:java.lang.RuntimeException: Error received from SDK harness for instruction 2: Traceback (most recent call last):

  File "apache_beam/runners/common.py", line 883, in apache_beam.runners.common.DoFnRunner.process

  File "apache_beam/runners/common.py", line 667, in apache_beam.runners.common.PerWindowInvoker.invoke_process

  File "apache_beam/runners/common.py", line 748, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window

  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py", line 1095, in _finalize_write

    writer = sink.open_writer(init_result, str(uuid.uuid4()))

  File "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", line 140, in _f

    return fnc(self, *args, **kwargs)

  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line 191, in open_writer

    return FileBasedSinkWriter(self, writer_path)

  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line 395, in __init__

    self.temp_handle = self.sink.open(temp_shard_path)

  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/textio.py", line 397, in open

    file_handle = super(_TextSink, self).open(temp_path)

  File "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", line 140, in _f

    return fnc(self, *args, **kwargs)

  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line 134, in open

    return FileSystems.create(temp_path, self.mime_type, self.compression_type)

  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", line 217, in create

    return filesystem.create(path, mime_type, compression_type)

  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/localfilesystem.py", line 155, in create

    return self._path_open(path, 'wb', mime_type, compression_type)

  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/localfilesystem.py", line 137, in _path_open

    raw_file = open(path, mode)

FileNotFoundError: [Errno 2] No such file or directory: '/tmp/beam-temp-result.txt-43eab4947dd811eab6a2002590f97cb6/dbc67656-ad7a-4b8b-97f1-6a223bb7afde.result.txt'





It appears that the filesystem in the client side is not the same as the environment that Flink creates to run the Beam pipeline (I think Flink does a docker run of the python sdk to run the Beam pipeline? In that case, how would the container know where to write the file?)



Please help me debug. The Flink monitoring dashboard shows the several stages of the job, Map, Reduce and what not… In the end, the status is FAILED.



-Buvana



From: Kyle Weaver <kc...@google.com>>
Reply-To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Date: Monday, April 13, 2020 at 11:57 AM
To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Subject: Re: SparkRunner on k8s



Hi Buvana,



Running Beam Python on Spark on Kubernetes is more complicated, because Beam has its own solution for running Python code [1]. Unfortunately there's no guide that I know of for Spark yet, however we do have instructions for Flink [2]. Beam's Flink and Spark runners, and I assume GCP's (unofficial) Flink and Spark [3] operators, are probably similar enough that it shouldn't be too hard to port the YAML from the Flink operator to the Spark operator. I filed an issue for it [4], but I probably won't have the bandwidth to work on it myself for a while.



- Kyle



[1] https://beam.apache.org/roadmap/portability/

[2] https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md

[3] https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/

[4] https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/issues/870



On Sat, Apr 11, 2020 at 4:33 PM Ramanan, Buvana (Nokia - US/Murray Hill) <bu...@nokia-bell-labs.com>> wrote:

Thank you, Rahul for your very useful response. Can you please extend your response by commenting on the procedure for Beam python pipeline?



From: rahul patwari <ra...@gmail.com>>
Reply-To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Date: Friday, April 10, 2020 at 10:57 PM
To: user <us...@beam.apache.org>>
Subject: Re: SparkRunner on k8s



Hi Buvana,



You can submit a Beam Pipeline to Spark on k8s like any other Spark Pipeline using the spark-submit script.



Create an Uber Jar of your Beam code and provide it as the primary resource to spark-submit. Provide the k8s master and the container image to use as arguments to spark-submit.

Refer https://spark.apache.org/docs/latest/running-on-kubernetes.html to know more about how to run Spark on k8s.



The Beam pipeline will be translated to a Spark Pipeline using Spark APIs in Runtime.



Regards,

Rahul



On Sat, Apr 11, 2020 at 4:38 AM Ramanan, Buvana (Nokia - US/Murray Hill) <bu...@nokia-bell-labs.com>> wrote:

Hello,



I newly joined this group and I went through the archive to see if any discussion exists on submitting Beam pipelines to a SparkRunner on k8s.



I run my Spark jobs on a k8s cluster in the cluster mode. Would like to deploy my beam pipeline on a SparkRunner with k8s underneath.



The Beam documentation:

https://beam.apache.org/documentation/runners/spark/

does not discuss about k8s (though there is mention of Mesos and YARN).



Can someone please point me to relevant material in this regard? Or, provide the steps for running my beam pipeline in this configuration?



Thank you,

Regards,

Buvana

Re: SparkRunner on k8s

Posted by Kyle Weaver <kc...@google.com>.
> It appears that the filesystem in the client side is not the same as the
environment that Flink creates to run the Beam pipeline (I think Flink does
a docker run of the python sdk to run the Beam pipeline? In that case, how
would the container know where to write the file?)


You are correct. Beam Python execution takes place within a Docker
container, or often multiple containers, depending on your pipeline and
configuration. Multiple containers is probably the cause of the error here.
The Python SDK doesn't do anything special with local file paths; it just
writes them to the local file system of the container. So in order to get a
persistent, shared file system, you have a couple options. One is you can
mount a directory from the Docker host inside the container(s). But the
more scalable solution is to use a distributed file system, such as HDFS,
Google Cloud Storage, or Amazon S3. Check out the Beam programming guide
for more info:
https://beam.apache.org/documentation/programming-guide/#pipeline-io

On Mon, Apr 13, 2020 at 6:55 PM Ramanan, Buvana (Nokia - US/Murray Hill) <
buvana.ramanan@nokia-bell-labs.com> wrote:

> Kyle,
>
>
>
> Thanks a lot for the pointers. I got interested to run my beam pipeline on
> FlinkRunner and got a local Flink cluster setup, tested a sample code to
> work fine.
>
>
>
> I started the Beam job runner going:
>
> docker run --net=host apachebeam/flink1.8_job_server:latest --flink-master
> $IP:8081 --job-host $IP  --job-port 8099
>
>
>
> Submitted a beam pipeline, which when run with LocalRunner works totally
> fine. The last stage of the pipeline code looks as follows:
>
> . . .
>
> . . .
>
> . . .
>
>     output= (
>
>         {
>
>             'Mean Open': mean_open,
>
>             'Mean Close': mean_close
>
>         } |
>
>         beam.CoGroupByKey() |
>
>         beam.io.WriteToText(args.output)
>
>     )
>
>
>
> So, we are ending the pipeline with a io.WriteToText()
>
>
>
> Now, when I supply a filename, whether residing in local disk (/tmp) or
> network mounted disk(e.g /nas2), I get the following error:
>
> python test-beam.py –input data/sp500.csv –output /tmp/result.txt
>
>
>
> WARNING:root:Make sure that locally built Python SDK docker image has
> Python 3.6 interpreter.
>
> ERROR:root:java.lang.RuntimeException: Error received from SDK harness for
> instruction 2: Traceback (most recent call last):
>
>   File "apache_beam/runners/common.py", line 883, in
> apache_beam.runners.common.DoFnRunner.process
>
>   File "apache_beam/runners/common.py", line 667, in
> apache_beam.runners.common.PerWindowInvoker.invoke_process
>
>   File "apache_beam/runners/common.py", line 748, in
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py",
> line 1095, in _finalize_write
>
>     writer = sink.open_writer(init_result, str(uuid.uuid4()))
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py",
> line 140, in _f
>
>     return fnc(self, *args, **kwargs)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py",
> line 191, in open_writer
>
>     return FileBasedSinkWriter(self, writer_path)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py",
> line 395, in __init__
>
>     self.temp_handle = self.sink.open(temp_shard_path)
>
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/io/textio.py",
> line 397, in open
>
>     file_handle = super(_TextSink, self).open(temp_path)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py",
> line 140, in _f
>
>     return fnc(self, *args, **kwargs)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py",
> line 134, in open
>
>     return FileSystems.create(temp_path, self.mime_type,
> self.compression_type)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py",
> line 217, in create
>
>     return filesystem.create(path, mime_type, compression_type)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/localfilesystem.py",
> line 155, in create
>
>     return self._path_open(path, 'wb', mime_type, compression_type)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/localfilesystem.py",
> line 137, in _path_open
>
>     raw_file = open(path, mode)
>
> FileNotFoundError: [Errno 2] No such file or directory:
> '/tmp/beam-temp-result.txt-43eab4947dd811eab6a2002590f97cb6/dbc67656-ad7a-4b8b-97f1-6a223bb7afde.result.txt'
>
>
>
>
>
> It appears that the filesystem in the client side is not the same as the
> environment that Flink creates to run the Beam pipeline (I think Flink does
> a docker run of the python sdk to run the Beam pipeline? In that case, how
> would the container know where to write the file?)
>
>
>
> Please help me debug. The Flink monitoring dashboard shows the several
> stages of the job, Map, Reduce and what not… In the end, the status is
> FAILED.
>
>
>
> -Buvana
>
>
>
> *From: *Kyle Weaver <kc...@google.com>
> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Date: *Monday, April 13, 2020 at 11:57 AM
> *To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Subject: *Re: SparkRunner on k8s
>
>
>
> Hi Buvana,
>
>
>
> Running Beam Python on Spark on Kubernetes is more complicated, because
> Beam has its own solution for running Python code [1]. Unfortunately
> there's no guide that I know of for Spark yet, however we do have
> instructions for Flink [2]. Beam's Flink and Spark runners, and I assume
> GCP's (unofficial) Flink and Spark [3] operators, are probably similar
> enough that it shouldn't be too hard to port the YAML from the Flink
> operator to the Spark operator. I filed an issue for it [4], but I probably
> won't have the bandwidth to work on it myself for a while.
>
>
>
> - Kyle
>
>
>
> [1] https://beam.apache.org/roadmap/portability/
>
> [2]
> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md
>
> [3] https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/
>
> [4]
> https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/issues/870
>
>
>
> On Sat, Apr 11, 2020 at 4:33 PM Ramanan, Buvana (Nokia - US/Murray Hill) <
> buvana.ramanan@nokia-bell-labs.com> wrote:
>
> Thank you, Rahul for your very useful response. Can you please extend your
> response by commenting on the procedure for Beam python pipeline?
>
>
>
> *From: *rahul patwari <ra...@gmail.com>
> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Date: *Friday, April 10, 2020 at 10:57 PM
> *To: *user <us...@beam.apache.org>
> *Subject: *Re: SparkRunner on k8s
>
>
>
> Hi Buvana,
>
>
>
> You can submit a Beam Pipeline to Spark on k8s like any other Spark
> Pipeline using the spark-submit script.
>
>
>
> Create an Uber Jar of your Beam code and provide it as the primary
> resource to spark-submit. Provide the k8s master and the container image to
> use as arguments to spark-submit.
>
> Refer https://spark.apache.org/docs/latest/running-on-kubernetes.html to
> know more about how to run Spark on k8s.
>
>
>
> The Beam pipeline will be translated to a Spark Pipeline using Spark APIs
> in Runtime.
>
>
>
> Regards,
>
> Rahul
>
>
>
> On Sat, Apr 11, 2020 at 4:38 AM Ramanan, Buvana (Nokia - US/Murray Hill) <
> buvana.ramanan@nokia-bell-labs.com> wrote:
>
> Hello,
>
>
>
> I newly joined this group and I went through the archive to see if any
> discussion exists on submitting Beam pipelines to a SparkRunner on k8s.
>
>
>
> I run my Spark jobs on a k8s cluster in the cluster mode. Would like to
> deploy my beam pipeline on a SparkRunner with k8s underneath.
>
>
>
> The Beam documentation:
>
> https://beam.apache.org/documentation/runners/spark/
>
> does not discuss about k8s (though there is mention of Mesos and YARN).
>
>
>
> Can someone please point me to relevant material in this regard? Or,
> provide the steps for running my beam pipeline in this configuration?
>
>
>
> Thank you,
>
> Regards,
>
> Buvana
>
>

Re: SparkRunner on k8s

Posted by "Ramanan, Buvana (Nokia - US/Murray Hill)" <bu...@nokia-bell-labs.com>.
Kyle,

Thanks a lot for the pointers. I got interested to run my beam pipeline on FlinkRunner and got a local Flink cluster setup, tested a sample code to work fine.

I started the Beam job runner going:
docker run --net=host apachebeam/flink1.8_job_server:latest --flink-master $IP:8081 --job-host $IP  --job-port 8099

Submitted a beam pipeline, which when run with LocalRunner works totally fine. The last stage of the pipeline code looks as follows:
. . .
. . .
. . .
    output= (
        {
            'Mean Open': mean_open,
            'Mean Close': mean_close
        } |
        beam.CoGroupByKey() |
        beam.io.WriteToText(args.output)
    )

So, we are ending the pipeline with a io.WriteToText()

Now, when I supply a filename, whether residing in local disk (/tmp) or network mounted disk(e.g /nas2), I get the following error:
python test-beam.py –input data/sp500.csv –output /tmp/result.txt

WARNING:root:Make sure that locally built Python SDK docker image has Python 3.6 interpreter.
ERROR:root:java.lang.RuntimeException: Error received from SDK harness for instruction 2: Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 883, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 667, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 748, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py", line 1095, in _finalize_write
    writer = sink.open_writer(init_result, str(uuid.uuid4()))
  File "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", line 140, in _f
    return fnc(self, *args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line 191, in open_writer
    return FileBasedSinkWriter(self, writer_path)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line 395, in __init__
    self.temp_handle = self.sink.open(temp_shard_path)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/textio.py", line 397, in open
    file_handle = super(_TextSink, self).open(temp_path)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", line 140, in _f
    return fnc(self, *args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line 134, in open
    return FileSystems.create(temp_path, self.mime_type, self.compression_type)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", line 217, in create
    return filesystem.create(path, mime_type, compression_type)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/localfilesystem.py", line 155, in create
    return self._path_open(path, 'wb', mime_type, compression_type)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/localfilesystem.py", line 137, in _path_open
    raw_file = open(path, mode)
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/beam-temp-result.txt-43eab4947dd811eab6a2002590f97cb6/dbc67656-ad7a-4b8b-97f1-6a223bb7afde.result.txt'


It appears that the filesystem in the client side is not the same as the environment that Flink creates to run the Beam pipeline (I think Flink does a docker run of the python sdk to run the Beam pipeline? In that case, how would the container know where to write the file?)

Please help me debug. The Flink monitoring dashboard shows the several stages of the job, Map, Reduce and what not… In the end, the status is FAILED.

-Buvana

From: Kyle Weaver <kc...@google.com>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Monday, April 13, 2020 at 11:57 AM
To: "user@beam.apache.org" <us...@beam.apache.org>
Subject: Re: SparkRunner on k8s

Hi Buvana,

Running Beam Python on Spark on Kubernetes is more complicated, because Beam has its own solution for running Python code [1]. Unfortunately there's no guide that I know of for Spark yet, however we do have instructions for Flink [2]. Beam's Flink and Spark runners, and I assume GCP's (unofficial) Flink and Spark [3] operators, are probably similar enough that it shouldn't be too hard to port the YAML from the Flink operator to the Spark operator. I filed an issue for it [4], but I probably won't have the bandwidth to work on it myself for a while.

- Kyle

[1] https://beam.apache.org/roadmap/portability/
[2] https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md
[3] https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/
[4] https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/issues/870

On Sat, Apr 11, 2020 at 4:33 PM Ramanan, Buvana (Nokia - US/Murray Hill) <bu...@nokia-bell-labs.com>> wrote:
Thank you, Rahul for your very useful response. Can you please extend your response by commenting on the procedure for Beam python pipeline?

From: rahul patwari <ra...@gmail.com>>
Reply-To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Date: Friday, April 10, 2020 at 10:57 PM
To: user <us...@beam.apache.org>>
Subject: Re: SparkRunner on k8s

Hi Buvana,

You can submit a Beam Pipeline to Spark on k8s like any other Spark Pipeline using the spark-submit script.

Create an Uber Jar of your Beam code and provide it as the primary resource to spark-submit. Provide the k8s master and the container image to use as arguments to spark-submit.
Refer https://spark.apache.org/docs/latest/running-on-kubernetes.html to know more about how to run Spark on k8s.

The Beam pipeline will be translated to a Spark Pipeline using Spark APIs in Runtime.

Regards,
Rahul

On Sat, Apr 11, 2020 at 4:38 AM Ramanan, Buvana (Nokia - US/Murray Hill) <bu...@nokia-bell-labs.com>> wrote:
Hello,

I newly joined this group and I went through the archive to see if any discussion exists on submitting Beam pipelines to a SparkRunner on k8s.

I run my Spark jobs on a k8s cluster in the cluster mode. Would like to deploy my beam pipeline on a SparkRunner with k8s underneath.

The Beam documentation:
https://beam.apache.org/documentation/runners/spark/
does not discuss about k8s (though there is mention of Mesos and YARN).

Can someone please point me to relevant material in this regard? Or, provide the steps for running my beam pipeline in this configuration?

Thank you,
Regards,
Buvana

Re: SparkRunner on k8s

Posted by Kyle Weaver <kc...@google.com>.
Hi Buvana,

Running Beam Python on Spark on Kubernetes is more complicated, because
Beam has its own solution for running Python code [1]. Unfortunately
there's no guide that I know of for Spark yet, however we do have
instructions for Flink [2]. Beam's Flink and Spark runners, and I assume
GCP's (unofficial) Flink and Spark [3] operators, are probably similar
enough that it shouldn't be too hard to port the YAML from the Flink
operator to the Spark operator. I filed an issue for it [4], but I probably
won't have the bandwidth to work on it myself for a while.
<https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md>

- Kyle

[1] https://beam.apache.org/roadmap/portability/
[2]
https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md
[3] https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/
[4] https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/issues/870

On Sat, Apr 11, 2020 at 4:33 PM Ramanan, Buvana (Nokia - US/Murray Hill) <
buvana.ramanan@nokia-bell-labs.com> wrote:

> Thank you, Rahul for your very useful response. Can you please extend your
> response by commenting on the procedure for Beam python pipeline?
>
>
>
> *From: *rahul patwari <ra...@gmail.com>
> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Date: *Friday, April 10, 2020 at 10:57 PM
> *To: *user <us...@beam.apache.org>
> *Subject: *Re: SparkRunner on k8s
>
>
>
> Hi Buvana,
>
>
>
> You can submit a Beam Pipeline to Spark on k8s like any other Spark
> Pipeline using the spark-submit script.
>
>
>
> Create an Uber Jar of your Beam code and provide it as the primary
> resource to spark-submit. Provide the k8s master and the container image to
> use as arguments to spark-submit.
>
> Refer https://spark.apache.org/docs/latest/running-on-kubernetes.html to
> know more about how to run Spark on k8s.
>
>
>
> The Beam pipeline will be translated to a Spark Pipeline using Spark APIs
> in Runtime.
>
>
>
> Regards,
>
> Rahul
>
>
>
> On Sat, Apr 11, 2020 at 4:38 AM Ramanan, Buvana (Nokia - US/Murray Hill) <
> buvana.ramanan@nokia-bell-labs.com> wrote:
>
> Hello,
>
>
>
> I newly joined this group and I went through the archive to see if any
> discussion exists on submitting Beam pipelines to a SparkRunner on k8s.
>
>
>
> I run my Spark jobs on a k8s cluster in the cluster mode. Would like to
> deploy my beam pipeline on a SparkRunner with k8s underneath.
>
>
>
> The Beam documentation:
>
> https://beam.apache.org/documentation/runners/spark/
>
> does not discuss about k8s (though there is mention of Mesos and YARN).
>
>
>
> Can someone please point me to relevant material in this regard? Or,
> provide the steps for running my beam pipeline in this configuration?
>
>
>
> Thank you,
>
> Regards,
>
> Buvana
>
>

Re: SparkRunner on k8s

Posted by "Ramanan, Buvana (Nokia - US/Murray Hill)" <bu...@nokia-bell-labs.com>.
Thank you, Rahul for your very useful response. Can you please extend your response by commenting on the procedure for Beam python pipeline?

From: rahul patwari <ra...@gmail.com>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Friday, April 10, 2020 at 10:57 PM
To: user <us...@beam.apache.org>
Subject: Re: SparkRunner on k8s

Hi Buvana,

You can submit a Beam Pipeline to Spark on k8s like any other Spark Pipeline using the spark-submit script.

Create an Uber Jar of your Beam code and provide it as the primary resource to spark-submit. Provide the k8s master and the container image to use as arguments to spark-submit.
Refer https://spark.apache.org/docs/latest/running-on-kubernetes.html to know more about how to run Spark on k8s.

The Beam pipeline will be translated to a Spark Pipeline using Spark APIs in Runtime.

Regards,
Rahul

On Sat, Apr 11, 2020 at 4:38 AM Ramanan, Buvana (Nokia - US/Murray Hill) <bu...@nokia-bell-labs.com>> wrote:
Hello,

I newly joined this group and I went through the archive to see if any discussion exists on submitting Beam pipelines to a SparkRunner on k8s.

I run my Spark jobs on a k8s cluster in the cluster mode. Would like to deploy my beam pipeline on a SparkRunner with k8s underneath.

The Beam documentation:
https://beam.apache.org/documentation/runners/spark/
does not discuss about k8s (though there is mention of Mesos and YARN).

Can someone please point me to relevant material in this regard? Or, provide the steps for running my beam pipeline in this configuration?

Thank you,
Regards,
Buvana

Re: SparkRunner on k8s

Posted by rahul patwari <ra...@gmail.com>.
Hi Buvana,

You can submit a Beam Pipeline to Spark on k8s like any other Spark
Pipeline using the spark-submit script.

Create an Uber Jar of your Beam code and provide it as the primary resource
to spark-submit. Provide the k8s master and the container image to use as
arguments to spark-submit.
Refer https://spark.apache.org/docs/latest/running-on-kubernetes.html to
know more about how to run Spark on k8s.

The Beam pipeline will be translated to a Spark Pipeline using Spark APIs
in Runtime.

Regards,
Rahul

On Sat, Apr 11, 2020 at 4:38 AM Ramanan, Buvana (Nokia - US/Murray Hill) <
buvana.ramanan@nokia-bell-labs.com> wrote:

> Hello,
>
>
>
> I newly joined this group and I went through the archive to see if any
> discussion exists on submitting Beam pipelines to a SparkRunner on k8s.
>
>
>
> I run my Spark jobs on a k8s cluster in the cluster mode. Would like to
> deploy my beam pipeline on a SparkRunner with k8s underneath.
>
>
>
> The Beam documentation:
>
> https://beam.apache.org/documentation/runners/spark/
>
> does not discuss about k8s (though there is mention of Mesos and YARN).
>
>
>
> Can someone please point me to relevant material in this regard? Or,
> provide the steps for running my beam pipeline in this configuration?
>
>
>
> Thank you,
>
> Regards,
>
> Buvana
>