You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Nathan Kronenfeld <nk...@oculusinfo.com> on 2014/07/11 15:54:18 UTC

Iteration question

Hi, folks.

We're having a problem with iteration that I don't understand.

We have the following test code:

org.apache.log4j.Logger.getLogger("org").setLevel(org.apache.log4j.Level.WARN)
org.apache.log4j.Logger.getLogger("akka").setLevel(org.apache.log4j.Level.WARN)

def test (caching: Boolean, points: Int, iterations: Int) {
var coords = sc.parallelize(Array.fill(points)(0.0,
0.0).zipWithIndex.map(_.swap))
if (caching) coords.cache
coords.count

var iteration = 0
val times = new Array[Double](iterations)

do {
val start = System.currentTimeMillis
val thisIteration = iteration
val increments = sc.parallelize(for (i <- 1 to points) yield (math.random,
math.random))
val newcoords = coords.zip(increments).map(p =>
{
if (0 == p._1._1) println("Processing iteration "+thisIteration)
(p._1._1,
 (p._1._2._1 + p._2._1,
  p._1._2._2 + p._2._2))
}
)
if (caching) newcoords.cache
newcoords.count
if (caching) coords.unpersist(false)
coords = newcoords
val end = System.currentTimeMillis

times(iteration) = (end-start)/1000.0
println("Done iteration "+iteration+" in "+times(iteration)+" seconds")
iteration = iteration + 1
} while (iteration < iterations)

for (i <- 0 until iterations) {
println("Iteration "+i+": "+times(i))
}
}

If you run this on a local server with caching on and off, it appears that
the caching does what it is supposed to do - only the latest iteration is
processed each time through the loop.

However, despite this, the time for each iteration still gets slower and
slower.
For example, calling test(true, 5000, 100), I get the following times
(weeding out a few for brevity):
Iteration 0: 0.084
Iteration 10: 0.381
Iteration 20: 0.674
Iteration 30: 0.975
Iteration 40: 1.254
Iteration 50: 1.544
Iteration 60: 1.802
Iteration 70: 2.147
Iteration 80: 2.469
Iteration 90: 2.715
Iteration 99: 2.962

That's a 35x increase between the first and last iteration, when it should
be doing the same thing each time!

Without caching, the nubmers are
Iteration 0: 0.642
Iteration 10: 0.516
Iteration 20: 0.823
Iteration 30: 1.17
Iteration 40: 1.514
Iteration 50: 1.655
Iteration 60: 1.992
Iteration 70: 2.177
Iteration 80: 2.472
Iteration 90: 2.814
Iteration 99: 3.018

slightly slower - but not significantly.

Does anyone know, if the caching is working, why is iteration 100 slower
than iteration 1?  And why is caching making so little difference?


Thanks,
            -Nathan Kronenfeld

-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenfeld@oculusinfo.com

Re: Iteration question

Posted by Matei Zaharia <ma...@gmail.com>.
Hi Nathan,

I think there are two possible reasons for this. One is that even though you are caching RDDs, their lineage chain gets longer and longer, and thus serializing each RDD takes more time. You can cut off the chain by using RDD.checkpoint() periodically, say every 5-10 iterations. The second reason may just be garbage accumulating in the JVM and causing more collection time as you go ahead.

Matei

On Jul 11, 2014, at 6:54 AM, Nathan Kronenfeld <nk...@oculusinfo.com> wrote:

> Hi, folks.
> 
> We're having a problem with iteration that I don't understand.
> 
> We have the following test code:
> 
> org.apache.log4j.Logger.getLogger("org").setLevel(org.apache.log4j.Level.WARN)
> org.apache.log4j.Logger.getLogger("akka").setLevel(org.apache.log4j.Level.WARN)
> 
> def test (caching: Boolean, points: Int, iterations: Int) {
> 	var coords = sc.parallelize(Array.fill(points)(0.0, 0.0).zipWithIndex.map(_.swap))
> 	if (caching) coords.cache
> 	coords.count
> 
> 	var iteration = 0
> 	val times = new Array[Double](iterations)
> 
> 	do {
> 		val start = System.currentTimeMillis
> 		val thisIteration = iteration
> 		val increments = sc.parallelize(for (i <- 1 to points) yield (math.random, math.random))
> 		val newcoords = coords.zip(increments).map(p =>
> 			{
> 				if (0 == p._1._1) println("Processing iteration "+thisIteration)
> 				(p._1._1,
> 				 (p._1._2._1 + p._2._1,
> 				  p._1._2._2 + p._2._2))
> 			}
> 		)
> 		if (caching) newcoords.cache
> 		newcoords.count
> 		if (caching) coords.unpersist(false)
> 		coords = newcoords
> 		val end = System.currentTimeMillis
> 
> 		times(iteration) = (end-start)/1000.0
> 		println("Done iteration "+iteration+" in "+times(iteration)+" seconds")
> 		iteration = iteration + 1
> 	} while (iteration < iterations)
> 
> 	for (i <- 0 until iterations) {
> 		println("Iteration "+i+": "+times(i))
> 	}
> }
> 
> If you run this on a local server with caching on and off, it appears that the caching does what it is supposed to do - only the latest iteration is processed each time through the loop.
> 
> However, despite this, the time for each iteration still gets slower and slower.
> For example, calling test(true, 5000, 100), I get the following times (weeding out a few for brevity):
> Iteration 0: 0.084
> Iteration 10: 0.381
> Iteration 20: 0.674
> Iteration 30: 0.975
> Iteration 40: 1.254
> Iteration 50: 1.544
> Iteration 60: 1.802
> Iteration 70: 2.147
> Iteration 80: 2.469
> Iteration 90: 2.715
> Iteration 99: 2.962
> 
> That's a 35x increase between the first and last iteration, when it should be doing the same thing each time!
> 
> Without caching, the nubmers are
> Iteration 0: 0.642
> Iteration 10: 0.516
> Iteration 20: 0.823
> Iteration 30: 1.17
> Iteration 40: 1.514
> Iteration 50: 1.655
> Iteration 60: 1.992
> Iteration 70: 2.177
> Iteration 80: 2.472
> Iteration 90: 2.814
> Iteration 99: 3.018
> 
> slightly slower - but not significantly.
> 
> Does anyone know, if the caching is working, why is iteration 100 slower than iteration 1?  And why is caching making so little difference?
> 
> 
> Thanks,
>             -Nathan Kronenfeld
> 
> -- 
> Nathan Kronenfeld
> Senior Visualization Developer
> Oculus Info Inc
> 2 Berkeley Street, Suite 600,
> Toronto, Ontario M5A 4J5
> Phone:  +1-416-203-3003 x 238
> Email:  nkronenfeld@oculusinfo.com