You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Reynold Xin <rx...@databricks.com> on 2015/04/21 22:34:23 UTC

[discuss] new Java friendly InputSource API

I created a pull request last night for a new InputSource API that is
essentially a stripped down version of the RDD API for providing data into
Spark. Would be great to hear the community's feedback.

Spark currently has two de facto input source API:
1. RDD
2. Hadoop MapReduce InputFormat

Neither of the above is ideal:

1. RDD: It is hard for Java developers to implement RDD, given the implicit
class tags. In addition, the RDD API depends on Scala's runtime library,
which does not preserve binary compatibility across Scala versions. If a
developer chooses Java to implement an input source, it would be great if
that input source can be binary compatible in years to come.

2. Hadoop InputFormat: The Hadoop InputFormat API is overly restrictive.
For example, it forces key-value semantics, and does not support running
arbitrary code on the driver side (an example of why this is useful is
broadcast). In addition, it is somewhat awkward to tell developers that in
order to implement an input source for Spark, they should learn the Hadoop
MapReduce API first.


My patch creates a new InputSource interface, described by:

- an array of InputPartition that specifies the data partitioning
- a RecordReader that specifies how data on each partition can be read

This interface is similar to Hadoop's InputFormat, except that there is no
explicit key/value separation.


JIRA ticket: https://issues.apache.org/jira/browse/SPARK-7025
Pull request: https://github.com/apache/spark/pull/5603

Re: [discuss] new Java friendly InputSource API

Posted by Mingyu Kim <mk...@palantir.com>.
I see. So, the difference is that the InputSource is instantiated on the driver side and gets sent to the executors, whereas Hadoop’s InputFormats are instantiated via reflection on the executors. That makes sense. Thanks for the clarification!

Mingyu

From: Reynold Xin <rx...@databricks.com>>
Date: Thursday, April 23, 2015 at 11:09 AM
To: Mingyu Kim <mk...@palantir.com>>
Cc: Soren Macbeth <so...@yieldbot.com>>, Punyashloka Biswal <pu...@gmail.com>>, "dev@spark.apache.org<ma...@spark.apache.org>" <de...@spark.apache.org>>
Subject: Re: [discuss] new Java friendly InputSource API

In the ctor of InputSource (I'm also considering adding an explicit initialize call), the implementation of InputSource can execute arbitrary code. The state in it will also be serialized and passed onto the executors.

Yes - technically you can hijack getSplits in Hadoop InputFormat to do the same thing, and then put a reference of the state into every Split. But that's kind of awkward. Hadoop relies on the giant Configuration object to pass state over.



On Thu, Apr 23, 2015 at 11:02 AM, Mingyu Kim <mk...@palantir.com>> wrote:
Hi Reynold,

You mentioned that the new API allows arbitrary code to be run on the
driver side, but it¹s not very clear to me how this is different from what
Hadoop API provides. In your example of using broadcast, did you mean
broadcasting something in InputSource.getPartitions() and having
InputPartitions use the broadcast variables? Isn¹t that already possible
with Hadoop's InputFormat.getSplits()?

Thanks,
Mingyu





On 4/21/15, 4:33 PM, "Soren Macbeth" <so...@yieldbot.com>> wrote:

>I'm also super interested in this. Flambo (our clojure DSL) wraps the java
>api and it would be great to have this.
>
>On Tue, Apr 21, 2015 at 4:10 PM, Reynold Xin <rx...@databricks.com>> wrote:
>
>> It can reuse. That's a good point and we should document it in the API
>> contract.
>>
>>
>> On Tue, Apr 21, 2015 at 4:06 PM, Punyashloka Biswal <
>> punya.biswal@gmail.com<ma...@gmail.com>>
>> wrote:
>>
>> > Reynold, thanks for this! At Palantir we're heavy users of the Java
>>APIs
>> > and appreciate being able to stop hacking around with fake ClassTags
>>:)
>> >
>> > Regarding this specific proposal, is the contract of RecordReader#get
>> > intended to be that it returns a fresh object each time? Or is it
>>allowed
>> > to mutate a fixed object and return a pointer to it each time?
>> >
>> > Put another way, is a caller supposed to clone the output of get() if
>> they
>> > want to use it later?
>> >
>> > Punya
>> >
>> > On Tue, Apr 21, 2015 at 4:35 PM Reynold Xin <rx...@databricks.com>>
>>wrote:
>> >
>> >> I created a pull request last night for a new InputSource API that is
>> >> essentially a stripped down version of the RDD API for providing data
>> into
>> >> Spark. Would be great to hear the community's feedback.
>> >>
>> >> Spark currently has two de facto input source API:
>> >> 1. RDD
>> >> 2. Hadoop MapReduce InputFormat
>> >>
>> >> Neither of the above is ideal:
>> >>
>> >> 1. RDD: It is hard for Java developers to implement RDD, given the
>> >> implicit
>> >> class tags. In addition, the RDD API depends on Scala's runtime
>>library,
>> >> which does not preserve binary compatibility across Scala versions.
>>If a
>> >> developer chooses Java to implement an input source, it would be
>>great
>> if
>> >> that input source can be binary compatible in years to come.
>> >>
>> >> 2. Hadoop InputFormat: The Hadoop InputFormat API is overly
>>restrictive.
>> >> For example, it forces key-value semantics, and does not support
>>running
>> >> arbitrary code on the driver side (an example of why this is useful
>>is
>> >> broadcast). In addition, it is somewhat awkward to tell developers
>>that
>> in
>> >> order to implement an input source for Spark, they should learn the
>> Hadoop
>> >> MapReduce API first.
>> >>
>> >>
>> >> My patch creates a new InputSource interface, described by:
>> >>
>> >> - an array of InputPartition that specifies the data partitioning
>> >> - a RecordReader that specifies how data on each partition can be
>>read
>> >>
>> >> This interface is similar to Hadoop's InputFormat, except that there
>>is
>> no
>> >> explicit key/value separation.
>> >>
>> >>
>> >> JIRA ticket:
>>https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_ji
>>ra_browse_SPARK-2D7025&d=AwIBaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oO
>>nmz8&r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=WO8We1dUoSercyCHNAKk
>>tWH_nMrqD5TUhek8mTSCfFs&s=xUHYpQoU3NlV__I37IUkVwf94zzgAvtIj6N6uy2vwnc&e=
>> >> Pull request:
>>https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_sp
>>ark_pull_5603&d=AwIBaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=en
>>nQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=WO8We1dUoSercyCHNAKktWH_nMrqD
>>5TUhek8mTSCfFs&s=qoAlpURPOSkRgXxtlXHChqVHjm3yiPFgERk4LwKHLpg&e=
>> >>
>> >
>>



