You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by "assaf.mendelson" <as...@rsa.com> on 2016/12/26 10:56:32 UTC

Shuffle intermidiate results not being cached

Hi,

Sorry to be bothering everyone on the holidays but I have found what may be a bug.

I am doing a "manual" streaming (see http://stackoverflow.com/questions/41266956/apache-spark-streaming-performance for the specific code) where I essentially read an additional dataframe each time from file, union it with previous dataframes to create a "window" and then do double aggregation on the result.
Having looked at the documentation (https://spark.apache.org/docs/latest/programming-guide.html#which-storage-level-to-choose right above the headline) I expected spark to automatically cache the partial aggregation for each dataframe read and then continue with the aggregations from there. Instead it seems it reads each dataframe from file all over again.
Is this a bug? Am I doing something wrong?

Thanks.
                Assaf.




--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

RE: Shuffle intermidiate results not being cached

Posted by Liang-Chi Hsieh <vi...@gmail.com>.
  var totalTime: Long = 0
  var allDF: DataFrame = null
  for {
    x <- dataframes
  } {
    val timeLen = time {
      allDF = if (allDF == null) x else {
        allDF.union(x).groupBy("cat1", "cat2").agg(sum($"v").alias("v"))
      }
    }
    println(s"Took $timeLen miliseconds")
    totalTime += timeLen
  }
  val timeLen2 = time {
    val grouped2 = allDF.groupBy("cat1").agg(sum($"v"), count($"cat2"))
    grouped2.show()
  }
  totalTime += timeLen2
  println(s"Overall time was $totalTime miliseconds")
} 



Liang-Chi Hsieh wrote
> The shuffle data can be reused only if you use the same RDD. When you
> union x1's RDD and x2's RDD in first iteration, and union x1's RDD and
> x2's RDD and x3's RDD in 2nd iteration, how do you think they are the same
> RDD?
> 
> I just use the previous example code to show that you should not recompute
> all data since the beginning of stream. Usually, a streaming job computes
> a summary of collected data in a window. If you want to compute all the
> data, you should use batch instead of streaming. In other words, if you
> run a long-running streaming job, would you like to recompute all data
> every few seconds after one year?
> 
> BTW, you don't need to compute:
> 
>       val grouped2 = allDF.groupBy("cat1").agg(sum($"v"), count($"cat2"))
> 
> for each iteration. This should be run after the loop if you want to
> compare it with batch. And you don't need to run two aggregation in the
> loop for allDF.
> 
>   var totalTime: Long = 0
>   var allDF: DataFrame = null
>   for {
>     x <- dataframes
>   } {
>     val timeLen = time {
>       allDF = if (allDF == null) x else {
>         allDF.union(x).groupBy("cat1", "cat2").agg(sum($"v").alias("v"))
>       }
>     }
>     val timeLen2 = time {
>       val grouped2 = allDF.groupBy("cat1").agg(sum($"v"), count($"cat2"))
>       grouped2.show()
>     }
>     totalTime += timeLen + timeLen2
>     println(s"Took $timeLen miliseconds")
>   }
>   println(s"Overall time was $totalTime miliseconds")
> }
> 
> Of course, this may not work for all aggregations. I just show that you do
> redundant work in this version when comparing to your batch code. For
> other aggregations, you may need other design to do similar job.
> 
> assaf.mendelson wrote
>> I understand the actual dataframe is different, but the underlying
>> partitions are not (hence the importance of mark's response). The code
>> you suggested would not work as allDF and x would have different schema's
>> (x is the original and allDF becomes the grouped).
>> I can do something like this:
>>   var totalTime: Long = 0
>>   var allDF: DataFrame = null
>>   for {
>>     x <- dataframes
>>   } {
>>     val timeLen = time {
>>       val grouped = x.groupBy("cat1",
>> "cat2").agg(sum($"valToAdd").alias("v"))
>>       allDF = if (allDF == null) grouped else {
>>         allDF.union(grouped).groupBy("cat1",
>> "cat2").agg(sum($"v").alias("v"))
>>       }
>>       val grouped2 = allDF.groupBy("cat1").agg(sum($"v"), count($"cat2"))
>>       grouped2.show()
>>     }
>>     totalTime += timeLen
>>     println(s"Took $timeLen miliseconds")
>>   }
>>   println(s"Overall time was $totalTime miliseconds")
>> }
>> 
>> and this indeed improves performance (I actually had a couple more tries)
>> but:
>> 
>> 1.       This still gives crappy performance (for 167 slices I get a
>> throughput which is 10 times lower than batch after doing some tuning
>> including caching and coalescing)
>> 
>> 2.       This works because the aggregation here is sum and we don't
>> forget. For more general aggregations we would have to join them together
>> (can't do it for count distinct for example) and we will need to "forget"
>> frames when moving out of the window (we can subtract a sum but not a
>> max).
>> 
>> The best solution I found so far (performance wise) was to write a custom
>> UDAF which does the window internally. This was still 8 times lower
>> throughput than batch and required a lot of coding and is not a general
>> solution.
>> 
>> I am looking for an approach to improve the performance even more
>> (preferably to either be on par with batch or a relatively low factor
>> which remains constant when the number of slices rise) and including the
>> option to "forget" frames.
>> 
>> Assaf.
>> 
>> 
>> 
>> 
>> From: Liang-Chi Hsieh [via Apache Spark Developers List] [mailto:

>> ml-node+s1001551n20371h90@.nabble

>> ]
>> Sent: Wednesday, December 28, 2016 3:59 AM
>> To: Mendelson, Assaf
>> Subject: RE: Shuffle intermidiate results not being cached
>> 
>> 
>> Hi,
>> 
>> Every iteration the data you run aggregation on it is different. As I
>> showed in previous reply:
>> 
>> 1st iteration: aggregation(x1 union x2)
>> 2nd iteration: aggregation(x3 union (x1 union x2))
>> 3rd iteration: aggregation(x4 union(x3 union (x1 union x2)))
>> 
>> In 1st you run aggregation on the data of x1 and x2. In 2nd the data is
>> x1, x2 and x3. Even you work on the same RDD, you won't see reuse of the
>> shuffle data because the shuffle data is different.
>> 
>> In your second example, I think the way to reduce the computation is
>> like:
>> 
>> var totalTime: Long = 0
>> var allDF: org.apache.spark.sql.DataFrame = null
>> for {
>>   x <- dataframes
>> } {
>>   val timeLen = time {
>>     allDF = if (allDF == null) x else allDF.union(x) // Union previous
>> aggregation summary with new dataframe in this window
>>     val grouped = allDF.groupBy("cat1",
>> "cat2").agg(sum($"valToAdd").alias("v"))
>>     val grouped2 = grouped.groupBy("cat1").agg(sum($"v"), count($"cat2"))
>>     grouped2.show()
>>     allDF = grouped  // Replace the union of data with aggregated summary
>>   }
>>   totalTime += timeLen
>>   println(s"Took $timeLen miliseconds")
>> }
>> println(s"Total time was $totalTime miliseconds")
>> 
>> You don't need to recompute the aggregation of previous dataframes in
>> each iteration. You just need to get the summary and union it with new
>> dataframe to compute the newer aggregation summary in next iteration. It
>> is more similar to streaming case, I don't think you can/should recompute
>> all the data since the beginning of a stream.
>> 
>> assaf.mendelson wrote
>> The reason I thought some operations would be reused is the fact that
>> spark automatically caches shuffle data which means the partial
>> aggregation for pervious dataframes would be saved. Unfortunatly, as Mark
>> Hamstra explained this is not the case because this is considered a new
>> RDD and therefore the previous data is lost.
>> 
>> I am still wondering if there is any way to do high performance streaming
>> of SQL. Basically this is not far from what DStream would do assuming we
>> convert a sliding window (e.g. 24 hours every 15 minutes) as we would be
>> doing a foreachRDD which would do the joining behind the scenes.
>> The problem is that any attempt to do a streaming like this results in
>> performance which is hundreds of times slower than batch.
>> Is there a correct way to do such an aggregation on streaming data (using
>> dataframes rather than RDD operations).
>> Assaf.
>> 
>> 
>> 
>> From: Liang-Chi Hsieh [via Apache Spark Developers List] [mailto:[hidden
>> email]&lt;/user/SendEmail.jtp?type=node&amp;node=20371&amp;i=0&gt;]
>> Sent: Monday, December 26, 2016 5:42 PM
>> To: Mendelson, Assaf
>> Subject: Re: Shuffle intermidiate results not being cached
>> 
>> 
>> Hi,
>> 
>> Let me quote your example codes:
>> 
>> var totalTime: Long = 0
>> var allDF: org.apache.spark.sql.DataFrame = null
>> for {
>>   x <- dataframes
>> } {
>>   val timeLen = time {
>>     allDF = if (allDF == null) x else allDF.union(x)
>>     val grouped = allDF.groupBy("cat1",
>> "cat2").agg(sum($"valToAdd").alias("v"))
>>     val grouped2 = grouped.groupBy("cat1").agg(sum($"v"), count($"cat2"))
>>     grouped2.show()
>>   }
>>   totalTime += timeLen
>>   println(s"Took $timeLen miliseconds")
>> }
>> println(s"Total time was $totalTime miliseconds")
>> 
>> 
>> Basically what you do is to union some dataframes for each iteration, and
>> do aggregation on this union data. I don't see any reused operations.
>> 
>> 1st iteration: aggregation(x1 union x2)
>> 2nd iteration: aggregation(x3 union (x1 union x2))
>> 3rd iteration: aggregation(x4 union(x3 union (x1 union x2)))
>> ...
>> 
>> Your first example just does two aggregation operations. But your second
>> example like above does this aggregation operations for each iteration.
>> So the time of second example grows as the iteration increases.
>> 
>> Liang-Chi Hsieh | @viirya
>> Spark Technology Center
>> http://www.spark.tc/
>> 
>> ________________________________
>> If you reply to this email, your message will be added to the discussion
>> below:
>> http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358p20361.html
>> To start a new topic under Apache Spark Developers List, email [hidden
>> email]&lt;/user/SendEmail.jtp?type=node&amp;node=20371&amp;i=1&gt;&lt;mailto:[hidden
>> email]&lt;/user/SendEmail.jtp?type=node&amp;node=20371&amp;i=2&gt;>
>> To unsubscribe from Apache Spark Developers List, click
>> here&lt;http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&amp;node=1&amp;code=YXNzYWYubWVuZGVsc29uQHJzYS5jb218MXwtMTI4OTkxNTg1Mg==&gt;&lt;http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&amp;node=1&amp;code=YXNzYWYubWVuZGVsc29uQHJzYS5jb218MXwtMTI4OTkxNTg1Mg==%3e&gt;.
>> NAML&lt;http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&amp;id=instant_html%21nabble%3Aemail.naml&amp;base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&amp;breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml&gt;&lt;http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&amp;id=instant_html%21nabble%3Aemail.naml&amp;base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&amp;breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml%3e&gt;
>> Liang-Chi Hsieh | @viirya
>> Spark Technology Center
>> http://www.spark.tc/
>> 
>> ________________________________
>> If you reply to this email, your message will be added to the discussion
>> below:
>> http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358p20371.html
>> To start a new topic under Apache Spark Developers List, email 

>> ml-node+s1001551n1h20@.nabble

>> &lt;mailto:

>> ml-node+s1001551n1h20@.nabble

>> &gt;
>> To unsubscribe from Apache Spark Developers List, click
>> here&lt;http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&amp;node=1&amp;code=YXNzYWYubWVuZGVsc29uQHJzYS5jb218MXwtMTI4OTkxNTg1Mg==&gt;.
>> NAML&lt;http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&amp;id=instant_html%21nabble%3Aemail.naml&amp;base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&amp;breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml&gt;





-----
Liang-Chi Hsieh | @viirya 
Spark Technology Center 
http://www.spark.tc/ 
--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358p20386.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


RE: Shuffle intermidiate results not being cached

Posted by Liang-Chi Hsieh <vi...@gmail.com>.
The shuffle data can be reused only if you use the same RDD. When you union
x1's RDD and x2's RDD in first iteration, and union x1's RDD and x2's RDD
and x3's RDD in 2nd iteration, how do you think they are the same RDD?

I just use the previous example code to show that you should not recompute
all data since the beginning of stream. Usually, a streaming job computes a
summary of collected data in a window. If you want to compute all the data,
you should use batch instead of streaming. In other words, if you run a
long-running streaming job, would you like to recompute all data every few
seconds after one year?

BTW, you don't need to compute:

      val grouped2 = allDF.groupBy("cat1").agg(sum($"v"), count($"cat2"))

for each iteration. This should be run after the loop if you want to compare
it with batch. And you don't need to run two aggregation in the loop for
allDF.

  var totalTime: Long = 0
  var allDF: DataFrame = null
  for {
    x <- dataframes
  } {
    val timeLen = time {
      allDF = if (allDF == null) x else {
        allDF.union(x).groupBy("cat1", "cat2").agg(sum($"v").alias("v"))
      }
    }
    val timeLen2 = time {
      val grouped2 = allDF.groupBy("cat1").agg(sum($"v"), count($"cat2"))
      grouped2.show()
    }
    totalTime += timeLen + timeLen2
    println(s"Took $timeLen miliseconds")
  }
  println(s"Overall time was $totalTime miliseconds")
}

