You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/09/05 05:39:43 UTC

spark git commit: [SPARK-25336][SS]Revert SPARK-24863 and SPARK-24748

Repository: spark
Updated Branches:
  refs/heads/master ca861fea2 -> 2119e518d


[SPARK-25336][SS]Revert SPARK-24863 and SPARK-24748

## What changes were proposed in this pull request?

Revert SPARK-24863 (#21819) and SPARK-24748 (#21721) as per discussion in #21721. We will revisit them when the data source v2 APIs are out.

## How was this patch tested?

Jenkins

Closes #22334 from zsxwing/revert-SPARK-24863-SPARK-24748.

Authored-by: Shixiong Zhu <zs...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2119e518
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2119e518
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2119e518

Branch: refs/heads/master
Commit: 2119e518d31331e65415e0f817a6f28ff18d2b42
Parents: ca861fe
Author: Shixiong Zhu <zs...@gmail.com>
Authored: Wed Sep 5 13:39:34 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Sep 5 13:39:34 2018 +0800

----------------------------------------------------------------------
 .../apache/spark/sql/kafka010/JsonUtils.scala   | 33 +++--------
 .../kafka010/KafkaMicroBatchReadSupport.scala   | 30 +---------
 .../kafka010/KafkaMicroBatchSourceSuite.scala   | 37 -------------
 .../spark/sql/sources/v2/CustomMetrics.java     | 33 -----------
 .../streaming/SupportsCustomReaderMetrics.java  | 47 ----------------
 .../streaming/SupportsCustomWriterMetrics.java  | 47 ----------------
 .../execution/streaming/ProgressReporter.scala  | 58 ++------------------
 .../execution/streaming/sources/memoryV2.scala  | 22 +-------
 .../apache/spark/sql/streaming/progress.scala   | 46 ++--------------
 .../execution/streaming/MemorySinkV2Suite.scala | 21 -------
 .../sql/streaming/StreamingQuerySuite.scala     | 27 ---------
 11 files changed, 22 insertions(+), 379 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2119e518/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
index 92b13f2..868edb5 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
@@ -29,11 +29,6 @@ import org.json4s.jackson.Serialization
  */
 private object JsonUtils {
   private implicit val formats = Serialization.formats(NoTypeHints)
-  implicit val ordering = new Ordering[TopicPartition] {
-    override def compare(x: TopicPartition, y: TopicPartition): Int = {
-      Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition))
-    }
-  }
 
   /**
    * Read TopicPartitions from json string
@@ -56,7 +51,7 @@ private object JsonUtils {
    * Write TopicPartitions as json string
    */
   def partitions(partitions: Iterable[TopicPartition]): String = {
-    val result = HashMap.empty[String, List[Int]]
+    val result = new HashMap[String, List[Int]]
     partitions.foreach { tp =>
       val parts: List[Int] = result.getOrElse(tp.topic, Nil)
       result += tp.topic -> (tp.partition::parts)
@@ -85,31 +80,19 @@ private object JsonUtils {
    * Write per-TopicPartition offsets as json string
    */
   def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String = {
-    val result = HashMap.empty[String, HashMap[Int, Long]]
+    val result = new HashMap[String, HashMap[Int, Long]]()
+    implicit val ordering = new Ordering[TopicPartition] {
+      override def compare(x: TopicPartition, y: TopicPartition): Int = {
+        Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition))
+      }
+    }
     val partitions = partitionOffsets.keySet.toSeq.sorted  // sort for more determinism
     partitions.foreach { tp =>
         val off = partitionOffsets(tp)
-        val parts = result.getOrElse(tp.topic, HashMap.empty[Int, Long])
+        val parts = result.getOrElse(tp.topic, new HashMap[Int, Long])
         parts += tp.partition -> off
         result += tp.topic -> parts
     }
     Serialization.write(result)
   }
-
-  /**
-   * Write per-topic partition lag as json string
-   */
-  def partitionLags(
-      latestOffsets: Map[TopicPartition, Long],
-      processedOffsets: Map[TopicPartition, Long]): String = {
-    val result = HashMap.empty[String, HashMap[Int, Long]]
-    val partitions = latestOffsets.keySet.toSeq.sorted
-    partitions.foreach { tp =>
-      val lag = latestOffsets(tp) - processedOffsets.getOrElse(tp, 0L)
-      val parts = result.getOrElse(tp.topic, HashMap.empty[Int, Long])
-      parts += tp.partition -> lag
-      result += tp.topic -> parts
-    }
-    Serialization.write(Map("lag" -> result))
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2119e518/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
index 70f37e3..bb4de67 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
@@ -22,7 +22,6 @@ import java.io._
 import java.nio.charset.StandardCharsets
 
 import org.apache.commons.io.IOUtils
-import org.apache.kafka.common.TopicPartition
 
 import org.apache.spark.SparkEnv
 import org.apache.spark.internal.Logging
@@ -33,9 +32,9 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset, SimpleStreamingScanConfig, SimpleStreamingScanConfigBuilder}
 import org.apache.spark.sql.execution.streaming.sources.RateControlMicroBatchReadSupport
 import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
-import org.apache.spark.sql.sources.v2.{CustomMetrics, DataSourceOptions}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.sources.v2.reader._
-import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset, SupportsCustomReaderMetrics}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.UninterruptibleThread
 