Re: [discuss] new Java friendly InputSource API

Posted by Reynold Xin <rx...@databricks.com>.
In the ctor of InputSource (I'm also considering adding an explicit
initialize call), the implementation of InputSource can execute arbitrary
code. The state in it will also be serialized and passed onto the executors.

Yes - technically you can hijack getSplits in Hadoop InputFormat to do the
same thing, and then put a reference of the state into every Split. But
that's kind of awkward. Hadoop relies on the giant Configuration object to
pass state over.



On Thu, Apr 23, 2015 at 11:02 AM, Mingyu Kim <mk...@palantir.com> wrote:

> Hi Reynold,
>
> You mentioned that the new API allows arbitrary code to be run on the
> driver side, but it¹s not very clear to me how this is different from what
> Hadoop API provides. In your example of using broadcast, did you mean
> broadcasting something in InputSource.getPartitions() and having
> InputPartitions use the broadcast variables? Isn¹t that already possible
> with Hadoop's InputFormat.getSplits()?
>
> Thanks,
> Mingyu
>
>
>
>
>
> On 4/21/15, 4:33 PM, "Soren Macbeth" <so...@yieldbot.com> wrote:
>
> >I'm also super interested in this. Flambo (our clojure DSL) wraps the java
> >api and it would be great to have this.
> >
> >On Tue, Apr 21, 2015 at 4:10 PM, Reynold Xin <rx...@databricks.com> wrote:
> >
> >> It can reuse. That's a good point and we should document it in the API
> >> contract.
> >>
> >>
> >> On Tue, Apr 21, 2015 at 4:06 PM, Punyashloka Biswal <
> >> punya.biswal@gmail.com>
> >> wrote:
> >>
> >> > Reynold, thanks for this! At Palantir we're heavy users of the Java
> >>APIs
> >> > and appreciate being able to stop hacking around with fake ClassTags
> >>:)
> >> >
> >> > Regarding this specific proposal, is the contract of RecordReader#get
> >> > intended to be that it returns a fresh object each time? Or is it
> >>allowed
> >> > to mutate a fixed object and return a pointer to it each time?
> >> >
> >> > Put another way, is a caller supposed to clone the output of get() if
> >> they
> >> > want to use it later?
> >> >
> >> > Punya
> >> >
> >> > On Tue, Apr 21, 2015 at 4:35 PM Reynold Xin <rx...@databricks.com>
> >>wrote:
> >> >
> >> >> I created a pull request last night for a new InputSource API that is
> >> >> essentially a stripped down version of the RDD API for providing data
> >> into
> >> >> Spark. Would be great to hear the community's feedback.
> >> >>
> >> >> Spark currently has two de facto input source API:
> >> >> 1. RDD
> >> >> 2. Hadoop MapReduce InputFormat
> >> >>
> >> >> Neither of the above is ideal:
> >> >>
> >> >> 1. RDD: It is hard for Java developers to implement RDD, given the
> >> >> implicit
> >> >> class tags. In addition, the RDD API depends on Scala's runtime
> >>library,
> >> >> which does not preserve binary compatibility across Scala versions.
> >>If a
> >> >> developer chooses Java to implement an input source, it would be
> >>great
> >> if
> >> >> that input source can be binary compatible in years to come.
> >> >>
> >> >> 2. Hadoop InputFormat: The Hadoop InputFormat API is overly
> >>restrictive.
> >> >> For example, it forces key-value semantics, and does not support
> >>running
> >> >> arbitrary code on the driver side (an example of why this is useful
> >>is
> >> >> broadcast). In addition, it is somewhat awkward to tell developers
> >>that
> >> in
> >> >> order to implement an input source for Spark, they should learn the
> >> Hadoop
> >> >> MapReduce API first.
> >> >>
> >> >>
> >> >> My patch creates a new InputSource interface, described by:
> >> >>
> >> >> - an array of InputPartition that specifies the data partitioning
> >> >> - a RecordReader that specifies how data on each partition can be
> >>read
> >> >>
> >> >> This interface is similar to Hadoop's InputFormat, except that there
> >>is
> >> no
> >> >> explicit key/value separation.
> >> >>
> >> >>
> >> >> JIRA ticket:
> >>
> https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_ji
> >>ra_browse_SPARK-2D7025&d=AwIBaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oO
> >>nmz8&r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=WO8We1dUoSercyCHNAKk
> >>tWH_nMrqD5TUhek8mTSCfFs&s=xUHYpQoU3NlV__I37IUkVwf94zzgAvtIj6N6uy2vwnc&e=
> >> >> Pull request:
> >>
> https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_sp
> >>ark_pull_5603&d=AwIBaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=en
> >>nQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=WO8We1dUoSercyCHNAKktWH_nMrqD
> >>5TUhek8mTSCfFs&s=qoAlpURPOSkRgXxtlXHChqVHjm3yiPFgERk4LwKHLpg&e=
> >> >>
> >> >
> >>
>
>