Of course, this may not work for all aggregations. I just show that you do
redundant work in this version when comparing to your batch code. For other
aggregations, you may need other design to do similar job.



assaf.mendelson wrote
> I understand the actual dataframe is different, but the underlying
> partitions are not (hence the importance of mark's response). The code you
> suggested would not work as allDF and x would have different schema's (x
> is the original and allDF becomes the grouped).
> I can do something like this:
>   var totalTime: Long = 0
>   var allDF: DataFrame = null
>   for {
>     x <- dataframes
>   } {
>     val timeLen = time {
>       val grouped = x.groupBy("cat1",
> "cat2").agg(sum($"valToAdd").alias("v"))
>       allDF = if (allDF == null) grouped else {
>         allDF.union(grouped).groupBy("cat1",
> "cat2").agg(sum($"v").alias("v"))
>       }
>       val grouped2 = allDF.groupBy("cat1").agg(sum($"v"), count($"cat2"))
>       grouped2.show()
>     }
>     totalTime += timeLen
>     println(s"Took $timeLen miliseconds")
>   }
>   println(s"Overall time was $totalTime miliseconds")
> }
> 
> and this indeed improves performance (I actually had a couple more tries)
> but:
> 
> 1.       This still gives crappy performance (for 167 slices I get a
> throughput which is 10 times lower than batch after doing some tuning
> including caching and coalescing)
> 
> 2.       This works because the aggregation here is sum and we don't
> forget. For more general aggregations we would have to join them together
> (can't do it for count distinct for example) and we will need to "forget"
> frames when moving out of the window (we can subtract a sum but not a
> max).
> 
> The best solution I found so far (performance wise) was to write a custom
> UDAF which does the window internally. This was still 8 times lower
> throughput than batch and required a lot of coding and is not a general
> solution.
> 
> I am looking for an approach to improve the performance even more
> (preferably to either be on par with batch or a relatively low factor
> which remains constant when the number of slices rise) and including the
> option to "forget" frames.
> 
> Assaf.
> 
> 
> 
> 
> From: Liang-Chi Hsieh [via Apache Spark Developers List] [mailto:

> ml-node+s1001551n20371h90@.nabble

> ]
> Sent: Wednesday, December 28, 2016 3:59 AM
> To: Mendelson, Assaf
> Subject: RE: Shuffle intermidiate results not being cached
> 
> 
> Hi,
> 
> Every iteration the data you run aggregation on it is different. As I
> showed in previous reply:
> 
> 1st iteration: aggregation(x1 union x2)
> 2nd iteration: aggregation(x3 union (x1 union x2))
> 3rd iteration: aggregation(x4 union(x3 union (x1 union x2)))
> 
> In 1st you run aggregation on the data of x1 and x2. In 2nd the data is
> x1, x2 and x3. Even you work on the same RDD, you won't see reuse of the
> shuffle data because the shuffle data is different.
> 
> In your second example, I think the way to reduce the computation is like:
> 
> var totalTime: Long = 0
> var allDF: org.apache.spark.sql.DataFrame = null
> for {
>   x <- dataframes
> } {
>   val timeLen = time {
>     allDF = if (allDF == null) x else allDF.union(x) // Union previous
> aggregation summary with new dataframe in this window
>     val grouped = allDF.groupBy("cat1",
> "cat2").agg(sum($"valToAdd").alias("v"))
>     val grouped2 = grouped.groupBy("cat1").agg(sum($"v"), count($"cat2"))
>     grouped2.show()
>     allDF = grouped  // Replace the union of data with aggregated summary
>   }
>   totalTime += timeLen
>   println(s"Took $timeLen miliseconds")
> }
> println(s"Total time was $totalTime miliseconds")
> 
> You don't need to recompute the aggregation of previous dataframes in each
> iteration. You just need to get the summary and union it with new
> dataframe to compute the newer aggregation summary in next iteration. It
> is more similar to streaming case, I don't think you can/should recompute
> all the data since the beginning of a stream.
> 
> assaf.mendelson wrote
> The reason I thought some operations would be reused is the fact that
> spark automatically caches shuffle data which means the partial
> aggregation for pervious dataframes would be saved. Unfortunatly, as Mark
> Hamstra explained this is not the case because this is considered a new
> RDD and therefore the previous data is lost.
> 
> I am still wondering if there is any way to do high performance streaming
> of SQL. Basically this is not far from what DStream would do assuming we
> convert a sliding window (e.g. 24 hours every 15 minutes) as we would be
> doing a foreachRDD which would do the joining behind the scenes.
> The problem is that any attempt to do a streaming like this results in
> performance which is hundreds of times slower than batch.
> Is there a correct way to do such an aggregation on streaming data (using
> dataframes rather than RDD operations).
> Assaf.
> 
> 
> 
> From: Liang-Chi Hsieh [via Apache Spark Developers List] [mailto:[hidden
> email]&lt;/user/SendEmail.jtp?type=node&amp;node=20371&amp;i=0&gt;]
> Sent: Monday, December 26, 2016 5:42 PM
> To: Mendelson, Assaf
> Subject: Re: Shuffle intermidiate results not being cached
> 
> 
> Hi,
> 
> Let me quote your example codes:
> 
> var totalTime: Long = 0
> var allDF: org.apache.spark.sql.DataFrame = null
> for {
>   x <- dataframes
> } {
>   val timeLen = time {
>     allDF = if (allDF == null) x else allDF.union(x)
>     val grouped = allDF.groupBy("cat1",
> "cat2").agg(sum($"valToAdd").alias("v"))
>     val grouped2 = grouped.groupBy("cat1").agg(sum($"v"), count($"cat2"))
>     grouped2.show()
>   }
>   totalTime += timeLen
>   println(s"Took $timeLen miliseconds")
> }
> println(s"Total time was $totalTime miliseconds")
> 
> 
> Basically what you do is to union some dataframes for each iteration, and
> do aggregation on this union data. I don't see any reused operations.
> 
> 1st iteration: aggregation(x1 union x2)
> 2nd iteration: aggregation(x3 union (x1 union x2))
> 3rd iteration: aggregation(x4 union(x3 union (x1 union x2)))
> ...
> 
> Your first example just does two aggregation operations. But your second
> example like above does this aggregation operations for each iteration. So
> the time of second example grows as the iteration increases.
> 
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/
> 
> ________________________________
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358p20361.html
> To start a new topic under Apache Spark Developers List, email [hidden
> email]&lt;/user/SendEmail.jtp?type=node&amp;node=20371&amp;i=1&gt;&lt;mailto:[hidden
> email]&lt;/user/SendEmail.jtp?type=node&amp;node=20371&amp;i=2&gt;>
> To unsubscribe from Apache Spark Developers List, click
> here&lt;http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&amp;node=1&amp;code=YXNzYWYubWVuZGVsc29uQHJzYS5jb218MXwtMTI4OTkxNTg1Mg==&gt;&lt;http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&amp;node=1&amp;code=YXNzYWYubWVuZGVsc29uQHJzYS5jb218MXwtMTI4OTkxNTg1Mg==%3e&gt;.
> NAML&lt;http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&amp;id=instant_html%21nabble%3Aemail.naml&amp;base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&amp;breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml&gt;&lt;http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&amp;id=instant_html%21nabble%3Aemail.naml&amp;base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&amp;breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml%3e&gt;
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/
> 
> ________________________________
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358p20371.html
> To start a new topic under Apache Spark Developers List, email 

> ml-node+s1001551n1h20@.nabble

> &lt;mailto:

> ml-node+s1001551n1h20@.nabble

> &gt;
> To unsubscribe from Apache Spark Developers List, click
> here&lt;http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&amp;node=1&amp;code=YXNzYWYubWVuZGVsc29uQHJzYS5jb218MXwtMTI4OTkxNTg1Mg==&gt;.
> NAML&lt;http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&amp;id=instant_html%21nabble%3Aemail.naml&amp;base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&amp;breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml&gt;





-----
Liang-Chi Hsieh | @viirya 
Spark Technology Center 
http://www.spark.tc/ 
--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358p20385.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


RE: Shuffle intermidiate results not being cached

Posted by "assaf.mendelson" <as...@rsa.com>.
I understand the actual dataframe is different, but the underlying partitions are not (hence the importance of mark's response). The code you suggested would not work as allDF and x would have different schema's (x is the original and allDF becomes the grouped).
I can do something like this:
  var totalTime: Long = 0
  var allDF: DataFrame = null
  for {
    x <- dataframes
  } {
    val timeLen = time {
      val grouped = x.groupBy("cat1", "cat2").agg(sum($"valToAdd").alias("v"))
      allDF = if (allDF == null) grouped else {
        allDF.union(grouped).groupBy("cat1", "cat2").agg(sum($"v").alias("v"))
      }
      val grouped2 = allDF.groupBy("cat1").agg(sum($"v"), count($"cat2"))
      grouped2.show()
    }
    totalTime += timeLen
    println(s"Took $timeLen miliseconds")
  }
  println(s"Overall time was $totalTime miliseconds")
}

and this indeed improves performance (I actually had a couple more tries) but:

1.       This still gives crappy performance (for 167 slices I get a throughput which is 10 times lower than batch after doing some tuning including caching and coalescing)

2.       This works because the aggregation here is sum and we don't forget. For more general aggregations we would have to join them together (can't do it for count distinct for example) and we will need to "forget" frames when moving out of the window (we can subtract a sum but not a max).

