You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by HyukjinKwon <gi...@git.apache.org> on 2018/12/08 09:43:56 UTC

[GitHub] spark pull request #23263: [SPARK-23674][ML] Adds Spark ML Events

GitHub user HyukjinKwon opened a pull request:

    https://github.com/apache/spark/pull/23263

    [SPARK-23674][ML] Adds Spark ML Events

    ## What changes were proposed in this pull request?
    
    This PR proposes to add ML events so that other developers can track and add some actions for them.
    
    ## Introduction
    
    This PR proposes to send some ML events like SQL. This is quite useful when people want to track and make some actions for corresponding ML operations. For instance, I have been working on integrating 
    Apache Spark with [Apache Atlas](https://atlas.apache.org/QuickStart.html). With some custom changes with this PR, I can visualise ML pipeline as below:
    
    ![spark_ml_streaming_lineage](https://user-images.githubusercontent.com/6477701/49682779-394bca80-faf5-11e8-85b8-5fae28b784b3.png)
    
    I think not to mention how useful it is to track the SQL operations. Likewise, I would like to propose ML events as well (as lowest stability `@Unstable` APIs for now - no guarantee about stability).
    
    ## Implementation Details
    
    ### Sends event (but not expose ML specific listener)
    
    In `events.scala`, it adds:
    
    ```scala
    @Unstable
    case class ...StartEvent(caller, input)
    @Unstable
    case class ...EndEvent(caller, output)
    
    object MLEvents {
      // Wrappers to send events:
      // def with...Event(body) = {
      //   body()
      //   SparkContext.getOrCreate().listenerBus.post(event)
      // }
    }
    ```
    
    This way mimics both:
    
    **1. Catalog events (see `org.apache.spark.sql.catalyst.catalog.events.scala`)**
    
    - This allows a Catalog specific listener to be added `ExternalCatalogEventListener` 
    
    - It's implemented in a way of wrapping whole `ExternalCatalog` named `ExternalCatalogWithListener`
    which delegates the operations to `ExternalCatalog`
    
    This is not quite possible in this case because most of instances (like `Pipeline`) will be directly created in most of cases. We might be able to do that via extending `ListenerBus` for all possible instances but IMHO it's too invasive. Also, exposing another ML specific listener sounds a bit too much at this stage. Therefore, I simply borrowed file name and structures here
    
    **2. SQL execution events (see `org.apache.spark.sql.execution.SQLExecution.scala`)**
    
    - Add an object that wraps a body to send events
    
    Current apporach is rather close to this. It has a `with...` wrapper to send events. I borrowed this approach to be consistent.
    
    
    ### Add `...Impl` methods to wrap each to send events
    
    **1. `mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala`**
    
    ```diff
    - def save(...) = { saveImpl(...) }
    + def save(...) = MLEvents.withSaveInstanceEvent { saveImpl(...) }
      def saveImpl(...): Unit = ...
    ```
    
      Note that `saveImpl` was already implemented unlike other instances below.
    
    
    ```diff
    - def load(...): T
    + def load(...): T = MLEvents.withLoadInstanceEvent { loadImple(...) }
    + def loadImpl(...): T
    ```
    
    **2. `mllib/src/main/scala/org/apache/spark/ml/Estimator.scala`**
    
    ```diff
    - def fit(...): Model
    + def fit(...): Model = MLEvents.withFitEvent { fitImpl(...) }
    + def fitImpl(...): Model
    ```
    
    **3. `mllib/src/main/scala/org/apache/spark/ml/Transformer.scala`**
    
    ```diff
    - def transform(...): DataFrame
    + def transform(...): DataFrame = MLEvents.withTransformEvent { transformImpl(...) }
    + def transformImpl(...): DataFrame
    ```
    
    This approach follows the existing way as below in ML:
    
    **1. `transform` and `transformImpl`**
    
    https://github.com/apache/spark/blob/9b1f6c8bab5401258c653d4e2efb50e97c6d282f/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala#L202-L213
    
    https://github.com/apache/spark/blob/9b1f6c8bab5401258c653d4e2efb50e97c6d282f/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala#L191-L196
    
    https://github.com/apache/spark/blob/9b1f6c8bab5401258c653d4e2efb50e97c6d282f/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala#L1037-L1042
    
    **2. `save` and `saveImpl`**
    
    https://github.com/apache/spark/blob/9b1f6c8bab5401258c653d4e2efb50e97c6d282f/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala#L166-L176
    
    Inherited ones are intentionally omitted here for simplicity. They are inherited and implemented at multiple places.
    
    ## Backward Compatibility
    
    _This keeps both source and binary backward compatibility_. I was thinking enforcing `...Impl` by leaving it abstract methods to force to implement but just decided to leave a body that throws `UnsupportedOperationException` so that we can keep full source and binary compatibilities.
    
    - For user-faced API perspective, _there's no difference_. `...Impl` methods are protected and not visible to end users.
    
    - For developer API perspective, if some developers want to `...` methods instead of `...Impl`, that's still fine. It only does not handle events. If developers want to handle events from their custom implementation, they should implement `...Impl`. Of course, it is encouraged to implement `...Impl`
    
    ## How was this patch tested?
    
    Manually tested and unit tests were added.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/HyukjinKwon/spark SPARK-23674-1

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/23263.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #23263
    
----
commit a9112f33ff8fbfb66bad76bff6898abdef5b6881
Author: Hyukjin Kwon <gu...@...>
Date:   2018-12-05T06:38:05Z

    Adds Spark ML Events

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23263: [SPARK-23674][ML] Adds Spark ML Events

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23263
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99873/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23263: [SPARK-23674][ML] Adds Spark ML Events

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the issue:

    https://github.com/apache/spark/pull/23263
  
    My first impression is that it's a big change, which is reason for caution here.
    
    Visualizing a workflow is nice, but Spark's Pipelines are typically pretty straightforward and linear. I could imagine producing a nicer visualization than what you get from reading the Spark UI, although of course we already have some degree of history and data there.
    
    These are just the hooks, right? someone would have to implement something to use these events. I see the value in the API to some degree, but with no concrete implementation, does it add anything for Spark users out of the box?
    
    It seems like the history this generates would belong in the history server, although that already has a pretty particular purpose, storing granular history of events in Spark. Is that what someone would likely do? or would someone likely have to run Atlas to use this? If that's a good example of the use case, and Atlas is really about lineage and governance, is that the thrust of this change, to help with something to do with model lineage and reproducibility?
    
    It's good that the API changes little, though it does change a bit.
    
    I think I mostly have questions right now.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23263: [SPARK-23674][ML] Adds Spark ML Events

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/23263
  
    The tests pass in my local. I'll fix them shortly.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23263: [SPARK-23674][ML] Adds Spark ML Events

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23263#discussion_r240003674
  
    --- Diff: mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala ---
    @@ -0,0 +1,199 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.ml
    +
    +import java.io.File
    +
    +import scala.collection.mutable
    +import scala.concurrent.duration._
    +
    +import org.apache.hadoop.fs.Path
    +import org.mockito.Matchers.{any, eq => meq}
    +import org.mockito.Mockito.when
    +import org.scalatest.BeforeAndAfterEach
    +import org.scalatest.concurrent.Eventually
    +import org.scalatest.mockito.MockitoSugar.mock
    +
    +import org.apache.spark.{SparkContext, SparkFunSuite}
    +import org.apache.spark.ml.param.ParamMap
    +import org.apache.spark.ml.util._
    +import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
    +import org.apache.spark.sql._
    +import org.apache.spark.util.Utils
    +
    +
    +class MLEventsSuite
    +    extends SparkFunSuite
    +    with BeforeAndAfterEach
    +    with DefaultReadWriteTest
    +    with Eventually {
    +
    +  private var spark: SparkSession = _
    +  private var sc: SparkContext = _
    +  private var checkpointDir: String = _
    +  private var listener: SparkListener = _
    +  private val dirName: String = "pipeline"
    +  private val events = mutable.ArrayBuffer.empty[MLEvent]
    +
    +  override def beforeAll(): Unit = {
    +    super.beforeAll()
    +    sc = new SparkContext("local[2]", "SparkListenerSuite")
    +    listener = new SparkListener {
    +      override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
    +        case e: FitStart[_] => events.append(e)
    +        case e: FitEnd[_] => events.append(e)
    +        case e: TransformStart => events.append(e)
    +        case e: TransformEnd => events.append(e)
    +        case e: SaveInstanceStart if e.path.endsWith(dirName) => events.append(e)
    +        case e: SaveInstanceEnd if e.path.endsWith(dirName) => events.append(e)
    +        case _ =>
    +      }
    +    }
    +    sc.addSparkListener(listener)
    +
    +    spark = SparkSession.builder()
    +      .sparkContext(sc)
    +      .getOrCreate()
    +
    +    checkpointDir = Utils.createDirectory(tempDir.getCanonicalPath, "checkpoints").toString
    +    sc.setCheckpointDir(checkpointDir)
    --- End diff --
    
    I may miss it, where do we use checkpoint?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23263: [SPARK-23674][ML] Adds Spark ML Events

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23263
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23263: [SPARK-23674][ML] Adds Spark ML Events

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23263#discussion_r240003563
  
    --- Diff: mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala ---
    @@ -132,7 +132,8 @@ class Pipeline @Since("1.4.0") (
        * @return fitted pipeline
        */
       @Since("2.0.0")
    -  override def fit(dataset: Dataset[_]): PipelineModel = {
    +  override def fit(dataset: Dataset[_]): PipelineModel = super.fit(dataset)
    --- End diff --
    
    Is there any `fit` method which doesn't do `super.fit()`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23263: [SPARK-23674][ML] Adds Spark ML Events

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23263
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99872/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23263: [SPARK-23674][ML] Adds Spark ML Events

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23263
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/5885/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23263: [SPARK-23674][ML] Adds Spark ML Events

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23263
  
    **[Test build #99872 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99872/testReport)** for PR 23263 at commit [`4f2fda2`](https://github.com/apache/spark/commit/4f2fda2272ee7e7d62df139c2f994ef7a122bf7c).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23263: [SPARK-23674][ML] Adds Spark ML Events

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23263
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23263: [SPARK-23674][ML] Adds Spark ML Events

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23263
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23263: [SPARK-23674][ML] Adds Spark ML Events

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23263
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/5889/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23263: [SPARK-23674][ML] Adds Spark ML Events

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23263#discussion_r240003952
  
    --- Diff: mllib/src/main/scala/org/apache/spark/ml/Estimator.scala ---
    @@ -65,7 +65,19 @@ abstract class Estimator[M <: Model[M]] extends PipelineStage {
        * Fits a model to the input data.
        */
       @Since("2.0.0")
    -  def fit(dataset: Dataset[_]): M
    +  def fit(dataset: Dataset[_]): M = MLEvents.withFitEvent(this, dataset) {
    +    fitImpl(dataset)
    +  }
    +
    +  /**
    +   * `fit()` handles events and then calls this method. Subclasses should override this
    +   * method to implement the actual fiting a model to the input data.
    +   */
    +  @Since("3.0.0")
    +  protected def fitImpl(dataset: Dataset[_]): M = {
    +    // Keep this default body for backward compatibility.
    +    throw new UnsupportedOperationException("fitImpl is not implemented.")
    --- End diff --
    
    For current change, Spark ML developers can still choose to override `fit` instead `fitImpl` so their ML model can work without ML event?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23263: [SPARK-23674][ML] Adds Spark ML Events

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23263#discussion_r240003885
  
    --- Diff: mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala ---
    @@ -0,0 +1,199 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.ml
    +
    +import java.io.File
    +
    +import scala.collection.mutable
    +import scala.concurrent.duration._
    +
    +import org.apache.hadoop.fs.Path
    +import org.mockito.Matchers.{any, eq => meq}
    +import org.mockito.Mockito.when
    +import org.scalatest.BeforeAndAfterEach
    +import org.scalatest.concurrent.Eventually
    +import org.scalatest.mockito.MockitoSugar.mock
    +
    +import org.apache.spark.{SparkContext, SparkFunSuite}
    +import org.apache.spark.ml.param.ParamMap
    +import org.apache.spark.ml.util._
    +import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
    +import org.apache.spark.sql._
    +import org.apache.spark.util.Utils
    +
    +
    +class MLEventsSuite
    +    extends SparkFunSuite
    +    with BeforeAndAfterEach
    +    with DefaultReadWriteTest
    +    with Eventually {
    +
    +  private var spark: SparkSession = _
    +  private var sc: SparkContext = _
    +  private var checkpointDir: String = _
    +  private var listener: SparkListener = _
    +  private val dirName: String = "pipeline"
    +  private val events = mutable.ArrayBuffer.empty[MLEvent]
    +
    +  override def beforeAll(): Unit = {
    +    super.beforeAll()
    +    sc = new SparkContext("local[2]", "SparkListenerSuite")
    +    listener = new SparkListener {
    +      override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
    +        case e: FitStart[_] => events.append(e)
    +        case e: FitEnd[_] => events.append(e)
    +        case e: TransformStart => events.append(e)
    +        case e: TransformEnd => events.append(e)
    +        case e: SaveInstanceStart if e.path.endsWith(dirName) => events.append(e)
    +        case e: SaveInstanceEnd if e.path.endsWith(dirName) => events.append(e)
    +        case _ =>
    +      }
    +    }
    +    sc.addSparkListener(listener)
    +
    +    spark = SparkSession.builder()
    +      .sparkContext(sc)
    +      .getOrCreate()
    +
    +    checkpointDir = Utils.createDirectory(tempDir.getCanonicalPath, "checkpoints").toString
    +    sc.setCheckpointDir(checkpointDir)
    --- End diff --
    
    Let me double check and address this while fixing the test. I just copied this from `MLlibTestSparkContext`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23263: [SPARK-23674][ML] Adds Spark ML Events

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23263
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99868/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23263: [SPARK-23674][ML] Adds Spark ML Events

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23263#discussion_r240004006
  
    --- Diff: mllib/src/main/scala/org/apache/spark/ml/Estimator.scala ---
    @@ -65,7 +65,19 @@ abstract class Estimator[M <: Model[M]] extends PipelineStage {
        * Fits a model to the input data.
        */
       @Since("2.0.0")
    -  def fit(dataset: Dataset[_]): M
    +  def fit(dataset: Dataset[_]): M = MLEvents.withFitEvent(this, dataset) {
    +    fitImpl(dataset)
    +  }
    +
    +  /**
    +   * `fit()` handles events and then calls this method. Subclasses should override this
    +   * method to implement the actual fiting a model to the input data.
    +   */
    +  @Since("3.0.0")
    +  protected def fitImpl(dataset: Dataset[_]): M = {
    +    // Keep this default body for backward compatibility.
    +    throw new UnsupportedOperationException("fitImpl is not implemented.")
    --- End diff --
    
    Yes, that was my intention. I wanted to force to implement `fitImpl` but was thinking that might be too breaking change (it's going to at least break source compatibility). I am willing to follow other suggestions - I am pretty sure you or other guys are more familiar with ML side.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23263: [SPARK-23674][ML] Adds Spark ML Events

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23263#discussion_r239999747
  
    --- Diff: mllib/src/main/scala/org/apache/spark/ml/Predictor.scala ---
    @@ -210,7 +214,7 @@ abstract class PredictionModel[FeaturesType, M <: PredictionModel[FeaturesType,
         }
       }
     
    -  protected def transformImpl(dataset: Dataset[_]): DataFrame = {
    +  override protected def transformImpl(dataset: Dataset[_]): DataFrame = {
    --- End diff --
    
    `transformImpl` for some abstraction and `saveImpl` are already existent.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23263: [SPARK-23674][ML] Adds Spark ML Events

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23263
  
    **[Test build #99872 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99872/testReport)** for PR 23263 at commit [`4f2fda2`](https://github.com/apache/spark/commit/4f2fda2272ee7e7d62df139c2f994ef7a122bf7c).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23263: [SPARK-23674][ML] Adds Spark ML Events

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23263#discussion_r240003869
  
    --- Diff: mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala ---
    @@ -132,7 +132,8 @@ class Pipeline @Since("1.4.0") (
        * @return fitted pipeline
        */
       @Since("2.0.0")
    -  override def fit(dataset: Dataset[_]): PipelineModel = {
    +  override def fit(dataset: Dataset[_]): PipelineModel = super.fit(dataset)
    --- End diff --
    
    Ah, it's there just only to keep the `@Since`. Looks some classes don't explicitly note that so I didn't call `super` in other places.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23263: [SPARK-23674][ML] Adds Spark ML Events

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23263
  
    **[Test build #99868 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99868/testReport)** for PR 23263 at commit [`a9112f3`](https://github.com/apache/spark/commit/a9112f33ff8fbfb66bad76bff6898abdef5b6881).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class TransformStart(transformer: Transformer, input: Dataset[_]) extends MLEvent`
      * `case class TransformEnd(transformer: Transformer, output: Dataset[_]) extends MLEvent`
      * `case class FitStart[M <: Model[M]](estimator: Estimator[M], dataset: Dataset[_]) extends MLEvent`
      * `case class FitEnd[M <: Model[M]](estimator: Estimator[M], model: M) extends MLEvent`
      * `case class LoadInstanceStart[T](reader: MLReader[T], path: String) extends MLEvent`
      * `case class LoadInstanceEnd[T](reader: MLReader[T], instance: T) extends MLEvent`
      * `case class SaveInstanceStart(writer: MLWriter, path: String) extends MLEvent`
      * `case class SaveInstanceEnd(writer: MLWriter, path: String) extends MLEvent`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23263: [SPARK-23674][ML] Adds Spark ML Events

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23263
  
    **[Test build #99873 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99873/testReport)** for PR 23263 at commit [`ba9db6e`](https://github.com/apache/spark/commit/ba9db6eb2c98a1c84f982a093ff982a030b9eab7).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23263: [SPARK-23674][ML] Adds Spark ML Events

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23263
  
    **[Test build #99868 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99868/testReport)** for PR 23263 at commit [`a9112f3`](https://github.com/apache/spark/commit/a9112f33ff8fbfb66bad76bff6898abdef5b6881).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23263: [SPARK-23674][ML] Adds Spark ML Events

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/23263
  
    cc @srowen, @cloud-fan (since it mimics SQL's event listener), @jkbradley, @mengxr and @yanboliang. Mind if I ask to take a look please? WDYT about this?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23263: [SPARK-23674][ML] Adds Spark ML Events

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23263
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23263: [SPARK-23674][ML] Adds Spark ML Events

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23263
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23263: [SPARK-23674][ML] Adds Spark ML Events

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23263
  
    **[Test build #99873 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99873/testReport)** for PR 23263 at commit [`ba9db6e`](https://github.com/apache/spark/commit/ba9db6eb2c98a1c84f982a093ff982a030b9eab7).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23263: [SPARK-23674][ML] Adds Spark ML Events

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/23263
  
    > Visualizing a workflow is nice, but Spark's Pipelines are typically pretty straightforward and linear. I could imagine producing a nicer visualization than what you get from reading the Spark UI, although of course we already have some degree of history and data there.
    
    Another good thing that might have to be considered is, that we can interact this with other SQL events. For instance, where the input `Dataset` is originated. For instance, with current Apache Spark, I can visualises SQL operations as below:
    
    ![screen shot 2018-12-10 at 9 41 36 am](https://user-images.githubusercontent.com/6477701/49706269-d9bdfe00-fc5f-11e8-943a-3309d1856ba5.png)
    
    I think we can combine those existing lineages together to easily understand where the data comes and goes.
    
    (BTW, I hope it doesn't sound like I'm pushing this case for my one specific case - I think this hook-like feature can be useful in many way. I currently have one explicit example to show so I'm referring my case.)
    
    > These are just the hooks, right? someone would have to implement something to use these events. I see the value in the API to some degree, but with no concrete implementation, does it add anything for Spark users out of the box?
    
    Yes, right. It does not add anything to Spark users out of the box. It needs a custom implementation for a query listener. For instance,
    
    with the custom listener below:
    
    ```scala
    class CustomSparkListener extends SparkListener
      def onOtherEvents(e: SparkListenerEvent) = e match {
        case e: MLEvent => // do something
        case _ => // pass
      }
    ```
    
    There are two (existing) ways to use this.
    
    ```scala
    spark.sparkContext.addSparkListener(new CustomSparkListener)
    ```
    
    ```bash
    spark-submit
      --conf spark.extraListeners=CustomSparkListener\
    ...
    ```
    
    It's also similar with other existing implementation in SQL side (catalog events described above in PR description).
    
    One actual example that I had with SQL query listener was that I had to close one connection every time after SQL execution.
    
    > Is that what someone would likely do? or would someone likely have to run Atlas to use this? If that's a good example of the use case, and Atlas is really about lineage and governance, is that the thrust of this change, to help with something to do with model lineage and reproducibility?
    
    There are two reasons.
    
    1. I think someone in general would likely utilise this feature like other event listeners. At least, I can see some interests going on outside.
    
    - SQL Listener
      - https://stackoverflow.com/questions/46409339/spark-listener-to-an-sql-query
      - http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-Custom-Query-Execution-listener-via-conf-properties-td30979.html
    
    - Streaming Query Listener
      - https://jhui.github.io/2017/01/15/Apache-Spark-Streaming/
      -  http://apache-spark-developers-list.1001551.n3.nabble.com/Structured-Streaming-with-Watermark-td25413.html#a25416
    
    2. Someone would likely run this via Atlas so that someone could do something about lineage and governance, yes, as you said. Yes, I'm trying to show integrated lineages in Apache Spark but this is a missing hole. There had to be this change for this.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org