@@ -61,8 +60,7 @@ private[kafka010] class KafkaMicroBatchReadSupport(
     options: DataSourceOptions,
     metadataPath: String,
     startingOffsets: KafkaOffsetRangeLimit,
-    failOnDataLoss: Boolean)
-  extends RateControlMicroBatchReadSupport with SupportsCustomReaderMetrics with Logging {
+    failOnDataLoss: Boolean) extends RateControlMicroBatchReadSupport with Logging {
 
   private val pollTimeoutMs = options.getLong(
     "kafkaConsumer.pollTimeoutMs",
@@ -156,13 +154,6 @@ private[kafka010] class KafkaMicroBatchReadSupport(
     KafkaMicroBatchReaderFactory
   }
 
-  // TODO: figure out the life cycle of custom metrics, and make this method take `ScanConfig` as
-  // a parameter.
-  override def getCustomMetrics(): CustomMetrics = {
-    KafkaCustomMetrics(
-      kafkaOffsetReader.fetchLatestOffsets(), endPartitionOffsets.partitionToOffsets)
-  }
-
   override def deserializeOffset(json: String): Offset = {
     KafkaSourceOffset(JsonUtils.partitionOffsets(json))
   }
@@ -384,18 +375,3 @@ private[kafka010] case class KafkaMicroBatchPartitionReader(
     }
   }
 }
-
-/**
- * Currently reports per topic-partition lag.
- * This is the difference between the offset of the latest available data
- * in a topic-partition and the latest offset that has been processed.
- */
-private[kafka010] case class KafkaCustomMetrics(
-    latestOffsets: Map[TopicPartition, Long],
-    processedOffsets: Map[TopicPartition, Long]) extends CustomMetrics {
-  override def json(): String = {
-    JsonUtils.partitionLags(latestOffsets, processedOffsets)
-  }
-
-  override def toString: String = json()
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/2119e518/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 78249f7..8e246db 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -30,8 +30,6 @@ import scala.util.Random
 
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
 import org.apache.kafka.common.TopicPartition
-import org.json4s.DefaultFormats
-import org.json4s.jackson.JsonMethods._
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
 import org.scalatest.time.SpanSugar._
 
@@ -958,41 +956,6 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
     intercept[IllegalArgumentException] { test(minPartitions = "-1", 1, true) }
   }
 