The best solution I found so far (performance wise) was to write a custom UDAF which does the window internally. This was still 8 times lower throughput than batch and required a lot of coding and is not a general solution.

I am looking for an approach to improve the performance even more (preferably to either be on par with batch or a relatively low factor which remains constant when the number of slices rise) and including the option to "forget" frames.

Assaf.




From: Liang-Chi Hsieh [via Apache Spark Developers List] [mailto:ml-node+s1001551n20371h90@n3.nabble.com]
Sent: Wednesday, December 28, 2016 3:59 AM
To: Mendelson, Assaf
Subject: RE: Shuffle intermidiate results not being cached


Hi,

Every iteration the data you run aggregation on it is different. As I showed in previous reply:

1st iteration: aggregation(x1 union x2)
2nd iteration: aggregation(x3 union (x1 union x2))
3rd iteration: aggregation(x4 union(x3 union (x1 union x2)))

In 1st you run aggregation on the data of x1 and x2. In 2nd the data is x1, x2 and x3. Even you work on the same RDD, you won't see reuse of the shuffle data because the shuffle data is different.

In your second example, I think the way to reduce the computation is like:

var totalTime: Long = 0
var allDF: org.apache.spark.sql.DataFrame = null
for {
  x <- dataframes
} {
  val timeLen = time {
    allDF = if (allDF == null) x else allDF.union(x) // Union previous aggregation summary with new dataframe in this window
    val grouped = allDF.groupBy("cat1", "cat2").agg(sum($"valToAdd").alias("v"))
    val grouped2 = grouped.groupBy("cat1").agg(sum($"v"), count($"cat2"))
    grouped2.show()
    allDF = grouped  // Replace the union of data with aggregated summary
  }
  totalTime += timeLen
  println(s"Took $timeLen miliseconds")
}
println(s"Total time was $totalTime miliseconds")

