You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Dan Halperin <dh...@google.com.INVALID> on 2016/10/03 16:34:22 UTC

Re: Preferred locations (or data locality) for batch pipelines.

See if this is a right interpretation:

* Hadoop's InputSplit has a getLocations
<https://hadoop.apache.org/docs/r2.7.3/api/org/apache/hadoop/mapred/InputSplit.html#getLocations()>
method that in some cases exposes useful information about the underlying
data locality.
* Beam jobs may run on the same cluster as the HDFS storage nodes (e.g.),
in which case it's useful to expose the locality to runners to assign
mappers (e.g.) to be near the data.

In that case, I think it makes perfect sense to expose the `getLocations`
on the actual HDFS sources. To do this, we would need to make the HDFS
Source an actual BoundedSource with a getter for the locations -- rather
than an anonymous inner class. See here: https://github.com/apache/
incubator-beam/blob/master/sdks/java/io/hdfs/src/main/
java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java#L180

If that's right, makes sense to me!

Dan

On Mon, Sep 26, 2016 at 12:55 PM, Amit Sela <am...@gmail.com> wrote:

> Thanks for the through response Dan, what you mentioned is very interesting
> and would clearly benefit runners.
>
> I was actually talking about something more "old-school", and specific to
> batch.
> If running a job on YARN - via MapReduce, Spark, etc. - you'd prefer that
> YARN would assign tasks working on splits locally.
>
> Spark does this for HDFS/HBase/S3:
> https://github.com/apache/spark/blob/branch-1.6/core/src/
> main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L241
> .
>
> Since for most(?) open-source runners YARN is the preferred/popular
> resource manager, and HDFS is the preferred filesystem, I was wondering if
> that's something that could be shared across runners and not being
> re-written per-runner.
> I'm talking about obtaining the locations of the input splits, and passing
> them to the runners to choose how to use them.
>
> I wonder if there's a need for that besides the Spark runner though, it's
> only for batch.. I opened https://issues.apache.org/jira/browse/BEAM-673
> as
> a "runner-spark" component for now.
>
> Thanks,
> Amit
>
>
> On Mon, Sep 26, 2016 at 10:39 PM Dan Halperin <dhalperi@google.com.invalid
> >
> wrote:
>
> > Hi Amit,
> >
> > Sorry to be late to the thread, but I've been traveling. I'm not sure I
> > fully grokked the question, but here's one attempt at an answer:
> >
> > In general, any options on where a pipeline is executed should be
> > runner-specific. One example: for Dataflow, we have the zone
> > <
> > https://github.com/apache/incubator-beam/blob/master/runners
> /google-cloud-dataflow-java/src/main/java/org/apache/beam/
> runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java#L167
> > >
> > option,
> > which can be used to control what GCE zone VMs are launched in. I could
> > imagine similar things for Spark/Yarn, etc.
> >
> > I think your question may be a bit deeper: given a pipeline without such
> > explicit configuration from the user, can a runner do something smart? I
> > think the answer to that is also yes. Today, we have DisplayData and soon
> > we will have the Runner API -- these expose in a standard way information
> > about file paths, BigQuery tables, Bigtable clusters, Kafka clusters,
> etc.,
> > that may be used by the pipeline. Once the Runner API is standardized and
> > implemented, a runner ought to be able to inspect the metadata and say
> > "hey, I see you're reading from this Kafka cluster, let's try to be near
> > it". For example.
> >
> > Does that answer the question / did I miss something?
> >
> > Thanks,
> > Dan
> >
> > On Thu, Sep 22, 2016 at 8:29 AM, Amit Sela <am...@gmail.com> wrote:
> >
> > > Generally this makes sense, though I thought that this is what
> > > IOChannelFactory was (also) about, and eventually the runner needs to
> > > facilitate the splitting/partitioning of the source, so I was wondering
> > if
> > > the source could have a generic mechanism for locality as well.
> > >
> > > On Thu, Sep 22, 2016 at 6:11 PM Jesse Anderson <je...@smokinghand.com>
> > > wrote:
> > >
> > > > I think the runners should. Each framework has put far more effort
> into
> > > > data locality than Beam should. Beam should just take advantage of
> it.
> > > >
> > > > On Thu, Sep 22, 2016, 7:57 AM Amit Sela <am...@gmail.com>
> wrote:
> > > >
> > > > > Not where in the file, where in the cluster.
> > > > >
> > > > > Like you said - mapper - in MapReduce the mapper instance will
> > *prefer*
> > > > to
> > > > > start on the same machine as the Node hosting it (unless that's
> > > changed,
> > > > > I've been out of touch with MR for a while...).
> > > > >
> > > > > And for Spark -
> > > > >
> > > > >
> > > > https://databricks.gitbooks.io/databricks-spark-knowledge-ba
> > > se/content/performance_optimization/data_locality.html
> > > > > .
> > > > >
> > > > > As for Flink, it's a streaming-first engine (sort of the opposite
> of
> > > > Spark,
> > > > > being a batch-first engine) so I *assume* they don't have this
> notion
> > > and
> > > > > simply "stream" input.
> > > > >
> > > > > Dataflow - no idea...
> > > > >
> > > > > On Thu, Sep 22, 2016 at 5:45 PM Jesse Anderson <
> > jesse@smokinghand.com>
> > > > > wrote:
> > > > >
> > > > > > I've only ever seen that being used to figure out which file the
> > > > > > runner/mapper/operation is working on. Otherwise, I haven't seen
> > > those
> > > > > > operations care where in the file they're working.
> > > > > >
> > > > > > On Thu, Sep 22, 2016 at 5:57 AM Amit Sela <am...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > Wouldn't it force all runners to implement this for all
> > distributed
> > > > > > > filesystems ? It's true that each runner has it's own
> > > "partitioning"
> > > > > > > mechanism, but I assume (maybe I'm wrong) that open-source
> > runners
> > > > use
> > > > > > the
> > > > > > > Hadoop InputFormat/InputSplit for that.. and the proper
> > connectors
> > > > for
> > > > > > that
> > > > > > > to run on top of s3/gs.
> > > > > > >
> > > > > > > If this is wrong, each runner should take care of it's own, but
> > if
> > > > not,
> > > > > > we
> > > > > > > could have a generic solution for runners, no ?
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Amit
> > > > > > >
> > > > > > > On Thu, Sep 22, 2016 at 3:30 PM Jean-Baptiste Onofré <
> > > > jb@nanthrax.net>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Amit,
> > > > > > > >
> > > > > > > > as the purpose is to remove IOChannelFactory, then I would
> > > suggest
> > > > > it's
> > > > > > > > a runner concern. The Read.Bounded should "locate" the
> bundles
> > > on a
> > > > > > > > executor close to the read data (even if it's not always
> > possible
> > > > > > > > depending of the source).
> > > > > > > >
> > > > > > > > My $0.01
> > > > > > > >
> > > > > > > > Regards
> > > > > > > > JB
> > > > > > > >
> > > > > > > > On 09/22/2016 02:26 PM, Amit Sela wrote:
> > > > > > > > > It's not new that batch pipeline can optimize on data
> > locality,
> > > > my
> > > > > > > > question
> > > > > > > > > is regarding this responsibility in Beam.
> > > > > > > > > If runners should implement a generic Read.Bounded support,
> > > > should
> > > > > > they
> > > > > > > > > also implement locating the input blocks ? or should it be
> a
> > > part
> > > > > > > > > of IOChannelFactory implementations ? or another way to go
> at
> > > it
> > > > > that
> > > > > > > I'm
> > > > > > > > > missing ?
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Amit.
> > > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > Jean-Baptiste Onofré
> > > > > > > > jbonofre@apache.org
> > > > > > > > http://blog.nanthrax.net
> > > > > > > > Talend - http://www.talend.com
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Preferred locations (or data locality) for batch pipelines.

Posted by Amit Sela <am...@gmail.com>.
You're right on the money Dan!

I'll just add a couple of more things:

   1. HDFS API can also help out with HBase locality (if the RegionServer
   is running on the same node etc.)
   2. Other filesystems such as S3 and GS have "connectors" that allow
   users to use the Hadoop API with the "s3/gs" protocol.

As for [1] it seems that since HBase is tightly coupled with HDFS, any
HBaseIO could share this implementation with the HDFS source.
Concerning [2] I'm not sure where are we with IOChannelFactory, but one way
to go would be to use those connectors with the HDFS source, at least as a
fallback until a specific IO is written for the filesystem (assuming gs is
covered, I'm mostly talking s3 here).

I've opened https://issues.apache.org/jira/browse/BEAM-673 a week ago, but
I still think it makes more sense as part of the runner API or the HDFS
source.
Do we have a ticket for an HDFS source ?

On Mon, Oct 3, 2016 at 7:34 PM Dan Halperin <dh...@google.com.invalid>
wrote:

> See if this is a right interpretation:
>
> * Hadoop's InputSplit has a getLocations
> <
> https://hadoop.apache.org/docs/r2.7.3/api/org/apache/hadoop/mapred/InputSplit.html#getLocations()
> >
> method that in some cases exposes useful information about the underlying
> data locality.
> * Beam jobs may run on the same cluster as the HDFS storage nodes (e.g.),
> in which case it's useful to expose the locality to runners to assign
> mappers (e.g.) to be near the data.
>
> In that case, I think it makes perfect sense to expose the `getLocations`
> on the actual HDFS sources. To do this, we would need to make the HDFS
> Source an actual BoundedSource with a getter for the locations -- rather
> than an anonymous inner class. See here: https://github.com/apache/
> incubator-beam/blob/master/sdks/java/io/hdfs/src/main/
> <https://github.com/apache/incubator-beam/blob/master/sdks/java/io/hdfs/src/main/>
> java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java#L180
>
> If that's right, makes sense to me!
>
> Dan
>
> On Mon, Sep 26, 2016 at 12:55 PM, Amit Sela <am...@gmail.com> wrote:
>
> > Thanks for the through response Dan, what you mentioned is very
> interesting
> > and would clearly benefit runners.
> >
> > I was actually talking about something more "old-school", and specific to
> > batch.
> > If running a job on YARN - via MapReduce, Spark, etc. - you'd prefer that
> > YARN would assign tasks working on splits locally.
> >
> > Spark does this for HDFS/HBase/S3:
> > https://github.com/apache/spark/blob/branch-1.6/core/src/
> > main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L241
> > .
> >
> > Since for most(?) open-source runners YARN is the preferred/popular
> > resource manager, and HDFS is the preferred filesystem, I was wondering
> if
> > that's something that could be shared across runners and not being
> > re-written per-runner.
> > I'm talking about obtaining the locations of the input splits, and
> passing
> > them to the runners to choose how to use them.
> >
> > I wonder if there's a need for that besides the Spark runner though, it's
> > only for batch.. I opened https://issues.apache.org/jira/browse/BEAM-673
> > as
> > a "runner-spark" component for now.
> >
> > Thanks,
> > Amit
> >
> >
> > On Mon, Sep 26, 2016 at 10:39 PM Dan Halperin
> <dhalperi@google.com.invalid
> > >
> > wrote:
> >
> > > Hi Amit,
> > >
> > > Sorry to be late to the thread, but I've been traveling. I'm not sure I
> > > fully grokked the question, but here's one attempt at an answer:
> > >
> > > In general, any options on where a pipeline is executed should be
> > > runner-specific. One example: for Dataflow, we have the zone
> > > <
> > > https://github.com/apache/incubator-beam/blob/master/runners
> > /google-cloud-dataflow-java/src/main/java/org/apache/beam/
> > runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java#L167
> > > >
> > > option,
> > > which can be used to control what GCE zone VMs are launched in. I could
> > > imagine similar things for Spark/Yarn, etc.
> > >
> > > I think your question may be a bit deeper: given a pipeline without
> such
> > > explicit configuration from the user, can a runner do something smart?
> I
> > > think the answer to that is also yes. Today, we have DisplayData and
> soon
> > > we will have the Runner API -- these expose in a standard way
> information
> > > about file paths, BigQuery tables, Bigtable clusters, Kafka clusters,
> > etc.,
> > > that may be used by the pipeline. Once the Runner API is standardized
> and
> > > implemented, a runner ought to be able to inspect the metadata and say
> > > "hey, I see you're reading from this Kafka cluster, let's try to be
> near
> > > it". For example.
> > >
> > > Does that answer the question / did I miss something?
> > >
> > > Thanks,
> > > Dan
> > >
> > > On Thu, Sep 22, 2016 at 8:29 AM, Amit Sela <am...@gmail.com>
> wrote:
> > >
> > > > Generally this makes sense, though I thought that this is what
> > > > IOChannelFactory was (also) about, and eventually the runner needs to
> > > > facilitate the splitting/partitioning of the source, so I was
> wondering
> > > if
> > > > the source could have a generic mechanism for locality as well.
> > > >
> > > > On Thu, Sep 22, 2016 at 6:11 PM Jesse Anderson <
> jesse@smokinghand.com>
> > > > wrote:
> > > >
> > > > > I think the runners should. Each framework has put far more effort
> > into
> > > > > data locality than Beam should. Beam should just take advantage of
> > it.
> > > > >
> > > > > On Thu, Sep 22, 2016, 7:57 AM Amit Sela <am...@gmail.com>
> > wrote:
> > > > >
> > > > > > Not where in the file, where in the cluster.
> > > > > >
> > > > > > Like you said - mapper - in MapReduce the mapper instance will
> > > *prefer*
> > > > > to
> > > > > > start on the same machine as the Node hosting it (unless that's
> > > > changed,
> > > > > > I've been out of touch with MR for a while...).
> > > > > >
> > > > > > And for Spark -
> > > > > >
> > > > > >
> > > > > https://databricks.gitbooks.io/databricks-spark-knowledge-ba
> > > > se/content/performance_optimization/data_locality.html
> > > > > > .
> > > > > >
> > > > > > As for Flink, it's a streaming-first engine (sort of the opposite
> > of
> > > > > Spark,
> > > > > > being a batch-first engine) so I *assume* they don't have this
> > notion
> > > > and
> > > > > > simply "stream" input.
> > > > > >
> > > > > > Dataflow - no idea...
> > > > > >
> > > > > > On Thu, Sep 22, 2016 at 5:45 PM Jesse Anderson <
> > > jesse@smokinghand.com>
> > > > > > wrote:
> > > > > >
> > > > > > > I've only ever seen that being used to figure out which file
> the
> > > > > > > runner/mapper/operation is working on. Otherwise, I haven't
> seen
> > > > those
> > > > > > > operations care where in the file they're working.
> > > > > > >
> > > > > > > On Thu, Sep 22, 2016 at 5:57 AM Amit Sela <
> amitsela33@gmail.com>
> > > > > wrote:
> > > > > > >
> > > > > > > > Wouldn't it force all runners to implement this for all
> > > distributed
> > > > > > > > filesystems ? It's true that each runner has it's own
> > > > "partitioning"
> > > > > > > > mechanism, but I assume (maybe I'm wrong) that open-source
> > > runners
> > > > > use
> > > > > > > the
> > > > > > > > Hadoop InputFormat/InputSplit for that.. and the proper
> > > connectors
> > > > > for
> > > > > > > that
> > > > > > > > to run on top of s3/gs.
> > > > > > > >
> > > > > > > > If this is wrong, each runner should take care of it's own,
> but
> > > if
> > > > > not,
> > > > > > > we
> > > > > > > > could have a generic solution for runners, no ?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Amit
> > > > > > > >
> > > > > > > > On Thu, Sep 22, 2016 at 3:30 PM Jean-Baptiste Onofré <
> > > > > jb@nanthrax.net>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Amit,
> > > > > > > > >
> > > > > > > > > as the purpose is to remove IOChannelFactory, then I would
> > > > suggest
> > > > > > it's
> > > > > > > > > a runner concern. The Read.Bounded should "locate" the
> > bundles
> > > > on a
> > > > > > > > > executor close to the read data (even if it's not always
> > > possible
> > > > > > > > > depending of the source).
> > > > > > > > >
> > > > > > > > > My $0.01
> > > > > > > > >
> > > > > > > > > Regards
> > > > > > > > > JB
> > > > > > > > >
> > > > > > > > > On 09/22/2016 02:26 PM, Amit Sela wrote:
> > > > > > > > > > It's not new that batch pipeline can optimize on data
> > > locality,
> > > > > my
> > > > > > > > > question
> > > > > > > > > > is regarding this responsibility in Beam.
> > > > > > > > > > If runners should implement a generic Read.Bounded
> support,
> > > > > should
> > > > > > > they
> > > > > > > > > > also implement locating the input blocks ? or should it
> be
> > a
> > > > part
> > > > > > > > > > of IOChannelFactory implementations ? or another way to
> go
> > at
> > > > it
> > > > > > that
> > > > > > > > I'm
> > > > > > > > > > missing ?
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Amit.
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > > Jean-Baptiste Onofré
> > > > > > > > > jbonofre@apache.org
> > > > > > > > > http://blog.nanthrax.net
> > > > > > > > > Talend - http://www.talend.com
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>