You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/09/05 05:39:43 UTC
spark git commit: [SPARK-25336][SS]Revert SPARK-24863 and SPARK-24748
Repository: spark
Updated Branches:
refs/heads/master ca861fea2 -> 2119e518d
[SPARK-25336][SS]Revert SPARK-24863 and SPARK-24748
## What changes were proposed in this pull request?
Revert SPARK-24863 (#21819) and SPARK-24748 (#21721) as per discussion in #21721. We will revisit them when the data source v2 APIs are out.
## How was this patch tested?
Jenkins
Closes #22334 from zsxwing/revert-SPARK-24863-SPARK-24748.
Authored-by: Shixiong Zhu <zs...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2119e518
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2119e518
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2119e518
Branch: refs/heads/master
Commit: 2119e518d31331e65415e0f817a6f28ff18d2b42
Parents: ca861fe
Author: Shixiong Zhu <zs...@gmail.com>
Authored: Wed Sep 5 13:39:34 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Sep 5 13:39:34 2018 +0800
----------------------------------------------------------------------
.../apache/spark/sql/kafka010/JsonUtils.scala | 33 +++--------
.../kafka010/KafkaMicroBatchReadSupport.scala | 30 +---------
.../kafka010/KafkaMicroBatchSourceSuite.scala | 37 -------------
.../spark/sql/sources/v2/CustomMetrics.java | 33 -----------
.../streaming/SupportsCustomReaderMetrics.java | 47 ----------------
.../streaming/SupportsCustomWriterMetrics.java | 47 ----------------
.../execution/streaming/ProgressReporter.scala | 58 ++------------------
.../execution/streaming/sources/memoryV2.scala | 22 +-------
.../apache/spark/sql/streaming/progress.scala | 46 ++--------------
.../execution/streaming/MemorySinkV2Suite.scala | 21 -------
.../sql/streaming/StreamingQuerySuite.scala | 27 ---------
11 files changed, 22 insertions(+), 379 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/2119e518/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
index 92b13f2..868edb5 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
@@ -29,11 +29,6 @@ import org.json4s.jackson.Serialization
*/
private object JsonUtils {
private implicit val formats = Serialization.formats(NoTypeHints)
- implicit val ordering = new Ordering[TopicPartition] {
- override def compare(x: TopicPartition, y: TopicPartition): Int = {
- Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition))
- }
- }
/**
* Read TopicPartitions from json string
@@ -56,7 +51,7 @@ private object JsonUtils {
* Write TopicPartitions as json string
*/
def partitions(partitions: Iterable[TopicPartition]): String = {
- val result = HashMap.empty[String, List[Int]]
+ val result = new HashMap[String, List[Int]]
partitions.foreach { tp =>
val parts: List[Int] = result.getOrElse(tp.topic, Nil)
result += tp.topic -> (tp.partition::parts)
@@ -85,31 +80,19 @@ private object JsonUtils {
* Write per-TopicPartition offsets as json string
*/
def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String = {
- val result = HashMap.empty[String, HashMap[Int, Long]]
+ val result = new HashMap[String, HashMap[Int, Long]]()
+ implicit val ordering = new Ordering[TopicPartition] {
+ override def compare(x: TopicPartition, y: TopicPartition): Int = {
+ Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition))
+ }
+ }
val partitions = partitionOffsets.keySet.toSeq.sorted // sort for more determinism
partitions.foreach { tp =>
val off = partitionOffsets(tp)
- val parts = result.getOrElse(tp.topic, HashMap.empty[Int, Long])
+ val parts = result.getOrElse(tp.topic, new HashMap[Int, Long])
parts += tp.partition -> off
result += tp.topic -> parts
}
Serialization.write(result)
}
-
- /**
- * Write per-topic partition lag as json string
- */
- def partitionLags(
- latestOffsets: Map[TopicPartition, Long],
- processedOffsets: Map[TopicPartition, Long]): String = {
- val result = HashMap.empty[String, HashMap[Int, Long]]
- val partitions = latestOffsets.keySet.toSeq.sorted
- partitions.foreach { tp =>
- val lag = latestOffsets(tp) - processedOffsets.getOrElse(tp, 0L)
- val parts = result.getOrElse(tp.topic, HashMap.empty[Int, Long])
- parts += tp.partition -> lag
- result += tp.topic -> parts
- }
- Serialization.write(Map("lag" -> result))
- }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/2119e518/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
index 70f37e3..bb4de67 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
@@ -22,7 +22,6 @@ import java.io._
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
-import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
@@ -33,9 +32,9 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset, SimpleStreamingScanConfig, SimpleStreamingScanConfigBuilder}
import org.apache.spark.sql.execution.streaming.sources.RateControlMicroBatchReadSupport
import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
-import org.apache.spark.sql.sources.v2.{CustomMetrics, DataSourceOptions}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader._
-import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset, SupportsCustomReaderMetrics}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.UninterruptibleThread
@@ -61,8 +60,7 @@ private[kafka010] class KafkaMicroBatchReadSupport(
options: DataSourceOptions,
metadataPath: String,
startingOffsets: KafkaOffsetRangeLimit,
- failOnDataLoss: Boolean)
- extends RateControlMicroBatchReadSupport with SupportsCustomReaderMetrics with Logging {
+ failOnDataLoss: Boolean) extends RateControlMicroBatchReadSupport with Logging {
private val pollTimeoutMs = options.getLong(
"kafkaConsumer.pollTimeoutMs",
@@ -156,13 +154,6 @@ private[kafka010] class KafkaMicroBatchReadSupport(
KafkaMicroBatchReaderFactory
}
- // TODO: figure out the life cycle of custom metrics, and make this method take `ScanConfig` as
- // a parameter.
- override def getCustomMetrics(): CustomMetrics = {
- KafkaCustomMetrics(
- kafkaOffsetReader.fetchLatestOffsets(), endPartitionOffsets.partitionToOffsets)
- }
-
override def deserializeOffset(json: String): Offset = {
KafkaSourceOffset(JsonUtils.partitionOffsets(json))
}
@@ -384,18 +375,3 @@ private[kafka010] case class KafkaMicroBatchPartitionReader(
}
}
}
-
-/**
- * Currently reports per topic-partition lag.
- * This is the difference between the offset of the latest available data
- * in a topic-partition and the latest offset that has been processed.
- */
-private[kafka010] case class KafkaCustomMetrics(
- latestOffsets: Map[TopicPartition, Long],
- processedOffsets: Map[TopicPartition, Long]) extends CustomMetrics {
- override def json(): String = {
- JsonUtils.partitionLags(latestOffsets, processedOffsets)
- }
-
- override def toString: String = json()
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/2119e518/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 78249f7..8e246db 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -30,8 +30,6 @@ import scala.util.Random
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.TopicPartition
-import org.json4s.DefaultFormats
-import org.json4s.jackson.JsonMethods._
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._
@@ -958,41 +956,6 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
intercept[IllegalArgumentException] { test(minPartitions = "-1", 1, true) }
}
- test("custom lag metrics") {
- import testImplicits._
- val topic = newTopic()
- testUtils.createTopic(topic, partitions = 2)
- testUtils.sendMessages(topic, (1 to 100).map(_.toString).toArray)
- require(testUtils.getLatestOffsets(Set(topic)).size === 2)
-
- val kafka = spark
- .readStream
- .format("kafka")
- .option("subscribe", topic)
- .option("startingOffsets", s"earliest")
- .option("maxOffsetsPerTrigger", 10)
- .option("kafka.bootstrap.servers", testUtils.brokerAddress)
- .load()
- .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
- .as[(String, String)]
-
- implicit val formats = DefaultFormats
-
- val mapped = kafka.map(kv => kv._2.toInt + 1)
- testStream(mapped)(
- StartStream(trigger = OneTimeTrigger),
- AssertOnQuery { query =>
- query.awaitTermination()
- val source = query.lastProgress.sources(0)
- // masOffsetsPerTrigger is 10, and there are two partitions containing 50 events each
- // so 5 events should be processed from each partition and a lag of 45 events
- val custom = parse(source.customMetrics)
- .extract[Map[String, Map[String, Map[String, Long]]]]
- custom("lag")(topic)("0") == 45 && custom("lag")(topic)("1") == 45
- }
- )
- }
-
}
abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
http://git-wip-us.apache.org/repos/asf/spark/blob/2119e518/sql/core/src/main/java/org/apache/spark/sql/sources/v2/CustomMetrics.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/CustomMetrics.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/CustomMetrics.java
deleted file mode 100644
index 7011a70..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/CustomMetrics.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import org.apache.spark.annotation.InterfaceStability;
-
-/**
- * An interface for reporting custom metrics from streaming sources and sinks
- */
-@InterfaceStability.Evolving
-public interface CustomMetrics {
- /**
- * Returns a JSON serialized representation of custom metrics
- *
- * @return JSON serialized representation of custom metrics
- */
- String json();
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/2119e518/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SupportsCustomReaderMetrics.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SupportsCustomReaderMetrics.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SupportsCustomReaderMetrics.java
deleted file mode 100644
index 8693154..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SupportsCustomReaderMetrics.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader.streaming;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.CustomMetrics;
-
-/**
- * A mix in interface for {@link StreamingReadSupport}. Data sources can implement this interface
- * to report custom metrics that gets reported under the
- * {@link org.apache.spark.sql.streaming.SourceProgress}
- */
-@InterfaceStability.Evolving
-public interface SupportsCustomReaderMetrics extends StreamingReadSupport {
-
- /**
- * Returns custom metrics specific to this data source.
- */
- CustomMetrics getCustomMetrics();
-
- /**
- * Invoked if the custom metrics returned by {@link #getCustomMetrics()} is invalid
- * (e.g. Invalid data that cannot be parsed). Throwing an error here would ensure that
- * your custom metrics work right and correct values are reported always. The default action
- * on invalid metrics is to ignore it.
- *
- * @param ex the exception
- */
- default void onInvalidMetrics(Exception ex) {
- // default is to ignore invalid custom metrics
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/2119e518/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsCustomWriterMetrics.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsCustomWriterMetrics.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsCustomWriterMetrics.java
deleted file mode 100644
index 2b018c7..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsCustomWriterMetrics.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.writer.streaming;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.CustomMetrics;
-
-/**
- * A mix in interface for {@link StreamingWriteSupport}. Data sources can implement this interface
- * to report custom metrics that gets reported under the
- * {@link org.apache.spark.sql.streaming.SinkProgress}
- */
-@InterfaceStability.Evolving
-public interface SupportsCustomWriterMetrics extends StreamingWriteSupport {
-
- /**
- * Returns custom metrics specific to this data source.
- */
- CustomMetrics getCustomMetrics();
-
- /**
- * Invoked if the custom metrics returned by {@link #getCustomMetrics()} is invalid
- * (e.g. Invalid data that cannot be parsed). Throwing an error here would ensure that
- * your custom metrics work right and correct values are reported always. The default action
- * on invalid metrics is to ignore it.
- *
- * @param ex the exception
- */
- default void onInvalidMetrics(Exception ex) {
- // default is to ignore invalid custom metrics
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/2119e518/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 417b6b3..d4b5065 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -22,20 +22,14 @@ import java.util.{Date, UUID}
import scala.collection.JavaConverters._
import scala.collection.mutable
-import scala.util.control.NonFatal
-
-import org.json4s.jackson.JsonMethods.parse
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.QueryExecution
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExec, WriteToDataSourceV2Exec}
-import org.apache.spark.sql.execution.streaming.sources.MicroBatchWritSupport
-import org.apache.spark.sql.sources.v2.CustomMetrics
-import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, SupportsCustomReaderMetrics}
-import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingWriteSupport, SupportsCustomWriterMetrics}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec
+import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
import org.apache.spark.util.Clock
@@ -162,31 +156,7 @@ trait ProgressReporter extends Logging {
}
logDebug(s"Execution stats: $executionStats")
- // extracts and validates custom metrics from readers and writers
- def extractMetrics(
- getMetrics: () => Option[CustomMetrics],
- onInvalidMetrics: (Exception) => Unit): Option[String] = {
- try {
- getMetrics().map(m => {
- val json = m.json()
- parse(json)
- json
- })
- } catch {
- case ex: Exception if NonFatal(ex) =>
- onInvalidMetrics(ex)
- None
- }
- }
-
val sourceProgress = sources.distinct.map { source =>
- val customReaderMetrics = source match {
- case s: SupportsCustomReaderMetrics =>
- extractMetrics(() => Option(s.getCustomMetrics), s.onInvalidMetrics)
-
- case _ => None
- }
-
val numRecords = executionStats.inputRows.getOrElse(source, 0L)
new SourceProgress(
description = source.toString,
@@ -194,19 +164,11 @@ trait ProgressReporter extends Logging {
endOffset = currentTriggerEndOffsets.get(source).orNull,
numInputRows = numRecords,
inputRowsPerSecond = numRecords / inputTimeSec,
- processedRowsPerSecond = numRecords / processingTimeSec,
- customReaderMetrics.orNull
+ processedRowsPerSecond = numRecords / processingTimeSec
)
}
- val customWriterMetrics = extractWriteSupport() match {
- case Some(s: SupportsCustomWriterMetrics) =>
- extractMetrics(() => Option(s.getCustomMetrics), s.onInvalidMetrics)
-
- case _ => None
- }
-
- val sinkProgress = new SinkProgress(sink.toString, customWriterMetrics.orNull)
+ val sinkProgress = new SinkProgress(sink.toString)
val newProgress = new StreamingQueryProgress(
id = id,
@@ -235,18 +197,6 @@ trait ProgressReporter extends Logging {
currentStatus = currentStatus.copy(isTriggerActive = false)
}
- /** Extract writer from the executed query plan. */
- private def extractWriteSupport(): Option[StreamingWriteSupport] = {
- if (lastExecution == null) return None
- lastExecution.executedPlan.collect {
- case p if p.isInstanceOf[WriteToDataSourceV2Exec] =>
- p.asInstanceOf[WriteToDataSourceV2Exec].writeSupport
- }.headOption match {
- case Some(w: MicroBatchWritSupport) => Some(w.writeSupport)
- case _ => None
- }
- }
-
/** Extract statistics about stateful operators from the executed query plan. */
private def extractStateOperatorMetrics(hasNewData: Boolean): Seq[StateOperatorProgress] = {
if (lastExecution == null) return Nil
http://git-wip-us.apache.org/repos/asf/spark/blob/2119e518/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
index 2509450..c50dc7b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
@@ -23,9 +23,6 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal
-import org.json4s.NoTypeHints
-import org.json4s.jackson.Serialization
-
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
@@ -35,9 +32,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, Complete, Update}
import org.apache.spark.sql.execution.streaming.{MemorySinkBase, Sink}
-import org.apache.spark.sql.sources.v2.{CustomMetrics, DataSourceOptions, DataSourceV2, StreamingWriteSupportProvider}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, StreamingWriteSupportProvider}
import org.apache.spark.sql.sources.v2.writer._
-import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport, SupportsCustomWriterMetrics}
+import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
@@ -119,26 +116,15 @@ class MemorySinkV2 extends DataSourceV2 with StreamingWriteSupportProvider
batches.clear()
}
- def numRows: Int = synchronized {
- batches.foldLeft(0)(_ + _.data.length)
- }
-
override def toString(): String = "MemorySinkV2"
}
case class MemoryWriterCommitMessage(partition: Int, data: Seq[Row])
extends WriterCommitMessage {}
-class MemoryV2CustomMetrics(sink: MemorySinkV2) extends CustomMetrics {
- private implicit val formats = Serialization.formats(NoTypeHints)
- override def json(): String = Serialization.write(Map("numRows" -> sink.numRows))
-}
-
class MemoryStreamingWriteSupport(
val sink: MemorySinkV2, outputMode: OutputMode, schema: StructType)
- extends StreamingWriteSupport with SupportsCustomWriterMetrics {
-
- private val customMemoryV2Metrics = new MemoryV2CustomMetrics(sink)
+ extends StreamingWriteSupport {
override def createStreamingWriterFactory: MemoryWriterFactory = {
MemoryWriterFactory(outputMode, schema)
@@ -154,8 +140,6 @@ class MemoryStreamingWriteSupport(
override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
// Don't accept any of the new input.
}
-
- override def getCustomMetrics: CustomMetrics = customMemoryV2Metrics
}
case class MemoryWriterFactory(outputMode: OutputMode, schema: StructType)
http://git-wip-us.apache.org/repos/asf/spark/blob/2119e518/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index cf9375d..f2173aa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -172,27 +172,7 @@ class SourceProgress protected[sql](
val endOffset: String,
val numInputRows: Long,
val inputRowsPerSecond: Double,
- val processedRowsPerSecond: Double,
- val customMetrics: String) extends Serializable {
-
- /** SourceProgress without custom metrics. */
- protected[sql] def this(
- description: String,
- startOffset: String,
- endOffset: String,
- numInputRows: Long,
- inputRowsPerSecond: Double,
- processedRowsPerSecond: Double) {
-
- this(
- description,
- startOffset,
- endOffset,
- numInputRows,
- inputRowsPerSecond,
- processedRowsPerSecond,
- null)
- }
+ val processedRowsPerSecond: Double) extends Serializable {
/** The compact JSON representation of this progress. */
def json: String = compact(render(jsonValue))
@@ -207,18 +187,12 @@ class SourceProgress protected[sql](
if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
}
- val jsonVal = ("description" -> JString(description)) ~
+ ("description" -> JString(description)) ~
("startOffset" -> tryParse(startOffset)) ~
("endOffset" -> tryParse(endOffset)) ~
("numInputRows" -> JInt(numInputRows)) ~
("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond))
-
- if (customMetrics != null) {
- jsonVal ~ ("customMetrics" -> parse(customMetrics))
- } else {
- jsonVal
- }
}
private def tryParse(json: String) = try {
@@ -237,13 +211,7 @@ class SourceProgress protected[sql](
*/
@InterfaceStability.Evolving
class SinkProgress protected[sql](
- val description: String,
- val customMetrics: String) extends Serializable {
-
- /** SinkProgress without custom metrics. */
- protected[sql] def this(description: String) {
- this(description, null)
- }
+ val description: String) extends Serializable {
/** The compact JSON representation of this progress. */
def json: String = compact(render(jsonValue))
@@ -254,12 +222,6 @@ class SinkProgress protected[sql](
override def toString: String = prettyJson
private[sql] def jsonValue: JValue = {
- val jsonVal = ("description" -> JString(description))
-
- if (customMetrics != null) {
- jsonVal ~ ("customMetrics" -> parse(customMetrics))
- } else {
- jsonVal
- }
+ ("description" -> JString(description))
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/2119e518/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
index 50f13be..6185736 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
@@ -63,25 +63,4 @@ class MemorySinkV2Suite extends StreamTest with BeforeAndAfter {
assert(sink.allData.map(_.getInt(0)).sorted == Seq(1, 2, 3, 4, 6, 7, 11, 22, 33))
}
-
- test("writer metrics") {
- val sink = new MemorySinkV2
- val schema = new StructType().add("i", "int")
- val writeSupport = new MemoryStreamingWriteSupport(
- sink, OutputMode.Append(), schema)
- // batch 0
- writeSupport.commit(0,
- Array(
- MemoryWriterCommitMessage(0, Seq(Row(1), Row(2))),
- MemoryWriterCommitMessage(1, Seq(Row(3), Row(4))),
- MemoryWriterCommitMessage(2, Seq(Row(5), Row(6)))
- ))
- assert(writeSupport.getCustomMetrics.json() == "{\"numRows\":6}")
- // batch 1
- writeSupport.commit(1,
- Array(
- MemoryWriterCommitMessage(0, Seq(Row(7), Row(8)))
- ))
- assert(writeSupport.getCustomMetrics.json() == "{\"numRows\":8}")
- }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/2119e518/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 7359252..1dd8175 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -22,8 +22,6 @@ import java.util.concurrent.CountDownLatch
import scala.collection.mutable
import org.apache.commons.lang3.RandomStringUtils
-import org.json4s.NoTypeHints
-import org.json4s.jackson.Serialization
import org.scalactic.TolerantNumerics
import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.PatienceConfiguration.Timeout
@@ -457,31 +455,6 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
}
}
- test("Check if custom metrics are reported") {
- val streamInput = MemoryStream[Int]
- implicit val formats = Serialization.formats(NoTypeHints)
- testStream(streamInput.toDF(), useV2Sink = true)(
- AddData(streamInput, 1, 2, 3),
- CheckAnswer(1, 2, 3),
- AssertOnQuery { q =>
- val lastProgress = getLastProgressWithData(q)
- assert(lastProgress.nonEmpty)
- assert(lastProgress.get.numInputRows == 3)
- assert(lastProgress.get.sink.customMetrics == "{\"numRows\":3}")
- true
- },
- AddData(streamInput, 4, 5, 6, 7),
- CheckAnswer(1, 2, 3, 4, 5, 6, 7),
- AssertOnQuery { q =>
- val lastProgress = getLastProgressWithData(q)
- assert(lastProgress.nonEmpty)
- assert(lastProgress.get.numInputRows == 4)
- assert(lastProgress.get.sink.customMetrics == "{\"numRows\":7}")
- true
- }
- )
- }
-
test("input row calculation with same V1 source used twice in self-join") {
val streamingTriggerDF = spark.createDataset(1 to 10).toDF
val streamingInputDF = createSingleTriggerStreamingDF(streamingTriggerDF).toDF("value")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org