You don't need to recompute the aggregation of previous dataframes in each iteration. You just need to get the summary and union it with new dataframe to compute the newer aggregation summary in next iteration. It is more similar to streaming case, I don't think you can/should recompute all the data since the beginning of a stream.

assaf.mendelson wrote
The reason I thought some operations would be reused is the fact that spark automatically caches shuffle data which means the partial aggregation for pervious dataframes would be saved. Unfortunatly, as Mark Hamstra explained this is not the case because this is considered a new RDD and therefore the previous data is lost.

I am still wondering if there is any way to do high performance streaming of SQL. Basically this is not far from what DStream would do assuming we convert a sliding window (e.g. 24 hours every 15 minutes) as we would be doing a foreachRDD which would do the joining behind the scenes.
The problem is that any attempt to do a streaming like this results in performance which is hundreds of times slower than batch.
Is there a correct way to do such an aggregation on streaming data (using dataframes rather than RDD operations).
Assaf.



From: Liang-Chi Hsieh [via Apache Spark Developers List] [mailto:[hidden email]</user/SendEmail.jtp?type=node&node=20371&i=0>]
Sent: Monday, December 26, 2016 5:42 PM
To: Mendelson, Assaf
Subject: Re: Shuffle intermidiate results not being cached


Hi,

Let me quote your example codes:

var totalTime: Long = 0
var allDF: org.apache.spark.sql.DataFrame = null
for {
  x <- dataframes
} {
  val timeLen = time {
    allDF = if (allDF == null) x else allDF.union(x)
    val grouped = allDF.groupBy("cat1", "cat2").agg(sum($"valToAdd").alias("v"))
    val grouped2 = grouped.groupBy("cat1").agg(sum($"v"), count($"cat2"))
    grouped2.show()
  }
  totalTime += timeLen
  println(s"Took $timeLen miliseconds")
}
println(s"Total time was $totalTime miliseconds")


Basically what you do is to union some dataframes for each iteration, and do aggregation on this union data. I don't see any reused operations.

1st iteration: aggregation(x1 union x2)
2nd iteration: aggregation(x3 union (x1 union x2))
3rd iteration: aggregation(x4 union(x3 union (x1 union x2)))
...

Your first example just does two aggregation operations. But your second example like above does this aggregation operations for each iteration. So the time of second example grows as the iteration increases.

Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/

________________________________
If you reply to this email, your message will be added to the discussion below:
http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358p20361.html
To start a new topic under Apache Spark Developers List, email [hidden email]</user/SendEmail.jtp?type=node&node=20371&i=1><mailto:[hidden email]</user/SendEmail.jtp?type=node&node=20371&i=2>>
To unsubscribe from Apache Spark Developers List, click here<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=YXNzYWYubWVuZGVsc29uQHJzYS5jb218MXwtMTI4OTkxNTg1Mg==><http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=YXNzYWYubWVuZGVsc29uQHJzYS5jb218MXwtMTI4OTkxNTg1Mg==%3e>.
NAML<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml><http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml%3e>
Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/

