You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jungtaek Lim (JIRA)" <ji...@apache.org> on 2018/07/09 05:19:00 UTC

[jira] [Comment Edited] (SPARK-24763) Remove redundant key data from value in streaming aggregation

    [ https://issues.apache.org/jira/browse/SPARK-24763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16536539#comment-16536539 ] 

Jungtaek Lim edited comment on SPARK-24763 at 7/9/18 5:18 AM:
--------------------------------------------------------------

> Spark version
 * 2.4.0-SNAPSHOT
 * commit: 79c66894296840cc4a5bf6c8718ecfd2b08bcca8 (latest master) + SPARK-24441 + SPARK-24717
 * POC commit: [https://github.com/HeartSaVioR/spark/commit/378ce2ae30116fba80421873ef04ed2ca113630e]

 

> App (query)

[https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming/blob/master/src/main/scala/com/hortonworks/spark/BenchmarkMovingAggregationsListenerToFs.scala]
{code:java}
val outDf = df
.withWatermark("timestamp", "10 seconds")
.selectExpr(
"timestamp", "mod(value, 100) as mod", "value",
createCaseExprStr(50, "mod(CAST(RANDN(0) * 1000 as INTEGER), 50)") + " as word")
.groupBy(
window($"timestamp", "1 minute", "10 seconds"),
$"mod", $"word")
.agg(max("value").as("max_value"), min("value").as("min_value"), avg("value").as("avg_value")){code}
 * key fields : window (start/end), mod (int), word (10 chars string)
 * value fields : max_value (int), min_value (int), avg_value (double)

 

> How to test

Ran above app three times per kinds of option (enable/disable).

Used AWS dedicated instance in Seoul region, c5.xlarge (4 core, 8 gb), 30g SSD (IOPS 100/3000)

Not using cluster: just ran in one JVM process to avoid any transfer latency.
{code:java}
--master local[*] --driver-memory 2g --executor-memory 8g --executor-cores 3{code}
 

> Test Result (analysis query and result table, respectively)

>> query (source)
{code:java}
val source0InfoPopulatedDf = jsonDf
.selectExpr("data.id", "data.runId", "data.batchId", "data.sources[0].numInputRows AS numInputRows", 
"data.sources[0].inputRowsPerSecond AS inputRowsPerSecond", 
"data.sources[0].processedRowsPerSecond AS processedRowsPerSecond",
"data.sources[0].startOffset AS startOffset", 
"data.sources[0].endOffset AS endOffset")
.distinct()
.where("batchId >= 50 and batchId <= 150")
.groupBy("id", "runId")
.agg("numInputRows" -> "avg", "inputRowsPerSecond" -> "avg",
"processedRowsPerSecond" -> "avg")
source0InfoPopulatedDf.show()
{code}
>> result
||mode||trial||avg(numInputRows)||avg(inputRowsPerSecond)||avg(processedRowsPerSecond)||
|disable|1|50000.0|10000.000110891095|22543.79742858393|
|disable|2|50000.0|10000.019900990894|22689.82215288191|
|disable|3|50000.0|9999.980265345746|22302.87036748844|
|enable|1|50000.0|10000.000110891096|22711.04445963631|
|enable|2|50000.0|10000.019893070104|22774.52502523933|
|enable|3|50000.0|10000.019908911689|22826.48595693046|

>> query (duration)
{code:java}
val durationInfoPopulatedDf = jsonDf
 .selectExpr("data.id", "data.runId", "data.batchId", "data.durationMs.*")
 .distinct()
 .where("batchId >= 50 and batchId <= 150")
 .groupBy("id", "runId")
 .agg("addBatch" -> "avg", "getBatch" -> "avg", "getOffset" -> "avg",
 "queryPlanning" -> "avg", "triggerExecution" -> "avg", "walCommit" -> "avg")
durationInfoPopulatedDf.show()
{code}
>> result
||mode||trial||avg(addBatch)||avg(getBatch)||avg(getOffset)||avg(queryPlanning)||avg(triggerExecution)||avg(walCommit)||
|disable|1|2145.58415841584|0.0693069306930693|null|55.23762376237624|2221.871287128713|12.762376237623762|
|disable|2|2131.00990099009|0.0495049504950495|null|55.49504950495049|2207.069306930693|12.405940594059405|
|disable|3|2172.99009900990|0.0495049504950495|null|55.97029702970297|2249.495049504950|12.445544554455445|
|enable|1|2130.77227722772|0.0594059405940594|null|55.31683168316832|2207.306930693069|12.871287128712872|
|enable|2|2126.14851485148|0.0396039603960396|null|54.98019801980198|2202.415841584158|12.891089108910892|
|enable|3|2120.19801980198|0.0495049504950495|null|54.82178217821782|2195.772277227723|12.643564356435643|

>> query (state information)
{code:java}
val state0InfoPopulatedDf = jsonDf
 .selectExpr("data.id", "data.runId", "data.batchId", 
 "data.stateOperators[0].numRowsTotal AS numRowsTotal", 
 "data.stateOperators[0].numRowsUpdated AS numRowsUpdated", 
 "data.stateOperators[0].memoryUsedBytes AS memoryUsedBytes")
 .distinct()
 // specific batch id
 .where("batchId = 150")
state0InfoPopulatedDf.show()
{code}
>> result
||mode||trial||batchId||numRowsTotal||numRowsUpdated||memoryUsedBytes||
|disable|1|150|34811|29838|10585079|
|disable|2|150|34811|29838|10477367|
|disable|3|150|39734|31597|12056439|
|enable|1|150|39725|31660|9856055|
|enable|2|150|34811|29838|8631607|
|enable|3|150|39713|31769|9674807|

>> efficiency of state row size when enabling option

(8631607 / 29838) / (10477367 / 29838) * 100 = 82.38%
 (9856055 / 31660) / (12056439 / 31597) * 100 = 81.58%

 

In overall, enabling the option runs faster (but around 1% so no outstanding) and uses less memory for state (saves around 20%, according to the compose of key-value fields).


was (Author: kabhwan):
> Spark version
 * 2.4.0-SNAPSHOT
 * commit: 79c66894296840cc4a5bf6c8718ecfd2b08bcca8 (latest master) + SPARK-24441 + SPARK-24717
 * POC commit: [https://github.com/HeartSaVioR/spark/commit/378ce2ae30116fba80421873ef04ed2ca113630e]

 

> App (query)

[https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming/blob/master/src/main/scala/com/hortonworks/spark/BenchmarkMovingAggregationsListenerToFs.scala]
{code:java}
val outDf = df
.withWatermark("timestamp", "10 seconds")
.selectExpr(
"timestamp", "mod(value, 100) as mod", "value",
createCaseExprStr(50, "mod(CAST(RANDN(0) * 1000 as INTEGER), 50)") + " as word")
.groupBy(
window($"timestamp", "1 minute", "10 seconds"),
$"mod", $"word")
.agg(max("value").as("max_value"), min("value").as("min_value"), avg("value").as("avg_value")){code}
 * key fields : window (start/end), mod (int), word (10 chars string)
 * value fields : max_value (int), min_value (int), avg_value (double)

 

> How to test

Ran above app three times per kinds of option (enable/disable).

 

> Test Result (analysis query and result table, respectively)

>> query (source)
{code:java}
val source0InfoPopulatedDf = jsonDf
.selectExpr("data.id", "data.runId", "data.batchId", "data.sources[0].numInputRows AS numInputRows", 
"data.sources[0].inputRowsPerSecond AS inputRowsPerSecond", 
"data.sources[0].processedRowsPerSecond AS processedRowsPerSecond",
"data.sources[0].startOffset AS startOffset", 
"data.sources[0].endOffset AS endOffset")
.distinct()
.where("batchId >= 50 and batchId <= 150")
.groupBy("id", "runId")
.agg("numInputRows" -> "avg", "inputRowsPerSecond" -> "avg",
"processedRowsPerSecond" -> "avg")
source0InfoPopulatedDf.show()
{code}
>> result
||mode||trial||avg(numInputRows)||avg(inputRowsPerSecond)||avg(processedRowsPerSecond)||
|disable|1|50000.0|10000.000110891095|22543.79742858393|
|disable|2|50000.0|10000.019900990894|22689.82215288191|
|disable|3|50000.0|9999.980265345746|22302.87036748844|
|enable|1|50000.0|10000.000110891096|22711.04445963631|
|enable|2|50000.0|10000.019893070104|22774.52502523933|
|enable|3|50000.0|10000.019908911689|22826.48595693046|

>> query (duration)
{code:java}
val durationInfoPopulatedDf = jsonDf
 .selectExpr("data.id", "data.runId", "data.batchId", "data.durationMs.*")
 .distinct()
 .where("batchId >= 50 and batchId <= 150")
 .groupBy("id", "runId")
 .agg("addBatch" -> "avg", "getBatch" -> "avg", "getOffset" -> "avg",
 "queryPlanning" -> "avg", "triggerExecution" -> "avg", "walCommit" -> "avg")
durationInfoPopulatedDf.show()
{code}
>> result
||mode||trial||avg(addBatch)||avg(getBatch)||avg(getOffset)||avg(queryPlanning)||avg(triggerExecution)||avg(walCommit)||
|disable|1|2145.58415841584|0.0693069306930693|null|55.23762376237624|2221.871287128713|12.762376237623762|
|disable|2|2131.00990099009|0.0495049504950495|null|55.49504950495049|2207.069306930693|12.405940594059405|
|disable|3|2172.99009900990|0.0495049504950495|null|55.97029702970297|2249.495049504950|12.445544554455445|
|enable|1|2130.77227722772|0.0594059405940594|null|55.31683168316832|2207.306930693069|12.871287128712872|
|enable|2|2126.14851485148|0.0396039603960396|null|54.98019801980198|2202.415841584158|12.891089108910892|
|enable|3|2120.19801980198|0.0495049504950495|null|54.82178217821782|2195.772277227723|12.643564356435643|

>> query (state information)
{code:java}
val state0InfoPopulatedDf = jsonDf
 .selectExpr("data.id", "data.runId", "data.batchId", 
 "data.stateOperators[0].numRowsTotal AS numRowsTotal", 
 "data.stateOperators[0].numRowsUpdated AS numRowsUpdated", 
 "data.stateOperators[0].memoryUsedBytes AS memoryUsedBytes")
 .distinct()
 // specific batch id
 .where("batchId = 150")
state0InfoPopulatedDf.show()
{code}
>> result
||mode||trial||batchId||numRowsTotal||numRowsUpdated||memoryUsedBytes||
|disable|1|150|34811|29838|10585079|
|disable|2|150|34811|29838|10477367|
|disable|3|150|39734|31597|12056439|
|enable|1|150|39725|31660|9856055|
|enable|2|150|34811|29838|8631607|
|enable|3|150|39713|31769|9674807|

>> efficiency of state row size when enabling option

(8631607 / 29838) / (10477367 / 29838) * 100 = 82.38%
 (9856055 / 31660) / (12056439 / 31597) * 100 = 81.58%

 

In overall, enabling the option runs faster (but around 1% so no outstanding) and uses less memory for state (saves around 20%, according to the compose of key-value fields).

> Remove redundant key data from value in streaming aggregation
> -------------------------------------------------------------
>
>                 Key: SPARK-24763
>                 URL: https://issues.apache.org/jira/browse/SPARK-24763
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 2.4.0
>            Reporter: Jungtaek Lim
>            Priority: Major
>
> Key/Value of state in streaming aggregation is formatted as below:
>  * key: UnsafeRow containing group-by fields
>  * value: UnsafeRow containing key fields and another fields for aggregation results
> which data for key is stored to both key and value.
> This is to avoid doing projection row to value while storing, and joining key and value to restore origin row to boost performance, but while doing a simple benchmark test, I found it not much helpful compared to "project and join". (will paste test result in comment)
> So I would propose a new option: remove redundant in stateful aggregation. I'm avoiding to modify default behavior of stateful aggregation, because state value will not be compatible between current and option enabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org