You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by raggy <ra...@gmail.com> on 2015/03/05 15:45:59 UTC

Partitioning Dataset and Using Reduce in Apache Spark

I am trying to use Apache spark to load up a file, and distribute the file to
several nodes in my cluster and then aggregate the results and obtain them.
I don't quite understand how to do this.

>From my understanding the reduce action enables Spark to combine the results
from different nodes and aggregate them together. Am I understanding this
correctly?

>From a programming perspective, I don't understand how I would code this
reduce function.

How exactly do I partition the main dataset into N pieces and ask them to be
parallel processed by using a list of transformations?

reduce is supposed to take in two elements and a function for combining
them. Are these 2 elements supposed to be RDDs from the context of Spark or
can they be any type of element? Also, if you have N different partitions
running parallel, how would reduce aggregate all their results into one
final result(since the reduce function aggregates only 2 elements)?

Also, I don't understand this example. The example from the spark website
uses reduce, but I don't see the data being processed in parallel. So, what
is the point of the reduce? If I could get a detailed explanation of the
loop in this example, I think that would clear up most of my questions.

class ComputeGradient extends Function<DataPoint, Vector> {
  private Vector w;
  ComputeGradient(Vector w) { this.w = w; }
  public Vector call(DataPoint p) {
    return p.x.times(p.y * (1 / (1 + Math.exp(w.dot(p.x))) - 1));
  }
}

JavaRDD<DataPoint> points = spark.textFile(...).map(new
ParsePoint()).cache();
Vector w = Vector.random(D); // current separating plane
for (int i = 0; i < ITERATIONS; i++) {
  Vector gradient = points.map(new ComputeGradient(w)).reduce(new
AddVectors());
  w = w.subtract(gradient);
}
System.out.println("Final separating plane: " + w);

Also, I have been trying to find the source code for reduce from the Apache
Spark Github, but the source is pretty huge and I haven't been able to
pinpoint it. Could someone please direct me towards which file I could find
it in?



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Partitioning-Dataset-and-Using-Reduce-in-Apache-Spark-tp21933.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Partitioning Dataset and Using Reduce in Apache Spark

Posted by Daniel Siegmann <da...@teamaol.com>.
On Thu, Mar 12, 2015 at 1:45 AM, <ra...@gmail.com> wrote:

>
> In your response you say “When you call reduce and *similar *methods,
> each partition can be reduced in parallel. Then the results of that can be
> transferred across the network and reduced to the final result”. By similar
> methods do you mean all actions within spark? Does transfer of data from
> worker nodes to driver nodes happen only when an action is performed?
>

There is reduceByKey and some others that are in a more generalized form.
Other transformations and actions may work somewhat differently, but they
will generally be parallelized as much as is possible.

If you look at the UI when the job is running, you will see some number of
tasks. Each task corresponds to a single partition.

Not all actions cause data to be transferred from worker nodes to the
driver (there is only one node) - saveAsTextFile, for example. In any case,
no processing is done and no data is transferred anywhere until an action
is invoked, since transformations are lazy.


> I am assuming that in Spark, you typically have a set of transformations
> followed by some sort of action. The RDD is partitioned and sent to
> different worker nodes(assuming this a cluster setup), the transformations
> are applied to the RDD partitions at the various worker nodes, and then
> when an action is performed, you perform the action on the worker nodes and
> then aggregate the partial results at the driver and then perform another
> reduction at the driver to obtain the overall results. I would also assume
> that deciding whether the action should be done on a worker node, depends
> on the type of action. For example, performing reduce at the worker node
> makes sense, while it doesn't make sense to save the file at the worker
> node.  Does that sound correct, or am I misinterpreting something?
>

On the contrary, saving of files would typically be done at the worker
node. If you are handling anything approaching "Big Data" you will be
keeping it in a distributed store (typically HDFS, though it depends on
your use case), and each worker will write into this store. For example, if
you saveAsTextFile you will see multiple "part-*" files in the output
directory from separate partitions (don't worry about combining them for
future processing, sc.textFile can read them all in as a single RDD with
the data appropriately partitioned).

Some actions do pull data back to the driver - collect, for example. You
need to be careful when using such methods that you can be sure the amount
of data won't be too large for your driver.

