You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by br...@apache.org on 2020/01/31 06:02:12 UTC

[spark] branch master updated: [SPARK-30669][SS] Introduce AdmissionControl APIs for StructuredStreaming

This is an automated email from the ASF dual-hosted git repository.

brkyvz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 940510c  [SPARK-30669][SS] Introduce AdmissionControl APIs for StructuredStreaming
940510c is described below

commit 940510cb1e43a4166d2fe7d7eb4ace8561d24c9b
Author: Burak Yavuz <br...@gmail.com>
AuthorDate: Thu Jan 30 22:01:53 2020 -0800

    [SPARK-30669][SS] Introduce AdmissionControl APIs for StructuredStreaming
    
    ### What changes were proposed in this pull request?
    
    We propose to add a new interface `SupportsAdmissionControl` and `ReadLimit`. A ReadLimit defines how much data should be read in the next micro-batch. `SupportsAdmissionControl` specifies that a source can rate limit its ingest into the system. The source can tell the system what the user specified as a read limit, and the system can enforce this limit within each micro-batch or impose its own limit if the Trigger is Trigger.Once() for example.
    
    We then use this interface in FileStreamSource, KafkaSource, and KafkaMicroBatchStream.
    
    ### Why are the changes needed?
    
    Sources currently have no information around execution semantics such as whether the stream is being executed in Trigger.Once() mode. This interface will pass this information into the sources as part of planning. With a trigger like Trigger.Once(), the semantics are to process all the data available to the datasource in a single micro-batch. However, this semantic can be broken when data source options such as `maxOffsetsPerTrigger` (in the Kafka source) rate limit the amount of data [...]
    
    ### Does this PR introduce any user-facing change?
    
    DataSource developers can extend this interface for their streaming sources to add admission control into their system and correctly support Trigger.Once().
    
    ### How was this patch tested?
    
    Existing tests, as this API is mostly internal
    
    Closes #27380 from brkyvz/rateLimit.
    
    Lead-authored-by: Burak Yavuz <br...@gmail.com>
    Co-authored-by: Burak Yavuz <bu...@databricks.com>
    Signed-off-by: Burak Yavuz <br...@gmail.com>
---
 .../spark/sql/kafka010/KafkaMicroBatchStream.scala | 25 ++++++----
 .../apache/spark/sql/kafka010/KafkaSource.scala    | 29 +++++++----
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  | 22 +++++++++
 .../read/streaming/ReadAllAvailable.java}          | 28 +++++++----
 .../sql/connector/read/streaming/ReadLimit.java}   | 25 ++++++----
 .../sql/connector/read/streaming/ReadMaxFiles.java | 55 +++++++++++++++++++++
 .../sql/connector/read/streaming/ReadMaxRows.java  | 55 +++++++++++++++++++++
 .../read/streaming/SupportsAdmissionControl.java   | 56 ++++++++++++++++++++++
 .../sql/execution/streaming/FileStreamSource.scala | 25 ++++++++--
 .../execution/streaming/MicroBatchExecution.scala  | 49 +++++++++++++------
 .../sql/execution/streaming/StreamExecution.scala  |  6 +--
 .../streaming/continuous/ContinuousExecution.scala |  4 +-
 .../sql/streaming/FileStreamSourceSuite.scala      | 56 ++++++++++++++++++++++
 13 files changed, 376 insertions(+), 59 deletions(-)

diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
index 844c963..6599e7e 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
@@ -27,8 +27,7 @@ import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT
 import org.apache.spark.scheduler.ExecutorCacheTaskLocation
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory}
-import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset}
-import org.apache.spark.sql.execution.streaming.sources.RateControlMicroBatchStream
+import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset, ReadAllAvailable, ReadLimit, ReadMaxRows, SupportsAdmissionControl}
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.UninterruptibleThread
@@ -55,7 +54,7 @@ private[kafka010] class KafkaMicroBatchStream(
     options: CaseInsensitiveStringMap,
     metadataPath: String,
     startingOffsets: KafkaOffsetRangeLimit,
-    failOnDataLoss: Boolean) extends RateControlMicroBatchStream with Logging {
+    failOnDataLoss: Boolean) extends SupportsAdmissionControl with MicroBatchStream with Logging {
 
   private[kafka010] val pollTimeoutMs = options.getLong(
     KafkaSourceProvider.CONSUMER_POLL_TIMEOUT,
@@ -77,13 +76,23 @@ private[kafka010] class KafkaMicroBatchStream(
     KafkaSourceOffset(getOrCreateInitialPartitionOffsets())
   }
 
-  override def latestOffset(start: Offset): Offset = {
+  override def getDefaultReadLimit: ReadLimit = {
+    maxOffsetsPerTrigger.map(ReadLimit.maxRows).getOrElse(super.getDefaultReadLimit)
+  }
+
+  override def latestOffset(): Offset = {
+    throw new UnsupportedOperationException(
+      "latestOffset(Offset, ReadLimit) should be called instead of this method")
+  }
+
+  override def latestOffset(start: Offset, readLimit: ReadLimit): Offset = {
     val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
     val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
-    endPartitionOffsets = KafkaSourceOffset(maxOffsetsPerTrigger.map { maxOffsets =>
-      rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
-    }.getOrElse {
-      latestPartitionOffsets
+    endPartitionOffsets = KafkaSourceOffset(readLimit match {
+      case rows: ReadMaxRows =>
+        rateLimit(rows.maxRows(), startPartitionOffsets, latestPartitionOffsets)
+      case _: ReadAllAvailable =>
+        latestPartitionOffsets
     })
     endPartitionOffsets
   }
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index f0b3bf1..57879c7 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -32,6 +32,8 @@ import org.apache.spark.scheduler.ExecutorCacheTaskLocation
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.connector.read.streaming
+import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, ReadLimit, ReadMaxRows, SupportsAdmissionControl}
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.kafka010.KafkaSource._
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
@@ -79,7 +81,7 @@ private[kafka010] class KafkaSource(
     metadataPath: String,
     startingOffsets: KafkaOffsetRangeLimit,
     failOnDataLoss: Boolean)
-  extends Source with Logging {
+  extends SupportsAdmissionControl with Source with Logging {
 
   private val sc = sqlContext.sparkContext
 
@@ -114,6 +116,10 @@ private[kafka010] class KafkaSource(
     }.partitionToOffsets
   }
 
+  override def getDefaultReadLimit: ReadLimit = {
+    maxOffsetsPerTrigger.map(ReadLimit.maxRows).getOrElse(super.getDefaultReadLimit)
+  }
+
   private var currentPartitionOffsets: Option[Map[TopicPartition, Long]] = None
 
   private val converter = new KafkaRecordToRowConverter()
@@ -122,23 +128,30 @@ private[kafka010] class KafkaSource(
 
   /** Returns the maximum available offset for this source. */
   override def getOffset: Option[Offset] = {
+    throw new UnsupportedOperationException(
+      "latestOffset(Offset, ReadLimit) should be called instead of this method")
+  }
+
+  override def latestOffset(startOffset: streaming.Offset, limit: ReadLimit): streaming.Offset = {
     // Make sure initialPartitionOffsets is initialized
     initialPartitionOffsets
 
     val latest = kafkaReader.fetchLatestOffsets(
       currentPartitionOffsets.orElse(Some(initialPartitionOffsets)))
-    val offsets = maxOffsetsPerTrigger match {
-      case None =>
+    val offsets = limit match {
+      case rows: ReadMaxRows =>
+        if (currentPartitionOffsets.isEmpty) {
+          rateLimit(rows.maxRows(), initialPartitionOffsets, latest)
+        } else {
+          rateLimit(rows.maxRows(), currentPartitionOffsets.get, latest)
+        }
+      case _: ReadAllAvailable =>
         latest
-      case Some(limit) if currentPartitionOffsets.isEmpty =>
-        rateLimit(limit, initialPartitionOffsets, latest)
-      case Some(limit) =>
-        rateLimit(limit, currentPartitionOffsets.get, latest)
     }
 
     currentPartitionOffsets = Some(offsets)
     logDebug(s"GetOffset: ${offsets.toSeq.map(_.toString).sorted}")
-    Some(KafkaSourceOffset(offsets))
+    KafkaSourceOffset(offsets)
   }
 
   /** Proportionally distribute limit number of offsets among topicpartitions */
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 468b21c..a4601b9 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -297,6 +297,28 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
         13, 126, 127, 128, 129, 130, 131, 132, 133, 134
       )
     )
+
+    // When Trigger.Once() is used, the read limit should be ignored
+    val allData = Seq(1) ++ (10 to 20) ++ (100 to 200)
+    withTempDir { dir =>
+      testStream(mapped)(
+        StartStream(Trigger.Once(), checkpointLocation = dir.getCanonicalPath),
+        AssertOnQuery { q =>
+          q.processAllAvailable()
+          true
+        },
+        CheckAnswer(allData: _*),
+        StopStream,
+
+        AddKafkaData(Set(topic), 1000 to 1010: _*),
+        StartStream(Trigger.Once(), checkpointLocation = dir.getCanonicalPath),
+        AssertOnQuery { q =>
+          q.processAllAvailable()
+          true
+        },
+        CheckAnswer((allData ++ 1000.to(1010)): _*)
+      )
+    }
   }
 
   test("input row metrics") {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateControlMicroBatchStream.scala b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadAllAvailable.java
similarity index 51%
copy from sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateControlMicroBatchStream.scala
copy to sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadAllAvailable.java
index fb46f76..5a946ad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateControlMicroBatchStream.scala
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadAllAvailable.java
@@ -15,17 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.streaming.sources
+package org.apache.spark.sql.connector.read.streaming;
 
-import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset}
+import org.apache.spark.annotation.Evolving;
 
-// A special `MicroBatchStream` that can get latestOffset with a start offset.
-trait RateControlMicroBatchStream extends MicroBatchStream {
+/**
+ * Represents a {@link ReadLimit} where the {@link MicroBatchStream} must scan all the data
+ * available at the streaming source. This is meant to be a hard specification as being able
+ * to return all available data is necessary for Trigger.Once() to work correctly.
+ * If a source is unable to scan all available data, then it must throw an error.
+ *
+ * @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit)
+ * @since 3.0.0
+ */
+@Evolving
+public final class ReadAllAvailable implements ReadLimit {
+  static final ReadAllAvailable INSTANCE = new ReadAllAvailable();
 
-  override def latestOffset(): Offset = {
-    throw new IllegalAccessException(
-      "latestOffset should not be called for RateControlMicroBatchReadSupport")
-  }
+  private ReadAllAvailable() {}
 
-  def latestOffset(start: Offset): Offset
+  @Override
+  public String toString() {
+    return "All Available";
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateControlMicroBatchStream.scala b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadLimit.java
similarity index 53%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateControlMicroBatchStream.scala
rename to sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadLimit.java
index fb46f76..121ed1a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateControlMicroBatchStream.scala
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadLimit.java
@@ -15,17 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.streaming.sources
+package org.apache.spark.sql.connector.read.streaming;
 
-import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset}
+import org.apache.spark.annotation.Evolving;
 
-// A special `MicroBatchStream` that can get latestOffset with a start offset.
-trait RateControlMicroBatchStream extends MicroBatchStream {
+/**
+ * Interface representing limits on how much to read from a {@link MicroBatchStream} when it
+ * implements {@link SupportsAdmissionControl}. There are several child interfaces representing
+ * various kinds of limits.
+ *
+ * @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit)
+ * @see ReadAllAvailable
+ * @see ReadMaxRows
+ */
+@Evolving
+public interface ReadLimit {
+  static ReadLimit maxRows(long rows) { return new ReadMaxRows(rows); }
 
-  override def latestOffset(): Offset = {
-    throw new IllegalAccessException(
-      "latestOffset should not be called for RateControlMicroBatchReadSupport")
-  }
+  static ReadLimit maxFiles(int files) { return new ReadMaxFiles(files); }
 
-  def latestOffset(start: Offset): Offset
+  static ReadLimit allAvailable() { return ReadAllAvailable.INSTANCE; }
 }
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadMaxFiles.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadMaxFiles.java
new file mode 100644
index 0000000..441a6c8
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadMaxFiles.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read.streaming;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * Represents a {@link ReadLimit} where the {@link MicroBatchStream} should scan approximately the
+ * given maximum number of files.
+ *
+ * @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit)
+ * @since 3.0.0
+ */
+@Evolving
+public class ReadMaxFiles implements ReadLimit {
+  private int files;
+
+  ReadMaxFiles(int maxFiles) {
+    this.files = maxFiles;
+  }
+
+  /** Approximate maximum rows to scan. */
+  public int maxFiles() { return this.files; }
+
+  @Override
+  public String toString() {
+    return "MaxFiles: " + maxFiles();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    ReadMaxFiles other = (ReadMaxFiles) o;
+    return other.maxFiles() == maxFiles();
+  }
+
+  @Override
+  public int hashCode() { return files; }
+}
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadMaxRows.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadMaxRows.java
new file mode 100644
index 0000000..65a68c5
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadMaxRows.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read.streaming;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * Represents a {@link ReadLimit} where the {@link MicroBatchStream} should scan approximately the
+ * given maximum number of rows.
+ *
+ * @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit)
+ * @since 3.0.0
+ */
+@Evolving
+public final class ReadMaxRows implements ReadLimit {
+  private long rows;
+
+  ReadMaxRows(long rows) {
+    this.rows = rows;
+  }
+
+  /** Approximate maximum rows to scan. */
+  public long maxRows() { return this.rows; }
+
+  @Override
+  public String toString() {
+    return "MaxRows: " + maxRows();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    ReadMaxRows other = (ReadMaxRows) o;
+    return other.maxRows() == maxRows();
+  }
+
+  @Override
+  public int hashCode() { return Long.hashCode(this.rows); }
+}
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsAdmissionControl.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsAdmissionControl.java
new file mode 100644
index 0000000..027763c
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsAdmissionControl.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read.streaming;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * A mix-in interface for {@link SparkDataStream} streaming sources to signal that they can control
+ * the rate of data ingested into the system. These rate limits can come implicitly from the
+ * contract of triggers, e.g. Trigger.Once() requires that a micro-batch process all data
+ * available to the system at the start of the micro-batch. Alternatively, sources can decide to
+ * limit ingest through data source options.
+ *
+ * Through this interface, a MicroBatchStream should be able to return the next offset that it will
+ * process until given a {@link ReadLimit}.
+ *
+ * @since 3.0.0
+ */
+@Evolving
+public interface SupportsAdmissionControl extends SparkDataStream {
+
+  /**
+   * Returns the read limits potentially passed to the data source through options when creating
+   * the data source.
+   */
+  default ReadLimit getDefaultReadLimit() { return ReadLimit.allAvailable(); }
+
+  /**
+   * Returns the most recent offset available given a read limit. The start offset can be used
+   * to figure out how much new data should be read given the limit. Users should implement this
+   * method instead of latestOffset for a MicroBatchStream or getOffset for Source.
+   *
+   * When this method is called on a `Source`, the source can return `null` if there is no
+   * data to process. In addition, for the very first micro-batch, the `startOffset` will be
+   * null as well.
+   *
+   * When this method is called on a MicroBatchStream, the `startOffset` will be `initialOffset`
+   * for the very first micro-batch. The source can return `null` if there is no data to process.
+   */
+  Offset latestOffset(Offset startOffset, ReadLimit limit);
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 36f7002..e8ce8e1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -30,6 +30,8 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.connector.read.streaming
+import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, ReadLimit, ReadMaxFiles, SupportsAdmissionControl}
 import org.apache.spark.sql.execution.datasources.{DataSource, InMemoryFileIndex, LogicalRelation}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
@@ -45,7 +47,7 @@ class FileStreamSource(
     override val schema: StructType,
     partitionColumns: Seq[String],
     metadataPath: String,
-    options: Map[String, String]) extends Source with Logging {
+    options: Map[String, String]) extends SupportsAdmissionControl with Source with Logging {
 
   import FileStreamSource._
 
@@ -115,15 +117,17 @@ class FileStreamSource(
    * `synchronized` on this method is for solving race conditions in tests. In the normal usage,
    * there is no race here, so the cost of `synchronized` should be rare.
    */
-  private def fetchMaxOffset(): FileStreamSourceOffset = synchronized {
+  private def fetchMaxOffset(limit: ReadLimit): FileStreamSourceOffset = synchronized {
     // All the new files found - ignore aged files and files that we have seen.
     val newFiles = fetchAllFiles().filter {
       case (path, timestamp) => seenFiles.isNewFile(path, timestamp)
     }
 
     // Obey user's setting to limit the number of files in this batch trigger.
-    val batchFiles =
-      if (maxFilesPerBatch.nonEmpty) newFiles.take(maxFilesPerBatch.get) else newFiles
+    val batchFiles = limit match {
+      case files: ReadMaxFiles => newFiles.take(files.maxFiles())
+      case _: ReadAllAvailable => newFiles
+    }
 
     batchFiles.foreach { file =>
       seenFiles.add(file._1, file._2)
@@ -150,6 +154,10 @@ class FileStreamSource(
     FileStreamSourceOffset(metadataLogCurrentOffset)
   }
 
+  override def getDefaultReadLimit: ReadLimit = {
+    maxFilesPerBatch.map(ReadLimit.maxFiles).getOrElse(super.getDefaultReadLimit)
+  }
+
   /**
    * For test only. Run `func` with the internal lock to make sure when `func` is running,
    * the current offset won't be changed and no new batch will be emitted.
@@ -269,7 +277,14 @@ class FileStreamSource(
     files
   }
 
-  override def getOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.logOffset == -1)
+  override def getOffset: Option[Offset] = {
+    throw new UnsupportedOperationException(
+      "latestOffset(Offset, ReadLimit) should be called instead of this method")
+  }
+
+  override def latestOffset(startOffset: streaming.Offset, limit: ReadLimit): streaming.Offset = {
+    Some(fetchMaxOffset(limit)).filterNot(_.logOffset == -1).orNull
+  }
 
   override def toString: String = s"FileStreamSource[$qualifiedBasePath]"
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 872c367..83bc347 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -25,10 +25,10 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentBatch
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability}
-import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset => OffsetV2, SparkDataStream}
+import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset => OffsetV2, ReadLimit, SparkDataStream, SupportsAdmissionControl}
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, StreamWriterCommitProgress, WriteToDataSourceV2Exec}
-import org.apache.spark.sql.execution.streaming.sources.{RateControlMicroBatchStream, WriteToMicroBatchDataSource}
+import org.apache.spark.sql.execution.streaming.sources.WriteToMicroBatchDataSource
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.{OutputMode, Trigger}
 import org.apache.spark.util.Clock
@@ -79,7 +79,7 @@ class MicroBatchExecution(
 
     import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
     val _logicalPlan = analyzedPlan.transform {
-      case streamingRelation@StreamingRelation(dataSourceV1, sourceName, output) =>
+      case streamingRelation @ StreamingRelation(dataSourceV1, sourceName, output) =>
         toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
           // Materialize source to avoid creating it in every batch
           val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
@@ -122,7 +122,18 @@ class MicroBatchExecution(
       // v2 source
       case r: StreamingDataSourceV2Relation => r.stream
     }
-    uniqueSources = sources.distinct
+    uniqueSources = sources.distinct.map {
+      case source: SupportsAdmissionControl =>
+        val limit = source.getDefaultReadLimit
+        if (trigger == OneTimeTrigger && limit != ReadLimit.allAvailable()) {
+          logWarning(s"The read limit $limit for $source is ignored when Trigger.Once() is used.")
+          source -> ReadLimit.allAvailable()
+        } else {
+          source -> limit
+        }
+      case other =>
+        other -> ReadLimit.allAvailable()
+    }.toMap
 
     // TODO (SPARK-27484): we should add the writing node before the plan is analyzed.
     sink match {
@@ -354,25 +365,33 @@ class MicroBatchExecution(
 
     // Generate a map from each unique source to the next available offset.
     val latestOffsets: Map[SparkDataStream, Option[OffsetV2]] = uniqueSources.map {
-      case s: Source =>
+      case (s: SupportsAdmissionControl, limit) =>
         updateStatusMessage(s"Getting offsets from $s")
-        reportTimeTaken("getOffset") {
-          (s, s.getOffset)
+        reportTimeTaken("latestOffset") {
+          val startOffsetOpt = availableOffsets.get(s)
+          val startOffset = s match {
+            case _: Source =>
+              startOffsetOpt.orNull
+            case v2: MicroBatchStream =>
+              startOffsetOpt.map(offset => v2.deserializeOffset(offset.json))
+                .getOrElse(v2.initialOffset())
+          }
+          (s, Option(s.latestOffset(startOffset, limit)))
         }
-      case s: RateControlMicroBatchStream =>
+      case (s: Source, _) =>
         updateStatusMessage(s"Getting offsets from $s")
-        reportTimeTaken("latestOffset") {
-          val startOffset = availableOffsets
-            .get(s).map(off => s.deserializeOffset(off.json))
-            .getOrElse(s.initialOffset())
-          (s, Option(s.latestOffset(startOffset)))
+        reportTimeTaken("getOffset") {
+          (s, s.getOffset)
         }
-      case s: MicroBatchStream =>
+      case (s: MicroBatchStream, _) =>
         updateStatusMessage(s"Getting offsets from $s")
         reportTimeTaken("latestOffset") {
           (s, Option(s.latestOffset()))
         }
-    }.toMap
+      case (s, _) =>
+        // for some reason, the compiler is unhappy and thinks the match is not exhaustive
+        throw new IllegalStateException(s"Unexpected source: $s")
+    }
     availableOffsets ++= latestOffsets.filter { case (_, o) => o.nonEmpty }.mapValues(_.get)
 
     // Update the query metadata
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index ed908a8..8b3534b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
 import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table}
-import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, SparkDataStream}
+import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, ReadLimit, SparkDataStream}
 import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl, SupportsTruncate}
 import org.apache.spark.sql.connector.write.streaming.StreamingWrite
 import org.apache.spark.sql.execution.QueryExecution
@@ -206,7 +206,7 @@ abstract class StreamExecution(
   /**
    * A list of unique sources in the query plan. This will be set when generating logical plan.
    */
-  @volatile protected var uniqueSources: Seq[SparkDataStream] = Seq.empty
+  @volatile protected var uniqueSources: Map[SparkDataStream, ReadLimit] = Map.empty
 
   /** Defines the internal state of execution */
   protected val state = new AtomicReference[State](INITIALIZING)
@@ -425,7 +425,7 @@ abstract class StreamExecution(
 
   /** Stops all streaming sources safely. */
   protected def stopSources(): Unit = {
-    uniqueSources.foreach { source =>
+    uniqueSources.foreach { case (source, _) =>
       try {
         source.stop()
       } catch {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index 481552a..a9b724a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.{CurrentDate, CurrentTimestamp}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, TableCapability}
-import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, Offset => OffsetV2, PartitionOffset}
+import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, Offset => OffsetV2, PartitionOffset, ReadLimit}
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
 import org.apache.spark.sql.execution.streaming.{StreamingRelationV2, _}
@@ -84,7 +84,7 @@ class ContinuousExecution(
     sources = _logicalPlan.collect {
       case r: StreamingDataSourceV2Relation => r.stream.asInstanceOf[ContinuousStream]
     }
-    uniqueSources = sources.distinct
+    uniqueSources = sources.distinct.map(s => s -> ReadLimit.allAvailable()).toMap
 
     // TODO (SPARK-27484): we should add the writing node before the plan is analyzed.
     WriteToContinuousDataSource(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 632e007..fa32033 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -1174,6 +1174,62 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     }
   }
 
+  test("SPARK-30669: maxFilesPerTrigger - ignored when using Trigger.Once") {
+    withTempDirs { (src, target) =>
+      val checkpoint = new File(target, "chk").getCanonicalPath
+      val targetDir = new File(target, "data").getCanonicalPath
+      var lastFileModTime: Option[Long] = None
+
+      /** Create a text file with a single data item */
+      def createFile(data: Int): File = {
+        val file = stringToFile(new File(src, s"$data.txt"), data.toString)
+        if (lastFileModTime.nonEmpty) file.setLastModified(lastFileModTime.get + 1000)
+        lastFileModTime = Some(file.lastModified)
+        file
+      }
+
+      createFile(1)
+      createFile(2)
+      createFile(3)
+
+      // Set up a query to read text files one at a time
+      val df = spark
+        .readStream
+        .option("maxFilesPerTrigger", 1)
+        .text(src.getCanonicalPath)
+
+      def startQuery(): StreamingQuery = {
+        df.writeStream
+          .format("parquet")
+          .trigger(Trigger.Once)
+          .option("checkpointLocation", checkpoint)
+          .start(targetDir)
+      }
+      val q = startQuery()
+
+      try {
+        assert(q.awaitTermination(streamingTimeout.toMillis))
+        assert(q.recentProgress.count(_.numInputRows != 0) == 1) // only one trigger was run
+        checkAnswer(sql(s"SELECT * from parquet.`$targetDir`"), (1 to 3).map(_.toString).toDF)
+      } finally {
+        q.stop()
+      }
+
+      createFile(4)
+      createFile(5)
+
+      // run a second batch
+      val q2 = startQuery()
+      try {
+        assert(q2.awaitTermination(streamingTimeout.toMillis))
+        assert(q2.recentProgress.count(_.numInputRows != 0) == 1) // only one trigger was run
+        checkAnswer(sql(s"SELECT * from parquet.`$targetDir`"), (1 to 5).map(_.toString).toDF)
+      } finally {
+        q2.stop()
+      }
+    }
+  }
+
   test("explain") {
     withTempDirs { case (src, tmp) =>
       src.mkdirs()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org