Re: [discuss] new Java friendly InputSource API

Posted by Mingyu Kim <mk...@palantir.com>.
Hi Reynold,

You mentioned that the new API allows arbitrary code to be run on the
driver side, but it¹s not very clear to me how this is different from what
Hadoop API provides. In your example of using broadcast, did you mean
broadcasting something in InputSource.getPartitions() and having
InputPartitions use the broadcast variables? Isn¹t that already possible
with Hadoop's InputFormat.getSplits()?

Thanks,
Mingyu





On 4/21/15, 4:33 PM, "Soren Macbeth" <so...@yieldbot.com> wrote:

>I'm also super interested in this. Flambo (our clojure DSL) wraps the java
>api and it would be great to have this.
>
>On Tue, Apr 21, 2015 at 4:10 PM, Reynold Xin <rx...@databricks.com> wrote:
>
>> It can reuse. That's a good point and we should document it in the API
>> contract.
>>
>>
>> On Tue, Apr 21, 2015 at 4:06 PM, Punyashloka Biswal <
>> punya.biswal@gmail.com>
>> wrote:
>>
>> > Reynold, thanks for this! At Palantir we're heavy users of the Java
>>APIs
>> > and appreciate being able to stop hacking around with fake ClassTags
>>:)
>> >
>> > Regarding this specific proposal, is the contract of RecordReader#get
>> > intended to be that it returns a fresh object each time? Or is it
>>allowed
>> > to mutate a fixed object and return a pointer to it each time?
>> >
>> > Put another way, is a caller supposed to clone the output of get() if
>> they
>> > want to use it later?
>> >
>> > Punya
>> >
>> > On Tue, Apr 21, 2015 at 4:35 PM Reynold Xin <rx...@databricks.com>
>>wrote:
>> >
>> >> I created a pull request last night for a new InputSource API that is
>> >> essentially a stripped down version of the RDD API for providing data
>> into
>> >> Spark. Would be great to hear the community's feedback.
>> >>
>> >> Spark currently has two de facto input source API:
>> >> 1. RDD
>> >> 2. Hadoop MapReduce InputFormat
>> >>
>> >> Neither of the above is ideal:
>> >>
>> >> 1. RDD: It is hard for Java developers to implement RDD, given the
>> >> implicit
>> >> class tags. In addition, the RDD API depends on Scala's runtime
>>library,
>> >> which does not preserve binary compatibility across Scala versions.
>>If a
>> >> developer chooses Java to implement an input source, it would be
>>great
>> if
>> >> that input source can be binary compatible in years to come.
>> >>
>> >> 2. Hadoop InputFormat: The Hadoop InputFormat API is overly
>>restrictive.
>> >> For example, it forces key-value semantics, and does not support
>>running
>> >> arbitrary code on the driver side (an example of why this is useful
>>is
>> >> broadcast). In addition, it is somewhat awkward to tell developers
>>that
>> in
>> >> order to implement an input source for Spark, they should learn the
>> Hadoop
>> >> MapReduce API first.
>> >>
>> >>
>> >> My patch creates a new InputSource interface, described by:
>> >>
>> >> - an array of InputPartition that specifies the data partitioning
>> >> - a RecordReader that specifies how data on each partition can be
>>read
>> >>
>> >> This interface is similar to Hadoop's InputFormat, except that there
>>is
>> no
>> >> explicit key/value separation.
>> >>
>> >>
>> >> JIRA ticket: 
>>https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_ji
>>ra_browse_SPARK-2D7025&d=AwIBaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oO
>>nmz8&r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=WO8We1dUoSercyCHNAKk
>>tWH_nMrqD5TUhek8mTSCfFs&s=xUHYpQoU3NlV__I37IUkVwf94zzgAvtIj6N6uy2vwnc&e=
>> >> Pull request:
>>https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_sp
>>ark_pull_5603&d=AwIBaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=en
>>nQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=WO8We1dUoSercyCHNAKktWH_nMrqD
>>5TUhek8mTSCfFs&s=qoAlpURPOSkRgXxtlXHChqVHjm3yiPFgERk4LwKHLpg&e=
>> >>
>> >
>>


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: [discuss] new Java friendly InputSource API

