You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Toby Douglass <to...@avocet.io> on 2014/06/12 11:24:46 UTC

initial basic question from new user

Gents,

I am investigating Spark with a view to perform reporting on a large data
set, where the large data set receives additional data in the form of log
files on an hourly basis.

Where the data set is large there is a possibility we will create a range
of aggregate tables, to reduce the volume of data which has to be processed.

Having spent a little while reading up about Spark, my thought was that I
could create an RDD which is an agg, persist this to disk, have reporting
queries run against that RDD and when new data arrives, convert the new log
file into an agg and add that to the agg RDD.

However, I begin now to get the impression that RDDs cannot be persisted
across jobs - I can generate an RDD, I can persist it, but I can see no way
for a later job to load a persisted RDD (and I begin to think it will have
been GCed anyway, at the end of the first job).  Is this correct?

Re: initial basic question from new user

Posted by Toby Douglass <to...@avocet.io>.
On Thu, Jun 12, 2014 at 3:03 PM, Christopher Nguyen <ct...@adatao.com> wrote:

> Toby, #saveAsTextFile() and #saveAsObjectFile() are probably what you want
> for your use case.
>

Yes.  Thankyou.  I'm about to see if they exist for Python.


> As for Parquet support, that's newly arrived in Spark 1.0.0 together with
> SparkSQL so continue to watch this space.
>

Okay.


>  Gerard's suggestion to look at JobServer, which you can generalize as
> "building a long-running application which allows multiple clients to
> load/share/persist/save/collaborate-on RDDs" satisfies a larger, more
> complex use case. That is indeed the job of a higher-level application,
> subject to a wide variety of higher-level design choices. A number of us
> have successfully built Spark-based apps around that model.
>

To my eyes, where I'm new to Spark, it seems like a sledgehammer being used
to crack a nut.  If RDDs persisted across jobs (a seemingly tiny change), I
wouldn't need JobServer (a whole new application).  There's a ton of
functionality in JobServer which as yet I think I have no use for, except
for that one feature, of persisting RDDs across jobs.

Re: initial basic question from new user

Posted by Toby Douglass <to...@avocet.io>.
On Thu, Jun 12, 2014 at 4:48 PM, Andre Schumacher <
schumach@icsi.berkeley.edu> wrote:

> On 06/12/2014 05:47 PM, Toby Douglass wrote:
>
> > In these future jobs, when I come to load the aggregted RDD, will Spark
> > load and only load the columns being accessed by the query?  or will
> Spark
> > load everything, to convert it into an internal representation, and then
> > execute the query?
>
> The aforementioned native Parquet support in Spark 1.0 supports column
> projections which means only the columns that appear in the query will
> be loaded.
>

[snip]

Thankyou!

Re: initial basic question from new user

Posted by Andre Schumacher <sc...@icsi.berkeley.edu>.
Hi,

On 06/12/2014 05:47 PM, Toby Douglass wrote:

> In these future jobs, when I come to load the aggregted RDD, will Spark
> load and only load the columns being accessed by the query?  or will Spark
> load everything, to convert it into an internal representation, and then
> execute the query?

The aforementioned native Parquet support in Spark 1.0 supports column
projections which means only the columns that appear in the query will
be loaded. The next release will also support record filters for simple
pruning predicates ("int-column smaller value" and such). This is
different from using a Hadoop Input/Output format and requires no
additional setup (jars in classpath and such).

For more details see:

http://spark.apache.org/docs/latest/sql-programming-guide.html#using-parquet

Andre

Re: initial basic question from new user

Posted by Toby Douglass <to...@avocet.io>.
On Thu, Jun 12, 2014 at 3:15 PM, FRANK AUSTIN NOTHAFT <fnothaft@berkeley.edu
> wrote:

> RE:
>
> > Given that our agg sizes will exceed memory, we expect to cache them to
> disk, so save-as-object (assuming there are no out of the ordinary
> performance issues) may solve the problem, but I was hoping to store data
> is a column orientated format.  However I think this in general is not
> possible - Spark can *read* Parquet, but I think it cannot write Parquet as
> a disk-based RDD format.
>
> Spark can write Parquet, via the ParquetOutputFormat which is distributed
> from Parquet. If you'd like example code for writing out to Parquet, please
> see the adamSave function in
> https://github.com/bigdatagenomics/adam/blob/master/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMRDDFunctions.scala,
> starting at line 62. There is a bit of setup necessary for the Parquet
> write codec, but otherwise it is fairly straightforward.
>

