You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/04/05 01:30:21 UTC

[GitHub] [spark] viirya commented on a change in pull request #31989: [SPARK-34891][SS] Introduce state store manager for session window in streaming query

viirya commented on a change in pull request #31989:
URL: https://github.com/apache/spark/pull/31989#discussion_r606880784



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StreamingSessionWindowStateManager.scala
##########
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Literal, SpecificInternalRow, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo
+import org.apache.spark.sql.types.{ArrayType, StructType, TimestampType}
+import org.apache.spark.util.NextIterator
+
+/**
+ * Base trait for state manager purposed to be used from streaming session window aggregation.
+ */
+sealed trait StreamingSessionWindowStateManager extends Serializable {
+
+  def getKey(row: InternalRow): UnsafeRow
+
+  def getStartTime(row: InternalRow): Long
+
+  /**
+   * Returns all stored keys.
+   */
+  def getAllKeys(): Iterator[UnsafeRow]
+
+  /**
+   * Returns a list of states for the key. These states are candidates for session window
+   * merging.
+   */
+  def getStates(key: UnsafeRow): Seq[UnsafeRow]
+
+  /**
+   * Returns a list of start times for session windows belonging to the given key.
+   */
+  def getStartTimeList(key: UnsafeRow): Seq[Long]
+
+  /**
+   * Returns the state of given key and start time.
+   */
+  def getState(key: UnsafeRow, startTime: Long): UnsafeRow
+
+  /**
+   * Returns a list of states for all keys.
+   */
+  def getStates(): Seq[UnsafeRow]
+
+  /**
+   * Puts a state row into the state with given key and start time. Note that this method
+   * does not update the start time into the list of start times for the given key. To update
+   * the list, please call `putStartTimeList`.
+   */
+  def putState(key: UnsafeRow, startTime: Long, value: UnsafeRow): Unit
+
+  /**
+   * Puts a list of states for given key. This method will update the list of start times
+   * for the given key.
+   */
+  def putStates(key: UnsafeRow, values: Seq[UnsafeRow]): Unit
+
+  /**
+   * Puts a list of start times of session windows for given key into the state. The list
+   * of start times must be sorted.
+   */
+  def putStartTimeList(key: UnsafeRow, startTimes: Seq[Long]): Unit
+
+  /**
+   * Removes specified state for the given key and start time.
+   */
+  def removeState(key: UnsafeRow, startTime: Long): Unit
+
+  /**
+   * Removes given key from the state store. This method will remove all states associated
+   * with the given key if any.
+   */
+  def removeKey(key: UnsafeRow): Unit
+
+  /**
+   * Given a callback function used to update state store metrics, updates the metrics of all
+   * state stores.
+   */
+  def updateMetrics(updateFunc: StateStoreMetrics => Unit)
+
+  /**
+   * Remove using a predicate on values.
+   *
+   * At a high level, this produces an iterator over the values such that value satisfies the
+   * predicate, where producing an element removes the value from the state store and producing
+   * all elements with a given key updates it accordingly.
+   *
+   * This implies the iterator must be consumed fully without any other operations on this manager
+   * or the underlying store being interleaved.
+   */
+  def removeByValueCondition(removalCondition: UnsafeRow => Boolean): Iterator[UnsafeRow]

Review comment:
       Seems this is not in my implementation. I also don't find it in #31937. Am I missing it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org