Posted by Soren Macbeth <so...@yieldbot.com>.
I'm also super interested in this. Flambo (our clojure DSL) wraps the java
api and it would be great to have this.

On Tue, Apr 21, 2015 at 4:10 PM, Reynold Xin <rx...@databricks.com> wrote:

> It can reuse. That's a good point and we should document it in the API
> contract.
>
>
> On Tue, Apr 21, 2015 at 4:06 PM, Punyashloka Biswal <
> punya.biswal@gmail.com>
> wrote:
>
> > Reynold, thanks for this! At Palantir we're heavy users of the Java APIs
> > and appreciate being able to stop hacking around with fake ClassTags :)
> >
> > Regarding this specific proposal, is the contract of RecordReader#get
> > intended to be that it returns a fresh object each time? Or is it allowed
> > to mutate a fixed object and return a pointer to it each time?
> >
> > Put another way, is a caller supposed to clone the output of get() if
> they
> > want to use it later?
> >
> > Punya
> >
> > On Tue, Apr 21, 2015 at 4:35 PM Reynold Xin <rx...@databricks.com> wrote:
> >
> >> I created a pull request last night for a new InputSource API that is
> >> essentially a stripped down version of the RDD API for providing data
> into
> >> Spark. Would be great to hear the community's feedback.
> >>
> >> Spark currently has two de facto input source API:
> >> 1. RDD
> >> 2. Hadoop MapReduce InputFormat
> >>
> >> Neither of the above is ideal:
> >>
> >> 1. RDD: It is hard for Java developers to implement RDD, given the
> >> implicit
> >> class tags. In addition, the RDD API depends on Scala's runtime library,
> >> which does not preserve binary compatibility across Scala versions. If a
> >> developer chooses Java to implement an input source, it would be great
> if
> >> that input source can be binary compatible in years to come.
> >>
> >> 2. Hadoop InputFormat: The Hadoop InputFormat API is overly restrictive.
> >> For example, it forces key-value semantics, and does not support running
> >> arbitrary code on the driver side (an example of why this is useful is
> >> broadcast). In addition, it is somewhat awkward to tell developers that
> in
> >> order to implement an input source for Spark, they should learn the
> Hadoop
> >> MapReduce API first.
> >>
> >>
> >> My patch creates a new InputSource interface, described by:
> >>
> >> - an array of InputPartition that specifies the data partitioning
> >> - a RecordReader that specifies how data on each partition can be read
> >>
> >> This interface is similar to Hadoop's InputFormat, except that there is
> no
> >> explicit key/value separation.
> >>
> >>
> >> JIRA ticket: https://issues.apache.org/jira/browse/SPARK-7025
> >> Pull request: https://github.com/apache/spark/pull/5603
> >>
> >
>