Thankyou, Frank.

My thought is to generate an aggregated RDD from our full data set, where
the aggregated RDD will be about 10% of the size of the full data set, and
will be stored to disk in column store, to be loaded by future jobs.

In these future jobs, when I come to load the aggregted RDD, will Spark
load and only load the columns being accessed by the query?  or will Spark
load everything, to convert it into an internal representation, and then
execute the query?

Re: initial basic question from new user

Posted by FRANK AUSTIN NOTHAFT <fn...@berkeley.edu>.
RE:

> Given that our agg sizes will exceed memory, we expect to cache them to
disk, so save-as-object (assuming there are no out of the ordinary
performance issues) may solve the problem, but I was hoping to store data
is a column orientated format.  However I think this in general is not
possible - Spark can *read* Parquet, but I think it cannot write Parquet as
a disk-based RDD format.

Spark can write Parquet, via the ParquetOutputFormat which is distributed
from Parquet. If you'd like example code for writing out to Parquet, please
see the adamSave function in
https://github.com/bigdatagenomics/adam/blob/master/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMRDDFunctions.scala,
starting at line 62. There is a bit of setup necessary for the Parquet
write codec, but otherwise it is fairly straightforward.

Frank Austin Nothaft
fnothaft@berkeley.edu
fnothaft@eecs.berkeley.edu
202-340-0466


On Thu, Jun 12, 2014 at 7:03 AM, Christopher Nguyen <ct...@adatao.com> wrote:

> Toby, #saveAsTextFile() and #saveAsObjectFile() are probably what you want
> for your use case. As for Parquet support, that's newly arrived in Spark
> 1.0.0 together with SparkSQL so continue to watch this space.
>
> Gerard's suggestion to look at JobServer, which you can generalize as
> "building a long-running application which allows multiple clients to
> load/share/persist/save/collaborate-on RDDs" satisfies a larger, more
> complex use case. That is indeed the job of a higher-level application,
> subject to a wide variety of higher-level design choices. A number of us
> have successfully built Spark-based apps around that model.
> --
> Christopher T. Nguyen
> Co-founder & CEO, Adatao <http://adatao.com>
> linkedin.com/in/ctnguyen
>
>
>
> On Thu, Jun 12, 2014 at 4:35 AM, Toby Douglass <to...@avocet.io> wrote:
>
>> On Thu, Jun 12, 2014 at 11:36 AM, Gerard Maas <ge...@gmail.com>
>> wrote:
>>
>>> The goal of rdd.persist is to created a cached rdd that breaks the DAG
>>> lineage. Therefore, computations *in the same job* that use that RDD can
>>> re-use that intermediate result, but it's not meant to survive between job
>>> runs.
>>>
>>
>> As I understand it, Spark is designed for interactive querying, in the
>> sense that the caching of intermediate results eliminates the need to
>> recompute those results.
>>
>> However, if intermediate results last only for the duration of a job
>> (e.g. say a python script), how exactly is interactive querying actually
>> performed?   a script is not an interactive medium.  Is the shell the only
>> medium for interactive querying?
>>
>> Consider a common usage case : a web-site, which offers reporting upon a
>> large data set.  Users issue arbitrary queries.  A few queries (just with
>> different arguments) dominate the query load, so we thought to create
>> intermediate RDDs to service those queries, so only those order of
>> magnitude or smaller RDDs would need to be processed.  Where this is not
>> possible, we can only use Spark for reporting by issuing each query over
>> the whole data set - e.g. Spark is just like Impala is just like Presto is
>> just like [nnn].  The enourmous benefit of RDDs - the entire point of Spark
>> - so profoundly useful here - is not available.  What a huge and unexpected
>> loss!  Spark seemingly renders itself ordinary.  It is for this reason I am
>> surprised to find this functionality is not available.
>>
>>
>>> If you need to ad-hoc persist to files, you can can save RDDs using
>>> rdd.saveAsObjectFile(...) [1] and load them afterwards using
>>> sparkContext.objectFile(...)
>>>
>>
>> I've been using this site for docs;
>>
>> http://spark.apache.org
>>
>> Here we find through the top-of-the-page menus the link "API Docs" ->
>> ""Python API" which brings us to;
>>
>> http://spark.apache.org/docs/latest/api/python/pyspark.rdd.RDD-class.html
>>
>> Where this page does not show the function saveAsObjectFile().
>>
>> I find now from your link here;
>>
>>
>> https://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.rdd.RDD
>>
>> What appears to be a second and more complete set of the same
>> documentation, using a different web-interface to boot.
>>
>> It appears at least that there are two sets of documentation for the same
>> APIs, where one set is out of the date and the other not, and the out of
>> date set is that which is linked to from the main site?
>>
>> Given that our agg sizes will exceed memory, we expect to cache them to
>> disk, so save-as-object (assuming there are no out of the ordinary
>> performance issues) may solve the problem, but I was hoping to store data
>> is a column orientated format.  However I think this in general is not
>> possible - Spark can *read* Parquet, but I think it cannot write Parquet as
>> a disk-based RDD format.
>>
>> If you want to preserve the RDDs in memory between job runs, you should
>>> look at the Spark-JobServer [3]
>>>
>>
>> Thankyou.
>>
>> I view this with some trepidation.  It took two man-days to get Spark
>> running (and I've spent another man day now trying to get a map/reduce to
>> run; I'm getting there, but not there yet) - the bring-up/config experience
>> for end-users is not tested or accurated documented (although to be clear,
>> no better and no worse than is normal for open source; Spark is not
>> exceptional).  Having to bring up another open source project is a
>> significant barrier to entry; it's always such a headache.
>>
>> The save-to-disk function you mentioned earlier will allow intermediate
>> RDDs to go to disk, but we do in fact have a use case where in-memory would
>> be useful; it might allow us to ditch Cassandra, which would be wonderful,
>> since it would reduce the system count by one.
>>
>> I have to say, having to install JobServer to achieve this one end seems
>> an extraordinarily heavyweight solution - a whole new application, when all
>> that is wished for is that Spark persists RDDs across jobs, where so small
>> a feature seems to open the door to so much functionality.
>>
>>
>>
>

Re: initial basic question from new user

Posted by Christopher Nguyen <ct...@adatao.com>.
Toby, #saveAsTextFile() and #saveAsObjectFile() are probably what you want
for your use case. As for Parquet support, that's newly arrived in Spark
1.0.0 together with SparkSQL so continue to watch this space.

Gerard's suggestion to look at JobServer, which you can generalize as
"building a long-running application which allows multiple clients to
load/share/persist/save/collaborate-on RDDs" satisfies a larger, more
complex use case. That is indeed the job of a higher-level application,
subject to a wide variety of higher-level design choices. A number of us
have successfully built Spark-based apps around that model.
--
Christopher T. Nguyen
Co-founder & CEO, Adatao <http://adatao.com>
linkedin.com/in/ctnguyen



On Thu, Jun 12, 2014 at 4:35 AM, Toby Douglass <to...@avocet.io> wrote:

> On Thu, Jun 12, 2014 at 11:36 AM, Gerard Maas <ge...@gmail.com>
> wrote:
>
>> The goal of rdd.persist is to created a cached rdd that breaks the DAG
>> lineage. Therefore, computations *in the same job* that use that RDD can
>> re-use that intermediate result, but it's not meant to survive between job
>> runs.
>>
>
> As I understand it, Spark is designed for interactive querying, in the
> sense that the caching of intermediate results eliminates the need to
> recompute those results.
>
> However, if intermediate results last only for the duration of a job (e.g.
> say a python script), how exactly is interactive querying actually
> performed?   a script is not an interactive medium.  Is the shell the only
> medium for interactive querying?
>
> Consider a common usage case : a web-site, which offers reporting upon a
> large data set.  Users issue arbitrary queries.  A few queries (just with
> different arguments) dominate the query load, so we thought to create
> intermediate RDDs to service those queries, so only those order of
> magnitude or smaller RDDs would need to be processed.  Where this is not
> possible, we can only use Spark for reporting by issuing each query over
> the whole data set - e.g. Spark is just like Impala is just like Presto is
> just like [nnn].  The enourmous benefit of RDDs - the entire point of Spark
> - so profoundly useful here - is not available.  What a huge and unexpected
> loss!  Spark seemingly renders itself ordinary.  It is for this reason I am
> surprised to find this functionality is not available.
>
>
>> If you need to ad-hoc persist to files, you can can save RDDs using
>> rdd.saveAsObjectFile(...) [1] and load them afterwards using
>> sparkContext.objectFile(...)
>>
>
> I've been using this site for docs;
>
> http://spark.apache.org
>
> Here we find through the top-of-the-page menus the link "API Docs" ->
> ""Python API" which brings us to;
>
> http://spark.apache.org/docs/latest/api/python/pyspark.rdd.RDD-class.html
>
> Where this page does not show the function saveAsObjectFile().
>
> I find now from your link here;
>
>
> https://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.rdd.RDD
>
> What appears to be a second and more complete set of the same
> documentation, using a different web-interface to boot.
>
> It appears at least that there are two sets of documentation for the same
> APIs, where one set is out of the date and the other not, and the out of
> date set is that which is linked to from the main site?
>
> Given that our agg sizes will exceed memory, we expect to cache them to
> disk, so save-as-object (assuming there are no out of the ordinary
> performance issues) may solve the problem, but I was hoping to store data
> is a column orientated format.  However I think this in general is not
> possible - Spark can *read* Parquet, but I think it cannot write Parquet as
> a disk-based RDD format.
>
> If you want to preserve the RDDs in memory between job runs, you should
>> look at the Spark-JobServer [3]
>>
>
> Thankyou.
>
> I view this with some trepidation.  It took two man-days to get Spark
> running (and I've spent another man day now trying to get a map/reduce to
> run; I'm getting there, but not there yet) - the bring-up/config experience
> for end-users is not tested or accurated documented (although to be clear,
> no better and no worse than is normal for open source; Spark is not
> exceptional).  Having to bring up another open source project is a
> significant barrier to entry; it's always such a headache.
>
> The save-to-disk function you mentioned earlier will allow intermediate
> RDDs to go to disk, but we do in fact have a use case where in-memory would
> be useful; it might allow us to ditch Cassandra, which would be wonderful,
> since it would reduce the system count by one.
>
> I have to say, having to install JobServer to achieve this one end seems
> an extraordinarily heavyweight solution - a whole new application, when all
> that is wished for is that Spark persists RDDs across jobs, where so small
> a feature seems to open the door to so much functionality.
>
>
>

Re: initial basic question from new user

Posted by Toby Douglass <to...@avocet.io>.
On Thu, Jun 12, 2014 at 11:36 AM, Gerard Maas <ge...@gmail.com> wrote:

> The goal of rdd.persist is to created a cached rdd that breaks the DAG
> lineage. Therefore, computations *in the same job* that use that RDD can
> re-use that intermediate result, but it's not meant to survive between job
> runs.
>

As I understand it, Spark is designed for interactive querying, in the
sense that the caching of intermediate results eliminates the need to
recompute those results.

However, if intermediate results last only for the duration of a job (e.g.
say a python script), how exactly is interactive querying actually
performed?   a script is not an interactive medium.  Is the shell the only
medium for interactive querying?

Consider a common usage case : a web-site, which offers reporting upon a
large data set.  Users issue arbitrary queries.  A few queries (just with
different arguments) dominate the query load, so we thought to create
intermediate RDDs to service those queries, so only those order of
magnitude or smaller RDDs would need to be processed.  Where this is not
possible, we can only use Spark for reporting by issuing each query over
the whole data set - e.g. Spark is just like Impala is just like Presto is
just like [nnn].  The enourmous benefit of RDDs - the entire point of Spark
- so profoundly useful here - is not available.  What a huge and unexpected
loss!  Spark seemingly renders itself ordinary.  It is for this reason I am
surprised to find this functionality is not available.


> If you need to ad-hoc persist to files, you can can save RDDs using
> rdd.saveAsObjectFile(...) [1] and load them afterwards using
> sparkContext.objectFile(...)
>

I've been using this site for docs;

http://spark.apache.org

Here we find through the top-of-the-page menus the link "API Docs" ->
""Python API" which brings us to;

http://spark.apache.org/docs/latest/api/python/pyspark.rdd.RDD-class.html

Where this page does not show the function saveAsObjectFile().

I find now from your link here;

https://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.rdd.RDD

What appears to be a second and more complete set of the same
documentation, using a different web-interface to boot.

It appears at least that there are two sets of documentation for the same
APIs, where one set is out of the date and the other not, and the out of
date set is that which is linked to from the main site?

Given that our agg sizes will exceed memory, we expect to cache them to
disk, so save-as-object (assuming there are no out of the ordinary
performance issues) may solve the problem, but I was hoping to store data
is a column orientated format.  However I think this in general is not
possible - Spark can *read* Parquet, but I think it cannot write Parquet as
a disk-based RDD format.

If you want to preserve the RDDs in memory between job runs, you should
> look at the Spark-JobServer [3]
>

Thankyou.

I view this with some trepidation.  It took two man-days to get Spark
running (and I've spent another man day now trying to get a map/reduce to
run; I'm getting there, but not there yet) - the bring-up/config experience
for end-users is not tested or accurated documented (although to be clear,
no better and no worse than is normal for open source; Spark is not
exceptional).  Having to bring up another open source project is a
significant barrier to entry; it's always such a headache.

The save-to-disk function you mentioned earlier will allow intermediate
RDDs to go to disk, but we do in fact have a use case where in-memory would
be useful; it might allow us to ditch Cassandra, which would be wonderful,
since it would reduce the system count by one.

I have to say, having to install JobServer to achieve this one end seems an
extraordinarily heavyweight solution - a whole new application, when all
that is wished for is that Spark persists RDDs across jobs, where so small
a feature seems to open the door to so much functionality.

Re: initial basic question from new user

Posted by Toby Douglass <to...@avocet.io>.
On Thu, Jun 12, 2014 at 11:36 AM, Gerard Maas <ge...@gmail.com> wrote:

> If you need to ad-hoc persist to files, you can can save RDDs using
> rdd.saveAsObjectFile(...) [1] and load them afterwards using
> sparkContext.objectFile(...)
>

Appears not available from Python.

Re: initial basic question from new user

Posted by Gerard Maas <ge...@gmail.com>.
The goal of rdd.persist is to created a cached rdd that breaks the DAG
lineage. Therefore, computations *in the same job* that use that RDD can
re-use that intermediate result, but it's not meant to survive between job
runs.

for example:

val baseData = rawDataRdd.map(...).flatMap(...).reduceByKey(...).persist
val metric1 = baseData.flatMap(op1).reduceByKey.collect
val metric2 = baseData.flatMap(op2).reduceByKey.collect

Without persist, computing metric1 and metric2 would trigger the
computation starting from rawData. With persist, both metric1 and metric2
will start from the intermediate result (baseData)

If you need to ad-hoc persist to files, you can can save RDDs using
rdd.saveAsObjectFile(...) [1] and load them afterwards using
sparkContext.objectFile(...)
If you want to preserve the RDDs in memory between job runs, you should
look at the Spark-JobServer [3]

[1]
https://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.rdd.RDD

[2]
https://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.SparkContext

[3] https://github.com/ooyala/spark-jobserver



On Thu, Jun 12, 2014 at 11:24 AM, Toby Douglass <to...@avocet.io> wrote:

> Gents,
>
> I am investigating Spark with a view to perform reporting on a large data
> set, where the large data set receives additional data in the form of log
> files on an hourly basis.
>
> Where the data set is large there is a possibility we will create a range
> of aggregate tables, to reduce the volume of data which has to be processed.
>
> Having spent a little while reading up about Spark, my thought was that I
> could create an RDD which is an agg, persist this to disk, have reporting
> queries run against that RDD and when new data arrives, convert the new log
> file into an agg and add that to the agg RDD.
>
> However, I begin now to get the impression that RDDs cannot be persisted
> across jobs - I can generate an RDD, I can persist it, but I can see no way
> for a later job to load a persisted RDD (and I begin to think it will have
> been GCed anyway, at the end of the first job).  Is this correct?
>
>
>