You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "HeartSaVioR (via GitHub)" <gi...@apache.org> on 2023/03/29 06:06:43 UTC

[GitHub] [spark] HeartSaVioR commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark

HeartSaVioR commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1151415904


##########
docs/structured-streaming-programming-guide.md:
##########
@@ -2132,6 +2132,48 @@ streamingDf <- withWatermark(streamingDf, "eventTime", "10 seconds")
 streamingDf <- dropDuplicates(streamingDf, "guid", "eventTime")
 {% endhighlight %}
 
+</div>

Review Comment:
   Updated the description.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -464,6 +469,19 @@ object UnsupportedOperationChecker extends Logging {
               throwError(s"Join type $joinType is not supported with streaming DataFrame/Dataset")
           }
 
+        case d: DeduplicateWithinWatermark if d.isStreaming =>
+          // Find any attributes that are associated with an eventTime watermark.
+          val watermarkAttributes = d.child.output.collect {
+            case a: Attribute if a.metadata.contains(EventTimeWatermark.delayKey) => a
+          }
+
+          // DeduplicateWithinWatermark requires event time column being set in the input DataFrame
+          if (watermarkAttributes.isEmpty) {
+            throwError(
+              "dropDuplicatesWithinWatermark is not supported on streaming DataFrames/DataSets " +

Review Comment:
   Let's leave it as it is, as this is the same pattern we error out for streaming aggregation.
   
   ```
           // watermark a group is never "finished" so we would never output anything.
           if (watermarkAttributes.isEmpty) {
             throwError(
               s"$outputMode output mode not supported when there are streaming aggregations on " +
                   s"streaming DataFrames/DataSets without watermark")(plan)
           }
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -3038,6 +3038,118 @@ class Dataset[T] private[sql](
     dropDuplicates(colNames)
   }
 
+  /**
+   * Returns a new Dataset with duplicates rows removed, as long as event times of duplicated rows
+   * are within delay threshold of watermark.
+   *
+   * This only works with streaming [[Dataset]], and watermark for the input [[Dataset]] must be
+   * set via [[withWatermark]].
+   *
+   * This will keep all data across triggers as intermediate state to drop duplicated rows. The
+   * state will be kept to guarantee the following, "If event time of the first arrived event is
+   * 'ts', this guarantees all duplicated rows will be dropped where these rows are within the time
+   * range of (ts - delay threshold, ts + delay threshold)". In practice, users are encouraged to
+   * set the delay threshold of watermark longer than max timestamp differences among duplicated
+   * events.
+   *
+   * In addition, too late data older than watermark will be dropped to avoid any possibility
+   * of duplicates.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def dropDuplicatesWithinWatermark(): Dataset[T] = {
+    dropDuplicatesWithinWatermark(this.columns)
+  }
+
+  /**
+   * Returns a new Dataset with duplicates rows removed, considering only the subset of columns,
+   * as long as event times of duplicated rows are within delay threshold of watermark.

Review Comment:
   What about just documenting the guarantee in below comment and avoid mentioning the details? I don't think there is an easy way to explain the tricky and indeterministic behavior, and we probably don't need to.



##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -3038,6 +3038,118 @@ class Dataset[T] private[sql](
     dropDuplicates(colNames)
   }
 
+  /**
+   * Returns a new Dataset with duplicates rows removed, as long as event times of duplicated rows
+   * are within delay threshold of watermark.
+   *
+   * This only works with streaming [[Dataset]], and watermark for the input [[Dataset]] must be

Review Comment:
   I'd prefer to just disallow using this operator with batch query, because the guarantee of this operator is limited, and beyond the guarantee we will go with very different output for streaming than batch one. This is far different UX compared with other operators.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -679,6 +679,8 @@ object RemoveNoopUnion extends Rule[LogicalPlan] {
       d.withNewChildren(Seq(simplifyUnion(u)))
     case d @ Deduplicate(_, u: Union) =>
       d.withNewChildren(Seq(simplifyUnion(u)))
+    case d @ DeduplicateWithinWatermark(_, u: Union) =>

Review Comment:
   I'd like to treat this operator very differently, because this is probably the first operator we clearly say we do not provide the same output between batch and streaming. Let's hear second voice on this.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -980,3 +980,117 @@ object StreamingDeduplicateExec {
   private val EMPTY_ROW =
     UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
 }
+
+case class StreamingDeduplicateWithinWatermarkExec(
+    keyExpressions: Seq[Attribute],
+    child: SparkPlan,
+    stateInfo: Option[StatefulOperatorStateInfo] = None,
+    eventTimeWatermarkForLateEvents: Option[Long] = None,
+    eventTimeWatermarkForEviction: Option[Long] = None)
+  extends UnaryExecNode with StateStoreWriter with WatermarkSupport {
+
+  /** Distribute by grouping attributes */
+  override def requiredChildDistribution: Seq[Distribution] = {
+    StatefulOperatorPartitioning.getCompatibleDistribution(
+      keyExpressions, getStateInfo, conf) :: Nil
+  }
+
+  private val schemaForTimeoutRow: StructType = StructType(
+    Array(StructField("expiresAt", LongType, nullable = false)))
+  private val eventTimeCol: Attribute = WatermarkSupport.findEventTimeColumn(child.output,
+    allowMultipleEventTimeColumns = false).get
+  private val delayThresholdMillis = eventTimeCol.metadata.getLong(EventTimeWatermark.delayKey)
+  private val eventTimeColOrdinal: Int = child.output.indexOf(eventTimeCol)
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    metrics // force lazy init at driver
+
+    child.execute().mapPartitionsWithStateStore(
+      getStateInfo,
+      keyExpressions.toStructType,
+      schemaForTimeoutRow,
+      numColsPrefixKey = 0,
+      session.sessionState,
+      Some(session.streams.stateStoreCoordinator)) { (store, iter) =>
+      val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
+
+      val timeoutToUnsafeRow = UnsafeProjection.create(schemaForTimeoutRow)
+      val timeoutRow = timeoutToUnsafeRow(new SpecificInternalRow(schemaForTimeoutRow))
+
+      val numOutputRows = longMetric("numOutputRows")
+      val numUpdatedStateRows = longMetric("numUpdatedStateRows")
+      val numRemovedStateRows = longMetric("numRemovedStateRows")
+      val allUpdatesTimeMs = longMetric("allUpdatesTimeMs")
+      val allRemovalsTimeMs = longMetric("allRemovalsTimeMs")
+      val commitTimeMs = longMetric("commitTimeMs")
+      val numDroppedDuplicateRows = longMetric("numDroppedDuplicateRows")
+
+      val baseIterator = watermarkPredicateForDataForLateEvents match {
+        case Some(predicate) => applyRemovingRowsOlderThanWatermark(iter, predicate)
+        case None => iter
+      }
+
+      val updatesStartTimeNs = System.nanoTime
+
+      val result = baseIterator.filter { r =>

Review Comment:
   This is physical node. Again I would like to distinguish this with existing API clearly - the semantic we produce is very different. For the code redundancy I'll try to deal with it.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -980,3 +980,117 @@ object StreamingDeduplicateExec {
   private val EMPTY_ROW =
     UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
 }
+
+case class StreamingDeduplicateWithinWatermarkExec(
+    keyExpressions: Seq[Attribute],
+    child: SparkPlan,
+    stateInfo: Option[StatefulOperatorStateInfo] = None,
+    eventTimeWatermarkForLateEvents: Option[Long] = None,
+    eventTimeWatermarkForEviction: Option[Long] = None)
+  extends UnaryExecNode with StateStoreWriter with WatermarkSupport {
+
+  /** Distribute by grouping attributes */
+  override def requiredChildDistribution: Seq[Distribution] = {
+    StatefulOperatorPartitioning.getCompatibleDistribution(
+      keyExpressions, getStateInfo, conf) :: Nil
+  }
+
+  private val schemaForTimeoutRow: StructType = StructType(
+    Array(StructField("expiresAt", LongType, nullable = false)))
+  private val eventTimeCol: Attribute = WatermarkSupport.findEventTimeColumn(child.output,
+    allowMultipleEventTimeColumns = false).get
+  private val delayThresholdMillis = eventTimeCol.metadata.getLong(EventTimeWatermark.delayKey)
+  private val eventTimeColOrdinal: Int = child.output.indexOf(eventTimeCol)
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    metrics // force lazy init at driver
+
+    child.execute().mapPartitionsWithStateStore(
+      getStateInfo,
+      keyExpressions.toStructType,
+      schemaForTimeoutRow,
+      numColsPrefixKey = 0,
+      session.sessionState,
+      Some(session.streams.stateStoreCoordinator)) { (store, iter) =>
+      val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
+
+      val timeoutToUnsafeRow = UnsafeProjection.create(schemaForTimeoutRow)
+      val timeoutRow = timeoutToUnsafeRow(new SpecificInternalRow(schemaForTimeoutRow))
+
+      val numOutputRows = longMetric("numOutputRows")
+      val numUpdatedStateRows = longMetric("numUpdatedStateRows")
+      val numRemovedStateRows = longMetric("numRemovedStateRows")
+      val allUpdatesTimeMs = longMetric("allUpdatesTimeMs")
+      val allRemovalsTimeMs = longMetric("allRemovalsTimeMs")
+      val commitTimeMs = longMetric("commitTimeMs")
+      val numDroppedDuplicateRows = longMetric("numDroppedDuplicateRows")
+
+      val baseIterator = watermarkPredicateForDataForLateEvents match {
+        case Some(predicate) => applyRemovingRowsOlderThanWatermark(iter, predicate)
+        case None => iter
+      }
+
+      val updatesStartTimeNs = System.nanoTime
+
+      val result = baseIterator.filter { r =>
+        val row = r.asInstanceOf[UnsafeRow]
+        val key = getKey(row)
+        val value = store.get(key)
+        if (value == null) {
+          val timestamp = row.getLong(eventTimeColOrdinal)
+          // The unit of timestamp in Spark is microseconds, convert the delay threshold.
+          val expiresAt = timestamp + delayThresholdMillis * 1000
+
+          timeoutRow.setLong(0, expiresAt)
+          store.put(key, timeoutRow)
+
+          numUpdatedStateRows += 1
+          numOutputRows += 1
+          true
+        } else {
+          // Drop duplicated rows
+          numDroppedDuplicateRows += 1
+          false
+        }
+      }
+
+      CompletionIterator[InternalRow, Iterator[InternalRow]](result, {
+        allUpdatesTimeMs += NANOSECONDS.toMillis(System.nanoTime - updatesStartTimeNs)
+        allRemovalsTimeMs += timeTakenMs {
+          // Convert watermark value to microsecond
+          val watermarkForEviction = eventTimeWatermarkForEviction.get * 1000
+          store.iterator().foreach { rowPair =>

Review Comment:
   All stateful operators do this. There could be some approaches to address this, but these approaches tend to incur write amplification hence not very sure it's far better.



##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -3038,6 +3038,118 @@ class Dataset[T] private[sql](
     dropDuplicates(colNames)
   }
 
+  /**
+   * Returns a new Dataset with duplicates rows removed, as long as event times of duplicated rows
+   * are within delay threshold of watermark.
+   *
+   * This only works with streaming [[Dataset]], and watermark for the input [[Dataset]] must be
+   * set via [[withWatermark]].
+   *
+   * This will keep all data across triggers as intermediate state to drop duplicated rows. The
+   * state will be kept to guarantee the following, "If event time of the first arrived event is
+   * 'ts', this guarantees all duplicated rows will be dropped where these rows are within the time
+   * range of (ts - delay threshold, ts + delay threshold)". In practice, users are encouraged to
+   * set the delay threshold of watermark longer than max timestamp differences among duplicated
+   * events.
+   *
+   * In addition, too late data older than watermark will be dropped to avoid any possibility
+   * of duplicates.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def dropDuplicatesWithinWatermark(): Dataset[T] = {
+    dropDuplicatesWithinWatermark(this.columns)
+  }
+
+  /**
+   * Returns a new Dataset with duplicates rows removed, considering only the subset of columns,
+   * as long as event times of duplicated rows are within delay threshold of watermark.
+   *
+   * This only works with streaming [[Dataset]], and watermark for the input [[Dataset]] must be
+   * set via [[withWatermark]].
+   *
+   * This will keep all data across triggers as intermediate state to drop duplicated rows. The
+   * state will be kept to guarantee the following, "If event time of the first arrived event is
+   * 'ts', this guarantees all duplicated rows will be dropped where these rows are within the time
+   * range of (ts - delay threshold, ts + delay threshold)". In practice, users are encouraged to
+   * set the delay threshold of watermark longer than max timestamp differences among duplicated
+   * events.
+   *
+   * In addition, too late data older than watermark will be dropped to avoid any possibility
+   * of duplicates.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def dropDuplicatesWithinWatermark(colNames: Seq[String]): Dataset[T] = withTypedPlan {
+    val resolver = sparkSession.sessionState.analyzer.resolver

Review Comment:
   I'll dedup the code. Using the same logical node (and physical node) is the different issue.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -980,3 +980,117 @@ object StreamingDeduplicateExec {
   private val EMPTY_ROW =
     UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
 }
+
+case class StreamingDeduplicateWithinWatermarkExec(
+    keyExpressions: Seq[Attribute],
+    child: SparkPlan,
+    stateInfo: Option[StatefulOperatorStateInfo] = None,
+    eventTimeWatermarkForLateEvents: Option[Long] = None,
+    eventTimeWatermarkForEviction: Option[Long] = None)
+  extends UnaryExecNode with StateStoreWriter with WatermarkSupport {
+
+  /** Distribute by grouping attributes */
+  override def requiredChildDistribution: Seq[Distribution] = {
+    StatefulOperatorPartitioning.getCompatibleDistribution(
+      keyExpressions, getStateInfo, conf) :: Nil
+  }
+
+  private val schemaForTimeoutRow: StructType = StructType(
+    Array(StructField("expiresAt", LongType, nullable = false)))
+  private val eventTimeCol: Attribute = WatermarkSupport.findEventTimeColumn(child.output,
+    allowMultipleEventTimeColumns = false).get
+  private val delayThresholdMillis = eventTimeCol.metadata.getLong(EventTimeWatermark.delayKey)

Review Comment:
   The intention was to distinguish mills and micros (especially we are dealing with micros here), but not a big deal as we use Ms as millis here and there, and I documented for case of micros.



##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -3038,6 +3038,118 @@ class Dataset[T] private[sql](
     dropDuplicates(colNames)
   }
 
+  /**
+   * Returns a new Dataset with duplicates rows removed, as long as event times of duplicated rows
+   * are within delay threshold of watermark.
+   *
+   * This only works with streaming [[Dataset]], and watermark for the input [[Dataset]] must be
+   * set via [[withWatermark]].
+   *
+   * This will keep all data across triggers as intermediate state to drop duplicated rows. The
+   * state will be kept to guarantee the following, "If event time of the first arrived event is
+   * 'ts', this guarantees all duplicated rows will be dropped where these rows are within the time
+   * range of (ts - delay threshold, ts + delay threshold)". In practice, users are encouraged to
+   * set the delay threshold of watermark longer than max timestamp differences among duplicated
+   * events.
+   *
+   * In addition, too late data older than watermark will be dropped to avoid any possibility
+   * of duplicates.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def dropDuplicatesWithinWatermark(): Dataset[T] = {
+    dropDuplicatesWithinWatermark(this.columns)
+  }
+
+  /**
+   * Returns a new Dataset with duplicates rows removed, considering only the subset of columns,
+   * as long as event times of duplicated rows are within delay threshold of watermark.
+   *
+   * This only works with streaming [[Dataset]], and watermark for the input [[Dataset]] must be
+   * set via [[withWatermark]].
+   *
+   * This will keep all data across triggers as intermediate state to drop duplicated rows. The
+   * state will be kept to guarantee the following, "If event time of the first arrived event is
+   * 'ts', this guarantees all duplicated rows will be dropped where these rows are within the time
+   * range of (ts - delay threshold, ts + delay threshold)". In practice, users are encouraged to

Review Comment:
   "ts + delay threshold" is something we guarantee. Maybe there could be edge case on guarantee for past events since it's more about watermark.
   
   Probably just better to say "This guarantees events are deduplicated as long as the time distance of earliest and latest events are smaller than the delay threshold of watermark." Not sure we want to be more verbose, e.g. explicitly saying that this may deduplicate more events than the time range. I don't think it's required, but I'm open to hear voices.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org