You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/07/19 10:33:30 UTC

[spark] branch master updated: Revert "[SPARK-34806][SQL] Add Observation helper for Dataset.observe"

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 506b333  Revert "[SPARK-34806][SQL] Add Observation helper for Dataset.observe"
506b333 is described below

commit 506b333a2f7829c14c25380c8290b7fe9c0610c5
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Mon Jul 19 19:32:54 2021 +0900

    Revert "[SPARK-34806][SQL] Add Observation helper for Dataset.observe"
    
    This reverts commit cc940ff3f8f2126e89ed708fbcf43f7328c9de5f.
---
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  25 ----
 .../scala/org/apache/spark/sql/Observation.scala   | 146 ---------------------
 .../org/apache/spark/sql/DataFrameSuite.scala      |  45 -------
 3 files changed, 216 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index ac9dc37..12112ef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1948,31 +1948,6 @@ class Dataset[T] private[sql](
   }
 
   /**
-   * Observe (named) metrics through an [[org.apache.spark.sql.Observation]] instance.
-   * This is equivalent to calling [[Dataset.observe(String, Column, Column*)]] but does
-   * not require adding [[org.apache.spark.sql.util.QueryExecutionListener]] to the spark session.
-   * This method does not support streaming datasets.
-   *
-   * A user can retrieve the metrics by accessing [[org.apache.spark.sql.Observation.get]].
-   *
-   * {{{
-   *   // Observe row count (rows) and highest id (maxid) in the Dataset while writing it
-   *   val observation = Observation("my_metrics")
-   *   val observed_ds = ds.observe(observation, count(lit(1)).as("rows"), max($"id").as("maxid"))
-   *   observed_ds.write.parquet("ds.parquet")
-   *   val metrics = observation.get
-   * }}}
-   *
-   * @throws IllegalArgumentException If this is a streaming Dataset (this.isStreaming == true)
-   *
-   * @group typedrel
-   * @since 3.3.0
-   */
-  def observe(observation: Observation, expr: Column, exprs: Column*): Dataset[T] = {
-    observation.on(this, expr, exprs: _*)
-  }
-
-  /**
    * Returns a new Dataset by taking the first `n` rows. The difference between this function
    * and `head` is that `head` is an action and returns an array (by triggering query execution)
    * while `limit` returns a new Dataset.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Observation.scala b/sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
deleted file mode 100644
index 5a30453..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import java.util.UUID
-
-import org.apache.spark.sql.execution.QueryExecution
-import org.apache.spark.sql.util.QueryExecutionListener
-
-
-/**
- * Helper class to simplify usage of [[Dataset#observe observe]]:
- *
- * {{{
- *   // Observe row count (rows) and highest id (maxid) in the Dataset while writing it
- *   val observation = Observation("my metrics")
- *   val observed_ds = ds.observe(observation, count(lit(1)).as("rows"), max($"id").as("maxid"))
- *   observed_ds.write.parquet("ds.parquet")
- *   val metrics = observation.get
- * }}}
- *
- * This collects the metrics while the first action is executed on the observed dataset. Subsequent
- * actions do not modify the metrics returned by [[get]]. Retrieval of the metric via [[get]]
- * blocks until the first action has finished and metrics become available.
- *
- * This class does not support streaming datasets.
- *
- * @param name name of the metric
- * @since 3.3.0
- */
-class Observation(name: String) {
-
-  private val listener: ObservationListener = ObservationListener(this)
-
-  @volatile private var sparkSession: Option[SparkSession] = None
-
-  @volatile private var row: Option[Row] = None
-
-  /**
-   * Attach this observation to the given [[Dataset]] to observe aggregation expressions.
-   *
-   * @param ds dataset
-   * @param expr first aggregation expression
-   * @param exprs more aggregation expressions
-   * @tparam T dataset type
-   * @return observed dataset
-   * @throws IllegalArgumentException If this is a streaming Dataset (ds.isStreaming == true)
-   */
-  private[spark] def on[T](ds: Dataset[T], expr: Column, exprs: Column*): Dataset[T] = {
-    if (ds.isStreaming) {
-      throw new IllegalArgumentException("Observation does not support streaming Datasets")
-    }
-    register(ds.sparkSession)
-    ds.observe(name, expr, exprs: _*)
-  }
-
-  /**
-   * Get the observed metrics. This waits for the observed dataset to finish its first action.
-   * Only the result of the first action is available. Subsequent actions do not modify the result.
-   *
-   * @return the observed metrics as a [[Row]]
-   * @throws java.lang.InterruptedException interrupted while waiting
-   */
-  def get: Row = {
-    synchronized {
-      // we need to loop as wait might return without us calling notify
-      // https://en.wikipedia.org/w/index.php?title=Spurious_wakeup&oldid=992601610
-      while (this.row.isEmpty) {
-        wait()
-      }
-    }
-
-    this.row.get
-  }
-
-  private def register(sparkSession: SparkSession): Unit = {
-    // makes this class thread-safe:
-    // only the first thread entering this block can set sparkSession
-    // all other threads will see the exception, as it is only allowed to do this once
-    synchronized {
-      if (this.sparkSession.isDefined) {
-        throw new IllegalArgumentException("An Observation can be used with a Dataset only once")
-      }
-      this.sparkSession = Some(sparkSession)
-    }
-
-    sparkSession.listenerManager.register(this.listener)
-  }
-
-  private def unregister(): Unit = {
-    this.sparkSession.foreach(_.listenerManager.unregister(this.listener))
-  }
-
-  private[spark] def onFinish(qe: QueryExecution): Unit = {
-    synchronized {
-      if (this.row.isEmpty) {
-        this.row = qe.observedMetrics.get(name)
-        if (this.row.isDefined) {
-          notifyAll()
-          unregister()
-        }
-      }
-    }
-  }
-
-}
-
-private[sql] case class ObservationListener(observation: Observation)
-  extends QueryExecutionListener {
-
-  override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit =
-    observation.onFinish(qe)
-
-  override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit =
-    observation.onFinish(qe)
-
-}
-
-object Observation {
-
-  /**
-   * Observation constructor for creating an anonymous observation.
-   */
-  def apply(): Observation = new Observation(UUID.randomUUID().toString)
-
-  /**
-   * Observation constructor for creating a named observation.
-   */
-  def apply(name: String): Observation = new Observation(name)
-
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index ac76565..1e3d2192 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -42,7 +42,6 @@ import org.apache.spark.sql.execution.{FilterExec, QueryExecution, WholeStageCod
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.execution.aggregate.HashAggregateExec
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec}
-import org.apache.spark.sql.execution.streaming.MemoryStream
 import org.apache.spark.sql.expressions.{Aggregator, Window}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
@@ -2383,50 +2382,6 @@ class DataFrameSuite extends QueryTest
     }
   }
 
