You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Amit Sela <am...@gmail.com> on 2017/02/01 08:01:36 UTC

Re: Beam Spark/Flink runner with DC/OS

Not sure what you mean Davor, the only runner I see registering IOs is
DataflowRunner here
<https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L228>.
I'll look into this.

On Tue, Jan 31, 2017 at 11:48 PM Davor Bonaci <da...@apache.org> wrote:

> The file systems are automatically registered locally, so that's why you'd
> be getting an error saying 'already registered'. We need to get it
> registered on your cluster, in the remote execution environment, where you
> are getting an error saying 'Unable to find handler for gs://'.
>
> Amit, any idea from the Spark runner whether IOChannelFactories get
> staged, would you expect AutoService to automatically register them or
> something manual needs to happen?
>
> On Mon, Jan 30, 2017 at 10:11 PM, Chaoran Yu <ch...@lightbend.com>
> wrote:
>
> Thank you Davor for the reply!
>
> Your understanding of my problem is exactly right.
>
> I thought about the issue you mentioned. Then I looked at Beam source
> code. It looks to me that IO is done via IOChannelFactory class. And it has
> two subclasses, FileIOChannelFactory and GcsIOChannelFactory. I figured
> probably the wrong class got registered. This link I found
> http://markmail.org/message/mrv4cg4y6bjtdssy points out the same
> registration problem. So I tried registering GcsIOChannelFactory, but got
> the following error:
>
> Scheme: [file] is already registered with class
> org.apache.beam.sdk.util.FileIOChannelFactory
>
> Now I’m not sure what to do..
>
> Thanks for the help!
>
> Chaoran
>
>
>
> On Jan 30, 2017, at 12:05 PM, Davor Bonaci <da...@apache.org> wrote:
>
> Sorry for the delay here.
>
> Am I correct in summarizing that "gs://bucket/file" doesn't work on a
> Spark cluster, but does with Spark runner locally? Beam file systems
> utilize AutoService functionality and, generally speaking, all filesystems
> should be available and automatically registered on all runners. This is
> probably just a simple matter of staging the right classes on the cluster.
>
> Pei, any additional thoughts here?
>
> On Mon, Jan 23, 2017 at 1:58 PM, Chaoran Yu <ch...@lightbend.com>
> wrote:
>
> Sorry for the spam. But to clarify, I didn’t write the code. I’m using the
> code described here https://beam.apache.org/get-started/wordcount-example/
> So the file already exists in GS.
>
> On Jan 23, 2017, at 4:55 PM, Chaoran Yu <ch...@lightbend.com> wrote:
>
> I didn’t upload the file. But since the identical Beam code, when running
> in Spark local mode, was able to fetch the file and process it, the file
> does exist.
> It’s just that somehow Spark standalone mode can’t find the file.
>
>
> On Jan 23, 2017, at 4:50 PM, Amit Sela <am...@gmail.com> wrote:
>
> I think "external" is the key here, you're cluster is running all it's
> components on your local machine so you're good.
>
> As for GS, it's like Amazon's S3 or sort-of a cloud service HDFS offered
> by Google. You need to upload your file to GS. Have you ?
>
> On Mon, Jan 23, 2017 at 11:47 PM Chaoran Yu <ch...@lightbend.com>
> wrote:
>
> Well, my file is not in my local filesystem. It’s in GS.
> This is the line of code that reads the input file: p.apply(TextIO.Read.
> from("gs://apache-beam-samples/shakespeare/*"))
>
> And this page https://beam.apache.org/get-started/quickstart/ says the
> following:
> "you can’t access a local file if you are running the pipeline on an
> external cluster”.
> I’m indeed trying to run a pipeline on a standalone Spark cluster running
> on my local machine. So local files are not an option.
>
>
> On Jan 23, 2017, at 4:41 PM, Amit Sela <am...@gmail.com> wrote:
>
> Why not try file:// instead ? it doesn't seem like you're using Google
> Storage, right ? I mean the input file is on your local FS.
>
> On Mon, Jan 23, 2017 at 11:34 PM Chaoran Yu <ch...@lightbend.com>
> wrote:
>
> No I’m not using Dataproc.
> I’m simply running on my local machine. I started a local Spark cluster
> with sbin/start-master.sh and sbin/start-slave.sh. Then I submitted my Beam
> job to that cluster.
> The gs file is the kinglear.txt from Beam’s example code and it should be
> public.
>
> My full stack trace is attached.
>
> Thanks,
> Chaoran
>
>
>
> On Jan 23, 2017, at 4:23 PM, Amit Sela <am...@gmail.com> wrote:
>
> Maybe, are you running on Dataproc ? are you using YARN/Mesos ? do the
> machines hosting the executor processes have access to GS ? could you paste
> the entire stack trace ?
>
> On Mon, Jan 23, 2017 at 11:21 PM Chaoran Yu <ch...@lightbend.com>
> wrote:
>
> Thank you Amit for the reply,
>
> I just tried two more runners and below is a summary:
>
> DirectRunner: works
> FlinkRunner: works in local mode. I got an error “Communication with
> JobManager failed: lost connection to the JobManager” when running in
> cluster mode,
> SparkRunner: works in local mode (mvn exec command) but fails in cluster
> mode (spark-submit) with the error I pasted in the previous email.
>
> In SparkRunner’s case, can it be that Spark executor can’t access gs file
> in Google Storage?
>
> Thank you,
>
>
>
> On Jan 23, 2017, at 3:28 PM, Amit Sela <am...@gmail.com> wrote:
>
> Is this working for you with other runners ? judging by the stack trace,
> it seems like IOChannelUtils fails to find a handler so it doesn't seem
> like it is a Spark specific problem.
>
> On Mon, Jan 23, 2017 at 8:50 PM Chaoran Yu <ch...@lightbend.com>
> wrote:
>
> Thank you Amit and JB!
>
> This is not related to DC/OS itself, but I ran into a problem when
> launching a Spark job on a cluster with spark-submit. My Spark job written
> in Beam can’t read the specified gs file. I got the following error:
>
> Caused by: java.io.IOException: Unable to find handler for
> gs://beam-samples/sample.txt
> at
> org.apache.beam.sdk.util.IOChannelUtils.getFactory(IOChannelUtils.java:307)
> at org.apache.beam.sdk.io
> .FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:528)
> at org.apache.beam.sdk.io
> .OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:271)
> at
> org.apache.beam.runners.spark.io.SourceRDD$Bounded$1.hasNext(SourceRDD.java:125)
>
> Then I thought about switching to reading from another source, but I saw
> in Beam’s documentation that TextIO can only read from files in Google
> Cloud Storage (prefixed with gs://) when running in cluster mode. How do
> you guys doing file IO in Beam when using the SparkRunner?
>
>
> Thank you,
> Chaoran
>
>
> On Jan 22, 2017, at 4:32 AM, Amit Sela <am...@gmail.com> wrote:
>
> I'lll join JB's comment on the Spark runner saying that submitting Beam
> pipelines using the Spark runner can be done using Spark's spark-submit
> script, find out more in the Spark runner documentation
> <https://beam.apache.org/documentation/runners/spark/>.
>
> Amit.
>
> On Sun, Jan 22, 2017 at 8:03 AM Jean-Baptiste Onofré <jb...@nanthrax.net>
> wrote:
>
> Hi,
>
> Not directly DCOS (I think Stephen did some test on it), but I have a
> platform running Spark and Flink with Beam on Mesos + Marathon.
>
> It basically doesn't have anything special as running piplines uses
> spark-submit (as on in Spark "natively").
>
> Regards
> JB
>
> On 01/22/2017 12:56 AM, Chaoran Yu wrote:
> > Hello all,
> >
> >   Has anyone had experience using Beam on DC/OS? I want to run Beam code
> >
> > executed with Spark runner on DC/OS. As a next step, I would like to run
> the
> >
> > Flink runner as well. There doesn't seem to exist any information
> > about running
> >
> > Beam on DC/OS I can find on the web. So some pointers are greatly
> > appreciated.
> >
> > Thank you,
> >
> > Chaoran Yu
> >
>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
>
>
>
>
>
>
>
>
>
>

Re: Beam Spark/Flink runner with DC/OS

Posted by Frances Perry <fr...@apache.org>.
I'm also hitting the issue with the missing file handler when trying to run
WordCount with the default GCS input file on the Spark runner.

The Dataflow worker harness (non-Beam code) and prototype fn harness [1]
are both calling IOChannelUtils.registerIOFactories(options) in their main
methods, which will ensure this gets called on each machine. My guess is
this needs to happen on the spark executors as well. Filed BEAM-1556 [2] to
track.

I'm not familiar enough with the Spark runner / Spark to know where this
call should go for real. But I can verify that hacking it
into SparkRuntimeContext.deserializePipelineOptions solves the problem and
allows me to run WordCount...

[1] https://github.com/apache/beam/blob/5d6dafa26a29020afa3d
b2bac252ceadddbdeeac/sdks/java/harness/src/main/java/
org/apache/beam/fn/harness/FnHarness.java#L90
[2] https://issues.apache.org/jira/browse/BEAM-1556



On Wed, Feb 1, 2017 at 12:01 AM, Amit Sela <am...@gmail.com> wrote:

> Not sure what you mean Davor, the only runner I see registering IOs is
> DataflowRunner here
> <https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L228>.
> I'll look into this.
>
>
> On Tue, Jan 31, 2017 at 11:48 PM Davor Bonaci <da...@apache.org> wrote:
>
>> The file systems are automatically registered locally, so that's why
>> you'd be getting an error saying 'already registered'. We need to get it
>> registered on your cluster, in the remote execution environment, where you
>> are getting an error saying 'Unable to find handler for gs://'.
>>
>> Amit, any idea from the Spark runner whether IOChannelFactories get
>> staged, would you expect AutoService to automatically register them or
>> something manual needs to happen?
>>
>> On Mon, Jan 30, 2017 at 10:11 PM, Chaoran Yu <ch...@lightbend.com>
>> wrote:
>>
>> Thank you Davor for the reply!
>>
>> Your understanding of my problem is exactly right.
>>
>> I thought about the issue you mentioned. Then I looked at Beam source
>> code. It looks to me that IO is done via IOChannelFactory class. And it has
>> two subclasses, FileIOChannelFactory and GcsIOChannelFactory. I figured
>> probably the wrong class got registered. This link I found
>> http://markmail.org/message/mrv4cg4y6bjtdssy points out the same
>> registration problem. So I tried registering GcsIOChannelFactory, but got
>> the following error:
>>
>> Scheme: [file] is already registered with class org.apache.beam.sdk.util.
>> FileIOChannelFactory
>>
>> Now I’m not sure what to do..
>>
>> Thanks for the help!
>>
>> Chaoran
>>
>>
>>
>> On Jan 30, 2017, at 12:05 PM, Davor Bonaci <da...@apache.org> wrote:
>>
>> Sorry for the delay here.
>>
>> Am I correct in summarizing that "gs://bucket/file" doesn't work on a
>> Spark cluster, but does with Spark runner locally? Beam file systems
>> utilize AutoService functionality and, generally speaking, all filesystems
>> should be available and automatically registered on all runners. This is
>> probably just a simple matter of staging the right classes on the cluster.
>>
>> Pei, any additional thoughts here?
>>
>> On Mon, Jan 23, 2017 at 1:58 PM, Chaoran Yu <ch...@lightbend.com>
>> wrote:
>>
>> Sorry for the spam. But to clarify, I didn’t write the code. I’m using
>> the code described here https://beam.apache.org/
>> get-started/wordcount-example/
>> So the file already exists in GS.
>>
>> On Jan 23, 2017, at 4:55 PM, Chaoran Yu <ch...@lightbend.com> wrote:
>>
>> I didn’t upload the file. But since the identical Beam code, when running
>> in Spark local mode, was able to fetch the file and process it, the file
>> does exist.
>> It’s just that somehow Spark standalone mode can’t find the file.
>>
>>
>> On Jan 23, 2017, at 4:50 PM, Amit Sela <am...@gmail.com> wrote:
>>
>> I think "external" is the key here, you're cluster is running all it's
>> components on your local machine so you're good.
>>
>> As for GS, it's like Amazon's S3 or sort-of a cloud service HDFS offered
>> by Google. You need to upload your file to GS. Have you ?
>>
>> On Mon, Jan 23, 2017 at 11:47 PM Chaoran Yu <ch...@lightbend.com>
>> wrote:
>>
>> Well, my file is not in my local filesystem. It’s in GS.
>> This is the line of code that reads the input file: p.apply(TextIO.Read.
>> from("gs://apache-beam-samples/shakespeare/*"))
>>
>> And this page https://beam.apache.org/get-started/quickstart/ says the
>> following:
>> "you can’t access a local file if you are running the pipeline on an
>> external cluster”.
>> I’m indeed trying to run a pipeline on a standalone Spark cluster running
>> on my local machine. So local files are not an option.
>>
>>
>> On Jan 23, 2017, at 4:41 PM, Amit Sela <am...@gmail.com> wrote:
>>
>> Why not try file:// instead ? it doesn't seem like you're using Google
>> Storage, right ? I mean the input file is on your local FS.
>>
>> On Mon, Jan 23, 2017 at 11:34 PM Chaoran Yu <ch...@lightbend.com>
>> wrote:
>>
>> No I’m not using Dataproc.
>> I’m simply running on my local machine. I started a local Spark cluster
>> with sbin/start-master.sh and sbin/start-slave.sh. Then I submitted my Beam
>> job to that cluster.
>> The gs file is the kinglear.txt from Beam’s example code and it should be
>> public.
>>
>> My full stack trace is attached.
>>
>> Thanks,
>> Chaoran
>>
>>
>>
>> On Jan 23, 2017, at 4:23 PM, Amit Sela <am...@gmail.com> wrote:
>>
>> Maybe, are you running on Dataproc ? are you using YARN/Mesos ? do the
>> machines hosting the executor processes have access to GS ? could you paste
>> the entire stack trace ?
>>
>> On Mon, Jan 23, 2017 at 11:21 PM Chaoran Yu <ch...@lightbend.com>
>> wrote:
>>
>> Thank you Amit for the reply,
>>
>> I just tried two more runners and below is a summary:
>>
>> DirectRunner: works
>> FlinkRunner: works in local mode. I got an error “Communication with
>> JobManager failed: lost connection to the JobManager” when running in
>> cluster mode,
>> SparkRunner: works in local mode (mvn exec command) but fails in cluster
>> mode (spark-submit) with the error I pasted in the previous email.
>>
>> In SparkRunner’s case, can it be that Spark executor can’t access gs file
>> in Google Storage?
>>
>> Thank you,
>>
>>
>>
>> On Jan 23, 2017, at 3:28 PM, Amit Sela <am...@gmail.com> wrote:
>>
>> Is this working for you with other runners ? judging by the stack trace,
>> it seems like IOChannelUtils fails to find a handler so it doesn't seem
>> like it is a Spark specific problem.
>>
>> On Mon, Jan 23, 2017 at 8:50 PM Chaoran Yu <ch...@lightbend.com>
>> wrote:
>>
>> Thank you Amit and JB!
>>
>> This is not related to DC/OS itself, but I ran into a problem when
>> launching a Spark job on a cluster with spark-submit. My Spark job written
>> in Beam can’t read the specified gs file. I got the following error:
>>
>> Caused by: java.io.IOException: Unable to find handler for
>> gs://beam-samples/sample.txt
>> at org.apache.beam.sdk.util.IOChannelUtils.getFactory(
>> IOChannelUtils.java:307)
>> at org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(
>> FileBasedSource.java:528)
>> at org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(
>> OffsetBasedSource.java:271)
>> at org.apache.beam.runners.spark.io.SourceRDD$Bounded$1.
>> hasNext(SourceRDD.java:125)
>>
>> Then I thought about switching to reading from another source, but I saw
>> in Beam’s documentation that TextIO can only read from files in Google
>> Cloud Storage (prefixed with gs://) when running in cluster mode. How do
>> you guys doing file IO in Beam when using the SparkRunner?
>>
>>
>> Thank you,
>> Chaoran
>>
>>
>> On Jan 22, 2017, at 4:32 AM, Amit Sela <am...@gmail.com> wrote:
>>
>> I'lll join JB's comment on the Spark runner saying that submitting Beam
>> pipelines using the Spark runner can be done using Spark's spark-submit
>> script, find out more in the Spark runner documentation
>> <https://beam.apache.org/documentation/runners/spark/>.
>>
>> Amit.
>>
>> On Sun, Jan 22, 2017 at 8:03 AM Jean-Baptiste Onofré <jb...@nanthrax.net>
>> wrote:
>>
>> Hi,
>>
>> Not directly DCOS (I think Stephen did some test on it), but I have a
>> platform running Spark and Flink with Beam on Mesos + Marathon.
>>
>> It basically doesn't have anything special as running piplines uses
>> spark-submit (as on in Spark "natively").
>>
>> Regards
>> JB
>>
>> On 01/22/2017 12:56 AM, Chaoran Yu wrote:
>> > Hello all,
>> >
>> >   Has anyone had experience using Beam on DC/OS? I want to run Beam code
>> >
>> > executed with Spark runner on DC/OS. As a next step, I would like to
>> run the
>> >
>> > Flink runner as well. There doesn't seem to exist any information
>> > about running
>> >
>> > Beam on DC/OS I can find on the web. So some pointers are greatly
>> > appreciated.
>> >
>> > Thank you,
>> >
>> > Chaoran Yu
>> >
>>
>> --
>> Jean-Baptiste Onofré
>> jbonofre@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>