-  test("custom lag metrics") {
-    import testImplicits._
-    val topic = newTopic()
-    testUtils.createTopic(topic, partitions = 2)
-    testUtils.sendMessages(topic, (1 to 100).map(_.toString).toArray)
-    require(testUtils.getLatestOffsets(Set(topic)).size === 2)
-
-    val kafka = spark
-      .readStream
-      .format("kafka")
-      .option("subscribe", topic)
-      .option("startingOffsets", s"earliest")
-      .option("maxOffsetsPerTrigger", 10)
-      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-      .load()
-      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-      .as[(String, String)]
-
-    implicit val formats = DefaultFormats
-
-    val mapped = kafka.map(kv => kv._2.toInt + 1)
-    testStream(mapped)(
-      StartStream(trigger = OneTimeTrigger),
-      AssertOnQuery { query =>
-        query.awaitTermination()
-        val source = query.lastProgress.sources(0)
-        // masOffsetsPerTrigger is 10, and there are two partitions containing 50 events each
-        // so 5 events should be processed from each partition and a lag of 45 events
-        val custom = parse(source.customMetrics)
-          .extract[Map[String, Map[String, Map[String, Long]]]]
-        custom("lag")(topic)("0") == 45 && custom("lag")(topic)("1") == 45
-      }
-    )
-  }
-
 }
 
 abstract class KafkaSourceSuiteBase extends KafkaSourceTest {

http://git-wip-us.apache.org/repos/asf/spark/blob/2119e518/sql/core/src/main/java/org/apache/spark/sql/sources/v2/CustomMetrics.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/CustomMetrics.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/CustomMetrics.java
deleted file mode 100644
index 7011a70..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/CustomMetrics.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import org.apache.spark.annotation.InterfaceStability;
-
-/**
- * An interface for reporting custom metrics from streaming sources and sinks
- */
-@InterfaceStability.Evolving
-public interface CustomMetrics {
-  /**
-   * Returns a JSON serialized representation of custom metrics
-   *
-   * @return JSON serialized representation of custom metrics
-   */
-  String json();
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/2119e518/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SupportsCustomReaderMetrics.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SupportsCustomReaderMetrics.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SupportsCustomReaderMetrics.java
deleted file mode 100644
index 8693154..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SupportsCustomReaderMetrics.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader.streaming;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.CustomMetrics;
-
-/**
- * A mix in interface for {@link StreamingReadSupport}. Data sources can implement this interface
- * to report custom metrics that gets reported under the
- * {@link org.apache.spark.sql.streaming.SourceProgress}
- */
-@InterfaceStability.Evolving
-public interface SupportsCustomReaderMetrics extends StreamingReadSupport {
-
-  /**
-   * Returns custom metrics specific to this data source.
-   */
-  CustomMetrics getCustomMetrics();
-
-  /**
-   * Invoked if the custom metrics returned by {@link #getCustomMetrics()} is invalid
-   * (e.g. Invalid data that cannot be parsed). Throwing an error here would ensure that
-   * your custom metrics work right and correct values are reported always. The default action
-   * on invalid metrics is to ignore it.
-   *
-   * @param ex the exception
-   */
-  default void onInvalidMetrics(Exception ex) {
-    // default is to ignore invalid custom metrics
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/2119e518/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsCustomWriterMetrics.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsCustomWriterMetrics.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsCustomWriterMetrics.java
deleted file mode 100644
index 2b018c7..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsCustomWriterMetrics.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.writer.streaming;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.CustomMetrics;
-
-/**
- * A mix in interface for {@link StreamingWriteSupport}. Data sources can implement this interface
- * to report custom metrics that gets reported under the
- * {@link org.apache.spark.sql.streaming.SinkProgress}
- */
-@InterfaceStability.Evolving
-public interface SupportsCustomWriterMetrics extends StreamingWriteSupport {
-
-  /**
-   * Returns custom metrics specific to this data source.
-   */
-  CustomMetrics getCustomMetrics();
-
-  /**
-   * Invoked if the custom metrics returned by {@link #getCustomMetrics()} is invalid
-   * (e.g. Invalid data that cannot be parsed). Throwing an error here would ensure that
-   * your custom metrics work right and correct values are reported always. The default action
-   * on invalid metrics is to ignore it.
-   *
-   * @param ex the exception
-   */
-  default void onInvalidMetrics(Exception ex) {
-    // default is to ignore invalid custom metrics
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/2119e518/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 417b6b3..d4b5065 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -22,20 +22,14 @@ import java.util.{Date, UUID}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
-import scala.util.control.NonFatal
-
-import org.json4s.jackson.JsonMethods.parse
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.QueryExecution
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExec, WriteToDataSourceV2Exec}
-import org.apache.spark.sql.execution.streaming.sources.MicroBatchWritSupport
-import org.apache.spark.sql.sources.v2.CustomMetrics
-import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, SupportsCustomReaderMetrics}
-import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingWriteSupport, SupportsCustomWriterMetrics}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec
+import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport
 import org.apache.spark.sql.streaming._
 import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
 import org.apache.spark.util.Clock
@@ -162,31 +156,7 @@ trait ProgressReporter extends Logging {
     }
     logDebug(s"Execution stats: $executionStats")
 
-    // extracts and validates custom metrics from readers and writers
-    def extractMetrics(
-        getMetrics: () => Option[CustomMetrics],
-        onInvalidMetrics: (Exception) => Unit): Option[String] = {
-      try {
-        getMetrics().map(m => {
-          val json = m.json()
-          parse(json)
-          json
-        })
-      } catch {
-        case ex: Exception if NonFatal(ex) =>
-          onInvalidMetrics(ex)
-          None
-      }
-    }
-
     val sourceProgress = sources.distinct.map { source =>
-      val customReaderMetrics = source match {
-        case s: SupportsCustomReaderMetrics =>
-          extractMetrics(() => Option(s.getCustomMetrics), s.onInvalidMetrics)
-
-        case _ => None
-      }
-
       val numRecords = executionStats.inputRows.getOrElse(source, 0L)
       new SourceProgress(
         description = source.toString,
@@ -194,19 +164,11 @@ trait ProgressReporter extends Logging {
         endOffset = currentTriggerEndOffsets.get(source).orNull,
         numInputRows = numRecords,
         inputRowsPerSecond = numRecords / inputTimeSec,
-        processedRowsPerSecond = numRecords / processingTimeSec,
-        customReaderMetrics.orNull
+        processedRowsPerSecond = numRecords / processingTimeSec
       )
     }
 
-    val customWriterMetrics = extractWriteSupport() match {
-      case Some(s: SupportsCustomWriterMetrics) =>
-        extractMetrics(() => Option(s.getCustomMetrics), s.onInvalidMetrics)
-
-      case _ => None
-    }
-
-    val sinkProgress = new SinkProgress(sink.toString, customWriterMetrics.orNull)
+    val sinkProgress = new SinkProgress(sink.toString)
 
     val newProgress = new StreamingQueryProgress(
       id = id,
@@ -235,18 +197,6 @@ trait ProgressReporter extends Logging {
     currentStatus = currentStatus.copy(isTriggerActive = false)
   }
 
-  /** Extract writer from the executed query plan. */
-  private def extractWriteSupport(): Option[StreamingWriteSupport] = {
-    if (lastExecution == null) return None
-    lastExecution.executedPlan.collect {
-      case p if p.isInstanceOf[WriteToDataSourceV2Exec] =>
-        p.asInstanceOf[WriteToDataSourceV2Exec].writeSupport
-    }.headOption match {
-      case Some(w: MicroBatchWritSupport) => Some(w.writeSupport)
-      case _ => None
-    }
-  }
-
   /** Extract statistics about stateful operators from the executed query plan. */
   private def extractStateOperatorMetrics(hasNewData: Boolean): Seq[StateOperatorProgress] = {
     if (lastExecution == null) return Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/2119e518/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
index 2509450..c50dc7b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
@@ -23,9 +23,6 @@ import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 import scala.util.control.NonFatal
 
-import org.json4s.NoTypeHints
-import org.json4s.jackson.Serialization
-
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.InternalRow
@@ -35,9 +32,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
 import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, Complete, Update}
 import org.apache.spark.sql.execution.streaming.{MemorySinkBase, Sink}
-import org.apache.spark.sql.sources.v2.{CustomMetrics, DataSourceOptions, DataSourceV2, StreamingWriteSupportProvider}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, StreamingWriteSupportProvider}
 import org.apache.spark.sql.sources.v2.writer._
-import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport, SupportsCustomWriterMetrics}
+import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport}
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
 
@@ -119,26 +116,15 @@ class MemorySinkV2 extends DataSourceV2 with StreamingWriteSupportProvider
     batches.clear()
   }
 
-  def numRows: Int = synchronized {
-    batches.foldLeft(0)(_ + _.data.length)
-  }
-
   override def toString(): String = "MemorySinkV2"
 }
 
 case class MemoryWriterCommitMessage(partition: Int, data: Seq[Row])
   extends WriterCommitMessage {}
 