Re: [discuss] new Java friendly InputSource API

Posted by Reynold Xin <rx...@databricks.com>.
It can reuse. That's a good point and we should document it in the API
contract.


On Tue, Apr 21, 2015 at 4:06 PM, Punyashloka Biswal <pu...@gmail.com>
wrote:

> Reynold, thanks for this! At Palantir we're heavy users of the Java APIs
> and appreciate being able to stop hacking around with fake ClassTags :)
>
> Regarding this specific proposal, is the contract of RecordReader#get
> intended to be that it returns a fresh object each time? Or is it allowed
> to mutate a fixed object and return a pointer to it each time?
>
> Put another way, is a caller supposed to clone the output of get() if they
> want to use it later?
>
> Punya
>
> On Tue, Apr 21, 2015 at 4:35 PM Reynold Xin <rx...@databricks.com> wrote:
>
>> I created a pull request last night for a new InputSource API that is
>> essentially a stripped down version of the RDD API for providing data into
>> Spark. Would be great to hear the community's feedback.
>>
>> Spark currently has two de facto input source API:
>> 1. RDD
>> 2. Hadoop MapReduce InputFormat
>>
>> Neither of the above is ideal:
>>
>> 1. RDD: It is hard for Java developers to implement RDD, given the
>> implicit
>> class tags. In addition, the RDD API depends on Scala's runtime library,
>> which does not preserve binary compatibility across Scala versions. If a
>> developer chooses Java to implement an input source, it would be great if
>> that input source can be binary compatible in years to come.
>>
>> 2. Hadoop InputFormat: The Hadoop InputFormat API is overly restrictive.
>> For example, it forces key-value semantics, and does not support running
>> arbitrary code on the driver side (an example of why this is useful is
>> broadcast). In addition, it is somewhat awkward to tell developers that in
>> order to implement an input source for Spark, they should learn the Hadoop
>> MapReduce API first.
>>
>>
>> My patch creates a new InputSource interface, described by:
>>
>> - an array of InputPartition that specifies the data partitioning
>> - a RecordReader that specifies how data on each partition can be read
>>
>> This interface is similar to Hadoop's InputFormat, except that there is no
>> explicit key/value separation.
>>
>>
>> JIRA ticket: https://issues.apache.org/jira/browse/SPARK-7025
>> Pull request: https://github.com/apache/spark/pull/5603
>>
>

Re: [discuss] new Java friendly InputSource API

Posted by Punyashloka Biswal <pu...@gmail.com>.
Reynold, thanks for this! At Palantir we're heavy users of the Java APIs
and appreciate being able to stop hacking around with fake ClassTags :)

Regarding this specific proposal, is the contract of RecordReader#get
intended to be that it returns a fresh object each time? Or is it allowed
to mutate a fixed object and return a pointer to it each time?

Put another way, is a caller supposed to clone the output of get() if they
want to use it later?

Punya
On Tue, Apr 21, 2015 at 4:35 PM Reynold Xin <rx...@databricks.com> wrote:

> I created a pull request last night for a new InputSource API that is
> essentially a stripped down version of the RDD API for providing data into
> Spark. Would be great to hear the community's feedback.
>
> Spark currently has two de facto input source API:
> 1. RDD
> 2. Hadoop MapReduce InputFormat
>
> Neither of the above is ideal:
>
> 1. RDD: It is hard for Java developers to implement RDD, given the implicit
> class tags. In addition, the RDD API depends on Scala's runtime library,
> which does not preserve binary compatibility across Scala versions. If a
> developer chooses Java to implement an input source, it would be great if
> that input source can be binary compatible in years to come.
>
> 2. Hadoop InputFormat: The Hadoop InputFormat API is overly restrictive.
> For example, it forces key-value semantics, and does not support running
> arbitrary code on the driver side (an example of why this is useful is
> broadcast). In addition, it is somewhat awkward to tell developers that in
> order to implement an input source for Spark, they should learn the Hadoop
> MapReduce API first.
>
>
> My patch creates a new InputSource interface, described by:
>
> - an array of InputPartition that specifies the data partitioning
> - a RecordReader that specifies how data on each partition can be read
>
> This interface is similar to Hadoop's InputFormat, except that there is no
> explicit key/value separation.
>
>
> JIRA ticket: https://issues.apache.org/jira/browse/SPARK-7025
> Pull request: https://github.com/apache/spark/pull/5603
>