In general you should avoid pulling any data back to the driver or doing
any processing on the driver as that will not scale. In some cases it may
be useful; for example if you wanted to join a large data set with a small
data set, it might perform better to collect the small data set and
broadcast it.

Take this for example:

sc.textFile(inputPath).map(...).filter(...).saveAsTextFile(outputPath)

None of that will execute on the driver. The map and filter will be
collapsed into a single stage (which you will in the UI). Each worker will
prefer to read its local data (but data may be transferred if there's none
locally to work on), transform the data, and write it out.

If you had this for example:

sc.textFile(inputPath).map(...).reduceByKey(...).saveAsTextFile(outputPath)


You will have two stages, since reduceByKey causes a shuffle - data will be
transferred across the network and formed into a new set of partitions. But
after the reduce, each worker will still save the data it is responsible
for. (You can provide a custom partitioner, but don't do that unless you
feel you have a good reason to do so.)

Hope that helps.

Re: Partitioning Dataset and Using Reduce in Apache Spark

Posted by ra...@gmail.com.

 Thank you very much for your detailed response, it was very informative and cleared up some of my misconceptions. After your explanation, I understand that the distribution of the data and parallelism is all meant to be an abstraction to the developer. 


In your response you say “When you call reduce and similar methods, each partition can be reduced in parallel. Then the results of that can be transferred across the network and reduced to the final result”. By similar methods do you mean all actions within spark? Does transfer of data from worker nodes to driver nodes happen only when an action is performed?


I am assuming that in Spark, you typically have a set of transformations followed by some sort of action. The RDD is partitioned and sent to different worker nodes(assuming this a cluster setup), the transformations are applied to the RDD partitions at the various worker nodes, and then when an action is performed, you perform the action on the worker nodes and then aggregate the partial results at the driver and then perform another reduction at the driver to obtain the overall results. I would also assume that deciding whether the action should be done on a worker node, depends on the type of action. For example, performing reduce at the worker node makes sense, while it doesn't make sense to save the file at the worker node.  Does that sound correct, or am I misinterpreting something?






Thanks,

Raghav





From: Daniel Siegmann
Sent: ‎Thursday‎, ‎March‎ ‎5‎, ‎2015 ‎2‎:‎01‎ ‎PM
To: Raghav Shankar
Cc: user@spark.apache.org








An RDD is a Resilient Distributed Data set. The partitioning and distribution of the data happens in the background. You'll occasionally need to concern yourself with it (especially to get good performance), but from an API perspective it's mostly invisible (some methods do allow you to specify a number of partitions).


When you call sc.textFile(myPath) or similar, you get an RDD. That RDD will be composed of a bunch of partitions, but you don't really need to worry about that. The partitioning will be based on how the data is stored. When you call a method that causes a shuffle (such as reduce), the data is repartitioned into a number of partitions based on your default parallelism setting (which IIRC is based on your number of cores if you haven't set it explicitly).

When you call reduce and similar methods, each partition can be reduced in parallel. Then the results of that can be transferred across the network and reduced to the final result. You supply the function and Spark handles the parallel execution of that function.

I hope this helps clear up your misconceptions. You might also want to familiarize yourself with the collections API in Java 8 (or Scala, or Python, or pretty much any other language with lambda expressions), since RDDs are meant to have an API that feels similar.



On Thu, Mar 5, 2015 at 9:45 AM, raggy <ra...@gmail.com> wrote:

I am trying to use Apache spark to load up a file, and distribute the file to
several nodes in my cluster and then aggregate the results and obtain them.
I don't quite understand how to do this.

From my understanding the reduce action enables Spark to combine the results
from different nodes and aggregate them together. Am I understanding this
correctly?

From a programming perspective, I don't understand how I would code this
reduce function.

How exactly do I partition the main dataset into N pieces and ask them to be
parallel processed by using a list of transformations?

reduce is supposed to take in two elements and a function for combining
them. Are these 2 elements supposed to be RDDs from the context of Spark or
can they be any type of element? Also, if you have N different partitions
running parallel, how would reduce aggregate all their results into one
final result(since the reduce function aggregates only 2 elements)?

Also, I don't understand this example. The example from the spark website
uses reduce, but I don't see the data being processed in parallel. So, what
is the point of the reduce? If I could get a detailed explanation of the
loop in this example, I think that would clear up most of my questions.

