You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/07/19 10:33:30 UTC
[spark] branch master updated: Revert "[SPARK-34806][SQL] Add
Observation helper for Dataset.observe"
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 506b333 Revert "[SPARK-34806][SQL] Add Observation helper for Dataset.observe"
506b333 is described below
commit 506b333a2f7829c14c25380c8290b7fe9c0610c5
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Mon Jul 19 19:32:54 2021 +0900
Revert "[SPARK-34806][SQL] Add Observation helper for Dataset.observe"
This reverts commit cc940ff3f8f2126e89ed708fbcf43f7328c9de5f.
---
.../main/scala/org/apache/spark/sql/Dataset.scala | 25 ----
.../scala/org/apache/spark/sql/Observation.scala | 146 ---------------------
.../org/apache/spark/sql/DataFrameSuite.scala | 45 -------
3 files changed, 216 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index ac9dc37..12112ef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1948,31 +1948,6 @@ class Dataset[T] private[sql](
}
/**
- * Observe (named) metrics through an [[org.apache.spark.sql.Observation]] instance.
- * This is equivalent to calling [[Dataset.observe(String, Column, Column*)]] but does
- * not require adding [[org.apache.spark.sql.util.QueryExecutionListener]] to the spark session.
- * This method does not support streaming datasets.
- *
- * A user can retrieve the metrics by accessing [[org.apache.spark.sql.Observation.get]].
- *
- * {{{
- * // Observe row count (rows) and highest id (maxid) in the Dataset while writing it
- * val observation = Observation("my_metrics")
- * val observed_ds = ds.observe(observation, count(lit(1)).as("rows"), max($"id").as("maxid"))
- * observed_ds.write.parquet("ds.parquet")
- * val metrics = observation.get
- * }}}
- *
- * @throws IllegalArgumentException If this is a streaming Dataset (this.isStreaming == true)
- *
- * @group typedrel
- * @since 3.3.0
- */
- def observe(observation: Observation, expr: Column, exprs: Column*): Dataset[T] = {
- observation.on(this, expr, exprs: _*)
- }
-
- /**
* Returns a new Dataset by taking the first `n` rows. The difference between this function
* and `head` is that `head` is an action and returns an array (by triggering query execution)
* while `limit` returns a new Dataset.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Observation.scala b/sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
deleted file mode 100644
index 5a30453..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import java.util.UUID
-
-import org.apache.spark.sql.execution.QueryExecution
-import org.apache.spark.sql.util.QueryExecutionListener
-
-
-/**
- * Helper class to simplify usage of [[Dataset#observe observe]]:
- *
- * {{{
- * // Observe row count (rows) and highest id (maxid) in the Dataset while writing it
- * val observation = Observation("my metrics")
- * val observed_ds = ds.observe(observation, count(lit(1)).as("rows"), max($"id").as("maxid"))
- * observed_ds.write.parquet("ds.parquet")
- * val metrics = observation.get
- * }}}
- *
- * This collects the metrics while the first action is executed on the observed dataset. Subsequent
- * actions do not modify the metrics returned by [[get]]. Retrieval of the metric via [[get]]
- * blocks until the first action has finished and metrics become available.
- *
- * This class does not support streaming datasets.
- *
- * @param name name of the metric
- * @since 3.3.0
- */
-class Observation(name: String) {
-
- private val listener: ObservationListener = ObservationListener(this)
-
- @volatile private var sparkSession: Option[SparkSession] = None
-
- @volatile private var row: Option[Row] = None
-
- /**
- * Attach this observation to the given [[Dataset]] to observe aggregation expressions.
- *
- * @param ds dataset
- * @param expr first aggregation expression
- * @param exprs more aggregation expressions
- * @tparam T dataset type
- * @return observed dataset
- * @throws IllegalArgumentException If this is a streaming Dataset (ds.isStreaming == true)
- */
- private[spark] def on[T](ds: Dataset[T], expr: Column, exprs: Column*): Dataset[T] = {
- if (ds.isStreaming) {
- throw new IllegalArgumentException("Observation does not support streaming Datasets")
- }
- register(ds.sparkSession)
- ds.observe(name, expr, exprs: _*)
- }
-
- /**
- * Get the observed metrics. This waits for the observed dataset to finish its first action.
- * Only the result of the first action is available. Subsequent actions do not modify the result.
- *
- * @return the observed metrics as a [[Row]]
- * @throws java.lang.InterruptedException interrupted while waiting
- */
- def get: Row = {
- synchronized {
- // we need to loop as wait might return without us calling notify
- // https://en.wikipedia.org/w/index.php?title=Spurious_wakeup&oldid=992601610
- while (this.row.isEmpty) {
- wait()
- }
- }
-
- this.row.get
- }
-
- private def register(sparkSession: SparkSession): Unit = {
- // makes this class thread-safe:
- // only the first thread entering this block can set sparkSession
- // all other threads will see the exception, as it is only allowed to do this once
- synchronized {
- if (this.sparkSession.isDefined) {
- throw new IllegalArgumentException("An Observation can be used with a Dataset only once")
- }
- this.sparkSession = Some(sparkSession)
- }
-
- sparkSession.listenerManager.register(this.listener)
- }
-
- private def unregister(): Unit = {
- this.sparkSession.foreach(_.listenerManager.unregister(this.listener))
- }
-
- private[spark] def onFinish(qe: QueryExecution): Unit = {
- synchronized {
- if (this.row.isEmpty) {
- this.row = qe.observedMetrics.get(name)
- if (this.row.isDefined) {
- notifyAll()
- unregister()
- }
- }
- }
- }
-
-}
-
-private[sql] case class ObservationListener(observation: Observation)
- extends QueryExecutionListener {
-
- override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit =
- observation.onFinish(qe)
-
- override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit =
- observation.onFinish(qe)
-
-}
-
-object Observation {
-
- /**
- * Observation constructor for creating an anonymous observation.
- */
- def apply(): Observation = new Observation(UUID.randomUUID().toString)
-
- /**
- * Observation constructor for creating a named observation.
- */
- def apply(name: String): Observation = new Observation(name)
-
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index ac76565..1e3d2192 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -42,7 +42,6 @@ import org.apache.spark.sql.execution.{FilterExec, QueryExecution, WholeStageCod
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec}
-import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.expressions.{Aggregator, Window}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
@@ -2383,50 +2382,6 @@ class DataFrameSuite extends QueryTest
}
}
- test("SPARK-34806: observation on datasets") {
- val namedObservation = Observation("named")
- val unnamedObservation = Observation()
-
- val df = spark
- .range(100)
- .observe(
- namedObservation,
- min($"id").as("min_val"),
- max($"id").as("max_val"),
- sum($"id").as("sum_val"),
- count(when($"id" % 2 === 0, 1)).as("num_even")
- )
- .observe(
- unnamedObservation,
- avg($"id").cast("int").as("avg_val")
- )
-
- def checkMetrics(namedMetric: Row, unnamedMetric: Row): Unit = {
- assert(namedMetric === Row(0L, 99L, 4950L, 50L))
- assert(unnamedMetric === Row(49))
- }
-
- // First run
- df.collect()
- checkMetrics(namedObservation.get, unnamedObservation.get)
- // we can get the result multiple times
- checkMetrics(namedObservation.get, unnamedObservation.get)
-
- // an observation can be used only once
- val err = intercept[IllegalArgumentException] {
- spark.range(100).observe(namedObservation, sum($"id").as("sum_val"))
- }
- assert(err.getMessage.contains("An Observation can be used with a Dataset only once"))
-
- // streaming datasets are not supported
- val streamDf = new MemoryStream[Int](0, sqlContext).toDF()
- val streamObservation = Observation("stream")
- val streamErr = intercept[IllegalArgumentException] {
- streamDf.observe(streamObservation, avg($"value").cast("int").as("avg_val"))
- }
- assert(streamErr.getMessage.contains("Observation does not support streaming Datasets"))
- }
-
test("SPARK-25159: json schema inference should only trigger one job") {
withTempPath { path =>
// This test is to prove that the `JsonInferSchema` does not use `RDD#toLocalIterator` which
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org