-class MemoryV2CustomMetrics(sink: MemorySinkV2) extends CustomMetrics {
-  private implicit val formats = Serialization.formats(NoTypeHints)
-  override def json(): String = Serialization.write(Map("numRows" -> sink.numRows))
-}
-
 class MemoryStreamingWriteSupport(
     val sink: MemorySinkV2, outputMode: OutputMode, schema: StructType)
-  extends StreamingWriteSupport with SupportsCustomWriterMetrics {
-
-  private val customMemoryV2Metrics = new MemoryV2CustomMetrics(sink)
+  extends StreamingWriteSupport {
 
   override def createStreamingWriterFactory: MemoryWriterFactory = {
     MemoryWriterFactory(outputMode, schema)
@@ -154,8 +140,6 @@ class MemoryStreamingWriteSupport(
   override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
     // Don't accept any of the new input.
   }
-
-  override def getCustomMetrics: CustomMetrics = customMemoryV2Metrics
 }
 
 case class MemoryWriterFactory(outputMode: OutputMode, schema: StructType)

http://git-wip-us.apache.org/repos/asf/spark/blob/2119e518/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index cf9375d..f2173aa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -172,27 +172,7 @@ class SourceProgress protected[sql](
   val endOffset: String,
   val numInputRows: Long,
   val inputRowsPerSecond: Double,
-  val processedRowsPerSecond: Double,
-  val customMetrics: String) extends Serializable {
-
-  /** SourceProgress without custom metrics. */
-  protected[sql] def this(
-      description: String,
-      startOffset: String,
-      endOffset: String,
-      numInputRows: Long,
-      inputRowsPerSecond: Double,
-      processedRowsPerSecond: Double) {
-
-    this(
-      description,
-      startOffset,
-      endOffset,
-      numInputRows,
-      inputRowsPerSecond,
-      processedRowsPerSecond,
-      null)
-  }
+  val processedRowsPerSecond: Double) extends Serializable {
 
   /** The compact JSON representation of this progress. */
   def json: String = compact(render(jsonValue))
@@ -207,18 +187,12 @@ class SourceProgress protected[sql](
       if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
     }
 
-    val jsonVal = ("description" -> JString(description)) ~
+    ("description" -> JString(description)) ~
       ("startOffset" -> tryParse(startOffset)) ~
       ("endOffset" -> tryParse(endOffset)) ~
       ("numInputRows" -> JInt(numInputRows)) ~
       ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
       ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond))