class ComputeGradient extends Function<DataPoint, Vector> {
  private Vector w;
  ComputeGradient(Vector w) { this.w = w; }
  public Vector call(DataPoint p) {
    return p.x.times(p.y * (1 / (1 + Math.exp(w.dot(p.x))) - 1));
  }
}

JavaRDD<DataPoint> points = spark.textFile(...).map(new
ParsePoint()).cache();
Vector w = Vector.random(D); // current separating plane
for (int i = 0; i < ITERATIONS; i++) {
  Vector gradient = points.map(new ComputeGradient(w)).reduce(new
AddVectors());
  w = w.subtract(gradient);
}
System.out.println("Final separating plane: " + w);

Also, I have been trying to find the source code for reduce from the Apache
Spark Github, but the source is pretty huge and I haven't been able to
pinpoint it. Could someone please direct me towards which file I could find
it in?



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Partitioning-Dataset-and-Using-Reduce-in-Apache-Spark-tp21933.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org

Re: Partitioning Dataset and Using Reduce in Apache Spark

Posted by Daniel Siegmann <da...@teamaol.com>.
An RDD is a Resilient *Distributed* Data set. The partitioning and
distribution of the data happens in the background. You'll occasionally
need to concern yourself with it (especially to get good performance), but
from an API perspective it's mostly invisible (some methods do allow you to
specify a number of partitions).

When you call sc.textFile(myPath) or similar, you get an RDD. That RDD will
be composed of a bunch of partitions, but you don't really need to worry
about that. The partitioning will be based on how the data is stored. When
you call a method that causes a shuffle (such as reduce), the data is
repartitioned into a number of partitions based on your default parallelism
setting (which IIRC is based on your number of cores if you haven't set it
explicitly).

When you call reduce and similar methods, each partition can be reduced in
parallel. Then the results of that can be transferred across the network
and reduced to the final result. *You supply the function and Spark handles
the parallel execution of that function*.

I hope this helps clear up your misconceptions. You might also want to
familiarize yourself with the collections API in Java 8 (or Scala, or
Python, or pretty much any other language with lambda expressions), since
RDDs are meant to have an API that feels similar.

On Thu, Mar 5, 2015 at 9:45 AM, raggy <ra...@gmail.com> wrote:

> I am trying to use Apache spark to load up a file, and distribute the file
> to
> several nodes in my cluster and then aggregate the results and obtain them.
> I don't quite understand how to do this.
>
> From my understanding the reduce action enables Spark to combine the
> results
> from different nodes and aggregate them together. Am I understanding this
> correctly?
>
> From a programming perspective, I don't understand how I would code this
> reduce function.
>
> How exactly do I partition the main dataset into N pieces and ask them to
> be
> parallel processed by using a list of transformations?
>
> reduce is supposed to take in two elements and a function for combining
> them. Are these 2 elements supposed to be RDDs from the context of Spark or
> can they be any type of element? Also, if you have N different partitions
> running parallel, how would reduce aggregate all their results into one
> final result(since the reduce function aggregates only 2 elements)?
>
> Also, I don't understand this example. The example from the spark website
> uses reduce, but I don't see the data being processed in parallel. So, what
> is the point of the reduce? If I could get a detailed explanation of the
> loop in this example, I think that would clear up most of my questions.
>
> class ComputeGradient extends Function<DataPoint, Vector> {
>   private Vector w;
>   ComputeGradient(Vector w) { this.w = w; }
>   public Vector call(DataPoint p) {
>     return p.x.times(p.y * (1 / (1 + Math.exp(w.dot(p.x))) - 1));
>   }
> }
>
> JavaRDD<DataPoint> points = spark.textFile(...).map(new
> ParsePoint()).cache();
> Vector w = Vector.random(D); // current separating plane
> for (int i = 0; i < ITERATIONS; i++) {
>   Vector gradient = points.map(new ComputeGradient(w)).reduce(new
> AddVectors());
>   w = w.subtract(gradient);
> }
> System.out.println("Final separating plane: " + w);
>
> Also, I have been trying to find the source code for reduce from the Apache
> Spark Github, but the source is pretty huge and I haven't been able to
> pinpoint it. Could someone please direct me towards which file I could find
> it in?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Partitioning-Dataset-and-Using-Reduce-in-Apache-Spark-tp21933.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>