You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2019/03/04 06:20:52 UTC

[spark] branch master updated: [SPARK-26956][SS] remove streaming output mode from data source v2 APIs

This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 382d5a8  [SPARK-26956][SS] remove streaming output mode from data source v2 APIs
382d5a8 is described below

commit 382d5a82b0e8ef1dd01209e828eae67fe7993a56
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Sun Mar 3 22:20:31 2019 -0800

    [SPARK-26956][SS] remove streaming output mode from data source v2 APIs
    
    ## What changes were proposed in this pull request?
    
    Similar to `SaveMode`, we should remove streaming `OutputMode` from data source v2 API, and use operations that has clear semantic.
    
    The changes are:
    1. append mode: create `StreamingWrite` directly. By default, the `WriteBuilder` will create `Write` to append data.
    2. complete mode: call `SupportsTruncate#truncate`. Complete mode means truncating all the old data and appending new data of the current epoch. `SupportsTruncate` has exactly the same semantic.
    3. update mode: fail. The current streaming framework can't propagate the update keys, so v2 sinks are not able to implement update mode. In the future we can introduce a `SupportsUpdate` trait.
    
    The behavior changes:
    1. all the v2 sinks(foreach, console, memory, kafka, noop) don't support update mode. The fact is, previously all the v2 sinks implement the update mode wrong. None of them can really support it.
    2. kafka sink doesn't support complete mode. The fact is, the kafka sink can only append data.
    
    ## How was this patch tested?
    
    existing tests
    
    Closes #23859 from cloud-fan/update.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: gatorsmile <ga...@gmail.com>
---
 .../spark/sql/kafka010/KafkaSourceProvider.scala   |  6 +--
 .../v2/writer/streaming/SupportsOutputMode.java    | 29 ------------
 .../datasources/noop/NoopDataSource.scala          |  7 ++-
 .../execution/streaming/MicroBatchExecution.scala  | 10 +----
 .../sql/execution/streaming/StreamExecution.scala  | 34 +++++++++++++++
 .../spark/sql/execution/streaming/console.scala    | 10 ++---
 .../streaming/continuous/ContinuousExecution.scala | 10 +----
 .../streaming/sources/ForeachWriterTable.scala     | 11 ++---
 .../sql/execution/streaming/sources/memoryV2.scala | 51 +++++++++-------------
 .../execution/streaming/MemorySinkV2Suite.scala    |  4 +-
 10 files changed, 75 insertions(+), 97 deletions(-)

diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index a139573..4dc6955 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder}
 import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream}
 import org.apache.spark.sql.sources.v2.writer.WriteBuilder
-import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingWrite, SupportsOutputMode}
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
 
@@ -362,7 +362,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
     }
 
     override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