-
-    if (customMetrics != null) {
-      jsonVal ~ ("customMetrics" -> parse(customMetrics))
-    } else {
-      jsonVal
-    }
   }
 
   private def tryParse(json: String) = try {
@@ -237,13 +211,7 @@ class SourceProgress protected[sql](
  */
 @InterfaceStability.Evolving
 class SinkProgress protected[sql](
-    val description: String,
-    val customMetrics: String) extends Serializable {
-
-  /** SinkProgress without custom metrics. */
-  protected[sql] def this(description: String) {
-    this(description, null)
-  }
+    val description: String) extends Serializable {
 
   /** The compact JSON representation of this progress. */
   def json: String = compact(render(jsonValue))
@@ -254,12 +222,6 @@ class SinkProgress protected[sql](
   override def toString: String = prettyJson
 
   private[sql] def jsonValue: JValue = {
-    val jsonVal = ("description" -> JString(description))
-
-    if (customMetrics != null) {
-      jsonVal ~ ("customMetrics" -> parse(customMetrics))
-    } else {
-      jsonVal
-    }
+    ("description" -> JString(description))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2119e518/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
index 50f13be..6185736 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
@@ -63,25 +63,4 @@ class MemorySinkV2Suite extends StreamTest with BeforeAndAfter {
 
     assert(sink.allData.map(_.getInt(0)).sorted == Seq(1, 2, 3, 4, 6, 7, 11, 22, 33))
   }
-
-  test("writer metrics") {
-    val sink = new MemorySinkV2
-    val schema = new StructType().add("i", "int")
-    val writeSupport = new MemoryStreamingWriteSupport(
-      sink, OutputMode.Append(), schema)
-    // batch 0
-    writeSupport.commit(0,
-      Array(
-        MemoryWriterCommitMessage(0, Seq(Row(1), Row(2))),
-        MemoryWriterCommitMessage(1, Seq(Row(3), Row(4))),
-        MemoryWriterCommitMessage(2, Seq(Row(5), Row(6)))
-      ))
-    assert(writeSupport.getCustomMetrics.json() == "{\"numRows\":6}")
-    // batch 1
-    writeSupport.commit(1,
-      Array(
-        MemoryWriterCommitMessage(0, Seq(Row(7), Row(8)))
-      ))
-    assert(writeSupport.getCustomMetrics.json() == "{\"numRows\":8}")
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2119e518/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 7359252..1dd8175 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -22,8 +22,6 @@ import java.util.concurrent.CountDownLatch
 import scala.collection.mutable
 
 import org.apache.commons.lang3.RandomStringUtils
-import org.json4s.NoTypeHints
-import org.json4s.jackson.Serialization
 import org.scalactic.TolerantNumerics
 import org.scalatest.BeforeAndAfter
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
@@ -457,31 +455,6 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
     }
   }
 
-  test("Check if custom metrics are reported") {
-    val streamInput = MemoryStream[Int]
-    implicit val formats = Serialization.formats(NoTypeHints)
-    testStream(streamInput.toDF(), useV2Sink = true)(
-      AddData(streamInput, 1, 2, 3),
-      CheckAnswer(1, 2, 3),
-      AssertOnQuery { q =>
-        val lastProgress = getLastProgressWithData(q)
-        assert(lastProgress.nonEmpty)
-        assert(lastProgress.get.numInputRows == 3)
-        assert(lastProgress.get.sink.customMetrics == "{\"numRows\":3}")
-        true
-      },
-      AddData(streamInput, 4, 5, 6, 7),
-      CheckAnswer(1, 2, 3, 4, 5, 6, 7),
-      AssertOnQuery { q =>
-        val lastProgress = getLastProgressWithData(q)
-        assert(lastProgress.nonEmpty)
-        assert(lastProgress.get.numInputRows == 4)
-        assert(lastProgress.get.sink.customMetrics == "{\"numRows\":7}")
-        true
-      }
-    )
-  }
-
   test("input row calculation with same V1 source used twice in self-join") {
     val streamingTriggerDF = spark.createDataset(1 to 10).toDF
     val streamingInputDF = createSingleTriggerStreamingDF(streamingTriggerDF).toDF("value")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org