________________________________
If you reply to this email, your message will be added to the discussion below:
http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358p20371.html
To start a new topic under Apache Spark Developers List, email ml-node+s1001551n1h20@n3.nabble.com<ma...@n3.nabble.com>
To unsubscribe from Apache Spark Developers List, click here<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=YXNzYWYubWVuZGVsc29uQHJzYS5jb218MXwtMTI4OTkxNTg1Mg==>.
NAML<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>




--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358p20377.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

RE: Shuffle intermidiate results not being cached

Posted by Liang-Chi Hsieh <vi...@gmail.com>.
Hi,

Every iteration the data you run aggregation on it is different. As I showed
in previous reply:

1st iteration: aggregation(x1 union x2)
2nd iteration: aggregation(x3 union (x1 union x2))
3rd iteration: aggregation(x4 union(x3 union (x1 union x2)))

In 1st you run aggregation on the data of x1 and x2. In 2nd the data is x1,
x2 and x3. Even you work on the same RDD, you won't see reuse of the shuffle
data because the shuffle data is different.

In your second example, I think the way to reduce the computation is like:

var totalTime: Long = 0
var allDF: org.apache.spark.sql.DataFrame = null
for {
  x <- dataframes
} {
  val timeLen = time {
    allDF = if (allDF == null) x else allDF.union(x) // Union previous
aggregation summary with new dataframe in this window
    val grouped = allDF.groupBy("cat1",
"cat2").agg(sum($"valToAdd").alias("v"))
    val grouped2 = grouped.groupBy("cat1").agg(sum($"v"), count($"cat2"))
    grouped2.show()
    allDF = grouped  // Replace the union of data with aggregated summary
  }
  totalTime += timeLen
  println(s"Took $timeLen miliseconds")
}
println(s"Total time was $totalTime miliseconds")

