You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "rangadi (via GitHub)" <gi...@apache.org> on 2023/03/29 04:47:47 UTC

[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark

rangadi commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1151374343


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -464,6 +469,19 @@ object UnsupportedOperationChecker extends Logging {
               throwError(s"Join type $joinType is not supported with streaming DataFrame/Dataset")
           }
 
+        case d: DeduplicateWithinWatermark if d.isStreaming =>
+          // Find any attributes that are associated with an eventTime watermark.
+          val watermarkAttributes = d.child.output.collect {
+            case a: Attribute if a.metadata.contains(EventTimeWatermark.delayKey) => a
+          }
+
+          // DeduplicateWithinWatermark requires event time column being set in the input DataFrame
+          if (watermarkAttributes.isEmpty) {
+            throwError(
+              "dropDuplicatesWithinWatermark is not supported on streaming DataFrames/DataSets " +

Review Comment:
   [optional] 
   "dropDuplicatesWithinWatermark() requires watermark to be set set on Dataframe, but there is no watermark set." 



##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -3038,6 +3038,118 @@ class Dataset[T] private[sql](
     dropDuplicates(colNames)
   }
 
+  /**
+   * Returns a new Dataset with duplicates rows removed, as long as event times of duplicated rows
+   * are within delay threshold of watermark.
+   *
+   * This only works with streaming [[Dataset]], and watermark for the input [[Dataset]] must be
+   * set via [[withWatermark]].
+   *
+   * This will keep all data across triggers as intermediate state to drop duplicated rows. The
+   * state will be kept to guarantee the following, "If event time of the first arrived event is
+   * 'ts', this guarantees all duplicated rows will be dropped where these rows are within the time
+   * range of (ts - delay threshold, ts + delay threshold)". In practice, users are encouraged to
+   * set the delay threshold of watermark longer than max timestamp differences among duplicated
+   * events.
+   *
+   * In addition, too late data older than watermark will be dropped to avoid any possibility
+   * of duplicates.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def dropDuplicatesWithinWatermark(): Dataset[T] = {
+    dropDuplicatesWithinWatermark(this.columns)
+  }
+
+  /**
+   * Returns a new Dataset with duplicates rows removed, considering only the subset of columns,
+   * as long as event times of duplicated rows are within delay threshold of watermark.

Review Comment:
   I know it is a tricky thing, but it might be better to rephrase. 



##########
docs/structured-streaming-programming-guide.md:
##########
@@ -2132,6 +2132,48 @@ streamingDf <- withWatermark(streamingDf, "eventTime", "10 seconds")
 streamingDf <- dropDuplicates(streamingDf, "guid", "eventTime")
 {% endhighlight %}
 
+</div>

Review Comment:
   [From PR description]
   > Only guarantee to deduplicate events within the watermark.
   
   'within watermark delay'



##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -3038,6 +3038,118 @@ class Dataset[T] private[sql](
     dropDuplicates(colNames)
   }
 
+  /**
+   * Returns a new Dataset with duplicates rows removed, as long as event times of duplicated rows
+   * are within delay threshold of watermark.
+   *
+   * This only works with streaming [[Dataset]], and watermark for the input [[Dataset]] must be
+   * set via [[withWatermark]].
+   *
+   * This will keep all data across triggers as intermediate state to drop duplicated rows. The
+   * state will be kept to guarantee the following, "If event time of the first arrived event is
+   * 'ts', this guarantees all duplicated rows will be dropped where these rows are within the time
+   * range of (ts - delay threshold, ts + delay threshold)". In practice, users are encouraged to
+   * set the delay threshold of watermark longer than max timestamp differences among duplicated
+   * events.
+   *
+   * In addition, too late data older than watermark will be dropped to avoid any possibility
+   * of duplicates.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def dropDuplicatesWithinWatermark(): Dataset[T] = {
+    dropDuplicatesWithinWatermark(this.columns)
+  }
+
+  /**
+   * Returns a new Dataset with duplicates rows removed, considering only the subset of columns,
+   * as long as event times of duplicated rows are within delay threshold of watermark.
+   *
+   * This only works with streaming [[Dataset]], and watermark for the input [[Dataset]] must be
+   * set via [[withWatermark]].
+   *
+   * This will keep all data across triggers as intermediate state to drop duplicated rows. The
+   * state will be kept to guarantee the following, "If event time of the first arrived event is
+   * 'ts', this guarantees all duplicated rows will be dropped where these rows are within the time
+   * range of (ts - delay threshold, ts + delay threshold)". In practice, users are encouraged to

Review Comment:
   I don't think we guarantee this condition : `(ts - delay threshold, ts + delay threshold)`. 
   we likely need to rephrase it. We can look at the scaladoc towards the end before merging this. 



##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -3038,6 +3038,118 @@ class Dataset[T] private[sql](
     dropDuplicates(colNames)
   }
 
+  /**
+   * Returns a new Dataset with duplicates rows removed, as long as event times of duplicated rows
+   * are within delay threshold of watermark.
+   *
+   * This only works with streaming [[Dataset]], and watermark for the input [[Dataset]] must be
+   * set via [[withWatermark]].
+   *
+   * This will keep all data across triggers as intermediate state to drop duplicated rows. The
+   * state will be kept to guarantee the following, "If event time of the first arrived event is
+   * 'ts', this guarantees all duplicated rows will be dropped where these rows are within the time
+   * range of (ts - delay threshold, ts + delay threshold)". In practice, users are encouraged to
+   * set the delay threshold of watermark longer than max timestamp differences among duplicated
+   * events.
+   *
+   * In addition, too late data older than watermark will be dropped to avoid any possibility
+   * of duplicates.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def dropDuplicatesWithinWatermark(): Dataset[T] = {
+    dropDuplicatesWithinWatermark(this.columns)
+  }
+
+  /**
+   * Returns a new Dataset with duplicates rows removed, considering only the subset of columns,
+   * as long as event times of duplicated rows are within delay threshold of watermark.
+   *
+   * This only works with streaming [[Dataset]], and watermark for the input [[Dataset]] must be
+   * set via [[withWatermark]].
+   *
+   * This will keep all data across triggers as intermediate state to drop duplicated rows. The
+   * state will be kept to guarantee the following, "If event time of the first arrived event is
+   * 'ts', this guarantees all duplicated rows will be dropped where these rows are within the time
+   * range of (ts - delay threshold, ts + delay threshold)". In practice, users are encouraged to
+   * set the delay threshold of watermark longer than max timestamp differences among duplicated
+   * events.
+   *
+   * In addition, too late data older than watermark will be dropped to avoid any possibility
+   * of duplicates.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def dropDuplicatesWithinWatermark(colNames: Seq[String]): Dataset[T] = withTypedPlan {
+    val resolver = sparkSession.sessionState.analyzer.resolver

Review Comment:
   can we share the this with dropDuplicate()? or even better we can reuse 'Deduplicate()' node(s). 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -980,3 +980,117 @@ object StreamingDeduplicateExec {
   private val EMPTY_ROW =
     UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
 }
+
+case class StreamingDeduplicateWithinWatermarkExec(
+    keyExpressions: Seq[Attribute],
+    child: SparkPlan,
+    stateInfo: Option[StatefulOperatorStateInfo] = None,
+    eventTimeWatermarkForLateEvents: Option[Long] = None,
+    eventTimeWatermarkForEviction: Option[Long] = None)
+  extends UnaryExecNode with StateStoreWriter with WatermarkSupport {
+
+  /** Distribute by grouping attributes */
+  override def requiredChildDistribution: Seq[Distribution] = {
+    StatefulOperatorPartitioning.getCompatibleDistribution(
+      keyExpressions, getStateInfo, conf) :: Nil
+  }
+
+  private val schemaForTimeoutRow: StructType = StructType(
+    Array(StructField("expiresAt", LongType, nullable = false)))
+  private val eventTimeCol: Attribute = WatermarkSupport.findEventTimeColumn(child.output,
+    allowMultipleEventTimeColumns = false).get
+  private val delayThresholdMillis = eventTimeCol.metadata.getLong(EventTimeWatermark.delayKey)
+  private val eventTimeColOrdinal: Int = child.output.indexOf(eventTimeCol)
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    metrics // force lazy init at driver
+
+    child.execute().mapPartitionsWithStateStore(
+      getStateInfo,
+      keyExpressions.toStructType,
+      schemaForTimeoutRow,
+      numColsPrefixKey = 0,
+      session.sessionState,
+      Some(session.streams.stateStoreCoordinator)) { (store, iter) =>
+      val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
+
+      val timeoutToUnsafeRow = UnsafeProjection.create(schemaForTimeoutRow)
+      val timeoutRow = timeoutToUnsafeRow(new SpecificInternalRow(schemaForTimeoutRow))
+
+      val numOutputRows = longMetric("numOutputRows")
+      val numUpdatedStateRows = longMetric("numUpdatedStateRows")
+      val numRemovedStateRows = longMetric("numRemovedStateRows")
+      val allUpdatesTimeMs = longMetric("allUpdatesTimeMs")
+      val allRemovalsTimeMs = longMetric("allRemovalsTimeMs")
+      val commitTimeMs = longMetric("commitTimeMs")
+      val numDroppedDuplicateRows = longMetric("numDroppedDuplicateRows")
+
+      val baseIterator = watermarkPredicateForDataForLateEvents match {
+        case Some(predicate) => applyRemovingRowsOlderThanWatermark(iter, predicate)
+        case None => iter
+      }
+
+      val updatesStartTimeNs = System.nanoTime
+
+      val result = baseIterator.filter { r =>

Review Comment:
   When we reuse Deduplicate() logical node: mainly this part and removal would be different based on 'dropWithinWatermark' flag. That we most of the remaining code remains unchanged. 



##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -3038,6 +3038,118 @@ class Dataset[T] private[sql](
     dropDuplicates(colNames)
   }
 
+  /**
+   * Returns a new Dataset with duplicates rows removed, as long as event times of duplicated rows
+   * are within delay threshold of watermark.
+   *
+   * This only works with streaming [[Dataset]], and watermark for the input [[Dataset]] must be
+   * set via [[withWatermark]].
+   *
+   * This will keep all data across triggers as intermediate state to drop duplicated rows. The
+   * state will be kept to guarantee the following, "If event time of the first arrived event is
+   * 'ts', this guarantees all duplicated rows will be dropped where these rows are within the time
+   * range of (ts - delay threshold, ts + delay threshold)". In practice, users are encouraged to
+   * set the delay threshold of watermark longer than max timestamp differences among duplicated
+   * events.
+   *
+   * In addition, too late data older than watermark will be dropped to avoid any possibility
+   * of duplicates.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def dropDuplicatesWithinWatermark(): Dataset[T] = {
+    dropDuplicatesWithinWatermark(this.columns)
+  }
+
+  /**
+   * Returns a new Dataset with duplicates rows removed, considering only the subset of columns,
+   * as long as event times of duplicated rows are within delay threshold of watermark.
+   *
+   * This only works with streaming [[Dataset]], and watermark for the input [[Dataset]] must be
+   * set via [[withWatermark]].
+   *
+   * This will keep all data across triggers as intermediate state to drop duplicated rows. The
+   * state will be kept to guarantee the following, "If event time of the first arrived event is
+   * 'ts', this guarantees all duplicated rows will be dropped where these rows are within the time
+   * range of (ts - delay threshold, ts + delay threshold)". In practice, users are encouraged to
+   * set the delay threshold of watermark longer than max timestamp differences among duplicated
+   * events.
+   *
+   * In addition, too late data older than watermark will be dropped to avoid any possibility
+   * of duplicates.

Review Comment:
   Same here. 



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -679,6 +679,8 @@ object RemoveNoopUnion extends Rule[LogicalPlan] {
       d.withNewChildren(Seq(simplifyUnion(u)))
     case d @ Deduplicate(_, u: Union) =>
       d.withNewChildren(Seq(simplifyUnion(u)))
+    case d @ DeduplicateWithinWatermark(_, u: Union) =>

Review Comment:
   Rather than making this a separate logical node, we can we make the new behavior an option in Deduplicate node? That way we don't need to distinguish them in the implementation except in couple of places. 



##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -3038,6 +3038,118 @@ class Dataset[T] private[sql](
     dropDuplicates(colNames)
   }
 
+  /**
+   * Returns a new Dataset with duplicates rows removed, as long as event times of duplicated rows
+   * are within delay threshold of watermark.
+   *
+   * This only works with streaming [[Dataset]], and watermark for the input [[Dataset]] must be

Review Comment:
   Also mentioned above: `withWatermark()` is allowed in batch, same way this could be allowed to. Essentially this is same as normal dropDuplicates(). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org