You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2018/12/12 07:03:15 UTC

[GitHub] HyukjinKwon opened a new pull request #23263: [SPARK-23674][ML] Adds Spark ML Events

HyukjinKwon opened a new pull request #23263: [SPARK-23674][ML] Adds Spark ML Events
URL: https://github.com/apache/spark/pull/23263
 
 
   ## What changes were proposed in this pull request?
   
   This PR proposes to add ML events so that other developers can track and add some actions for them.
   
   ## Introduction
   
   ML events (like SQL events) can be quite useful when people want to track and make some actions for corresponding ML operations. For instance, I have been working on integrating 
   Apache Spark with [Apache Atlas](https://atlas.apache.org/QuickStart.html). With some custom changes with this PR, I can visualise ML pipeline as below:
   
   ![spark_ml_streaming_lineage](https://user-images.githubusercontent.com/6477701/49682779-394bca80-faf5-11e8-85b8-5fae28b784b3.png)
   
   Another good thing that might have to be considered is, that we can interact this with other SQL/Streaming events. For instance, where the input `Dataset` is originated. For instance, with current Apache Spark, I can visualise SQL operations as below:
   
   ![screen shot 2018-12-10 at 9 41 36 am](https://user-images.githubusercontent.com/6477701/49706269-d9bdfe00-fc5f-11e8-943a-3309d1856ba5.png)
   
   I think we can combine those existing lineages together to easily understand where the data comes and goes. Currently, ML side is a hole so the lineages can't be connected for the current Apache Spark ..
   
   To add up, I think it's not to mention how useful it is to track the SQL/Streaming operations. Likewise, I would like to propose ML events as well (as lowest stability `@Unstable` APIs for now - no guarantee about stability).
   
   ## Implementation Details
   
   ### Sends event (but not expose ML specific listener)
   
   **`mllib/src/main/scala/org/apache/spark/ml/events.scala`**
   
   ```scala
   @Unstable
   case class ...StartEvent(caller, input)
   @Unstable
   case class ...EndEvent(caller, output)
   
   object MLEvents {
     // Wrappers to send events:
     // def with...Event(body) = {
     //   body()
     //   SparkContext.getOrCreate().listenerBus.post(event)
     // }
   }
   ```
   
   This way mimics both:
   
   **1. Catalog events (see `org/apache/spark/sql/catalyst/catalog/events.scala`)**
   
   - This allows a Catalog specific listener to be added `ExternalCatalogEventListener` 
   
   - It's implemented in a way of wrapping whole `ExternalCatalog` named `ExternalCatalogWithListener`
   which delegates the operations to `ExternalCatalog`
   
   This is not quite possible in this case because most of instances (like `Pipeline`) will be directly created in most of cases. We might be able to do that via extending `ListenerBus` for all possible instances but IMHO it's too invasive. Also, exposing another ML specific listener sounds a bit too much at this stage. Therefore, I simply borrowed file name and structures here
   
   **2. SQL execution events (see `org/apache/spark/sql/execution/SQLExecution.scala`)**
   
   - Add an object that wraps a body to send events
   
   Current apporach is rather close to this. It has a `with...` wrapper to send events. I borrowed this approach to be consistent.
   
   
   ### Add `...Impl` methods to wrap each to send events
   
   **`mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala`**
   
   ```diff
   - def save(...) = { saveImpl(...) }
   + def save(...) = MLEvents.withSaveInstanceEvent { saveImpl(...) }
     def saveImpl(...): Unit = ...
   ```
   
     Note that `saveImpl` was already implemented unlike other instances below.
   
   
   ```diff
   - def load(...): T
   + def load(...): T = MLEvents.withLoadInstanceEvent { loadImple(...) }
   + def loadImpl(...): T
   ```
   
   **`mllib/src/main/scala/org/apache/spark/ml/Estimator.scala`**
   
   ```diff
   - def fit(...): Model
   + def fit(...): Model = MLEvents.withFitEvent { fitImpl(...) }
   + def fitImpl(...): Model
   ```
   
   **`mllib/src/main/scala/org/apache/spark/ml/Transformer.scala`**
   
   ```diff
   - def transform(...): DataFrame
   + def transform(...): DataFrame = MLEvents.withTransformEvent { transformImpl(...) }
   + def transformImpl(...): DataFrame
   ```
   
   This approach follows the existing way as below in ML:
   
   **1. `transform` and `transformImpl`**
   
   https://github.com/apache/spark/blob/9b1f6c8bab5401258c653d4e2efb50e97c6d282f/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala#L202-L213
   
   https://github.com/apache/spark/blob/9b1f6c8bab5401258c653d4e2efb50e97c6d282f/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala#L191-L196
   
   https://github.com/apache/spark/blob/9b1f6c8bab5401258c653d4e2efb50e97c6d282f/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala#L1037-L1042
   
   **2. `save` and `saveImpl`**
   
   https://github.com/apache/spark/blob/9b1f6c8bab5401258c653d4e2efb50e97c6d282f/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala#L166-L176
   
   Inherited ones are intentionally omitted here for simplicity. They are inherited and implemented at multiple places.
   
   ## Usage
   
   It needs a custom implementation for a query listener. For instance,
   
   with the custom listener below:
   
   ```scala
   class CustomMLListener extends SparkListener
     def onOtherEvents(e) = e match {
       case e: MLEvent => // do something
       case _ => // pass
     }
   }
   ```
   
   There are two (existing) ways to use this.
   
   ```scala
   spark.sparkContext.addSparkListener(new CustomMLListener)
   ```
   
   ```bash
   spark-submit ...\
     --conf spark.extraListeners=CustomMLListener\
     ...
   ```
   
   It's also similar with other existing implementation in SQL side.
   
   ## Target users
   
   1. I think someone in general would likely utilise this feature like other event listeners. At least, I can see some interests going on outside.
   
       - SQL Listener
         - https://stackoverflow.com/questions/46409339/spark-listener-to-an-sql-query
         - http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-Custom-Query-Execution-listener-via-conf-properties-td30979.html
   
       - Streaming Query Listener
         - https://jhui.github.io/2017/01/15/Apache-Spark-Streaming/
         -  http://apache-spark-developers-list.1001551.n3.nabble.com/Structured-Streaming-with-Watermark-td25413.html#a25416
   
   2. Someone would likely run this via Atlas. The plugin mirror intentionally is exposed at [spark-atlas-connector](https://github.com/hortonworks-spark/spark-atlas-connector) so that anyone could do something about lineage and governance in Atlas, yes, as you said. Yes, I'm trying to show integrated lineages in Apache Spark but this is a missing hole. There had to be this change for it.
   
   
   ## Backward Compatibility
   
   _This keeps both source and binary backward compatibility_. I was thinking enforcing `...Impl` by leaving it abstract methods to force to implement but just decided to leave a body that throws `UnsupportedOperationException` so that we can keep full source and binary compatibilities.
   
   - For user-faced API perspective, _there's no difference_. `...Impl` methods are protected and not visible to end users.
   
   - For developer API perspective, if some developers want to `...` methods instead of `...Impl`, that's still fine. It only does not handle events. If developers want to handle events from their custom implementation, they should implement `...Impl`. Of course, it is encouraged to implement `...Impl`
   
     For instance,
   
     ```scala
     class Pipeline extends Estimator[PipelineModel] {
       def fit(dataset: Dataset[_]): PipelineModel = {
         ...
       }
     }
     ```
     still works fine without any behaviour changes.
   
     If developers want their pipeline to emit events, they should change:
   
     ```diff
       class Pipeline extends Estimator[PipelineModel] {
     -   def fit(dataset: Dataset[_]): PipelineModel = {
     +   def fitImpl(dataset: Dataset[_]): PipelineModel = {
           ...
         }
       }
     ```
   
     _^ this is only API change this PR causes._
   
   
   ## How was this patch tested?
   
   Manually tested and unit tests were added.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org