You don't need to recompute the aggregation of previous dataframes in each
iteration. You just need to get the summary and union it with new dataframe
to compute the newer aggregation summary in next iteration. It is more
similar to streaming case, I don't think you can/should recompute all the
data since the beginning of a stream.



assaf.mendelson wrote
> The reason I thought some operations would be reused is the fact that
> spark automatically caches shuffle data which means the partial
> aggregation for pervious dataframes would be saved. Unfortunatly, as Mark
> Hamstra explained this is not the case because this is considered a new
> RDD and therefore the previous data is lost.
> 
> I am still wondering if there is any way to do high performance streaming
> of SQL. Basically this is not far from what DStream would do assuming we
> convert a sliding window (e.g. 24 hours every 15 minutes) as we would be
> doing a foreachRDD which would do the joining behind the scenes.
> The problem is that any attempt to do a streaming like this results in
> performance which is hundreds of times slower than batch.
> Is there a correct way to do such an aggregation on streaming data (using
> dataframes rather than RDD operations).
> Assaf.
> 
> 
> 
> From: Liang-Chi Hsieh [via Apache Spark Developers List] [mailto:

> ml-node+s1001551n20361h80@.nabble

> ]
> Sent: Monday, December 26, 2016 5:42 PM
> To: Mendelson, Assaf
> Subject: Re: Shuffle intermidiate results not being cached
> 
> 
> Hi,
> 
> Let me quote your example codes:
> 
> var totalTime: Long = 0
> var allDF: org.apache.spark.sql.DataFrame = null
> for {
>   x <- dataframes
> } {
>   val timeLen = time {
>     allDF = if (allDF == null) x else allDF.union(x)
>     val grouped = allDF.groupBy("cat1",
> "cat2").agg(sum($"valToAdd").alias("v"))
>     val grouped2 = grouped.groupBy("cat1").agg(sum($"v"), count($"cat2"))
>     grouped2.show()
>   }
>   totalTime += timeLen
>   println(s"Took $timeLen miliseconds")
> }
> println(s"Total time was $totalTime miliseconds")
> 
> 
> Basically what you do is to union some dataframes for each iteration, and
> do aggregation on this union data. I don't see any reused operations.
> 
> 1st iteration: aggregation(x1 union x2)
> 2nd iteration: aggregation(x3 union (x1 union x2))
> 3rd iteration: aggregation(x4 union(x3 union (x1 union x2)))
> ...
> 
> Your first example just does two aggregation operations. But your second
> example like above does this aggregation operations for each iteration. So
> the time of second example grows as the iteration increases.
> 
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/
> 
> ________________________________
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358p20361.html
> To start a new topic under Apache Spark Developers List, email 

> ml-node+s1001551n1h20@.nabble

> &lt;mailto:

> ml-node+s1001551n1h20@.nabble

> &gt;
> To unsubscribe from Apache Spark Developers List, click
> here&lt;http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&amp;node=1&amp;code=YXNzYWYubWVuZGVsc29uQHJzYS5jb218MXwtMTI4OTkxNTg1Mg==&gt;.
> NAML&lt;http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&amp;id=instant_html%21nabble%3Aemail.naml&amp;base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&amp;breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml&gt;





-----
Liang-Chi Hsieh | @viirya 
Spark Technology Center 
http://www.spark.tc/ 
--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358p20371.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


RE: Shuffle intermidiate results not being cached

Posted by "assaf.mendelson" <as...@rsa.com>.
The reason I thought some operations would be reused is the fact that spark automatically caches shuffle data which means the partial aggregation for pervious dataframes would be saved. Unfortunatly, as Mark Hamstra explained this is not the case because this is considered a new RDD and therefore the previous data is lost.

I am still wondering if there is any way to do high performance streaming of SQL. Basically this is not far from what DStream would do assuming we convert a sliding window (e.g. 24 hours every 15 minutes) as we would be doing a foreachRDD which would do the joining behind the scenes.
The problem is that any attempt to do a streaming like this results in performance which is hundreds of times slower than batch.
Is there a correct way to do such an aggregation on streaming data (using dataframes rather than RDD operations).
Assaf.