-      new WriteBuilder with SupportsOutputMode {
+      new WriteBuilder {
         private var inputSchema: StructType = _
 
         override def withInputDataSchema(schema: StructType): WriteBuilder = {
@@ -370,8 +370,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
           this
         }
 
-        override def outputMode(mode: OutputMode): WriteBuilder = this
-
         override def buildForStreaming(): StreamingWrite = {
           import scala.collection.JavaConverters._
 
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsOutputMode.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsOutputMode.java
deleted file mode 100644
index 832dcfa..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsOutputMode.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.writer.streaming;
-
-import org.apache.spark.annotation.Unstable;
-import org.apache.spark.sql.sources.v2.writer.WriteBuilder;
-import org.apache.spark.sql.streaming.OutputMode;
-
-// TODO: remove it when we have `SupportsTruncate`
-@Unstable
-public interface SupportsOutputMode extends WriteBuilder {
-
-  WriteBuilder outputMode(OutputMode mode);
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
index 8f2072c..22a74e3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
@@ -22,8 +22,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.writer._
-import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite, SupportsOutputMode}
-import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
 import org.apache.spark.sql.types.StructType
 
 /**
@@ -42,9 +41,9 @@ private[noop] object NoopTable extends Table with SupportsBatchWrite with Suppor
 }
 
 private[noop] object NoopWriteBuilder extends WriteBuilder
-  with SupportsSaveMode with SupportsOutputMode {
+  with SupportsSaveMode with SupportsTruncate {
   override def mode(mode: SaveMode): WriteBuilder = this
-  override def outputMode(mode: OutputMode): WriteBuilder = this
+  override def truncate(): WriteBuilder = this
   override def buildForBatch(): BatchWrite = NoopBatchWrite
   override def buildForStreaming(): StreamingWrite = NoopStreamingWrite
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index cca2790..de7cbe2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -31,7 +31,6 @@ import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWrite, RateCo
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset => OffsetV2}
-import org.apache.spark.sql.sources.v2.writer.streaming.SupportsOutputMode
 import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
 import org.apache.spark.util.Clock
 
@@ -515,14 +514,7 @@ class MicroBatchExecution(
     val triggerLogicalPlan = sink match {
       case _: Sink => newAttributePlan
       case s: SupportsStreamingWrite =>
-        // TODO: we should translate OutputMode to concrete write actions like truncate, but
-        // the truncate action is being developed in SPARK-26666.
-        val writeBuilder = s.newWriteBuilder(new DataSourceOptions(extraOptions.asJava))
-          .withQueryId(runId.toString)
-          .withInputDataSchema(newAttributePlan.schema)
-        val streamingWrite = writeBuilder.asInstanceOf[SupportsOutputMode]
-          .outputMode(outputMode)
-          .buildForStreaming()
+        val streamingWrite = createStreamingWrite(s, extraOptions, newAttributePlan)
         WriteToDataSourceV2(new MicroBatchWrite(currentBatchId, streamingWrite), newAttributePlan)
       case _ => throw new IllegalArgumentException(s"unknown sink type for $sink")
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 5c21dfe..bba640e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -24,6 +24,7 @@ import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
 import java.util.concurrent.atomic.AtomicReference
 import java.util.concurrent.locks.{Condition, ReentrantLock}
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable.{Map => MutableMap}
 import scala.util.control.NonFatal
 
@@ -34,10 +35,14 @@ import org.apache.spark.{SparkContext, SparkException}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.command.StreamingExplainCommand
 import org.apache.spark.sql.execution.datasources.v2.StreamWriterCommitProgress
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite}
+import org.apache.spark.sql.sources.v2.writer.SupportsTruncate
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
 import org.apache.spark.sql.streaming._
 import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
 
@@ -574,6 +579,35 @@ abstract class StreamExecution(
     Option(name).map(_ + "<br/>").getOrElse("") +
       s"id = $id<br/>runId = $runId<br/>batch = $batchDescription"
   }
+
+  protected def createStreamingWrite(
+      table: SupportsStreamingWrite,
+      options: Map[String, String],
+      inputPlan: LogicalPlan): StreamingWrite = {
+    val writeBuilder = table.newWriteBuilder(new DataSourceOptions(options.asJava))
+      .withQueryId(runId.toString)
+      .withInputDataSchema(inputPlan.schema)
+    outputMode match {
+      case Append =>
+        writeBuilder.buildForStreaming()
+
+      case Complete =>
+        // TODO: we should do this check earlier when we have capability API.
+        require(writeBuilder.isInstanceOf[SupportsTruncate],
+          table.name + " does not support Complete mode.")
+        writeBuilder.asInstanceOf[SupportsTruncate].truncate().buildForStreaming()
+
+      case Update =>
+        // Although no v2 sinks really support Update mode now, but during tests we do want them
+        // to pretend to support Update mode, and treat Update mode same as Append mode.
+        if (Utils.isTesting) {
+          writeBuilder.buildForStreaming()
+        } else {
+          throw new IllegalArgumentException(
+            "Data source v2 streaming sinks does not support Update mode.")
+        }
+    }
+  }
 }
 
 object StreamExecution {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
index 348bc76..923bd74 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
@@ -21,9 +21,8 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.execution.streaming.sources.ConsoleWrite
 import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister}
 import org.apache.spark.sql.sources.v2._
-import org.apache.spark.sql.sources.v2.writer.WriteBuilder
-import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingWrite, SupportsOutputMode}
-import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.sources.v2.writer.{SupportsTruncate, WriteBuilder}
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
 import org.apache.spark.sql.types.StructType
 
 case class ConsoleRelation(override val sqlContext: SQLContext, data: DataFrame)
@@ -64,7 +63,7 @@ object ConsoleTable extends Table with SupportsStreamingWrite {
   override def schema(): StructType = StructType(Nil)
 
   override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
-    new WriteBuilder with SupportsOutputMode {
+    new WriteBuilder with SupportsTruncate {
       private var inputSchema: StructType = _
 
       override def withInputDataSchema(schema: StructType): WriteBuilder = {
@@ -72,7 +71,8 @@ object ConsoleTable extends Table with SupportsStreamingWrite {
         this
       }
 
-      override def outputMode(mode: OutputMode): WriteBuilder = this
+      // Do nothing for truncate. Console sink is special that it just prints all the records.
+      override def truncate(): WriteBuilder = this
 
       override def buildForStreaming(): StreamingWrite = {
         assert(inputSchema != null)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index 3f88bcf..26b5642 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -35,7 +35,6 @@ import org.apache.spark.sql.execution.streaming.{StreamingRelationV2, _}
 import org.apache.spark.sql.sources.v2
 import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsContinuousRead, SupportsStreamingWrite}
 import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset}
-import org.apache.spark.sql.sources.v2.writer.streaming.SupportsOutputMode
 import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
 import org.apache.spark.util.Clock
 
@@ -179,14 +178,7 @@ class ContinuousExecution(
           "CurrentTimestamp and CurrentDate not yet supported for continuous processing")
     }
 
-    // TODO: we should translate OutputMode to concrete write actions like truncate, but
-    // the truncate action is being developed in SPARK-26666.
-    val writeBuilder = sink.newWriteBuilder(new DataSourceOptions(extraOptions.asJava))
-      .withQueryId(runId.toString)
-      .withInputDataSchema(withNewSources.schema)
-    val streamingWrite = writeBuilder.asInstanceOf[SupportsOutputMode]
-      .outputMode(outputMode)
-      .buildForStreaming()
+    val streamingWrite = createStreamingWrite(sink, extraOptions, withNewSources)
     val planWithSink = WriteToContinuousDataSource(streamingWrite, withNewSources)
 
     reportTimeTaken("queryPlanning") {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
index 6fbb59c..c0ae44a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
@@ -23,9 +23,8 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.execution.python.PythonForeachWriter
 import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite, Table}
-import org.apache.spark.sql.sources.v2.writer.{DataWriter, WriteBuilder, WriterCommitMessage}
-import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite, SupportsOutputMode}
-import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.sources.v2.writer.{DataWriter, SupportsTruncate, WriteBuilder, WriterCommitMessage}
+import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
 import org.apache.spark.sql.types.StructType
 
 /**
@@ -46,7 +45,7 @@ case class ForeachWriterTable[T](
   override def schema(): StructType = StructType(Nil)
 
   override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
-    new WriteBuilder with SupportsOutputMode {
+    new WriteBuilder with SupportsTruncate {
       private var inputSchema: StructType = _
 
       override def withInputDataSchema(schema: StructType): WriteBuilder = {
@@ -54,7 +53,9 @@ case class ForeachWriterTable[T](
         this
       }
 
-      override def outputMode(mode: OutputMode): WriteBuilder = this
+      // Do nothing for truncate. Foreach sink is special that it just forwards all the records to
+      // ForeachWriter.
+      override def truncate(): WriteBuilder = this
 
       override def buildForStreaming(): StreamingWrite = {
         new StreamingWrite {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
index 3fc2cbe..397c5ff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
@@ -30,12 +30,10 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
 import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
-import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, Complete, Update}
 import org.apache.spark.sql.execution.streaming.{MemorySinkBase, Sink}
 import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite}
 import org.apache.spark.sql.sources.v2.writer._
-import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite, SupportsOutputMode}
-import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
 import org.apache.spark.sql.types.StructType
 
 /**
@@ -49,12 +47,12 @@ class MemorySinkV2 extends SupportsStreamingWrite with MemorySinkBase with Loggi
   override def schema(): StructType = StructType(Nil)
 
   override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
-    new WriteBuilder with SupportsOutputMode {
-      private var mode: OutputMode = _
+    new WriteBuilder with SupportsTruncate {
+      private var needTruncate: Boolean = false
       private var inputSchema: StructType = _
 
-      override def outputMode(mode: OutputMode): WriteBuilder = {
-        this.mode = mode
+      override def truncate(): WriteBuilder = {
+        this.needTruncate = true
         this
       }
 
@@ -64,7 +62,7 @@ class MemorySinkV2 extends SupportsStreamingWrite with MemorySinkBase with Loggi
       }
 
       override def buildForStreaming(): StreamingWrite = {
-        new MemoryStreamingWrite(MemorySinkV2.this, mode, inputSchema)
+        new MemoryStreamingWrite(MemorySinkV2.this, inputSchema, needTruncate)
       }
     }
   }
@@ -101,27 +99,20 @@ class MemorySinkV2 extends SupportsStreamingWrite with MemorySinkBase with Loggi
     }.mkString("\n")
   }
 
-  def write(batchId: Long, outputMode: OutputMode, newRows: Array[Row]): Unit = {
+  def write(batchId: Long, needTruncate: Boolean, newRows: Array[Row]): Unit = {
     val notCommitted = synchronized {
       latestBatchId.isEmpty || batchId > latestBatchId.get
     }
     if (notCommitted) {
       logDebug(s"Committing batch $batchId to $this")
-      outputMode match {
-        case Append | Update =>
-          val rows = AddedData(batchId, newRows)
-          synchronized { batches += rows }
-
-        case Complete =>
-          val rows = AddedData(batchId, newRows)
-          synchronized {
-            batches.clear()
-            batches += rows
-          }
-
-        case _ =>
-          throw new IllegalArgumentException(
-            s"Output mode $outputMode is not supported by MemorySinkV2")
+      val rows = AddedData(batchId, newRows)
+      if (needTruncate) {
+        synchronized {
+          batches.clear()
+          batches += rows
+        }
+      } else {
+        synchronized { batches += rows }
       }
     } else {
       logDebug(s"Skipping already committed batch: $batchId")
@@ -139,18 +130,18 @@ case class MemoryWriterCommitMessage(partition: Int, data: Seq[Row])
   extends WriterCommitMessage {}
 
 class MemoryStreamingWrite(
-    val sink: MemorySinkV2, outputMode: OutputMode, schema: StructType)
+    val sink: MemorySinkV2, schema: StructType, needTruncate: Boolean)
   extends StreamingWrite {
 
   override def createStreamingWriterFactory: MemoryWriterFactory = {
-    MemoryWriterFactory(outputMode, schema)
+    MemoryWriterFactory(schema)
   }
 
   override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
     val newRows = messages.flatMap {
       case message: MemoryWriterCommitMessage => message.data
     }
-    sink.write(epochId, outputMode, newRows)
+    sink.write(epochId, needTruncate, newRows)
   }
 
   override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
@@ -158,13 +149,13 @@ class MemoryStreamingWrite(
   }
 }
 
-case class MemoryWriterFactory(outputMode: OutputMode, schema: StructType)
+case class MemoryWriterFactory(schema: StructType)
   extends DataWriterFactory with StreamingDataWriterFactory {
 
   override def createWriter(
       partitionId: Int,
       taskId: Long): DataWriter[InternalRow] = {
-    new MemoryDataWriter(partitionId, outputMode, schema)
+    new MemoryDataWriter(partitionId, schema)
   }
 
   override def createWriter(
@@ -175,7 +166,7 @@ case class MemoryWriterFactory(outputMode: OutputMode, schema: StructType)
   }
 }
 
-class MemoryDataWriter(partition: Int, outputMode: OutputMode, schema: StructType)
+class MemoryDataWriter(partition: Int, schema: StructType)
   extends DataWriter[InternalRow] with Logging {
 
   private val data = mutable.Buffer[Row]()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
index e804377..a90acf8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
@@ -29,7 +29,7 @@ class MemorySinkV2Suite extends StreamTest with BeforeAndAfter {
   test("data writer") {
     val partition = 1234
     val writer = new MemoryDataWriter(
-      partition, OutputMode.Append(), new StructType().add("i", "int"))
+      partition, new StructType().add("i", "int"))
     writer.write(InternalRow(1))
     writer.write(InternalRow(2))
     writer.write(InternalRow(44))
@@ -44,7 +44,7 @@ class MemorySinkV2Suite extends StreamTest with BeforeAndAfter {
   test("streaming writer") {
     val sink = new MemorySinkV2
     val write = new MemoryStreamingWrite(
-      sink, OutputMode.Append(), new StructType().add("i", "int"))
+      sink, new StructType().add("i", "int"), needTruncate = false)
     write.commit(0,
       Array(
         MemoryWriterCommitMessage(0, Seq(Row(1), Row(2))),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org