You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "pengzhon-db (via GitHub)" <gi...@apache.org> on 2023/04/16 21:21:26 UTC

[GitHub] [spark] pengzhon-db commented on a diff in pull request #40783: [SPARK-43129] Scala core API for streaming Spark Connect

pengzhon-db commented on code in PR #40783:
URL: https://github.com/apache/spark/pull/40783#discussion_r1168016036


##########
connector/connect/client/jvm/src/main/java/org/apache/spark/sql/streaming/Trigger.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming;
+
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.duration.Duration;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.execution.streaming.AvailableNowTrigger$;
+import org.apache.spark.sql.execution.streaming.ContinuousTrigger;
+import org.apache.spark.sql.execution.streaming.OneTimeTrigger$;
+import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger;
+
+/**
+ * Policy used to indicate how often results should be produced by a [[StreamingQuery]].
+ *
+ * @since 3.5.0
+ */
+@Evolving
+public class Trigger {
+  // This is a copy of the same class in sql/core/.../streaming/Trigger.java
+
+  /**
+   * A trigger policy that runs a query periodically based on an interval in processing time.
+   * If `interval` is 0, the query will run as fast as possible.
+   *
+   * @since 3.5.0
+   */
+  public static Trigger ProcessingTime(long intervalMs) {
+      return ProcessingTimeTrigger.create(intervalMs, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * (Java-friendly)
+   * A trigger policy that runs a query periodically based on an interval in processing time.
+   * If `interval` is 0, the query will run as fast as possible.
+   *
+   * {{{
+   *    import java.util.concurrent.TimeUnit
+   *    df.writeStream().trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS))
+   * }}}
+   *
+   * @since 3.5.0
+   */
+  public static Trigger ProcessingTime(long interval, TimeUnit timeUnit) {
+      return ProcessingTimeTrigger.create(interval, timeUnit);
+  }
+
+  /**
+   * (Scala-friendly)
+   * A trigger policy that runs a query periodically based on an interval in processing time.
+   * If `duration` is 0, the query will run as fast as possible.
+   *
+   * {{{
+   *    import scala.concurrent.duration._
+   *    df.writeStream.trigger(Trigger.ProcessingTime(10.seconds))
+   * }}}
+   * @since 3.5.0
+   */
+  public static Trigger ProcessingTime(Duration interval) {
+      return ProcessingTimeTrigger.apply(interval);
+  }
+
+  /**
+   * A trigger policy that runs a query periodically based on an interval in processing time.
+   * If `interval` is effectively 0, the query will run as fast as possible.
+   *
+   * {{{
+   *    df.writeStream.trigger(Trigger.ProcessingTime("10 seconds"))
+   * }}}
+   * @since 3.5.0
+   */
+  public static Trigger ProcessingTime(String interval) {
+      return ProcessingTimeTrigger.apply(interval);
+  }
+
+  /**
+   * A trigger that processes all available data in a single batch then terminates the query.
+   *
+   * @since 3.5.0
+   * @deprecated This is deprecated as of Spark 3.4.0. Use {@link #AvailableNow()} to leverage
+   *             better guarantee of processing, fine-grained scale of batches, and better gradual
+   *             processing of watermark advancement including no-data batch.
+   *             See the NOTES in {@link #AvailableNow()} for details.
+   */
+  @Deprecated

Review Comment:
   Do we still want to include this API? The doc sys it's deprecated of 3.4.0, but now also supported since 3.5.0 



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -2935,6 +2936,16 @@ class Dataset[T] private[sql] (
     new DataFrameWriterV2[T](table, this)
   }
 
+  /**
+   * Interface for saving the content of the streaming Dataset out into external storage.
+   *
+   * @group basic
+   * @since 3.5.0
+   */
+  def writeStream: DataStreamWriter[T] = {
+    new DataStreamWriter[T](this)

Review Comment:
   should we do some similar check about `isStreaming` similar to this https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L3983 



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -3017,8 +3028,37 @@ class Dataset[T] private[sql] (
         .getStorageLevel)
   }
 
+  /**
+   * Defines an event time watermark for this [[Dataset]]. A watermark tracks a point in time
+   * before which we assume no more late data is going to arrive.
+   *
+   * Spark will use this watermark for several purposes: <ul> <li>To know when a given time window
+   * aggregation can be finalized and thus can be emitted when using output modes that do not
+   * allow updates.</li> <li>To minimize the amount of state that we need to keep for on-going
+   * aggregations, `mapGroupsWithState` and `dropDuplicates` operators.</li> </ul> The current
+   * watermark is computed by looking at the `MAX(eventTime)` seen across all of the partitions in
+   * the query minus a user specified `delayThreshold`. Due to the cost of coordinating this value
+   * across partitions, the actual watermark used is only guaranteed to be at least
+   * `delayThreshold` behind the actual event time. In some cases we may still process records
+   * that arrive more than `delayThreshold` late.
+   *
+   * @param eventTime
+   *   the name of the column that contains the event time of the row.
+   * @param delayThreshold
+   *   the minimum delay to wait to data to arrive late, relative to the latest record that has
+   *   been processed in the form of an interval (e.g. "1 minute" or "5 hours"). NOTE: This should
+   *   not be negative.
+   *
+   * @group streaming
+   * @since 3.5.0
+   */
   def withWatermark(eventTime: String, delayThreshold: String): Dataset[T] = {
-    throw new UnsupportedOperationException("withWatermark is not implemented.")
+    sparkSession.newDataset(encoder) { builder =>

Review Comment:
   Should we add client side check to ensure delayThreshold is non negative, similar to the existing withWatermark?



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Evolving
+import org.apache.spark.connect.proto.Read.DataSource
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Interface used to load a streaming `Dataset` from external storage systems (e.g. file systems,
+ * key-value stores, etc). Use `SparkSession.readStream` to access this.
+ *
+ * @since 3.5.0
+ */
+@Evolving
+final class DataStreamReader private[sql] (sparkSession: SparkSession) {
+
+  /**
+   * Specifies the input data source format.
+   *
+   * @since 3.5.0
+   */
+  def format(source: String): DataStreamReader = {
+    sourceBuilder.setFormat(source)
+    this
+  }
+
+  /**
+   * Specifies the input schema. Some data sources (e.g. JSON) can infer the input schema
+   * automatically from data. By specifying the schema here, the underlying data source can skip
+   * the schema inference step, and thus speed up data loading.
+   *
+   * @since 3.5.0
+   */
+  def schema(schema: StructType): DataStreamReader = {
+    if (schema != null) {
+      sourceBuilder.setSchema(schema.json) // Use json. DDL does not retail all the attributes.
+    }
+    this
+  }
+
+  /**
+   * Specifies the schema by using the input DDL-formatted string. Some data sources (e.g. JSON)
+   * can infer the input schema automatically from data. By specifying the schema here, the
+   * underlying data source can skip the schema inference step, and thus speed up data loading.
+   *
+   * @since 3.5.0
+   */
+  def schema(schemaString: String): DataStreamReader = {
+    sourceBuilder.setSchema(schemaString)
+    this
+  }
+
+  /**
+   * Adds an input option for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def option(key: String, value: String): DataStreamReader = {
+    sourceBuilder.putOptions(key, value)
+    this
+  }
+
+  /**
+   * Adds an input option for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def option(key: String, value: Boolean): DataStreamReader = option(key, value.toString)
+
+  /**
+   * Adds an input option for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def option(key: String, value: Long): DataStreamReader = option(key, value.toString)
+
+  /**
+   * Adds an input option for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def option(key: String, value: Double): DataStreamReader = option(key, value.toString)
+
+  /**
+   * (Scala-specific) Adds input options for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def options(options: scala.collection.Map[String, String]): DataStreamReader = {
+    this.options(options.asJava)
+    this
+  }
+
+  /**
+   * (Java-specific) Adds input options for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def options(options: java.util.Map[String, String]): DataStreamReader = {
+    sourceBuilder.putAllOptions(options)
+    this
+  }
+
+  /**
+   * Loads input data stream in as a `DataFrame`, for data streams that don't require a path (e.g.
+   * external key-value stores).
+   *
+   * @since 3.5.0
+   */
+  def load(): DataFrame = {
+    sparkSession.newDataFrame { relationBuilder =>
+      relationBuilder.getReadBuilder
+        .setIsStreaming(true)
+        .setDataSource(sourceBuilder.build())
+    }
+  }
+
+  /**
+   * Loads input in as a `DataFrame`, for data streams that read from some path.
+   *
+   * @since 3.5.0
+   */
+  def load(path: String): DataFrame = {
+    sourceBuilder.clearPaths()

Review Comment:
   what's this for



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Evolving
+import org.apache.spark.connect.proto.Read.DataSource
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Interface used to load a streaming `Dataset` from external storage systems (e.g. file systems,
+ * key-value stores, etc). Use `SparkSession.readStream` to access this.
+ *
+ * @since 3.5.0
+ */
+@Evolving
+final class DataStreamReader private[sql] (sparkSession: SparkSession) {
+
+  /**
+   * Specifies the input data source format.
+   *
+   * @since 3.5.0
+   */
+  def format(source: String): DataStreamReader = {
+    sourceBuilder.setFormat(source)
+    this
+  }
+
+  /**
+   * Specifies the input schema. Some data sources (e.g. JSON) can infer the input schema
+   * automatically from data. By specifying the schema here, the underlying data source can skip
+   * the schema inference step, and thus speed up data loading.
+   *
+   * @since 3.5.0
+   */
+  def schema(schema: StructType): DataStreamReader = {
+    if (schema != null) {
+      sourceBuilder.setSchema(schema.json) // Use json. DDL does not retail all the attributes.
+    }
+    this
+  }
+
+  /**
+   * Specifies the schema by using the input DDL-formatted string. Some data sources (e.g. JSON)
+   * can infer the input schema automatically from data. By specifying the schema here, the
+   * underlying data source can skip the schema inference step, and thus speed up data loading.
+   *
+   * @since 3.5.0
+   */
+  def schema(schemaString: String): DataStreamReader = {

Review Comment:
   is schemaString here only support DDL, or could be json as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org