You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/03/29 04:12:41 UTC

[GitHub] [spark] HeartSaVioR opened a new pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

HeartSaVioR opened a new pull request #31986:
URL: https://github.com/apache/spark/pull/31986


   Introduction: this PR is a part of SPARK-10816 (`EventTime based sessionization (session window)`). Please refer #31937 to see the overall view of the code change. (Note that code diff could be diverged a bit.)
   
   ### What changes were proposed in this pull request?
   
   This PR introduces UpdatingSessionsIterator, which analyzes neighbor elements and adjust session information on elements.
   
   UpdatingSessionsIterator calculates and updates the session window for each element in the given iterator, which makes elements in the same session window having same session spec. Downstream can apply aggregation to finally merge these elements bound to the same session window.
   
   UpdatingSessionsIterator works on the precondition that given iterator is sorted by "group keys + start time of session window", and the iterator still retains the characteristic of the sort.
   
   UpdatingSessionsIterator copies the elements to safely update on each element, as well as buffers elements which are bound to the same session window. Due to such overheads, MergingSessionsIterator which will be introduced via SPARK-34889 should be used whenever possible.
   
   This PR also introduces UpdatingSessionsExec which is the physical node on leveraging UpdatingSessionsIterator to sort the input rows and updates session information on input rows.
   
   ### Why are the changes needed?
   
   This part is a one of required on implementing SPARK-10816.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   New test suite added.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #31986:
URL: https://github.com/apache/spark/pull/31986#discussion_r619919017



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsExec.scala
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+
+/**
+ * This node updates the session window spec of each input rows via analyzing neighbor rows and
+ * determining rows belong to the same session window. The number of input rows remains the same.
+ * This node requires sort on input rows by group keys + the start time of session window.
+ *
+ * There are lots of overhead compared to [[MergingSessionsExec]]. Use [[MergingSessionsExec]]
+ * instead whenever possible. Use this node only when we cannot apply both calculations
+ * determining session windows and aggregating rows in session window altogether.
+ *
+ * Refer [[UpdatingSessionsIterator]] for more details.
+ */
+case class UpdatingSessionsExec(
+    keyExpressions: Seq[Attribute],
+    sessionExpression: Attribute,
+    child: SparkPlan) extends UnaryExecNode {
+
+  private val groupWithoutSessionExpression = keyExpressions.filterNot {
+    p => p.semanticEquals(sessionExpression)
+  }
+  private val groupingWithoutSessionAttributes = groupWithoutSessionExpression.map(_.toAttribute)
+
+  val childOrdering = Seq((groupingWithoutSessionAttributes ++ Seq(sessionExpression))
+    .map(SortOrder(_, Ascending)))
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    val inMemoryThreshold = sqlContext.conf.windowExecBufferInMemoryThreshold
+    val spillThreshold = sqlContext.conf.windowExecBufferSpillThreshold
+
+    child.execute().mapPartitions { iter =>
+      new UpdatingSessionsIterator(iter, keyExpressions, sessionExpression,
+        child.output, inMemoryThreshold, spillThreshold)
+    }
+  }
+
+  override def output: Seq[Attribute] = child.output
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+    if (groupWithoutSessionExpression.isEmpty) {
+      AllTuples :: Nil

Review comment:
       Yes, for batch query. The limitation comes from state format of session window and shouldn't limit to batch query as well.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsIterator.scala
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray
+
+/**
+ * This class calculates and updates the session window for each element in the given iterator,
+ * which makes elements in the same session window having same session spec. Downstream can apply
+ * aggregation to finally merge these elements bound to the same session window.
+ *
+ * This class works on the precondition that given iterator is sorted by "group keys + start time
+ * of session window", and this iterator still retains the characteristic of the sort.
+ *
+ * This class copies the elements to safely update on each element, as well as buffers elements
+ * which are bound to the same session window. Due to such overheads, [[MergingSessionsIterator]]
+ * should be used whenever possible.
+ */
+class UpdatingSessionsIterator(
+    iter: Iterator[InternalRow],
+    groupingExpressions: Seq[NamedExpression],
+    sessionExpression: NamedExpression,
+    inputSchema: Seq[Attribute],
+    inMemoryThreshold: Int,
+    spillThreshold: Int) extends Iterator[InternalRow] {
+
+  private val groupingWithoutSession: Seq[NamedExpression] =
+    groupingExpressions.diff(Seq(sessionExpression))
+  private val groupingWithoutSessionAttributes: Seq[Attribute] =
+    groupingWithoutSession.map(_.toAttribute)
+  private[this] val groupingWithoutSessionProjection: UnsafeProjection =
+    UnsafeProjection.create(groupingWithoutSession, inputSchema)
+
+  private val valuesExpressions: Seq[Attribute] = inputSchema.diff(groupingWithoutSession)
+
+  private[this] val sessionProjection: UnsafeProjection =
+    UnsafeProjection.create(Seq(sessionExpression), inputSchema)
+
+  private var currentKeys: InternalRow = _
+  private var currentSession: UnsafeRow = _
+
+  private var currentRows: ExternalAppendOnlyUnsafeRowArray = _
+
+  private var returnRows: ExternalAppendOnlyUnsafeRowArray = _
+  private var returnRowsIter: Iterator[InternalRow] = _
+  private var errorOnIterator: Boolean = false
+
+  private val processedKeys: mutable.HashSet[InternalRow] = new mutable.HashSet[InternalRow]()
+
+  override def hasNext: Boolean = {
+    assertIteratorNotCorrupted()
+
+    if (returnRowsIter != null && returnRowsIter.hasNext) {
+      return true
+    }
+
+    if (returnRowsIter != null) {
+      returnRowsIter = null
+      returnRows.clear()
+    }
+
+    iter.hasNext
+  }
+
+  override def next(): InternalRow = {
+    assertIteratorNotCorrupted()
+
+    if (returnRowsIter != null && returnRowsIter.hasNext) {
+      return returnRowsIter.next()
+    }
+
+    var exitCondition = false
+    while (iter.hasNext && !exitCondition) {
+      // we are going to modify the row, so we should make sure multiple objects are not
+      // referencing same memory, which could be possible when optimizing iterator
+      // without this, multiple rows in same key will be returned with same content
+      val row = iter.next().copy()
+
+      val keys = groupingWithoutSessionProjection(row)
+      val session = sessionProjection(row)
+      val sessionStruct = session.getStruct(0, 2)
+      val sessionStart = getSessionStart(sessionStruct)
+      val sessionEnd = getSessionEnd(sessionStruct)
+
+      if (currentKeys == null) {
+        startNewSession(row, keys, sessionStruct)
+      } else if (keys != currentKeys) {
+        closeCurrentSession(keyChanged = true)
+        processedKeys.add(currentKeys)
+        startNewSession(row, keys, sessionStruct)
+        exitCondition = true
+      } else {
+        if (sessionStart < getSessionStart(currentSession)) {
+          handleBrokenPreconditionForSort()
+        } else if (sessionStart <= getSessionEnd(currentSession)) {
+          // expanding session length if needed
+          expandEndOfCurrentSession(sessionEnd)
+          currentRows.add(row.asInstanceOf[UnsafeRow])
+        } else {
+          closeCurrentSession(keyChanged = false)
+          startNewSession(row, keys, sessionStruct)
+          exitCondition = true
+        }
+      }
+    }
+
+    if (!iter.hasNext) {
+      // no further row: closing session
+      closeCurrentSession(keyChanged = false)
+    }
+
+    // here returnRowsIter should be able to provide at least one row
+    require(returnRowsIter != null && returnRowsIter.hasNext)
+
+    returnRowsIter.next()
+  }
+
+  private def startNewSession(
+      currentRow: InternalRow,
+      groupingKey: UnsafeRow,
+      sessionStruct: UnsafeRow): Unit = {
+    if (processedKeys.contains(groupingKey)) {
+      handleBrokenPreconditionForSort()
+    }
+
+    currentKeys = groupingKey.copy()
+    currentSession = sessionStruct.copy()
+
+    currentRows = new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold)
+    currentRows.add(currentRow.asInstanceOf[UnsafeRow])
+  }
+
+  private def getSessionStart(sessionStruct: UnsafeRow): Long = {
+    sessionStruct.getLong(0)
+  }
+
+  private def getSessionEnd(sessionStruct: UnsafeRow): Long = {
+    sessionStruct.getLong(1)
+  }
+
+  def updateSessionEnd(sessionStruct: UnsafeRow, sessionEnd: Long): Unit = {
+    sessionStruct.setLong(1, sessionEnd)
+  }
+
+  private def expandEndOfCurrentSession(sessionEnd: Long): Unit = {
+    if (sessionEnd > getSessionEnd(currentSession)) {
+      updateSessionEnd(currentSession, sessionEnd)
+    }
+  }
+
+  private def handleBrokenPreconditionForSort(): Unit = {
+    errorOnIterator = true

Review comment:
       This ensures the iterator won't work "after" raising exception. The iterator itself should make sense without any context how Spark works.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsExec.scala
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+
+/**
+ * This node updates the session window spec of each input rows via analyzing neighbor rows and
+ * determining rows belong to the same session window. The number of input rows remains the same.
+ * This node requires sort on input rows by group keys + the start time of session window.
+ *
+ * There are lots of overhead compared to [[MergingSessionsExec]]. Use [[MergingSessionsExec]]
+ * instead whenever possible. Use this node only when we cannot apply both calculations
+ * determining session windows and aggregating rows in session window altogether.
+ *
+ * Refer [[UpdatingSessionsIterator]] for more details.
+ */
+case class UpdatingSessionsExec(
+    keyExpressions: Seq[Attribute],
+    sessionExpression: Attribute,
+    child: SparkPlan) extends UnaryExecNode {
+
+  private val groupWithoutSessionExpression = keyExpressions.filterNot {
+    p => p.semanticEquals(sessionExpression)
+  }
+  private val groupingWithoutSessionAttributes = groupWithoutSessionExpression.map(_.toAttribute)
+
+  val childOrdering = Seq((groupingWithoutSessionAttributes ++ Seq(sessionExpression))
+    .map(SortOrder(_, Ascending)))
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    val inMemoryThreshold = sqlContext.conf.windowExecBufferInMemoryThreshold
+    val spillThreshold = sqlContext.conf.windowExecBufferSpillThreshold

Review comment:
       Makes sense. Let's have new configs for session window specific.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #31986:
URL: https://github.com/apache/spark/pull/31986#discussion_r629830829



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsExec.scala
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+
+/**
+ * This node updates the session window spec of each input rows via analyzing neighbor rows and
+ * determining rows belong to the same session window. The number of input rows remains the same.
+ * This node requires sort on input rows by group keys + the start time of session window.
+ *
+ * There are lots of overhead compared to [[MergingSessionsExec]]. Use [[MergingSessionsExec]]
+ * instead whenever possible. Use this node only when we cannot apply both calculations
+ * determining session windows and aggregating rows in session window altogether.
+ *
+ * Refer [[UpdatingSessionsIterator]] for more details.
+ */
+case class UpdatingSessionsExec(
+    keyExpressions: Seq[Attribute],
+    sessionExpression: Attribute,
+    child: SparkPlan) extends UnaryExecNode {
+
+  private val groupingWithoutSessionExpression = keyExpressions.filterNot {
+    p => p.semanticEquals(sessionExpression)
+  }
+  private val groupingWithoutSessionAttributes =
+    groupingWithoutSessionExpression.map(_.toAttribute)
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    val inMemoryThreshold = sqlContext.conf.sessionWindowBufferInMemoryThreshold
+    val spillThreshold = sqlContext.conf.sessionWindowBufferSpillThreshold
+
+    child.execute().mapPartitions { iter =>
+      new UpdatingSessionsIterator(iter, keyExpressions, sessionExpression,
+        child.output, inMemoryThreshold, spillThreshold)
+    }
+  }
+
+  override def output: Seq[Attribute] = child.output
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+    if (groupingWithoutSessionExpression.isEmpty) {
+      AllTuples :: Nil
+    } else {
+      ClusteredDistribution(groupingWithoutSessionExpression) :: Nil
+    }
+  }
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
+    Seq((groupingWithoutSessionAttributes ++ Seq(sessionExpression))

Review comment:
       Ah I remembered the reason. You can't safely assume session expression is placed at the end of grouping. Let me revert back.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-809109297


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/136628/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR closed pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
HeartSaVioR closed pull request #31986:
URL: https://github.com/apache/spark/pull/31986


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-810074108


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/136688/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-809144449


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/41211/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-809058962


   **[Test build #136628 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/136628/testReport)** for PR 31986 at commit [`3e8dd5c`](https://github.com/apache/spark/commit/3e8dd5ccd8c2c136f5c3a4ff64269267edbdf81e).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-826828002


   **[Test build #137949 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137949/testReport)** for PR 31986 at commit [`5ce623b`](https://github.com/apache/spark/commit/5ce623be7aec9e3e9230604b485be57c993a280a).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-809133797


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41211/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-817403314


   **[Test build #137184 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137184/testReport)** for PR 31986 at commit [`2ae446e`](https://github.com/apache/spark/commit/2ae446ed2d79d1f286a2995e9cb9d092d975e53f).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-842283124


   **[Test build #138618 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138618/testReport)** for PR 31986 at commit [`b0d434a`](https://github.com/apache/spark/commit/b0d434a046454b1c4dd1e164cc89e1c70fa04eaf).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-809931402


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/136677/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-809903940


   **[Test build #136688 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/136688/testReport)** for PR 31986 at commit [`9c36bde`](https://github.com/apache/spark/commit/9c36bdea0ad10b59296ed1b1d002ad0cf8420867).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-826825883


   **[Test build #137948 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137948/testReport)** for PR 31986 at commit [`ef37887`](https://github.com/apache/spark/commit/ef37887da6454fc0d4ebfe7cb40a42bf5b23a88d).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-842054675


   +1 to merge individual PR. IMO, it should be easy for tracking the history in the future.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-842269846


   OK GA passed. Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on a change in pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on a change in pull request #31986:
URL: https://github.com/apache/spark/pull/31986#discussion_r617495129



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsExec.scala
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+
+/**
+ * This node updates the session window spec of each input rows via analyzing neighbor rows and
+ * determining rows belong to the same session window. The number of input rows remains the same.
+ * This node requires sort on input rows by group keys + the start time of session window.
+ *
+ * There are lots of overhead compared to [[MergingSessionsExec]]. Use [[MergingSessionsExec]]
+ * instead whenever possible. Use this node only when we cannot apply both calculations
+ * determining session windows and aggregating rows in session window altogether.
+ *
+ * Refer [[UpdatingSessionsIterator]] for more details.
+ */
+case class UpdatingSessionsExec(
+    keyExpressions: Seq[Attribute],
+    sessionExpression: Attribute,
+    child: SparkPlan) extends UnaryExecNode {
+
+  private val groupWithoutSessionExpression = keyExpressions.filterNot {
+    p => p.semanticEquals(sessionExpression)
+  }
+  private val groupingWithoutSessionAttributes = groupWithoutSessionExpression.map(_.toAttribute)
+
+  val childOrdering = Seq((groupingWithoutSessionAttributes ++ Seq(sessionExpression))

Review comment:
       Seems the `childOrdering` is an unused val.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsExec.scala
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+
+/**
+ * This node updates the session window spec of each input rows via analyzing neighbor rows and
+ * determining rows belong to the same session window. The number of input rows remains the same.
+ * This node requires sort on input rows by group keys + the start time of session window.
+ *
+ * There are lots of overhead compared to [[MergingSessionsExec]]. Use [[MergingSessionsExec]]
+ * instead whenever possible. Use this node only when we cannot apply both calculations
+ * determining session windows and aggregating rows in session window altogether.
+ *
+ * Refer [[UpdatingSessionsIterator]] for more details.
+ */
+case class UpdatingSessionsExec(
+    keyExpressions: Seq[Attribute],
+    sessionExpression: Attribute,
+    child: SparkPlan) extends UnaryExecNode {
+
+  private val groupWithoutSessionExpression = keyExpressions.filterNot {
+    p => p.semanticEquals(sessionExpression)
+  }
+  private val groupingWithoutSessionAttributes = groupWithoutSessionExpression.map(_.toAttribute)
+
+  val childOrdering = Seq((groupingWithoutSessionAttributes ++ Seq(sessionExpression))
+    .map(SortOrder(_, Ascending)))
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    val inMemoryThreshold = sqlContext.conf.windowExecBufferInMemoryThreshold
+    val spillThreshold = sqlContext.conf.windowExecBufferSpillThreshold
+
+    child.execute().mapPartitions { iter =>
+      new UpdatingSessionsIterator(iter, keyExpressions, sessionExpression,
+        child.output, inMemoryThreshold, spillThreshold)
+    }
+  }
+
+  override def output: Seq[Attribute] = child.output
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+    if (groupWithoutSessionExpression.isEmpty) {
+      AllTuples :: Nil

Review comment:
       So we allow the case that session window as the only grouping key?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsIterator.scala
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray
+
+/**
+ * This class calculates and updates the session window for each element in the given iterator,
+ * which makes elements in the same session window having same session spec. Downstream can apply
+ * aggregation to finally merge these elements bound to the same session window.
+ *
+ * This class works on the precondition that given iterator is sorted by "group keys + start time
+ * of session window", and this iterator still retains the characteristic of the sort.
+ *
+ * This class copies the elements to safely update on each element, as well as buffers elements
+ * which are bound to the same session window. Due to such overheads, [[MergingSessionsIterator]]
+ * should be used whenever possible.
+ */
+class UpdatingSessionsIterator(
+    iter: Iterator[InternalRow],
+    groupingExpressions: Seq[NamedExpression],
+    sessionExpression: NamedExpression,
+    inputSchema: Seq[Attribute],
+    inMemoryThreshold: Int,
+    spillThreshold: Int) extends Iterator[InternalRow] {
+
+  private val groupingWithoutSession: Seq[NamedExpression] =
+    groupingExpressions.diff(Seq(sessionExpression))
+  private val groupingWithoutSessionAttributes: Seq[Attribute] =
+    groupingWithoutSession.map(_.toAttribute)
+  private[this] val groupingWithoutSessionProjection: UnsafeProjection =
+    UnsafeProjection.create(groupingWithoutSession, inputSchema)
+
+  private val valuesExpressions: Seq[Attribute] = inputSchema.diff(groupingWithoutSession)
+
+  private[this] val sessionProjection: UnsafeProjection =
+    UnsafeProjection.create(Seq(sessionExpression), inputSchema)
+
+  private var currentKeys: InternalRow = _
+  private var currentSession: UnsafeRow = _
+
+  private var currentRows: ExternalAppendOnlyUnsafeRowArray = _
+
+  private var returnRows: ExternalAppendOnlyUnsafeRowArray = _
+  private var returnRowsIter: Iterator[InternalRow] = _
+  private var errorOnIterator: Boolean = false
+
+  private val processedKeys: mutable.HashSet[InternalRow] = new mutable.HashSet[InternalRow]()
+
+  override def hasNext: Boolean = {
+    assertIteratorNotCorrupted()
+
+    if (returnRowsIter != null && returnRowsIter.hasNext) {
+      return true
+    }
+
+    if (returnRowsIter != null) {
+      returnRowsIter = null
+      returnRows.clear()
+    }
+
+    iter.hasNext
+  }
+
+  override def next(): InternalRow = {
+    assertIteratorNotCorrupted()
+
+    if (returnRowsIter != null && returnRowsIter.hasNext) {
+      return returnRowsIter.next()
+    }
+
+    var exitCondition = false
+    while (iter.hasNext && !exitCondition) {
+      // we are going to modify the row, so we should make sure multiple objects are not
+      // referencing same memory, which could be possible when optimizing iterator
+      // without this, multiple rows in same key will be returned with same content
+      val row = iter.next().copy()
+
+      val keys = groupingWithoutSessionProjection(row)
+      val session = sessionProjection(row)
+      val sessionStruct = session.getStruct(0, 2)
+      val sessionStart = getSessionStart(sessionStruct)
+      val sessionEnd = getSessionEnd(sessionStruct)
+
+      if (currentKeys == null) {
+        startNewSession(row, keys, sessionStruct)
+      } else if (keys != currentKeys) {
+        closeCurrentSession(keyChanged = true)
+        processedKeys.add(currentKeys)
+        startNewSession(row, keys, sessionStruct)
+        exitCondition = true
+      } else {
+        if (sessionStart < getSessionStart(currentSession)) {
+          handleBrokenPreconditionForSort()
+        } else if (sessionStart <= getSessionEnd(currentSession)) {
+          // expanding session length if needed
+          expandEndOfCurrentSession(sessionEnd)
+          currentRows.add(row.asInstanceOf[UnsafeRow])
+        } else {
+          closeCurrentSession(keyChanged = false)
+          startNewSession(row, keys, sessionStruct)
+          exitCondition = true
+        }
+      }
+    }
+
+    if (!iter.hasNext) {
+      // no further row: closing session
+      closeCurrentSession(keyChanged = false)
+    }
+
+    // here returnRowsIter should be able to provide at least one row
+    require(returnRowsIter != null && returnRowsIter.hasNext)
+
+    returnRowsIter.next()
+  }
+
+  private def startNewSession(
+      currentRow: InternalRow,
+      groupingKey: UnsafeRow,
+      sessionStruct: UnsafeRow): Unit = {
+    if (processedKeys.contains(groupingKey)) {
+      handleBrokenPreconditionForSort()
+    }
+
+    currentKeys = groupingKey.copy()
+    currentSession = sessionStruct.copy()
+
+    currentRows = new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold)
+    currentRows.add(currentRow.asInstanceOf[UnsafeRow])
+  }
+
+  private def getSessionStart(sessionStruct: UnsafeRow): Long = {
+    sessionStruct.getLong(0)
+  }
+
+  private def getSessionEnd(sessionStruct: UnsafeRow): Long = {
+    sessionStruct.getLong(1)
+  }
+
+  def updateSessionEnd(sessionStruct: UnsafeRow, sessionEnd: Long): Unit = {
+    sessionStruct.setLong(1, sessionEnd)
+  }
+
+  private def expandEndOfCurrentSession(sessionEnd: Long): Unit = {
+    if (sessionEnd > getSessionEnd(currentSession)) {
+      updateSessionEnd(currentSession, sessionEnd)
+    }
+  }
+
+  private def handleBrokenPreconditionForSort(): Unit = {
+    errorOnIterator = true

Review comment:
       Not sure the var `errorOnIterator` is necessary or not, since we throw an exception just after it turns to `true`.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsExec.scala
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+
+/**
+ * This node updates the session window spec of each input rows via analyzing neighbor rows and
+ * determining rows belong to the same session window. The number of input rows remains the same.
+ * This node requires sort on input rows by group keys + the start time of session window.
+ *
+ * There are lots of overhead compared to [[MergingSessionsExec]]. Use [[MergingSessionsExec]]
+ * instead whenever possible. Use this node only when we cannot apply both calculations
+ * determining session windows and aggregating rows in session window altogether.
+ *
+ * Refer [[UpdatingSessionsIterator]] for more details.
+ */
+case class UpdatingSessionsExec(
+    keyExpressions: Seq[Attribute],
+    sessionExpression: Attribute,
+    child: SparkPlan) extends UnaryExecNode {
+
+  private val groupWithoutSessionExpression = keyExpressions.filterNot {
+    p => p.semanticEquals(sessionExpression)
+  }
+  private val groupingWithoutSessionAttributes = groupWithoutSessionExpression.map(_.toAttribute)
+
+  val childOrdering = Seq((groupingWithoutSessionAttributes ++ Seq(sessionExpression))
+    .map(SortOrder(_, Ascending)))
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    val inMemoryThreshold = sqlContext.conf.windowExecBufferInMemoryThreshold
+    val spillThreshold = sqlContext.conf.windowExecBufferSpillThreshold

Review comment:
       An open question, since the `UpdatingSessionExec` is in a different pattern with the WindowExec. Should we add new configs for session window update/merge?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsIterator.scala
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray
+
+/**
+ * This class calculates and updates the session window for each element in the given iterator,
+ * which makes elements in the same session window having same session spec. Downstream can apply
+ * aggregation to finally merge these elements bound to the same session window.
+ *
+ * This class works on the precondition that given iterator is sorted by "group keys + start time
+ * of session window", and this iterator still retains the characteristic of the sort.
+ *
+ * This class copies the elements to safely update on each element, as well as buffers elements
+ * which are bound to the same session window. Due to such overheads, [[MergingSessionsIterator]]
+ * should be used whenever possible.
+ */
+class UpdatingSessionsIterator(
+    iter: Iterator[InternalRow],
+    groupingExpressions: Seq[NamedExpression],
+    sessionExpression: NamedExpression,
+    inputSchema: Seq[Attribute],
+    inMemoryThreshold: Int,
+    spillThreshold: Int) extends Iterator[InternalRow] {
+
+  private val groupingWithoutSession: Seq[NamedExpression] =
+    groupingExpressions.diff(Seq(sessionExpression))
+  private val groupingWithoutSessionAttributes: Seq[Attribute] =
+    groupingWithoutSession.map(_.toAttribute)
+  private[this] val groupingWithoutSessionProjection: UnsafeProjection =
+    UnsafeProjection.create(groupingWithoutSession, inputSchema)
+
+  private val valuesExpressions: Seq[Attribute] = inputSchema.diff(groupingWithoutSession)
+
+  private[this] val sessionProjection: UnsafeProjection =
+    UnsafeProjection.create(Seq(sessionExpression), inputSchema)
+
+  private var currentKeys: InternalRow = _

Review comment:
       Spend some time reviewing all the vars. It would be great to add some comment on each variable?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsIterator.scala
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray
+
+/**
+ * This class calculates and updates the session window for each element in the given iterator,
+ * which makes elements in the same session window having same session spec. Downstream can apply
+ * aggregation to finally merge these elements bound to the same session window.
+ *
+ * This class works on the precondition that given iterator is sorted by "group keys + start time
+ * of session window", and this iterator still retains the characteristic of the sort.
+ *
+ * This class copies the elements to safely update on each element, as well as buffers elements
+ * which are bound to the same session window. Due to such overheads, [[MergingSessionsIterator]]
+ * should be used whenever possible.
+ */
+class UpdatingSessionsIterator(
+    iter: Iterator[InternalRow],
+    groupingExpressions: Seq[NamedExpression],
+    sessionExpression: NamedExpression,
+    inputSchema: Seq[Attribute],
+    inMemoryThreshold: Int,
+    spillThreshold: Int) extends Iterator[InternalRow] {
+
+  private val groupingWithoutSession: Seq[NamedExpression] =
+    groupingExpressions.diff(Seq(sessionExpression))
+  private val groupingWithoutSessionAttributes: Seq[Attribute] =
+    groupingWithoutSession.map(_.toAttribute)
+  private[this] val groupingWithoutSessionProjection: UnsafeProjection =
+    UnsafeProjection.create(groupingWithoutSession, inputSchema)
+
+  private val valuesExpressions: Seq[Attribute] = inputSchema.diff(groupingWithoutSession)
+
+  private[this] val sessionProjection: UnsafeProjection =
+    UnsafeProjection.create(Seq(sessionExpression), inputSchema)
+
+  private var currentKeys: InternalRow = _

Review comment:
       Or rename some vars, e.g. `currentRows` -> `rowsForCurrentSession`?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsExec.scala
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+
+/**
+ * This node updates the session window spec of each input rows via analyzing neighbor rows and
+ * determining rows belong to the same session window. The number of input rows remains the same.
+ * This node requires sort on input rows by group keys + the start time of session window.
+ *
+ * There are lots of overhead compared to [[MergingSessionsExec]]. Use [[MergingSessionsExec]]
+ * instead whenever possible. Use this node only when we cannot apply both calculations
+ * determining session windows and aggregating rows in session window altogether.
+ *
+ * Refer [[UpdatingSessionsIterator]] for more details.
+ */
+case class UpdatingSessionsExec(
+    keyExpressions: Seq[Attribute],
+    sessionExpression: Attribute,
+    child: SparkPlan) extends UnaryExecNode {
+
+  private val groupWithoutSessionExpression = keyExpressions.filterNot {

Review comment:
       nit: groupingWithout...?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-810074108


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/136688/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-809839890


   **[Test build #136677 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/136677/testReport)** for PR 31986 at commit [`3e8dd5c`](https://github.com/apache/spark/commit/3e8dd5ccd8c2c136f5c3a4ff64269267edbdf81e).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-826629092


   **[Test build #137949 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137949/testReport)** for PR 31986 at commit [`5ce623b`](https://github.com/apache/spark/commit/5ce623be7aec9e3e9230604b485be57c993a280a).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #31986:
URL: https://github.com/apache/spark/pull/31986#discussion_r629774312



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2000,6 +2000,26 @@ object SQLConf {
       .intConf
       .createWithDefault(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get)
 
+  val SESSION_WINDOW_BUFFER_IN_MEMORY_THRESHOLD =
+    buildConf("spark.sql.sessionWindow.buffer.in.memory.threshold")

Review comment:
       `in.memory` looks weird to me. Maybe just `inMemory`?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsExec.scala
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+
+/**
+ * This node updates the session window spec of each input rows via analyzing neighbor rows and
+ * determining rows belong to the same session window. The number of input rows remains the same.
+ * This node requires sort on input rows by group keys + the start time of session window.
+ *
+ * There are lots of overhead compared to [[MergingSessionsExec]]. Use [[MergingSessionsExec]]
+ * instead whenever possible. Use this node only when we cannot apply both calculations
+ * determining session windows and aggregating rows in session window altogether.
+ *
+ * Refer [[UpdatingSessionsIterator]] for more details.
+ */
+case class UpdatingSessionsExec(
+    keyExpressions: Seq[Attribute],
+    sessionExpression: Attribute,
+    child: SparkPlan) extends UnaryExecNode {
+
+  private val groupingWithoutSessionExpression = keyExpressions.filterNot {
+    p => p.semanticEquals(sessionExpression)
+  }
+  private val groupingWithoutSessionAttributes =
+    groupingWithoutSessionExpression.map(_.toAttribute)
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    val inMemoryThreshold = sqlContext.conf.sessionWindowBufferInMemoryThreshold
+    val spillThreshold = sqlContext.conf.sessionWindowBufferSpillThreshold
+
+    child.execute().mapPartitions { iter =>
+      new UpdatingSessionsIterator(iter, keyExpressions, sessionExpression,
+        child.output, inMemoryThreshold, spillThreshold)
+    }
+  }
+
+  override def output: Seq[Attribute] = child.output
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+    if (groupingWithoutSessionExpression.isEmpty) {
+      AllTuples :: Nil
+    } else {
+      ClusteredDistribution(groupingWithoutSessionExpression) :: Nil
+    }
+  }
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
+    Seq((groupingWithoutSessionAttributes ++ Seq(sessionExpression))

Review comment:
       Isn't this just `keyExpressions`?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsExec.scala
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+
+/**
+ * This node updates the session window spec of each input rows via analyzing neighbor rows and
+ * determining rows belong to the same session window. The number of input rows remains the same.
+ * This node requires sort on input rows by group keys + the start time of session window.
+ *
+ * There are lots of overhead compared to [[MergingSessionsExec]]. Use [[MergingSessionsExec]]
+ * instead whenever possible. Use this node only when we cannot apply both calculations
+ * determining session windows and aggregating rows in session window altogether.

Review comment:
       where do we have listed the possible cases we cannot apply both them altogher?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsIterator.scala
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray
+
+/**
+ * This class calculates and updates the session window for each element in the given iterator,
+ * which makes elements in the same session window having same session spec. Downstream can apply
+ * aggregation to finally merge these elements bound to the same session window.
+ *
+ * This class works on the precondition that given iterator is sorted by "group keys + start time
+ * of session window", and this iterator still retains the characteristic of the sort.
+ *
+ * This class copies the elements to safely update on each element, as well as buffers elements
+ * which are bound to the same session window. Due to such overheads, [[MergingSessionsIterator]]
+ * should be used whenever possible.
+ */
+class UpdatingSessionsIterator(
+    iter: Iterator[InternalRow],
+    groupingExpressions: Seq[NamedExpression],
+    sessionExpression: NamedExpression,
+    inputSchema: Seq[Attribute],
+    inMemoryThreshold: Int,
+    spillThreshold: Int) extends Iterator[InternalRow] {
+
+  private val groupingWithoutSession: Seq[NamedExpression] =
+    groupingExpressions.diff(Seq(sessionExpression))
+  private val groupingWithoutSessionAttributes: Seq[Attribute] =
+    groupingWithoutSession.map(_.toAttribute)
+  private[this] val groupingWithoutSessionProjection: UnsafeProjection =
+    UnsafeProjection.create(groupingWithoutSession, inputSchema)
+
+  private val valuesExpressions: Seq[Attribute] = inputSchema.diff(groupingWithoutSession)
+
+  private[this] val sessionProjection: UnsafeProjection =
+    UnsafeProjection.create(Seq(sessionExpression), inputSchema)
+
+  // Below three variables hold the information for "current session".
+  private var currentKeys: InternalRow = _
+  private var currentSession: UnsafeRow = _
+  private var rowsForCurrentSession: ExternalAppendOnlyUnsafeRowArray = _
+
+  // Below two variables hold the information for "returning rows". The reason we have this in
+  // addition to "current session" is that there could be the chance that iterator for returning
+  // rows on previous session wasn't fully consumed and there's a new session being started.
+  private var returnRows: ExternalAppendOnlyUnsafeRowArray = _
+  private var returnRowsIter: Iterator[InternalRow] = _
+
+  // Mark this to raise error on any operations after the iterator figures out the error.
+  private var errorOnIterator: Boolean = false
+
+  private val processedKeys: mutable.HashSet[InternalRow] = new mutable.HashSet[InternalRow]()
+
+  override def hasNext: Boolean = {
+    assertIteratorNotCorrupted()
+
+    if (returnRowsIter != null && returnRowsIter.hasNext) {
+      return true
+    }
+
+    if (returnRowsIter != null) {
+      returnRowsIter = null
+      returnRows.clear()
+    }
+
+    iter.hasNext
+  }
+
+  override def next(): InternalRow = {
+    assertIteratorNotCorrupted()
+
+    if (returnRowsIter != null && returnRowsIter.hasNext) {
+      return returnRowsIter.next()
+    }
+
+    var exitCondition = false
+    while (iter.hasNext && !exitCondition) {
+      // we are going to modify the row, so we should make sure multiple objects are not
+      // referencing same memory, which could be possible when optimizing iterator
+      // without this, multiple rows in same key will be returned with same content
+      val row = iter.next().copy()
+
+      val keys = groupingWithoutSessionProjection(row)
+      val session = sessionProjection(row)
+      val sessionStruct = session.getStruct(0, 2)
+      val sessionStart = getSessionStart(sessionStruct)
+      val sessionEnd = getSessionEnd(sessionStruct)
+
+      if (currentKeys == null) {
+        startNewSession(row, keys, sessionStruct)
+      } else if (keys != currentKeys) {
+        closeCurrentSession(keyChanged = true)
+        processedKeys.add(currentKeys)
+        startNewSession(row, keys, sessionStruct)
+        exitCondition = true
+      } else {
+        if (sessionStart < getSessionStart(currentSession)) {
+          handleBrokenPreconditionForSort()
+        } else if (sessionStart <= getSessionEnd(currentSession)) {
+          // expanding session length if needed
+          expandEndOfCurrentSession(sessionEnd)
+          rowsForCurrentSession.add(row.asInstanceOf[UnsafeRow])
+        } else {
+          closeCurrentSession(keyChanged = false)
+          startNewSession(row, keys, sessionStruct)
+          exitCondition = true
+        }
+      }
+    }
+
+    if (!iter.hasNext) {
+      // no further row: closing session
+      closeCurrentSession(keyChanged = false)
+    }
+
+    // here returnRowsIter should be able to provide at least one row
+    require(returnRowsIter != null && returnRowsIter.hasNext)
+
+    returnRowsIter.next()
+  }
+
+  private def startNewSession(
+      currentRow: InternalRow,
+      groupingKey: UnsafeRow,
+      sessionStruct: UnsafeRow): Unit = {
+    if (processedKeys.contains(groupingKey)) {
+      handleBrokenPreconditionForSort()
+    }
+
+    currentKeys = groupingKey.copy()
+    currentSession = sessionStruct.copy()
+
+    rowsForCurrentSession = new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold)
+    rowsForCurrentSession.add(currentRow.asInstanceOf[UnsafeRow])
+  }
+
+  private def getSessionStart(sessionStruct: UnsafeRow): Long = {
+    sessionStruct.getLong(0)
+  }
+
+  private def getSessionEnd(sessionStruct: UnsafeRow): Long = {
+    sessionStruct.getLong(1)
+  }
+
+  def updateSessionEnd(sessionStruct: UnsafeRow, sessionEnd: Long): Unit = {
+    sessionStruct.setLong(1, sessionEnd)
+  }
+
+  private def expandEndOfCurrentSession(sessionEnd: Long): Unit = {
+    if (sessionEnd > getSessionEnd(currentSession)) {
+      updateSessionEnd(currentSession, sessionEnd)
+    }
+  }
+
+  private def handleBrokenPreconditionForSort(): Unit = {
+    errorOnIterator = true
+    throw new IllegalStateException("The iterator must be sorted by key and session start!")
+  }
+
+  private val join = new JoinedRow
+  private val join2 = new JoinedRow
+
+  private val groupingKeyProj = GenerateUnsafeProjection.generate(groupingExpressions,
+    groupingWithoutSessionAttributes :+ sessionExpression.toAttribute)
+  private val valueProj = GenerateUnsafeProjection.generate(valuesExpressions, inputSchema)
+  private val restoreProj = GenerateUnsafeProjection.generate(inputSchema,
+    groupingExpressions.map(_.toAttribute) ++ valuesExpressions.map(_.toAttribute))
+
+  private def generateGroupingKey(): UnsafeRow = {
+    val newRow = new SpecificInternalRow(Seq(sessionExpression.toAttribute).toStructType)
+    newRow.update(0, currentSession)
+    val joined = join(currentKeys, newRow)
+
+    groupingKeyProj(joined)
+  }
+
+  private def closeCurrentSession(keyChanged: Boolean): Unit = {
+    assert(returnRowsIter == null || !returnRowsIter.hasNext)
+
+    returnRows = rowsForCurrentSession
+    rowsForCurrentSession = null
+
+    val groupingKey = generateGroupingKey()
+
+    val currentRowsIter = returnRows.generateIterator().map { internalRow =>
+      val valueRow = valueProj(internalRow)
+      restoreProj(join2(groupingKey, valueRow)).copy()

Review comment:
       The projection of `groupingKey` looks redundant? Looks like we only need do `restoreProj`?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsExec.scala
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+
+/**
+ * This node updates the session window spec of each input rows via analyzing neighbor rows and
+ * determining rows belong to the same session window. The number of input rows remains the same.
+ * This node requires sort on input rows by group keys + the start time of session window.
+ *
+ * There are lots of overhead compared to [[MergingSessionsExec]]. Use [[MergingSessionsExec]]
+ * instead whenever possible. Use this node only when we cannot apply both calculations
+ * determining session windows and aggregating rows in session window altogether.
+ *
+ * Refer [[UpdatingSessionsIterator]] for more details.
+ */
+case class UpdatingSessionsExec(
+    keyExpressions: Seq[Attribute],

Review comment:
       `keyExpressions` seems also including session window expression and session keys? Maybe `groupingExpressions`?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsIterator.scala
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray
+
+/**
+ * This class calculates and updates the session window for each element in the given iterator,
+ * which makes elements in the same session window having same session spec. Downstream can apply
+ * aggregation to finally merge these elements bound to the same session window.
+ *
+ * This class works on the precondition that given iterator is sorted by "group keys + start time
+ * of session window", and this iterator still retains the characteristic of the sort.
+ *
+ * This class copies the elements to safely update on each element, as well as buffers elements
+ * which are bound to the same session window. Due to such overheads, [[MergingSessionsIterator]]
+ * should be used whenever possible.
+ */
+class UpdatingSessionsIterator(
+    iter: Iterator[InternalRow],
+    groupingExpressions: Seq[NamedExpression],
+    sessionExpression: NamedExpression,
+    inputSchema: Seq[Attribute],
+    inMemoryThreshold: Int,
+    spillThreshold: Int) extends Iterator[InternalRow] {
+
+  private val groupingWithoutSession: Seq[NamedExpression] =
+    groupingExpressions.diff(Seq(sessionExpression))
+  private val groupingWithoutSessionAttributes: Seq[Attribute] =
+    groupingWithoutSession.map(_.toAttribute)
+  private[this] val groupingWithoutSessionProjection: UnsafeProjection =
+    UnsafeProjection.create(groupingWithoutSession, inputSchema)
+
+  private val valuesExpressions: Seq[Attribute] = inputSchema.diff(groupingWithoutSession)
+
+  private[this] val sessionProjection: UnsafeProjection =
+    UnsafeProjection.create(Seq(sessionExpression), inputSchema)
+
+  // Below three variables hold the information for "current session".
+  private var currentKeys: InternalRow = _
+  private var currentSession: UnsafeRow = _
+  private var rowsForCurrentSession: ExternalAppendOnlyUnsafeRowArray = _
+
+  // Below two variables hold the information for "returning rows". The reason we have this in
+  // addition to "current session" is that there could be the chance that iterator for returning
+  // rows on previous session wasn't fully consumed and there's a new session being started.
+  private var returnRows: ExternalAppendOnlyUnsafeRowArray = _
+  private var returnRowsIter: Iterator[InternalRow] = _
+
+  // Mark this to raise error on any operations after the iterator figures out the error.
+  private var errorOnIterator: Boolean = false
+
+  private val processedKeys: mutable.HashSet[InternalRow] = new mutable.HashSet[InternalRow]()
+
+  override def hasNext: Boolean = {
+    assertIteratorNotCorrupted()
+
+    if (returnRowsIter != null && returnRowsIter.hasNext) {
+      return true
+    }
+
+    if (returnRowsIter != null) {
+      returnRowsIter = null
+      returnRows.clear()
+    }
+
+    iter.hasNext
+  }
+
+  override def next(): InternalRow = {
+    assertIteratorNotCorrupted()
+
+    if (returnRowsIter != null && returnRowsIter.hasNext) {
+      return returnRowsIter.next()
+    }
+
+    var exitCondition = false
+    while (iter.hasNext && !exitCondition) {
+      // we are going to modify the row, so we should make sure multiple objects are not
+      // referencing same memory, which could be possible when optimizing iterator
+      // without this, multiple rows in same key will be returned with same content
+      val row = iter.next().copy()
+
+      val keys = groupingWithoutSessionProjection(row)
+      val session = sessionProjection(row)
+      val sessionStruct = session.getStruct(0, 2)
+      val sessionStart = getSessionStart(sessionStruct)
+      val sessionEnd = getSessionEnd(sessionStruct)
+
+      if (currentKeys == null) {
+        startNewSession(row, keys, sessionStruct)
+      } else if (keys != currentKeys) {
+        closeCurrentSession(keyChanged = true)
+        processedKeys.add(currentKeys)
+        startNewSession(row, keys, sessionStruct)
+        exitCondition = true
+      } else {
+        if (sessionStart < getSessionStart(currentSession)) {
+          handleBrokenPreconditionForSort()
+        } else if (sessionStart <= getSessionEnd(currentSession)) {
+          // expanding session length if needed
+          expandEndOfCurrentSession(sessionEnd)
+          rowsForCurrentSession.add(row.asInstanceOf[UnsafeRow])
+        } else {
+          closeCurrentSession(keyChanged = false)
+          startNewSession(row, keys, sessionStruct)
+          exitCondition = true

Review comment:
       session gap is already added to session end?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsIterator.scala
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray
+
+/**
+ * This class calculates and updates the session window for each element in the given iterator,
+ * which makes elements in the same session window having same session spec. Downstream can apply
+ * aggregation to finally merge these elements bound to the same session window.
+ *
+ * This class works on the precondition that given iterator is sorted by "group keys + start time
+ * of session window", and this iterator still retains the characteristic of the sort.
+ *
+ * This class copies the elements to safely update on each element, as well as buffers elements
+ * which are bound to the same session window. Due to such overheads, [[MergingSessionsIterator]]
+ * should be used whenever possible.
+ */
+class UpdatingSessionsIterator(
+    iter: Iterator[InternalRow],
+    groupingExpressions: Seq[NamedExpression],
+    sessionExpression: NamedExpression,
+    inputSchema: Seq[Attribute],
+    inMemoryThreshold: Int,
+    spillThreshold: Int) extends Iterator[InternalRow] {
+
+  private val groupingWithoutSession: Seq[NamedExpression] =
+    groupingExpressions.diff(Seq(sessionExpression))
+  private val groupingWithoutSessionAttributes: Seq[Attribute] =
+    groupingWithoutSession.map(_.toAttribute)
+  private[this] val groupingWithoutSessionProjection: UnsafeProjection =
+    UnsafeProjection.create(groupingWithoutSession, inputSchema)
+
+  private val valuesExpressions: Seq[Attribute] = inputSchema.diff(groupingWithoutSession)

Review comment:
       Does `valuesExpressions` include session window?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-842172316






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-826627455


   @viirya @xuanyuanking 
   Appreciated the next round of reviewing. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-838135143


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/138356/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-842104032


   retest this, please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #31986:
URL: https://github.com/apache/spark/pull/31986#discussion_r619919322



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsIterator.scala
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray
+
+/**
+ * This class calculates and updates the session window for each element in the given iterator,
+ * which makes elements in the same session window having same session spec. Downstream can apply
+ * aggregation to finally merge these elements bound to the same session window.
+ *
+ * This class works on the precondition that given iterator is sorted by "group keys + start time
+ * of session window", and this iterator still retains the characteristic of the sort.
+ *
+ * This class copies the elements to safely update on each element, as well as buffers elements
+ * which are bound to the same session window. Due to such overheads, [[MergingSessionsIterator]]
+ * should be used whenever possible.
+ */
+class UpdatingSessionsIterator(
+    iter: Iterator[InternalRow],
+    groupingExpressions: Seq[NamedExpression],
+    sessionExpression: NamedExpression,
+    inputSchema: Seq[Attribute],
+    inMemoryThreshold: Int,
+    spillThreshold: Int) extends Iterator[InternalRow] {
+
+  private val groupingWithoutSession: Seq[NamedExpression] =
+    groupingExpressions.diff(Seq(sessionExpression))
+  private val groupingWithoutSessionAttributes: Seq[Attribute] =
+    groupingWithoutSession.map(_.toAttribute)
+  private[this] val groupingWithoutSessionProjection: UnsafeProjection =
+    UnsafeProjection.create(groupingWithoutSession, inputSchema)
+
+  private val valuesExpressions: Seq[Attribute] = inputSchema.diff(groupingWithoutSession)
+
+  private[this] val sessionProjection: UnsafeProjection =
+    UnsafeProjection.create(Seq(sessionExpression), inputSchema)
+
+  private var currentKeys: InternalRow = _
+  private var currentSession: UnsafeRow = _
+
+  private var currentRows: ExternalAppendOnlyUnsafeRowArray = _
+
+  private var returnRows: ExternalAppendOnlyUnsafeRowArray = _
+  private var returnRowsIter: Iterator[InternalRow] = _
+  private var errorOnIterator: Boolean = false
+
+  private val processedKeys: mutable.HashSet[InternalRow] = new mutable.HashSet[InternalRow]()
+
+  override def hasNext: Boolean = {
+    assertIteratorNotCorrupted()
+
+    if (returnRowsIter != null && returnRowsIter.hasNext) {
+      return true
+    }
+
+    if (returnRowsIter != null) {
+      returnRowsIter = null
+      returnRows.clear()
+    }
+
+    iter.hasNext
+  }
+
+  override def next(): InternalRow = {
+    assertIteratorNotCorrupted()
+
+    if (returnRowsIter != null && returnRowsIter.hasNext) {
+      return returnRowsIter.next()
+    }
+
+    var exitCondition = false
+    while (iter.hasNext && !exitCondition) {
+      // we are going to modify the row, so we should make sure multiple objects are not
+      // referencing same memory, which could be possible when optimizing iterator
+      // without this, multiple rows in same key will be returned with same content
+      val row = iter.next().copy()
+
+      val keys = groupingWithoutSessionProjection(row)
+      val session = sessionProjection(row)
+      val sessionStruct = session.getStruct(0, 2)
+      val sessionStart = getSessionStart(sessionStruct)
+      val sessionEnd = getSessionEnd(sessionStruct)
+
+      if (currentKeys == null) {
+        startNewSession(row, keys, sessionStruct)
+      } else if (keys != currentKeys) {
+        closeCurrentSession(keyChanged = true)
+        processedKeys.add(currentKeys)
+        startNewSession(row, keys, sessionStruct)
+        exitCondition = true
+      } else {
+        if (sessionStart < getSessionStart(currentSession)) {
+          handleBrokenPreconditionForSort()
+        } else if (sessionStart <= getSessionEnd(currentSession)) {
+          // expanding session length if needed
+          expandEndOfCurrentSession(sessionEnd)
+          currentRows.add(row.asInstanceOf[UnsafeRow])
+        } else {
+          closeCurrentSession(keyChanged = false)
+          startNewSession(row, keys, sessionStruct)
+          exitCondition = true
+        }
+      }
+    }
+
+    if (!iter.hasNext) {
+      // no further row: closing session
+      closeCurrentSession(keyChanged = false)
+    }
+
+    // here returnRowsIter should be able to provide at least one row
+    require(returnRowsIter != null && returnRowsIter.hasNext)
+
+    returnRowsIter.next()
+  }
+
+  private def startNewSession(
+      currentRow: InternalRow,
+      groupingKey: UnsafeRow,
+      sessionStruct: UnsafeRow): Unit = {
+    if (processedKeys.contains(groupingKey)) {
+      handleBrokenPreconditionForSort()
+    }
+
+    currentKeys = groupingKey.copy()
+    currentSession = sessionStruct.copy()
+
+    currentRows = new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold)
+    currentRows.add(currentRow.asInstanceOf[UnsafeRow])
+  }
+
+  private def getSessionStart(sessionStruct: UnsafeRow): Long = {
+    sessionStruct.getLong(0)
+  }
+
+  private def getSessionEnd(sessionStruct: UnsafeRow): Long = {
+    sessionStruct.getLong(1)
+  }
+
+  def updateSessionEnd(sessionStruct: UnsafeRow, sessionEnd: Long): Unit = {
+    sessionStruct.setLong(1, sessionEnd)
+  }
+
+  private def expandEndOfCurrentSession(sessionEnd: Long): Unit = {
+    if (sessionEnd > getSessionEnd(currentSession)) {
+      updateSessionEnd(currentSession, sessionEnd)
+    }
+  }
+
+  private def handleBrokenPreconditionForSort(): Unit = {
+    errorOnIterator = true

Review comment:
       This ensures the iterator won't work "after" raising exception. The iterator itself should make sense without any context how Spark works.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #31986:
URL: https://github.com/apache/spark/pull/31986#discussion_r629795680



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsExec.scala
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+
+/**
+ * This node updates the session window spec of each input rows via analyzing neighbor rows and
+ * determining rows belong to the same session window. The number of input rows remains the same.
+ * This node requires sort on input rows by group keys + the start time of session window.
+ *
+ * There are lots of overhead compared to [[MergingSessionsExec]]. Use [[MergingSessionsExec]]
+ * instead whenever possible. Use this node only when we cannot apply both calculations
+ * determining session windows and aggregating rows in session window altogether.
+ *
+ * Refer [[UpdatingSessionsIterator]] for more details.
+ */
+case class UpdatingSessionsExec(
+    keyExpressions: Seq[Attribute],
+    sessionExpression: Attribute,
+    child: SparkPlan) extends UnaryExecNode {
+
+  private val groupingWithoutSessionExpression = keyExpressions.filterNot {
+    p => p.semanticEquals(sessionExpression)
+  }
+  private val groupingWithoutSessionAttributes =
+    groupingWithoutSessionExpression.map(_.toAttribute)
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    val inMemoryThreshold = sqlContext.conf.sessionWindowBufferInMemoryThreshold
+    val spillThreshold = sqlContext.conf.sessionWindowBufferSpillThreshold
+
+    child.execute().mapPartitions { iter =>
+      new UpdatingSessionsIterator(iter, keyExpressions, sessionExpression,
+        child.output, inMemoryThreshold, spillThreshold)
+    }
+  }
+
+  override def output: Seq[Attribute] = child.output
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+    if (groupingWithoutSessionExpression.isEmpty) {
+      AllTuples :: Nil
+    } else {
+      ClusteredDistribution(groupingWithoutSessionExpression) :: Nil
+    }
+  }
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
+    Seq((groupingWithoutSessionAttributes ++ Seq(sessionExpression))

Review comment:
       Nice finding. Will update.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsIterator.scala
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray
+
+/**
+ * This class calculates and updates the session window for each element in the given iterator,
+ * which makes elements in the same session window having same session spec. Downstream can apply
+ * aggregation to finally merge these elements bound to the same session window.
+ *
+ * This class works on the precondition that given iterator is sorted by "group keys + start time
+ * of session window", and this iterator still retains the characteristic of the sort.
+ *
+ * This class copies the elements to safely update on each element, as well as buffers elements
+ * which are bound to the same session window. Due to such overheads, [[MergingSessionsIterator]]
+ * should be used whenever possible.
+ */
+class UpdatingSessionsIterator(
+    iter: Iterator[InternalRow],
+    groupingExpressions: Seq[NamedExpression],
+    sessionExpression: NamedExpression,
+    inputSchema: Seq[Attribute],
+    inMemoryThreshold: Int,
+    spillThreshold: Int) extends Iterator[InternalRow] {
+
+  private val groupingWithoutSession: Seq[NamedExpression] =
+    groupingExpressions.diff(Seq(sessionExpression))
+  private val groupingWithoutSessionAttributes: Seq[Attribute] =
+    groupingWithoutSession.map(_.toAttribute)
+  private[this] val groupingWithoutSessionProjection: UnsafeProjection =
+    UnsafeProjection.create(groupingWithoutSession, inputSchema)
+
+  private val valuesExpressions: Seq[Attribute] = inputSchema.diff(groupingWithoutSession)

Review comment:
       Yes but looks like not necessary. Will fix.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsIterator.scala
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray
+
+/**
+ * This class calculates and updates the session window for each element in the given iterator,
+ * which makes elements in the same session window having same session spec. Downstream can apply
+ * aggregation to finally merge these elements bound to the same session window.
+ *
+ * This class works on the precondition that given iterator is sorted by "group keys + start time
+ * of session window", and this iterator still retains the characteristic of the sort.
+ *
+ * This class copies the elements to safely update on each element, as well as buffers elements
+ * which are bound to the same session window. Due to such overheads, [[MergingSessionsIterator]]
+ * should be used whenever possible.
+ */
+class UpdatingSessionsIterator(
+    iter: Iterator[InternalRow],
+    groupingExpressions: Seq[NamedExpression],
+    sessionExpression: NamedExpression,
+    inputSchema: Seq[Attribute],
+    inMemoryThreshold: Int,
+    spillThreshold: Int) extends Iterator[InternalRow] {
+
+  private val groupingWithoutSession: Seq[NamedExpression] =
+    groupingExpressions.diff(Seq(sessionExpression))
+  private val groupingWithoutSessionAttributes: Seq[Attribute] =
+    groupingWithoutSession.map(_.toAttribute)
+  private[this] val groupingWithoutSessionProjection: UnsafeProjection =
+    UnsafeProjection.create(groupingWithoutSession, inputSchema)
+
+  private val valuesExpressions: Seq[Attribute] = inputSchema.diff(groupingWithoutSession)
+
+  private[this] val sessionProjection: UnsafeProjection =
+    UnsafeProjection.create(Seq(sessionExpression), inputSchema)
+
+  // Below three variables hold the information for "current session".
+  private var currentKeys: InternalRow = _
+  private var currentSession: UnsafeRow = _
+  private var rowsForCurrentSession: ExternalAppendOnlyUnsafeRowArray = _
+
+  // Below two variables hold the information for "returning rows". The reason we have this in
+  // addition to "current session" is that there could be the chance that iterator for returning
+  // rows on previous session wasn't fully consumed and there's a new session being started.
+  private var returnRows: ExternalAppendOnlyUnsafeRowArray = _
+  private var returnRowsIter: Iterator[InternalRow] = _
+
+  // Mark this to raise error on any operations after the iterator figures out the error.
+  private var errorOnIterator: Boolean = false
+
+  private val processedKeys: mutable.HashSet[InternalRow] = new mutable.HashSet[InternalRow]()
+
+  override def hasNext: Boolean = {
+    assertIteratorNotCorrupted()
+
+    if (returnRowsIter != null && returnRowsIter.hasNext) {
+      return true
+    }
+
+    if (returnRowsIter != null) {
+      returnRowsIter = null
+      returnRows.clear()
+    }
+
+    iter.hasNext
+  }
+
+  override def next(): InternalRow = {
+    assertIteratorNotCorrupted()
+
+    if (returnRowsIter != null && returnRowsIter.hasNext) {
+      return returnRowsIter.next()
+    }
+
+    var exitCondition = false
+    while (iter.hasNext && !exitCondition) {
+      // we are going to modify the row, so we should make sure multiple objects are not
+      // referencing same memory, which could be possible when optimizing iterator
+      // without this, multiple rows in same key will be returned with same content
+      val row = iter.next().copy()
+
+      val keys = groupingWithoutSessionProjection(row)
+      val session = sessionProjection(row)
+      val sessionStruct = session.getStruct(0, 2)
+      val sessionStart = getSessionStart(sessionStruct)
+      val sessionEnd = getSessionEnd(sessionStruct)
+
+      if (currentKeys == null) {
+        startNewSession(row, keys, sessionStruct)
+      } else if (keys != currentKeys) {
+        closeCurrentSession(keyChanged = true)
+        processedKeys.add(currentKeys)
+        startNewSession(row, keys, sessionStruct)
+        exitCondition = true
+      } else {
+        if (sessionStart < getSessionStart(currentSession)) {
+          handleBrokenPreconditionForSort()
+        } else if (sessionStart <= getSessionEnd(currentSession)) {
+          // expanding session length if needed
+          expandEndOfCurrentSession(sessionEnd)
+          rowsForCurrentSession.add(row.asInstanceOf[UnsafeRow])
+        } else {
+          closeCurrentSession(keyChanged = false)
+          startNewSession(row, keys, sessionStruct)
+          exitCondition = true

Review comment:
       Yes, every row has session end which session gap is applied. Merging sessions would expand session end as later one which session gap is applied.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsExec.scala
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+
+/**
+ * This node updates the session window spec of each input rows via analyzing neighbor rows and
+ * determining rows belong to the same session window. The number of input rows remains the same.
+ * This node requires sort on input rows by group keys + the start time of session window.
+ *
+ * There are lots of overhead compared to [[MergingSessionsExec]]. Use [[MergingSessionsExec]]
+ * instead whenever possible. Use this node only when we cannot apply both calculations
+ * determining session windows and aggregating rows in session window altogether.
+ *
+ * Refer [[UpdatingSessionsIterator]] for more details.
+ */
+case class UpdatingSessionsExec(
+    keyExpressions: Seq[Attribute],

Review comment:
       Yeah I agree the name is confused. Let me rename to `groupingExpressions`. Thanks!

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsExec.scala
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+
+/**
+ * This node updates the session window spec of each input rows via analyzing neighbor rows and
+ * determining rows belong to the same session window. The number of input rows remains the same.
+ * This node requires sort on input rows by group keys + the start time of session window.
+ *
+ * There are lots of overhead compared to [[MergingSessionsExec]]. Use [[MergingSessionsExec]]
+ * instead whenever possible. Use this node only when we cannot apply both calculations
+ * determining session windows and aggregating rows in session window altogether.

Review comment:
       I don't list the case explicitly as we use UpdatingSessionsExec whenever needed; you can find the case in original PR, but please let me know if we'd like to explicitly specify some cases here as well.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsIterator.scala
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray
+
+/**
+ * This class calculates and updates the session window for each element in the given iterator,
+ * which makes elements in the same session window having same session spec. Downstream can apply
+ * aggregation to finally merge these elements bound to the same session window.
+ *
+ * This class works on the precondition that given iterator is sorted by "group keys + start time
+ * of session window", and this iterator still retains the characteristic of the sort.
+ *
+ * This class copies the elements to safely update on each element, as well as buffers elements
+ * which are bound to the same session window. Due to such overheads, [[MergingSessionsIterator]]
+ * should be used whenever possible.
+ */
+class UpdatingSessionsIterator(
+    iter: Iterator[InternalRow],
+    groupingExpressions: Seq[NamedExpression],
+    sessionExpression: NamedExpression,
+    inputSchema: Seq[Attribute],
+    inMemoryThreshold: Int,
+    spillThreshold: Int) extends Iterator[InternalRow] {
+
+  private val groupingWithoutSession: Seq[NamedExpression] =
+    groupingExpressions.diff(Seq(sessionExpression))
+  private val groupingWithoutSessionAttributes: Seq[Attribute] =
+    groupingWithoutSession.map(_.toAttribute)
+  private[this] val groupingWithoutSessionProjection: UnsafeProjection =
+    UnsafeProjection.create(groupingWithoutSession, inputSchema)
+
+  private val valuesExpressions: Seq[Attribute] = inputSchema.diff(groupingWithoutSession)
+
+  private[this] val sessionProjection: UnsafeProjection =
+    UnsafeProjection.create(Seq(sessionExpression), inputSchema)
+
+  // Below three variables hold the information for "current session".
+  private var currentKeys: InternalRow = _
+  private var currentSession: UnsafeRow = _
+  private var rowsForCurrentSession: ExternalAppendOnlyUnsafeRowArray = _
+
+  // Below two variables hold the information for "returning rows". The reason we have this in
+  // addition to "current session" is that there could be the chance that iterator for returning
+  // rows on previous session wasn't fully consumed and there's a new session being started.
+  private var returnRows: ExternalAppendOnlyUnsafeRowArray = _
+  private var returnRowsIter: Iterator[InternalRow] = _
+
+  // Mark this to raise error on any operations after the iterator figures out the error.
+  private var errorOnIterator: Boolean = false
+
+  private val processedKeys: mutable.HashSet[InternalRow] = new mutable.HashSet[InternalRow]()
+
+  override def hasNext: Boolean = {
+    assertIteratorNotCorrupted()
+
+    if (returnRowsIter != null && returnRowsIter.hasNext) {
+      return true
+    }
+
+    if (returnRowsIter != null) {
+      returnRowsIter = null
+      returnRows.clear()
+    }
+
+    iter.hasNext
+  }
+
+  override def next(): InternalRow = {
+    assertIteratorNotCorrupted()
+
+    if (returnRowsIter != null && returnRowsIter.hasNext) {
+      return returnRowsIter.next()
+    }
+
+    var exitCondition = false
+    while (iter.hasNext && !exitCondition) {
+      // we are going to modify the row, so we should make sure multiple objects are not
+      // referencing same memory, which could be possible when optimizing iterator
+      // without this, multiple rows in same key will be returned with same content
+      val row = iter.next().copy()
+
+      val keys = groupingWithoutSessionProjection(row)
+      val session = sessionProjection(row)
+      val sessionStruct = session.getStruct(0, 2)
+      val sessionStart = getSessionStart(sessionStruct)
+      val sessionEnd = getSessionEnd(sessionStruct)
+
+      if (currentKeys == null) {
+        startNewSession(row, keys, sessionStruct)
+      } else if (keys != currentKeys) {
+        closeCurrentSession(keyChanged = true)
+        processedKeys.add(currentKeys)
+        startNewSession(row, keys, sessionStruct)
+        exitCondition = true
+      } else {
+        if (sessionStart < getSessionStart(currentSession)) {
+          handleBrokenPreconditionForSort()
+        } else if (sessionStart <= getSessionEnd(currentSession)) {
+          // expanding session length if needed
+          expandEndOfCurrentSession(sessionEnd)
+          rowsForCurrentSession.add(row.asInstanceOf[UnsafeRow])
+        } else {
+          closeCurrentSession(keyChanged = false)
+          startNewSession(row, keys, sessionStruct)
+          exitCondition = true
+        }
+      }
+    }
+
+    if (!iter.hasNext) {
+      // no further row: closing session
+      closeCurrentSession(keyChanged = false)
+    }
+
+    // here returnRowsIter should be able to provide at least one row
+    require(returnRowsIter != null && returnRowsIter.hasNext)
+
+    returnRowsIter.next()
+  }
+
+  private def startNewSession(
+      currentRow: InternalRow,
+      groupingKey: UnsafeRow,
+      sessionStruct: UnsafeRow): Unit = {
+    if (processedKeys.contains(groupingKey)) {
+      handleBrokenPreconditionForSort()
+    }
+
+    currentKeys = groupingKey.copy()
+    currentSession = sessionStruct.copy()
+
+    rowsForCurrentSession = new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold)
+    rowsForCurrentSession.add(currentRow.asInstanceOf[UnsafeRow])
+  }
+
+  private def getSessionStart(sessionStruct: UnsafeRow): Long = {
+    sessionStruct.getLong(0)
+  }
+
+  private def getSessionEnd(sessionStruct: UnsafeRow): Long = {
+    sessionStruct.getLong(1)
+  }
+
+  def updateSessionEnd(sessionStruct: UnsafeRow, sessionEnd: Long): Unit = {
+    sessionStruct.setLong(1, sessionEnd)
+  }
+
+  private def expandEndOfCurrentSession(sessionEnd: Long): Unit = {
+    if (sessionEnd > getSessionEnd(currentSession)) {
+      updateSessionEnd(currentSession, sessionEnd)
+    }
+  }
+
+  private def handleBrokenPreconditionForSort(): Unit = {
+    errorOnIterator = true
+    throw new IllegalStateException("The iterator must be sorted by key and session start!")
+  }
+
+  private val join = new JoinedRow
+  private val join2 = new JoinedRow
+
+  private val groupingKeyProj = GenerateUnsafeProjection.generate(groupingExpressions,
+    groupingWithoutSessionAttributes :+ sessionExpression.toAttribute)
+  private val valueProj = GenerateUnsafeProjection.generate(valuesExpressions, inputSchema)
+  private val restoreProj = GenerateUnsafeProjection.generate(inputSchema,
+    groupingExpressions.map(_.toAttribute) ++ valuesExpressions.map(_.toAttribute))
+
+  private def generateGroupingKey(): UnsafeRow = {
+    val newRow = new SpecificInternalRow(Seq(sessionExpression.toAttribute).toStructType)
+    newRow.update(0, currentSession)
+    val joined = join(currentKeys, newRow)
+
+    groupingKeyProj(joined)
+  }
+
+  private def closeCurrentSession(keyChanged: Boolean): Unit = {
+    assert(returnRowsIter == null || !returnRowsIter.hasNext)
+
+    returnRows = rowsForCurrentSession
+    rowsForCurrentSession = null
+
+    val groupingKey = generateGroupingKey()
+
+    val currentRowsIter = returnRows.generateIterator().map { internalRow =>
+      val valueRow = valueProj(internalRow)
+      restoreProj(join2(groupingKey, valueRow)).copy()

Review comment:
       Yeah just removed the projection of groupingKeys. Just needed to join the rows. Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-841340730


   > Personally I'd like to rely on merging individual PR - I'd also prefer feature branch to push sub-PRs and finally merge the feature branch into master, but Apache Spark project doesn't seem to use such practice. I see the project just merges the sub-PR.
   
   Okay. I'm fine to merge individual PR. cc @xuanyuanking @gaborgsomogyi  @zsxwing @tdas 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-817419976


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/41762/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-842054675


   +1 to merge individual PR. IMO, it should be easy for tracking the history in the future.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR closed pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
HeartSaVioR closed pull request #31986:
URL: https://github.com/apache/spark/pull/31986


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-841365324


   **[Test build #138559 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138559/testReport)** for PR 31986 at commit [`b0d434a`](https://github.com/apache/spark/commit/b0d434a046454b1c4dd1e164cc89e1c70fa04eaf).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-842292443


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/138618/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-837833241


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42878/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #31986:
URL: https://github.com/apache/spark/pull/31986#discussion_r629877249



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsExec.scala
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+
+/**
+ * This node updates the session window spec of each input rows via analyzing neighbor rows and
+ * determining rows belong to the same session window. The number of input rows remains the same.
+ * This node requires sort on input rows by group keys + the start time of session window.
+ *
+ * There are lots of overhead compared to [[MergingSessionsExec]]. Use [[MergingSessionsExec]]
+ * instead whenever possible. Use this node only when we cannot apply both calculations
+ * determining session windows and aggregating rows in session window altogether.
+ *
+ * Refer [[UpdatingSessionsIterator]] for more details.
+ */
+case class UpdatingSessionsExec(
+    keyExpressions: Seq[Attribute],
+    sessionExpression: Attribute,
+    child: SparkPlan) extends UnaryExecNode {
+
+  private val groupingWithoutSessionExpression = keyExpressions.filterNot {
+    p => p.semanticEquals(sessionExpression)
+  }
+  private val groupingWithoutSessionAttributes =
+    groupingWithoutSessionExpression.map(_.toAttribute)
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    val inMemoryThreshold = sqlContext.conf.sessionWindowBufferInMemoryThreshold
+    val spillThreshold = sqlContext.conf.sessionWindowBufferSpillThreshold
+
+    child.execute().mapPartitions { iter =>
+      new UpdatingSessionsIterator(iter, keyExpressions, sessionExpression,
+        child.output, inMemoryThreshold, spillThreshold)
+    }
+  }
+
+  override def output: Seq[Attribute] = child.output
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+    if (groupingWithoutSessionExpression.isEmpty) {
+      AllTuples :: Nil
+    } else {
+      ClusteredDistribution(groupingWithoutSessionExpression) :: Nil
+    }
+  }
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
+    Seq((groupingWithoutSessionAttributes ++ Seq(sessionExpression))

Review comment:
       Oh, I thought the caller side always append session expression at the end of grouping keys.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-817468948


   **[Test build #137184 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137184/testReport)** for PR 31986 at commit [`2ae446e`](https://github.com/apache/spark/commit/2ae446ed2d79d1f286a2995e9cb9d092d975e53f).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-842180144


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/43141/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-841087267


   Maybe @xuanyuanking want to take another look?
   
   I'm not sure what merging strategy we will take? As we finish all individual PRs and merge the original one? Or merge individual one once we finish review on each PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-842101797


   Thanks everyone! I'll retrigger the test and merge once the test passes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-837818648






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-841213114


   **[Test build #138559 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138559/testReport)** for PR 31986 at commit [`b0d434a`](https://github.com/apache/spark/commit/b0d434a046454b1c4dd1e164cc89e1c70fa04eaf).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-842106040


   **[Test build #138618 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138618/testReport)** for PR 31986 at commit [`b0d434a`](https://github.com/apache/spark/commit/b0d434a046454b1c4dd1e164cc89e1c70fa04eaf).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-809859985


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/41259/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-838800572


   Thanks for updating! I will take another look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-842101797






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-826623402


   **[Test build #137948 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137948/testReport)** for PR 31986 at commit [`ef37887`](https://github.com/apache/spark/commit/ef37887da6454fc0d4ebfe7cb40a42bf5b23a88d).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-817393275


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/41760/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-809931402


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/136677/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-837775575


   **[Test build #138356 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138356/testReport)** for PR 31986 at commit [`7764c72`](https://github.com/apache/spark/commit/7764c72a932aa058f9c864c8da8a5479c2be0c68).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #31986:
URL: https://github.com/apache/spark/pull/31986#discussion_r629900797



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsExec.scala
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+
+/**
+ * This node updates the session window spec of each input rows via analyzing neighbor rows and
+ * determining rows belong to the same session window. The number of input rows remains the same.
+ * This node requires sort on input rows by group keys + the start time of session window.
+ *
+ * There are lots of overhead compared to [[MergingSessionsExec]]. Use [[MergingSessionsExec]]
+ * instead whenever possible. Use this node only when we cannot apply both calculations
+ * determining session windows and aggregating rows in session window altogether.
+ *
+ * Refer [[UpdatingSessionsIterator]] for more details.
+ */
+case class UpdatingSessionsExec(
+    keyExpressions: Seq[Attribute],
+    sessionExpression: Attribute,
+    child: SparkPlan) extends UnaryExecNode {
+
+  private val groupingWithoutSessionExpression = keyExpressions.filterNot {
+    p => p.semanticEquals(sessionExpression)
+  }
+  private val groupingWithoutSessionAttributes =
+    groupingWithoutSessionExpression.map(_.toAttribute)
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    val inMemoryThreshold = sqlContext.conf.sessionWindowBufferInMemoryThreshold
+    val spillThreshold = sqlContext.conf.sessionWindowBufferSpillThreshold
+
+    child.execute().mapPartitions { iter =>
+      new UpdatingSessionsIterator(iter, keyExpressions, sessionExpression,
+        child.output, inMemoryThreshold, spillThreshold)
+    }
+  }
+
+  override def output: Seq[Attribute] = child.output
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+    if (groupingWithoutSessionExpression.isEmpty) {
+      AllTuples :: Nil
+    } else {
+      ClusteredDistribution(groupingWithoutSessionExpression) :: Nil
+    }
+  }
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
+    Seq((groupingWithoutSessionAttributes ++ Seq(sessionExpression))

Review comment:
       That said, other parts should be checked as well - probably it has some logic relying on this.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsExec.scala
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+
+/**
+ * This node updates the session window spec of each input rows via analyzing neighbor rows and
+ * determining rows belong to the same session window. The number of input rows remains the same.
+ * This node requires sort on input rows by group keys + the start time of session window.
+ *
+ * There are lots of overhead compared to [[MergingSessionsExec]]. Use [[MergingSessionsExec]]
+ * instead whenever possible. Use this node only when we cannot apply both calculations
+ * determining session windows and aggregating rows in session window altogether.
+ *
+ * Refer [[UpdatingSessionsIterator]] for more details.
+ */
+case class UpdatingSessionsExec(
+    keyExpressions: Seq[Attribute],
+    sessionExpression: Attribute,
+    child: SparkPlan) extends UnaryExecNode {
+
+  private val groupingWithoutSessionExpression = keyExpressions.filterNot {
+    p => p.semanticEquals(sessionExpression)
+  }
+  private val groupingWithoutSessionAttributes =
+    groupingWithoutSessionExpression.map(_.toAttribute)
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    val inMemoryThreshold = sqlContext.conf.sessionWindowBufferInMemoryThreshold
+    val spillThreshold = sqlContext.conf.sessionWindowBufferSpillThreshold
+
+    child.execute().mapPartitions { iter =>
+      new UpdatingSessionsIterator(iter, keyExpressions, sessionExpression,
+        child.output, inMemoryThreshold, spillThreshold)
+    }
+  }
+
+  override def output: Seq[Attribute] = child.output
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+    if (groupingWithoutSessionExpression.isEmpty) {
+      AllTuples :: Nil
+    } else {
+      ClusteredDistribution(groupingWithoutSessionExpression) :: Nil
+    }
+  }
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
+    Seq((groupingWithoutSessionAttributes ++ Seq(sessionExpression))

Review comment:
       That said, other parts should be checked as well - probably they might have some logic relying on this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-842292443


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/138618/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-826668676


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42469/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #31986:
URL: https://github.com/apache/spark/pull/31986#discussion_r619919740



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsExec.scala
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+
+/**
+ * This node updates the session window spec of each input rows via analyzing neighbor rows and
+ * determining rows belong to the same session window. The number of input rows remains the same.
+ * This node requires sort on input rows by group keys + the start time of session window.
+ *
+ * There are lots of overhead compared to [[MergingSessionsExec]]. Use [[MergingSessionsExec]]
+ * instead whenever possible. Use this node only when we cannot apply both calculations
+ * determining session windows and aggregating rows in session window altogether.
+ *
+ * Refer [[UpdatingSessionsIterator]] for more details.
+ */
+case class UpdatingSessionsExec(
+    keyExpressions: Seq[Attribute],
+    sessionExpression: Attribute,
+    child: SparkPlan) extends UnaryExecNode {
+
+  private val groupWithoutSessionExpression = keyExpressions.filterNot {
+    p => p.semanticEquals(sessionExpression)
+  }
+  private val groupingWithoutSessionAttributes = groupWithoutSessionExpression.map(_.toAttribute)
+
+  val childOrdering = Seq((groupingWithoutSessionAttributes ++ Seq(sessionExpression))
+    .map(SortOrder(_, Ascending)))
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    val inMemoryThreshold = sqlContext.conf.windowExecBufferInMemoryThreshold
+    val spillThreshold = sqlContext.conf.windowExecBufferSpillThreshold

Review comment:
       Makes sense. Let's have new configs for session window specific.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-842304823


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/138621/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-810047339


   **[Test build #136688 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/136688/testReport)** for PR 31986 at commit [`9c36bde`](https://github.com/apache/spark/commit/9c36bdea0ad10b59296ed1b1d002ad0cf8420867).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-817392727


   **[Test build #137182 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137182/testReport)** for PR 31986 at commit [`7880b44`](https://github.com/apache/spark/commit/7880b44ba0f8f8d5f13211787de9ffd4c2a32ae6).
    * This patch **fails to build**.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-842303430


   **[Test build #138621 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138621/testReport)** for PR 31986 at commit [`8c2eed8`](https://github.com/apache/spark/commit/8c2eed82d553ddacb36088954232ae704a855094).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-809925236


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/41270/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-817391934


   **[Test build #137182 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137182/testReport)** for PR 31986 at commit [`7880b44`](https://github.com/apache/spark/commit/7880b44ba0f8f8d5f13211787de9ffd4c2a32ae6).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-841244616


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/43080/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-838086565


   **[Test build #138356 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138356/testReport)** for PR 31986 at commit [`7764c72`](https://github.com/apache/spark/commit/7764c72a932aa058f9c864c8da8a5479c2be0c68).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-841194454


   > I'm not sure what merging strategy we will take? As we finish all individual PRs and merge the original one? Or merge individual one once we finish review on each PR?
   
   Personally I'd like to rely on merging individual PR - I'd also prefer feature branch to push sub-PRs and finally merge the feature branch into master, but Apache Spark project doesn't seem to use such practice. I see the project just merges the sub-PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #31986:
URL: https://github.com/apache/spark/pull/31986#discussion_r620081071



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsIterator.scala
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray
+
+/**
+ * This class calculates and updates the session window for each element in the given iterator,
+ * which makes elements in the same session window having same session spec. Downstream can apply
+ * aggregation to finally merge these elements bound to the same session window.
+ *
+ * This class works on the precondition that given iterator is sorted by "group keys + start time
+ * of session window", and this iterator still retains the characteristic of the sort.
+ *
+ * This class copies the elements to safely update on each element, as well as buffers elements
+ * which are bound to the same session window. Due to such overheads, [[MergingSessionsIterator]]
+ * should be used whenever possible.
+ */
+class UpdatingSessionsIterator(
+    iter: Iterator[InternalRow],
+    groupingExpressions: Seq[NamedExpression],
+    sessionExpression: NamedExpression,
+    inputSchema: Seq[Attribute],
+    inMemoryThreshold: Int,
+    spillThreshold: Int) extends Iterator[InternalRow] {
+
+  private val groupingWithoutSession: Seq[NamedExpression] =
+    groupingExpressions.diff(Seq(sessionExpression))
+  private val groupingWithoutSessionAttributes: Seq[Attribute] =
+    groupingWithoutSession.map(_.toAttribute)
+  private[this] val groupingWithoutSessionProjection: UnsafeProjection =
+    UnsafeProjection.create(groupingWithoutSession, inputSchema)
+
+  private val valuesExpressions: Seq[Attribute] = inputSchema.diff(groupingWithoutSession)
+
+  private[this] val sessionProjection: UnsafeProjection =
+    UnsafeProjection.create(Seq(sessionExpression), inputSchema)
+
+  private var currentKeys: InternalRow = _

Review comment:
       Changed variable names, as well as added some information on variables.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-809857792


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41259/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-809140467


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41211/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-817469602


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/137184/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-817403314


   **[Test build #137184 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137184/testReport)** for PR 31986 at commit [`2ae446e`](https://github.com/apache/spark/commit/2ae446ed2d79d1f286a2995e9cb9d092d975e53f).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-842057838


   Okay for me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-809859985


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/41259/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-842141372


   **[Test build #138621 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138621/testReport)** for PR 31986 at commit [`8c2eed8`](https://github.com/apache/spark/commit/8c2eed82d553ddacb36088954232ae704a855094).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #31986:
URL: https://github.com/apache/spark/pull/31986#discussion_r611255893



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsIterator.scala
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray
+
+/**
+ * This class calculates and updates the session window for each element in the given iterator,
+ * which makes elements in the same session window having same session spec. Downstream can apply
+ * aggregation to finally merge these elements bound to the same session window.
+ *
+ * This class works on the precondition that given iterator is sorted by "group keys + start time
+ * of session window", and this iterator still retains the characteristic of the sort.
+ *
+ * This class copies the elements to safely update on each element, as well as buffers elements
+ * which are bound to the same session window. Due to such overheads, [[MergingSessionsIterator]]
+ * should be used whenever possible.
+ */
+class UpdatingSessionsIterator(
+    iter: Iterator[InternalRow],
+    groupingExpressions: Seq[NamedExpression],
+    sessionExpression: NamedExpression,
+    inputSchema: Seq[Attribute],
+    inMemoryThreshold: Int,
+    spillThreshold: Int) extends Iterator[InternalRow] {
+
+  private val groupingWithoutSession: Seq[NamedExpression] =
+    groupingExpressions.diff(Seq(sessionExpression))
+  private val groupingWithoutSessionAttributes: Seq[Attribute] =
+    groupingWithoutSession.map(_.toAttribute)
+  private[this] val groupingWithoutSessionProjection: UnsafeProjection =
+    UnsafeProjection.create(groupingWithoutSession, inputSchema)
+
+  private val valuesExpressions: Seq[Attribute] = inputSchema.diff(groupingWithoutSession)
+
+  private[this] val sessionProjection: UnsafeProjection =
+    UnsafeProjection.create(Seq(sessionExpression), inputSchema)
+
+  private var currentKeys: InternalRow = _
+  private var currentSession: UnsafeRow = _
+
+  private var currentRows: ExternalAppendOnlyUnsafeRowArray = new ExternalAppendOnlyUnsafeRowArray(
+    inMemoryThreshold, spillThreshold)
+
+  private var returnRows: ExternalAppendOnlyUnsafeRowArray = _
+  private var returnRowsIter: Iterator[InternalRow] = _
+  private var errorOnIterator: Boolean = false
+
+  private val processedKeys: mutable.HashSet[InternalRow] = new mutable.HashSet[InternalRow]()
+
+  override def hasNext: Boolean = {
+    assertIteratorNotCorrupted()
+
+    if (returnRowsIter != null && returnRowsIter.hasNext) {
+      return true
+    }
+
+    if (returnRowsIter != null) {
+      returnRowsIter = null
+      returnRows.clear()
+    }
+
+    iter.hasNext
+  }
+
+  override def next(): InternalRow = {
+    assertIteratorNotCorrupted()
+
+    if (returnRowsIter != null && returnRowsIter.hasNext) {
+      return returnRowsIter.next()
+    }
+
+    var exitCondition = false
+    while (iter.hasNext && !exitCondition) {
+      // we are going to modify the row, so we should make sure multiple objects are not
+      // referencing same memory, which could be possible when optimizing iterator
+      // without this, multiple rows in same key will be returned with same content
+      val row = iter.next().copy()
+
+      val keys = groupingWithoutSessionProjection(row)
+      val session = sessionProjection(row)
+      val sessionStruct = session.getStruct(0, 2)
+      val sessionStart = getSessionStart(sessionStruct)
+      val sessionEnd = getSessionEnd(sessionStruct)
+
+      if (currentKeys == null) {
+        startNewSession(row, keys, sessionStruct)
+      } else if (keys != currentKeys) {
+        closeCurrentSession(keyChanged = true)
+        processedKeys.add(currentKeys)
+        startNewSession(row, keys, sessionStruct)
+        exitCondition = true
+      } else {
+        if (sessionStart < getSessionStart(currentSession)) {
+          handleBrokenPreconditionForSort()
+        } else if (sessionStart <= getSessionEnd(currentSession)) {
+          // expanding session length if needed
+          expandEndOfCurrentSession(sessionEnd)
+          currentRows.add(row.asInstanceOf[UnsafeRow])
+        } else {
+          closeCurrentSession(keyChanged = false)
+          startNewSession(row, keys, sessionStruct)
+          exitCondition = true
+        }
+      }
+    }
+
+    if (!iter.hasNext) {
+      // no further row: closing session
+      closeCurrentSession(keyChanged = false)
+    }
+
+    // here returnRowsIter should be able to provide at least one row
+    require(returnRowsIter != null && returnRowsIter.hasNext)
+
+    returnRowsIter.next()
+  }
+
+  private def startNewSession(
+      currentRow: InternalRow,
+      groupingKey: UnsafeRow,
+      sessionStruct: UnsafeRow): Unit = {
+    if (processedKeys.contains(groupingKey)) {
+      handleBrokenPreconditionForSort()
+    }
+
+    currentKeys = groupingKey.copy()
+    currentSession = sessionStruct.copy()
+
+    currentRows.clear()
+    currentRows.add(currentRow.asInstanceOf[UnsafeRow])
+  }
+
+  private def getSessionStart(sessionStruct: UnsafeRow): Long = {
+    sessionStruct.getLong(0)
+  }
+
+  private def getSessionEnd(sessionStruct: UnsafeRow): Long = {
+    sessionStruct.getLong(1)
+  }
+
+  def updateSessionEnd(sessionStruct: UnsafeRow, sessionEnd: Long): Unit = {
+    sessionStruct.setLong(1, sessionEnd)
+  }
+
+  private def expandEndOfCurrentSession(sessionEnd: Long): Unit = {
+    if (sessionEnd > getSessionEnd(currentSession)) {
+      updateSessionEnd(currentSession, sessionEnd)
+    }
+  }
+
+  private def handleBrokenPreconditionForSort(): Unit = {
+    errorOnIterator = true
+    throw new IllegalStateException("The iterator must be sorted by key and session start!")
+  }
+
+  private val join = new JoinedRow
+  private val join2 = new JoinedRow
+
+  private val groupingKeyProj = GenerateUnsafeProjection.generate(groupingExpressions,
+    groupingWithoutSessionAttributes :+ sessionExpression.toAttribute)
+  private val valueProj = GenerateUnsafeProjection.generate(valuesExpressions, inputSchema)
+  private val restoreProj = GenerateUnsafeProjection.generate(inputSchema,
+    groupingExpressions.map(_.toAttribute) ++ valuesExpressions.map(_.toAttribute))
+
+  private def generateGroupingKey(): UnsafeRow = {
+    val newRow = new SpecificInternalRow(Seq(sessionExpression.toAttribute).toStructType)
+    newRow.update(0, currentSession)
+    val joined = join(currentKeys, newRow)
+
+    groupingKeyProj(joined)
+  }
+
+  private def closeCurrentSession(keyChanged: Boolean): Unit = {
+    returnRows = currentRows
+    currentRows = new ExternalAppendOnlyUnsafeRowArray(

Review comment:
       Please check that we back up current array to `returnRows` for providing iterator. When we close the current session, we have a row which needs to be bound to the next session. The new row and following rows bound to the next session will be buffered to the next array.
   
   The origin reason of not re-using an ExternalAppendOnlyUnsafeRowArray instance is, the iterator implementation of ExternalAppendOnlyUnsafeRowArray invalidates itself (meaning throwing exception) if the array is modified afterwards.
   
   So it's not simply done as creating array and iterator, and appending data into array and letting consumer to retrieve data from iterator at the same time. We need to call stop on buffering a specific array and provide iterator, and create a new array to buffer.
   
   Probably initializing `currentRows` is better to be placed in `startNewSession`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-817415705






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-841367765


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/138559/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-838135143


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/138356/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-842180144


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/43141/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-837775575


   **[Test build #138356 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138356/testReport)** for PR 31986 at commit [`7764c72`](https://github.com/apache/spark/commit/7764c72a932aa058f9c864c8da8a5479c2be0c68).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-809144449


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/41211/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-817393273


   Kubernetes integration test unable to build dist.
   
   exiting with code: 1
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41760/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-841213114


   **[Test build #138559 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138559/testReport)** for PR 31986 at commit [`b0d434a`](https://github.com/apache/spark/commit/b0d434a046454b1c4dd1e164cc89e1c70fa04eaf).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #31986:
URL: https://github.com/apache/spark/pull/31986#discussion_r632348736



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2000,6 +2000,26 @@ object SQLConf {
       .intConf
       .createWithDefault(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get)
 
+  val SESSION_WINDOW_BUFFER_IN_MEMORY_THRESHOLD =
+    buildConf("spark.sql.sessionWindow.buffer.in.memory.threshold")
+      .internal()
+      .doc("Threshold for number of windows guaranteed to be held in memory by the " +
+        "session window operator. Note that the buffer is used only for the query Spark " +
+        "cannot apply aggregations on classifying session window.")

Review comment:
       classifying -> determining? 

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2000,6 +2000,26 @@ object SQLConf {
       .intConf
       .createWithDefault(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get)
 
+  val SESSION_WINDOW_BUFFER_IN_MEMORY_THRESHOLD =
+    buildConf("spark.sql.sessionWindow.buffer.in.memory.threshold")
+      .internal()
+      .doc("Threshold for number of windows guaranteed to be held in memory by the " +
+        "session window operator. Note that the buffer is used only for the query Spark " +
+        "cannot apply aggregations on classifying session window.")
+      .version("3.2.0")
+      .intConf
+      .createWithDefault(4096)
+
+  val SESSION_WINDOW_BUFFER_SPILL_THRESHOLD =
+    buildConf("spark.sql.sessionWindow.buffer.spill.threshold")
+      .internal()
+      .doc("Threshold for number of rows to be spilled by window operator. Note that " +
+        "the buffer is used only for the query Spark cannot apply aggregations on classifying " +

Review comment:
       ditto




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #31986:
URL: https://github.com/apache/spark/pull/31986#discussion_r611255893



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsIterator.scala
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray
+
+/**
+ * This class calculates and updates the session window for each element in the given iterator,
+ * which makes elements in the same session window having same session spec. Downstream can apply
+ * aggregation to finally merge these elements bound to the same session window.
+ *
+ * This class works on the precondition that given iterator is sorted by "group keys + start time
+ * of session window", and this iterator still retains the characteristic of the sort.
+ *
+ * This class copies the elements to safely update on each element, as well as buffers elements
+ * which are bound to the same session window. Due to such overheads, [[MergingSessionsIterator]]
+ * should be used whenever possible.
+ */
+class UpdatingSessionsIterator(
+    iter: Iterator[InternalRow],
+    groupingExpressions: Seq[NamedExpression],
+    sessionExpression: NamedExpression,
+    inputSchema: Seq[Attribute],
+    inMemoryThreshold: Int,
+    spillThreshold: Int) extends Iterator[InternalRow] {
+
+  private val groupingWithoutSession: Seq[NamedExpression] =
+    groupingExpressions.diff(Seq(sessionExpression))
+  private val groupingWithoutSessionAttributes: Seq[Attribute] =
+    groupingWithoutSession.map(_.toAttribute)
+  private[this] val groupingWithoutSessionProjection: UnsafeProjection =
+    UnsafeProjection.create(groupingWithoutSession, inputSchema)
+
+  private val valuesExpressions: Seq[Attribute] = inputSchema.diff(groupingWithoutSession)
+
+  private[this] val sessionProjection: UnsafeProjection =
+    UnsafeProjection.create(Seq(sessionExpression), inputSchema)
+
+  private var currentKeys: InternalRow = _
+  private var currentSession: UnsafeRow = _
+
+  private var currentRows: ExternalAppendOnlyUnsafeRowArray = new ExternalAppendOnlyUnsafeRowArray(
+    inMemoryThreshold, spillThreshold)
+
+  private var returnRows: ExternalAppendOnlyUnsafeRowArray = _
+  private var returnRowsIter: Iterator[InternalRow] = _
+  private var errorOnIterator: Boolean = false
+
+  private val processedKeys: mutable.HashSet[InternalRow] = new mutable.HashSet[InternalRow]()
+
+  override def hasNext: Boolean = {
+    assertIteratorNotCorrupted()
+
+    if (returnRowsIter != null && returnRowsIter.hasNext) {
+      return true
+    }
+
+    if (returnRowsIter != null) {
+      returnRowsIter = null
+      returnRows.clear()
+    }
+
+    iter.hasNext
+  }
+
+  override def next(): InternalRow = {
+    assertIteratorNotCorrupted()
+
+    if (returnRowsIter != null && returnRowsIter.hasNext) {
+      return returnRowsIter.next()
+    }
+
+    var exitCondition = false
+    while (iter.hasNext && !exitCondition) {
+      // we are going to modify the row, so we should make sure multiple objects are not
+      // referencing same memory, which could be possible when optimizing iterator
+      // without this, multiple rows in same key will be returned with same content
+      val row = iter.next().copy()
+
+      val keys = groupingWithoutSessionProjection(row)
+      val session = sessionProjection(row)
+      val sessionStruct = session.getStruct(0, 2)
+      val sessionStart = getSessionStart(sessionStruct)
+      val sessionEnd = getSessionEnd(sessionStruct)
+
+      if (currentKeys == null) {
+        startNewSession(row, keys, sessionStruct)
+      } else if (keys != currentKeys) {
+        closeCurrentSession(keyChanged = true)
+        processedKeys.add(currentKeys)
+        startNewSession(row, keys, sessionStruct)
+        exitCondition = true
+      } else {
+        if (sessionStart < getSessionStart(currentSession)) {
+          handleBrokenPreconditionForSort()
+        } else if (sessionStart <= getSessionEnd(currentSession)) {
+          // expanding session length if needed
+          expandEndOfCurrentSession(sessionEnd)
+          currentRows.add(row.asInstanceOf[UnsafeRow])
+        } else {
+          closeCurrentSession(keyChanged = false)
+          startNewSession(row, keys, sessionStruct)
+          exitCondition = true
+        }
+      }
+    }
+
+    if (!iter.hasNext) {
+      // no further row: closing session
+      closeCurrentSession(keyChanged = false)
+    }
+
+    // here returnRowsIter should be able to provide at least one row
+    require(returnRowsIter != null && returnRowsIter.hasNext)
+
+    returnRowsIter.next()
+  }
+
+  private def startNewSession(
+      currentRow: InternalRow,
+      groupingKey: UnsafeRow,
+      sessionStruct: UnsafeRow): Unit = {
+    if (processedKeys.contains(groupingKey)) {
+      handleBrokenPreconditionForSort()
+    }
+
+    currentKeys = groupingKey.copy()
+    currentSession = sessionStruct.copy()
+
+    currentRows.clear()
+    currentRows.add(currentRow.asInstanceOf[UnsafeRow])
+  }
+
+  private def getSessionStart(sessionStruct: UnsafeRow): Long = {
+    sessionStruct.getLong(0)
+  }
+
+  private def getSessionEnd(sessionStruct: UnsafeRow): Long = {
+    sessionStruct.getLong(1)
+  }
+
+  def updateSessionEnd(sessionStruct: UnsafeRow, sessionEnd: Long): Unit = {
+    sessionStruct.setLong(1, sessionEnd)
+  }
+
+  private def expandEndOfCurrentSession(sessionEnd: Long): Unit = {
+    if (sessionEnd > getSessionEnd(currentSession)) {
+      updateSessionEnd(currentSession, sessionEnd)
+    }
+  }
+
+  private def handleBrokenPreconditionForSort(): Unit = {
+    errorOnIterator = true
+    throw new IllegalStateException("The iterator must be sorted by key and session start!")
+  }
+
+  private val join = new JoinedRow
+  private val join2 = new JoinedRow
+
+  private val groupingKeyProj = GenerateUnsafeProjection.generate(groupingExpressions,
+    groupingWithoutSessionAttributes :+ sessionExpression.toAttribute)
+  private val valueProj = GenerateUnsafeProjection.generate(valuesExpressions, inputSchema)
+  private val restoreProj = GenerateUnsafeProjection.generate(inputSchema,
+    groupingExpressions.map(_.toAttribute) ++ valuesExpressions.map(_.toAttribute))
+
+  private def generateGroupingKey(): UnsafeRow = {
+    val newRow = new SpecificInternalRow(Seq(sessionExpression.toAttribute).toStructType)
+    newRow.update(0, currentSession)
+    val joined = join(currentKeys, newRow)
+
+    groupingKeyProj(joined)
+  }
+
+  private def closeCurrentSession(keyChanged: Boolean): Unit = {
+    returnRows = currentRows
+    currentRows = new ExternalAppendOnlyUnsafeRowArray(

Review comment:
       Please check that we back up current array to `returnRows` for providing iterator. When we close the current session, we have a row which needs to be bound to the next session. The new row and following rows bound to the next session will be buffered to the next array.
   
   The origin reason of not re-using an ExternalAppendOnlyUnsafeRowArray instance is, the iterator implementation of ExternalAppendOnlyUnsafeRowArray invalidates itself (meaning throwing exception) if the array is modified afterwards.
   
   So it's not simply done as creating array and iterator, and appending data into array and letting consumer to retrieve data from iterator. We need to call stop on buffering a specific array and provide iterator, and create a new array to buffer.
   
   Probably initializing `currentRows` is better to be placed in `startNewSession`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-842057838


   Okay for me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-842180144






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-809833095


   retest this, please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-826629092


   **[Test build #137949 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137949/testReport)** for PR 31986 at commit [`5ce623b`](https://github.com/apache/spark/commit/5ce623be7aec9e3e9230604b485be57c993a280a).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #31986:
URL: https://github.com/apache/spark/pull/31986#discussion_r619919017



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsExec.scala
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+
+/**
+ * This node updates the session window spec of each input rows via analyzing neighbor rows and
+ * determining rows belong to the same session window. The number of input rows remains the same.
+ * This node requires sort on input rows by group keys + the start time of session window.
+ *
+ * There are lots of overhead compared to [[MergingSessionsExec]]. Use [[MergingSessionsExec]]
+ * instead whenever possible. Use this node only when we cannot apply both calculations
+ * determining session windows and aggregating rows in session window altogether.
+ *
+ * Refer [[UpdatingSessionsIterator]] for more details.
+ */
+case class UpdatingSessionsExec(
+    keyExpressions: Seq[Attribute],
+    sessionExpression: Attribute,
+    child: SparkPlan) extends UnaryExecNode {
+
+  private val groupWithoutSessionExpression = keyExpressions.filterNot {
+    p => p.semanticEquals(sessionExpression)
+  }
+  private val groupingWithoutSessionAttributes = groupWithoutSessionExpression.map(_.toAttribute)
+
+  val childOrdering = Seq((groupingWithoutSessionAttributes ++ Seq(sessionExpression))
+    .map(SortOrder(_, Ascending)))
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    val inMemoryThreshold = sqlContext.conf.windowExecBufferInMemoryThreshold
+    val spillThreshold = sqlContext.conf.windowExecBufferSpillThreshold
+
+    child.execute().mapPartitions { iter =>
+      new UpdatingSessionsIterator(iter, keyExpressions, sessionExpression,
+        child.output, inMemoryThreshold, spillThreshold)
+    }
+  }
+
+  override def output: Seq[Attribute] = child.output
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+    if (groupWithoutSessionExpression.isEmpty) {
+      AllTuples :: Nil

Review comment:
       Yes, for batch query. The limitation comes from state format of session window and shouldn't limit to batch query as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-842106040


   **[Test build #138618 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138618/testReport)** for PR 31986 at commit [`b0d434a`](https://github.com/apache/spark/commit/b0d434a046454b1c4dd1e164cc89e1c70fa04eaf).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-841367765


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/138559/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-826623402


   **[Test build #137948 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137948/testReport)** for PR 31986 at commit [`ef37887`](https://github.com/apache/spark/commit/ef37887da6454fc0d4ebfe7cb40a42bf5b23a88d).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-826832910






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #31986:
URL: https://github.com/apache/spark/pull/31986#discussion_r629787730



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2000,6 +2000,26 @@ object SQLConf {
       .intConf
       .createWithDefault(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get)
 
+  val SESSION_WINDOW_BUFFER_IN_MEMORY_THRESHOLD =
+    buildConf("spark.sql.sessionWindow.buffer.in.memory.threshold")

Review comment:
       I agree with you, but the name semantic follows existing configurations. Please search `_IN_MEMORY_THRESHOLD` into codebase.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-817393275


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/41760/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-817392738


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/137182/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-826668332






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-841235389






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-817391934


   **[Test build #137182 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137182/testReport)** for PR 31986 at commit [`7880b44`](https://github.com/apache/spark/commit/7880b44ba0f8f8d5f13211787de9ffd4c2a32ae6).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-826673247


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42470/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #31986:
URL: https://github.com/apache/spark/pull/31986#discussion_r629830829



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsExec.scala
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+
+/**
+ * This node updates the session window spec of each input rows via analyzing neighbor rows and
+ * determining rows belong to the same session window. The number of input rows remains the same.
+ * This node requires sort on input rows by group keys + the start time of session window.
+ *
+ * There are lots of overhead compared to [[MergingSessionsExec]]. Use [[MergingSessionsExec]]
+ * instead whenever possible. Use this node only when we cannot apply both calculations
+ * determining session windows and aggregating rows in session window altogether.
+ *
+ * Refer [[UpdatingSessionsIterator]] for more details.
+ */
+case class UpdatingSessionsExec(
+    keyExpressions: Seq[Attribute],
+    sessionExpression: Attribute,
+    child: SparkPlan) extends UnaryExecNode {
+
+  private val groupingWithoutSessionExpression = keyExpressions.filterNot {
+    p => p.semanticEquals(sessionExpression)
+  }
+  private val groupingWithoutSessionAttributes =
+    groupingWithoutSessionExpression.map(_.toAttribute)
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    val inMemoryThreshold = sqlContext.conf.sessionWindowBufferInMemoryThreshold
+    val spillThreshold = sqlContext.conf.sessionWindowBufferSpillThreshold
+
+    child.execute().mapPartitions { iter =>
+      new UpdatingSessionsIterator(iter, keyExpressions, sessionExpression,
+        child.output, inMemoryThreshold, spillThreshold)
+    }
+  }
+
+  override def output: Seq[Attribute] = child.output
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+    if (groupingWithoutSessionExpression.isEmpty) {
+      AllTuples :: Nil
+    } else {
+      ClusteredDistribution(groupingWithoutSessionExpression) :: Nil
+    }
+  }
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
+    Seq((groupingWithoutSessionAttributes ++ Seq(sessionExpression))

Review comment:
       Ah I remembered the reason. We can't safely assume session expression is placed at the end of grouping. Let me revert back.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-841244616


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/43080/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #31986:
URL: https://github.com/apache/spark/pull/31986#discussion_r629900797



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsExec.scala
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+
+/**
+ * This node updates the session window spec of each input rows via analyzing neighbor rows and
+ * determining rows belong to the same session window. The number of input rows remains the same.
+ * This node requires sort on input rows by group keys + the start time of session window.
+ *
+ * There are lots of overhead compared to [[MergingSessionsExec]]. Use [[MergingSessionsExec]]
+ * instead whenever possible. Use this node only when we cannot apply both calculations
+ * determining session windows and aggregating rows in session window altogether.
+ *
+ * Refer [[UpdatingSessionsIterator]] for more details.
+ */
+case class UpdatingSessionsExec(
+    keyExpressions: Seq[Attribute],
+    sessionExpression: Attribute,
+    child: SparkPlan) extends UnaryExecNode {
+
+  private val groupingWithoutSessionExpression = keyExpressions.filterNot {
+    p => p.semanticEquals(sessionExpression)
+  }
+  private val groupingWithoutSessionAttributes =
+    groupingWithoutSessionExpression.map(_.toAttribute)
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    val inMemoryThreshold = sqlContext.conf.sessionWindowBufferInMemoryThreshold
+    val spillThreshold = sqlContext.conf.sessionWindowBufferSpillThreshold
+
+    child.execute().mapPartitions { iter =>
+      new UpdatingSessionsIterator(iter, keyExpressions, sessionExpression,
+        child.output, inMemoryThreshold, spillThreshold)
+    }
+  }
+
+  override def output: Seq[Attribute] = child.output
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+    if (groupingWithoutSessionExpression.isEmpty) {
+      AllTuples :: Nil
+    } else {
+      ClusteredDistribution(groupingWithoutSessionExpression) :: Nil
+    }
+  }
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
+    Seq((groupingWithoutSessionAttributes ++ Seq(sessionExpression))

Review comment:
       That said, other parts should be checked as well - probably they might have some logic relying on this.
   
   EDIT: probably it might be OK to reorder at first, and leverage the fact, and reorder back later. My preference is still not to reorder, but if it brings some benefits then we could do.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-809922224


   Kubernetes integration test unable to build dist.
   
   exiting with code: 1
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41270/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-809903940


   **[Test build #136688 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/136688/testReport)** for PR 31986 at commit [`9c36bde`](https://github.com/apache/spark/commit/9c36bdea0ad10b59296ed1b1d002ad0cf8420867).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-826673247


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42470/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-809930442


   **[Test build #136677 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/136677/testReport)** for PR 31986 at commit [`3e8dd5c`](https://github.com/apache/spark/commit/3e8dd5ccd8c2c136f5c3a4ff64269267edbdf81e).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds the following public classes _(experimental)_:
     * `case class UpdatingSessionsExec(`
     * `class UpdatingSessionsIterator(`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-809925236


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/41270/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-817469602


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/137184/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-809839890


   **[Test build #136677 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/136677/testReport)** for PR 31986 at commit [`3e8dd5c`](https://github.com/apache/spark/commit/3e8dd5ccd8c2c136f5c3a4ff64269267edbdf81e).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-842180144






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-837833241


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42878/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-817392738


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/137182/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-842106040






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-817419976


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/41762/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-842141372


   **[Test build #138621 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138621/testReport)** for PR 31986 at commit [`8c2eed8`](https://github.com/apache/spark/commit/8c2eed82d553ddacb36088954232ae704a855094).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #31986:
URL: https://github.com/apache/spark/pull/31986#discussion_r629900130



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsExec.scala
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+
+/**
+ * This node updates the session window spec of each input rows via analyzing neighbor rows and
+ * determining rows belong to the same session window. The number of input rows remains the same.
+ * This node requires sort on input rows by group keys + the start time of session window.
+ *
+ * There are lots of overhead compared to [[MergingSessionsExec]]. Use [[MergingSessionsExec]]
+ * instead whenever possible. Use this node only when we cannot apply both calculations
+ * determining session windows and aggregating rows in session window altogether.
+ *
+ * Refer [[UpdatingSessionsIterator]] for more details.
+ */
+case class UpdatingSessionsExec(
+    keyExpressions: Seq[Attribute],
+    sessionExpression: Attribute,
+    child: SparkPlan) extends UnaryExecNode {
+
+  private val groupingWithoutSessionExpression = keyExpressions.filterNot {
+    p => p.semanticEquals(sessionExpression)
+  }
+  private val groupingWithoutSessionAttributes =
+    groupingWithoutSessionExpression.map(_.toAttribute)
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    val inMemoryThreshold = sqlContext.conf.sessionWindowBufferInMemoryThreshold
+    val spillThreshold = sqlContext.conf.sessionWindowBufferSpillThreshold
+
+    child.execute().mapPartitions { iter =>
+      new UpdatingSessionsIterator(iter, keyExpressions, sessionExpression,
+        child.output, inMemoryThreshold, spillThreshold)
+    }
+  }
+
+  override def output: Seq[Attribute] = child.output
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+    if (groupingWithoutSessionExpression.isEmpty) {
+      AllTuples :: Nil
+    } else {
+      ClusteredDistribution(groupingWithoutSessionExpression) :: Nil
+    }
+  }
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
+    Seq((groupingWithoutSessionAttributes ++ Seq(sessionExpression))

Review comment:
       We don't reorder the columns if I remember correctly, but I might be wrong as the overall code is quite ancient even for me. At least it's more natural not to reorder - if we reorder the columns in whole picture, I'd rather want to fix that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #31986:
URL: https://github.com/apache/spark/pull/31986#discussion_r610073162



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsIterator.scala
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray
+
+/**
+ * This class calculates and updates the session window for each element in the given iterator,
+ * which makes elements in the same session window having same session spec. Downstream can apply
+ * aggregation to finally merge these elements bound to the same session window.
+ *
+ * This class works on the precondition that given iterator is sorted by "group keys + start time
+ * of session window", and this iterator still retains the characteristic of the sort.
+ *
+ * This class copies the elements to safely update on each element, as well as buffers elements
+ * which are bound to the same session window. Due to such overheads, [[MergingSessionsIterator]]
+ * should be used whenever possible.
+ */
+class UpdatingSessionsIterator(
+    iter: Iterator[InternalRow],
+    groupingExpressions: Seq[NamedExpression],
+    sessionExpression: NamedExpression,
+    inputSchema: Seq[Attribute],
+    inMemoryThreshold: Int,
+    spillThreshold: Int) extends Iterator[InternalRow] {
+
+  private val groupingWithoutSession: Seq[NamedExpression] =
+    groupingExpressions.diff(Seq(sessionExpression))
+  private val groupingWithoutSessionAttributes: Seq[Attribute] =
+    groupingWithoutSession.map(_.toAttribute)
+  private[this] val groupingWithoutSessionProjection: UnsafeProjection =
+    UnsafeProjection.create(groupingWithoutSession, inputSchema)
+
+  private val valuesExpressions: Seq[Attribute] = inputSchema.diff(groupingWithoutSession)
+
+  private[this] val sessionProjection: UnsafeProjection =
+    UnsafeProjection.create(Seq(sessionExpression), inputSchema)
+
+  private var currentKeys: InternalRow = _
+  private var currentSession: UnsafeRow = _
+
+  private var currentRows: ExternalAppendOnlyUnsafeRowArray = new ExternalAppendOnlyUnsafeRowArray(
+    inMemoryThreshold, spillThreshold)
+
+  private var returnRows: ExternalAppendOnlyUnsafeRowArray = _
+  private var returnRowsIter: Iterator[InternalRow] = _
+  private var errorOnIterator: Boolean = false
+
+  private val processedKeys: mutable.HashSet[InternalRow] = new mutable.HashSet[InternalRow]()
+
+  override def hasNext: Boolean = {
+    assertIteratorNotCorrupted()
+
+    if (returnRowsIter != null && returnRowsIter.hasNext) {
+      return true
+    }
+
+    if (returnRowsIter != null) {
+      returnRowsIter = null
+      returnRows.clear()
+    }
+
+    iter.hasNext
+  }
+
+  override def next(): InternalRow = {
+    assertIteratorNotCorrupted()
+
+    if (returnRowsIter != null && returnRowsIter.hasNext) {
+      return returnRowsIter.next()
+    }
+
+    var exitCondition = false
+    while (iter.hasNext && !exitCondition) {
+      // we are going to modify the row, so we should make sure multiple objects are not
+      // referencing same memory, which could be possible when optimizing iterator
+      // without this, multiple rows in same key will be returned with same content
+      val row = iter.next().copy()
+
+      val keys = groupingWithoutSessionProjection(row)
+      val session = sessionProjection(row)
+      val sessionStruct = session.getStruct(0, 2)
+      val sessionStart = getSessionStart(sessionStruct)
+      val sessionEnd = getSessionEnd(sessionStruct)
+
+      if (currentKeys == null) {
+        startNewSession(row, keys, sessionStruct)
+      } else if (keys != currentKeys) {
+        closeCurrentSession(keyChanged = true)
+        processedKeys.add(currentKeys)
+        startNewSession(row, keys, sessionStruct)
+        exitCondition = true
+      } else {
+        if (sessionStart < getSessionStart(currentSession)) {
+          handleBrokenPreconditionForSort()
+        } else if (sessionStart <= getSessionEnd(currentSession)) {
+          // expanding session length if needed
+          expandEndOfCurrentSession(sessionEnd)
+          currentRows.add(row.asInstanceOf[UnsafeRow])
+        } else {
+          closeCurrentSession(keyChanged = false)
+          startNewSession(row, keys, sessionStruct)
+          exitCondition = true
+        }
+      }
+    }
+
+    if (!iter.hasNext) {
+      // no further row: closing session
+      closeCurrentSession(keyChanged = false)
+    }
+
+    // here returnRowsIter should be able to provide at least one row
+    require(returnRowsIter != null && returnRowsIter.hasNext)
+
+    returnRowsIter.next()
+  }
+
+  private def startNewSession(
+      currentRow: InternalRow,
+      groupingKey: UnsafeRow,
+      sessionStruct: UnsafeRow): Unit = {
+    if (processedKeys.contains(groupingKey)) {
+      handleBrokenPreconditionForSort()
+    }
+
+    currentKeys = groupingKey.copy()
+    currentSession = sessionStruct.copy()
+
+    currentRows.clear()
+    currentRows.add(currentRow.asInstanceOf[UnsafeRow])
+  }
+
+  private def getSessionStart(sessionStruct: UnsafeRow): Long = {
+    sessionStruct.getLong(0)
+  }
+
+  private def getSessionEnd(sessionStruct: UnsafeRow): Long = {
+    sessionStruct.getLong(1)
+  }
+
+  def updateSessionEnd(sessionStruct: UnsafeRow, sessionEnd: Long): Unit = {
+    sessionStruct.setLong(1, sessionEnd)
+  }
+
+  private def expandEndOfCurrentSession(sessionEnd: Long): Unit = {
+    if (sessionEnd > getSessionEnd(currentSession)) {
+      updateSessionEnd(currentSession, sessionEnd)
+    }
+  }
+
+  private def handleBrokenPreconditionForSort(): Unit = {
+    errorOnIterator = true
+    throw new IllegalStateException("The iterator must be sorted by key and session start!")
+  }
+
+  private val join = new JoinedRow
+  private val join2 = new JoinedRow
+
+  private val groupingKeyProj = GenerateUnsafeProjection.generate(groupingExpressions,
+    groupingWithoutSessionAttributes :+ sessionExpression.toAttribute)
+  private val valueProj = GenerateUnsafeProjection.generate(valuesExpressions, inputSchema)
+  private val restoreProj = GenerateUnsafeProjection.generate(inputSchema,
+    groupingExpressions.map(_.toAttribute) ++ valuesExpressions.map(_.toAttribute))
+
+  private def generateGroupingKey(): UnsafeRow = {
+    val newRow = new SpecificInternalRow(Seq(sessionExpression.toAttribute).toStructType)
+    newRow.update(0, currentSession)
+    val joined = join(currentKeys, newRow)
+
+    groupingKeyProj(joined)
+  }
+
+  private def closeCurrentSession(keyChanged: Boolean): Unit = {
+    returnRows = currentRows
+    currentRows = new ExternalAppendOnlyUnsafeRowArray(

Review comment:
       why we need create new `ExternalAppendOnlyUnsafeRowArray`? don't we already do `clear`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-809109297


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/136628/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-826832910






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-826668676


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42469/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-809855283


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41259/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-826673205






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31986: [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31986:
URL: https://github.com/apache/spark/pull/31986#issuecomment-842106040






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org