You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Aby Philip (JIRA)" <ji...@apache.org> on 2018/11/05 05:51:00 UTC

[jira] [Created] (SPARK-25940) Add the ability to tag datasets so that tags appear with RDDs in StageCompleted events when the corresponding sources are read/written

Aby Philip created SPARK-25940:
----------------------------------

             Summary: Add the ability to tag datasets so that tags appear with RDDs in StageCompleted events when the corresponding sources are read/written
                 Key: SPARK-25940
                 URL: https://issues.apache.org/jira/browse/SPARK-25940
             Project: Spark
          Issue Type: Improvement
          Components: Spark Core, SQL
    Affects Versions: 2.3.0, 2.2.0
            Reporter: Aby Philip
             Fix For: 2.3.1, 2.3.0


At work, I recently made some private changes to Spark code to use with our product. I would like advice on how the changes can be merged and how to go about it.

Summary
- The datasets can be tagged using an API. e.g. "SRC_source1". 
- The tags ("SRC_source1") are propagated through the execution such that they appear in the StageCompleted events, (along with Rdd names) when the corresponding source is read from/ written to.
- This allows to associate internal accumulables (e.g. spark.internal.metrics.recordsWritten) to a Dataset. (and hence correlate this as rowCount for the source)

Background 
Spark is used to created ETL pipelines in the product i work on. Spark code is generated based on a data pipeline i.e. when the pipeline is executed, an equivalent spark program is generated and run on a Yarn cluster. There can be one or more sources and targets in the pipeline. The requirement is to get the number of records read from each source and number of records written to each target. Would like to avoid using count() for performance reasons. I have tried the solutions that have been suggested earlier - e.g named accumulators, createTempView() (as suggested in https://github.com/apache/spark/pull/16609#issuecomment-281865742) - but each had performance impact with big datasets. (Number of records can run into millions.)

Changes: 
I did the following changes - 
1. Added an API to 'tag()' a dataframe. The tag is propagated all the way during the execution and populated to the SparkListener events. In SparkCompleted event, the tags corresponding to the RDDs (if any) are listed too. (This allows to correlate 'spark.internal.metrics.recordsWritten' to a source whose tag is present in the event). 
2. The accumulable 'spark.internal.metrics.recordsWritten' was missing in events, because it was not incremented in some cases. So changes were done to increment it too.

Similar jiras: 
I got this idea from the comment https://github.com/apache/spark/pull/16609#pullrequestreview-17823555 which mentions about 'tagging' dataframe. However, the focus of that discussion was on Spark UI. I have not made any changes to SparkUI but has instead populated the changes only to SparkListener events.

Limitations:
If a stage reads from more than one source Or writes to more than one source in a stage, the \{recordsRead|recordsWritten} accumulable represents the total number of rows corresponding to these sources. All these tags would appear in the event. The only downside is that the number of rows for each source cannot be distinguished.

Since this is the first time, i am trying to submit a patch, would like some advise on how to proceed.
- I did the changes in 2.1.0 initially but have merged them later on 2.2.1, 2.3.0 and 2.3.1. Would need advise on how i should submit the patch.
- The testing was done manually on the product. So i may have to add unit tests for this. 
- I have not focussed on Spark UI. The product just used SparkListenerStageCompleted events. Is it necessary to make UI changes before i submit patch.
Please advise.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org