From: Liang-Chi Hsieh [via Apache Spark Developers List] [mailto:ml-node+s1001551n20361h80@n3.nabble.com]
Sent: Monday, December 26, 2016 5:42 PM
To: Mendelson, Assaf
Subject: Re: Shuffle intermidiate results not being cached


Hi,

Let me quote your example codes:

var totalTime: Long = 0
var allDF: org.apache.spark.sql.DataFrame = null
for {
  x <- dataframes
} {
  val timeLen = time {
    allDF = if (allDF == null) x else allDF.union(x)
    val grouped = allDF.groupBy("cat1", "cat2").agg(sum($"valToAdd").alias("v"))
    val grouped2 = grouped.groupBy("cat1").agg(sum($"v"), count($"cat2"))
    grouped2.show()
  }
  totalTime += timeLen
  println(s"Took $timeLen miliseconds")
}
println(s"Total time was $totalTime miliseconds")


Basically what you do is to union some dataframes for each iteration, and do aggregation on this union data. I don't see any reused operations.

1st iteration: aggregation(x1 union x2)
2nd iteration: aggregation(x3 union (x1 union x2))
3rd iteration: aggregation(x4 union(x3 union (x1 union x2)))
...

Your first example just does two aggregation operations. But your second example like above does this aggregation operations for each iteration. So the time of second example grows as the iteration increases.

Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/

________________________________
If you reply to this email, your message will be added to the discussion below:
http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358p20361.html
To start a new topic under Apache Spark Developers List, email ml-node+s1001551n1h20@n3.nabble.com<ma...@n3.nabble.com>
To unsubscribe from Apache Spark Developers List, click here<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=YXNzYWYubWVuZGVsc29uQHJzYS5jb218MXwtMTI4OTkxNTg1Mg==>.
NAML<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>




--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358p20368.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

Re: Shuffle intermidiate results not being cached

Posted by Liang-Chi Hsieh <vi...@gmail.com>.
Hi,

Let me quote your example codes:

var totalTime: Long = 0
var allDF: org.apache.spark.sql.DataFrame = null
for {
  x <- dataframes
} {
  val timeLen = time {
    allDF = if (allDF == null) x else allDF.union(x)
    val grouped = allDF.groupBy("cat1",
"cat2").agg(sum($"valToAdd").alias("v"))
    val grouped2 = grouped.groupBy("cat1").agg(sum($"v"), count($"cat2"))
    grouped2.show()
  }
  totalTime += timeLen
  println(s"Took $timeLen miliseconds")
}
println(s"Total time was $totalTime miliseconds")


Basically what you do is to union some dataframes for each iteration, and do
aggregation on this union data. I don't see any reused operations.

1st iteration: aggregation(x1 union x2)
2nd iteration: aggregation(x3 union (x1 union x2))
3rd iteration: aggregation(x4 union(x3 union (x1 union x2)))
...

Your first example just does two aggregation operations. But your second
example like above does this aggregation operations for each iteration. So
the time of second example grows as the iteration increases.
 




-----
Liang-Chi Hsieh | @viirya 
Spark Technology Center 
http://www.spark.tc/ 
--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358p20361.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


Re: Shuffle intermidiate results not being cached

Posted by Mark Hamstra <ma...@clearstorydata.com>.
Shuffle results are only reused if you are reusing the exact same RDD.  If
you are working with Dataframes that you have not explicitly cached, then
they are going to be producing new RDDs within their physical plan creation
and evaluation, so you won't get implicit shuffle reuse.  This is what
https://issues.apache.org/jira/browse/SPARK-11838 is about.

On Mon, Dec 26, 2016 at 5:56 AM, assaf.mendelson <as...@rsa.com>
wrote:

> Hi,
>
>
>
> Sorry to be bothering everyone on the holidays but I have found what may
> be a bug.
>
>
>
> I am doing a “manual” streaming (see http://stackoverflow.com/
> questions/41266956/apache-spark-streaming-performance for the specific
> code) where I essentially read an additional dataframe each time from file,
> union it with previous dataframes to create a “window” and then do double
> aggregation on the result.
>
> Having looked at the documentation (https://spark.apache.org/
> docs/latest/programming-guide.html#which-storage-level-to-choose right
> above the headline) I expected spark to automatically cache the partial
> aggregation for each dataframe read and then continue with the aggregations
> from there. Instead it seems it reads each dataframe from file all over
> again.
>
> Is this a bug? Am I doing something wrong?
>
>
>
> Thanks.
>
>                 Assaf.
>
> ------------------------------
> View this message in context: Shuffle intermidiate results not being
> cached
> <http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358.html>
> Sent from the Apache Spark Developers List mailing list archive
> <http://apache-spark-developers-list.1001551.n3.nabble.com/> at
> Nabble.com.
>