-  test("SPARK-34806: observation on datasets") {
-    val namedObservation = Observation("named")
-    val unnamedObservation = Observation()
-
-    val df = spark
-      .range(100)
-      .observe(
-        namedObservation,
-        min($"id").as("min_val"),
-        max($"id").as("max_val"),
-        sum($"id").as("sum_val"),
-        count(when($"id" % 2 === 0, 1)).as("num_even")
-      )
-      .observe(
-        unnamedObservation,
-        avg($"id").cast("int").as("avg_val")
-      )
-
-    def checkMetrics(namedMetric: Row, unnamedMetric: Row): Unit = {
-      assert(namedMetric === Row(0L, 99L, 4950L, 50L))
-      assert(unnamedMetric === Row(49))
-    }
-
-    // First run
-    df.collect()
-    checkMetrics(namedObservation.get, unnamedObservation.get)
-    // we can get the result multiple times
-    checkMetrics(namedObservation.get, unnamedObservation.get)
-
-    // an observation can be used only once
-    val err = intercept[IllegalArgumentException] {
-      spark.range(100).observe(namedObservation, sum($"id").as("sum_val"))
-    }
-    assert(err.getMessage.contains("An Observation can be used with a Dataset only once"))
-
-    // streaming datasets are not supported
-    val streamDf = new MemoryStream[Int](0, sqlContext).toDF()
-    val streamObservation = Observation("stream")
-    val streamErr = intercept[IllegalArgumentException] {
-      streamDf.observe(streamObservation, avg($"value").cast("int").as("avg_val"))
-    }
-    assert(streamErr.getMessage.contains("Observation does not support streaming Datasets"))
-  }
-
   test("SPARK-25159: json schema inference should only trigger one job") {
     withTempPath { path =>
       // This test is to prove that the `JsonInferSchema` does not use `RDD#toLocalIterator` which

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org