You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by tdas <gi...@git.apache.org> on 2017/10/03 02:57:11 UTC

[GitHub] spark pull request #19416: [SPARK-22187][SS] Update unsaferow format for sav...

GitHub user tdas opened a pull request:

    https://github.com/apache/spark/pull/19416

    [SPARK-22187][SS] Update unsaferow format for saved state such that we can set timeouts when state is null

    ## What changes were proposed in this pull request?
    
    Currently, the group state of user-defined-type is encoded as top-level columns in the UnsafeRows stores in the state store. The timeout timestamp is also saved as (when needed) as the last top-level column. Since the group state is serialized to top-level columns, you cannot save "null" as a value of state (setting null in all the top-level columns is not equivalent). So we don't let the user set the timeout without initializing the state for a key. Based on user experience, this leads to confusion.
    
    This PR is to change the row format such that the state is saved as nested columns. This would allow the state to be set to null, and avoid these confusing corner cases.
    
    ## How was this patch tested?
    Refactored tests.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tdas/spark SPARK-22187

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/19416.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #19416
    
----
commit 301e0a15b87be8cd1c71090ece3497191bbd3881
Author: Tathagata Das <ta...@gmail.com>
Date:   2017-09-29T03:10:34Z

    Refactored all state operations into separate inner class

commit 64a8d865f71a92ed9f76879eb6c5a24d1fef8cec
Author: Tathagata Das <ta...@gmail.com>
Date:   2017-10-03T02:39:05Z

    Refactored and changed state format

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19416: [SPARK-22187][SS] Update unsaferow format for saved stat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19416
  
    **[Test build #82407 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82407/testReport)** for PR 19416 at commit [`64a8d86`](https://github.com/apache/spark/commit/64a8d865f71a92ed9f76879eb6c5a24d1fef8cec).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class FlatMapGroupsWithState_StateManager(`
      * `case class FlatMapGroupsWithState_StateData(`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19416: [SPARK-22187][SS] Update unsaferow format for saved stat...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:

    https://github.com/apache/spark/pull/19416
  
    LGTM pending tests


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19416: [SPARK-22187][SS] Update unsaferow format for saved stat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19416
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19416: [SPARK-22187][SS] Update unsaferow format for sav...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19416#discussion_r142303228
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala ---
    @@ -397,50 +435,23 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
             timeoutConf = EventTimeTimeout,
             priorState = priorState,
             priorTimeoutTimestamp = priorTimeoutTimestamp,
    -        expectedState = Some(5),                                 // state should change
    -        expectedTimeoutTimestamp = NO_TIMESTAMP)                 // timestamp should not update
    -    }
    -  }
    -
    -  // Currently disallowed cases for StateStoreUpdater.updateStateForKeysWithData(),
    -  // Try to remove these cases in the future
    -  for (priorTimeoutTimestamp <- Seq(NO_TIMESTAMP, 1000)) {
    --- End diff --
    
    These functions test the cases where exception used to thrown to avoid null state + timeout to be saved. The exception checks have now been replaced by the correct output tests (see the added lines above).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19416: [SPARK-22187][SS] Update unsaferow format for sav...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19416#discussion_r142303057
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/FlatMapGroupsWithState_StateManager.scala ---
    @@ -0,0 +1,143 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.state
    +
    +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BoundReference, CaseWhen, CreateNamedStruct, GetStructField, IsNull, Literal, UnsafeRow}
    +import org.apache.spark.sql.execution.ObjectOperator
    +import org.apache.spark.sql.execution.streaming.GroupStateImpl
    +import org.apache.spark.sql.execution.streaming.GroupStateImpl.NO_TIMESTAMP
    +import org.apache.spark.sql.types.{IntegerType, LongType, StructType}
    +
    +
    +class FlatMapGroupsWithState_StateManager(
    +    stateEncoder: ExpressionEncoder[Any],
    +    shouldStoreTimestamp: Boolean) extends Serializable {
    +
    +  val stateSchema = {
    +    val schema = new StructType().add("groupState", stateEncoder.schema, nullable = true)
    +    if (shouldStoreTimestamp) schema.add("timeoutTimestamp", LongType) else schema
    +  }
    +
    +  def getState(store: StateStore, keyRow: UnsafeRow): FlatMapGroupsWithState_StateData = {
    +    val stateRow = store.get(keyRow)
    +    stateDataForGets.withNew(
    +      keyRow, stateRow, getStateObj(stateRow), getTimestamp(stateRow))
    +  }
    +
    +  def putState(store: StateStore, keyRow: UnsafeRow, state: Any, timestamp: Long): Unit = {
    +    val stateRow = getStateRow(state)
    +    setTimestamp(stateRow, timestamp)
    +    store.put(keyRow, stateRow)
    +  }
    +
    +  def removeState(store: StateStore, keyRow: UnsafeRow): Unit = {
    +    store.remove(keyRow)
    +  }
    +
    +  def getAllState(store: StateStore): Iterator[FlatMapGroupsWithState_StateData] = {
    +    val stateDataForGetAllState = FlatMapGroupsWithState_StateData()
    +    store.getRange(None, None).map { pair =>
    +      stateDataForGetAllState.withNew(
    +        pair.key, pair.value, getStateObjFromRow(pair.value), getTimestamp(pair.value))
    +    }
    +  }
    +
    +  private val stateAttributes: Seq[Attribute] = stateSchema.toAttributes
    +
    +  // Get the serializer for the state, taking into account whether we need to save timestamps
    +  private val stateSerializer = {
    +    val nestedStateExpr = CreateNamedStruct(
    +      stateEncoder.namedExpressions.flatMap(e => Seq(Literal(e.name), e)))
    +    if (shouldStoreTimestamp) {
    +      Seq(nestedStateExpr, Literal(GroupStateImpl.NO_TIMESTAMP))
    +    } else {
    +      Seq(nestedStateExpr)
    +    }
    +  }
    +
    +  // Get the deserializer for the state. Note that this must be done in the driver, as
    +  // resolving and binding of deserializer expressions to the encoded type can be safely done
    +  // only in the driver.
    +  private val stateDeserializer = {
    +    val boundRefToNestedState = BoundReference(nestedStateOrdinal, stateEncoder.schema, true)
    +    val deser = stateEncoder.resolveAndBind().deserializer.transformUp {
    +      case BoundReference(ordinal, _, _) => GetStructField(boundRefToNestedState, ordinal)
    +    }
    +    CaseWhen(Seq(IsNull(boundRefToNestedState) -> Literal(null)), elseValue = deser).toCodegen()
    +  }
    +
    +  private lazy val nestedStateOrdinal = 0
    +  private lazy val timeoutTimestampOrdinal = 1
    +
    +  // Converters for translating state between rows and Java objects
    +  private lazy val getStateObjFromRow = ObjectOperator.deserializeRowToObject(
    +    stateDeserializer, stateAttributes)
    +  private lazy val getStateRowFromObj = ObjectOperator.serializeObjectToRow(stateSerializer)
    +
    +  private lazy val stateDataForGets = FlatMapGroupsWithState_StateData()
    +
    +  /** Returns the state as Java object if defined */
    +  private def getStateObj(stateRow: UnsafeRow): Any = {
    +    if (stateRow == null) null
    +    // else if (stateRow.isNullAt(nestedStateOrdinal)) null
    +    else getStateObjFromRow(stateRow)
    +  }
    +
    +  /** Returns the row for an updated state */
    +  private def getStateRow(obj: Any): UnsafeRow = {
    +    val row = getStateRowFromObj(obj)
    +    if (obj == null) {
    +      row.setNullAt(nestedStateOrdinal)
    +    }
    +    row
    +  }
    +
    +  /** Returns the timeout timestamp of a state row is set */
    +  private def getTimestamp(stateRow: UnsafeRow): Long = {
    +    if (shouldStoreTimestamp && stateRow != null) {
    +      stateRow.getLong(timeoutTimestampOrdinal)
    +    } else NO_TIMESTAMP
    +  }
    +
    +  /** Set the timestamp in a state row */
    +  private def setTimestamp(stateRow: UnsafeRow, timeoutTimestamps: Long): Unit = {
    +    if (shouldStoreTimestamp) stateRow.setLong(timeoutTimestampOrdinal, timeoutTimestamps)
    +  }
    +
    +}
    +
    +
    +case class FlatMapGroupsWithState_StateData(
    --- End diff --
    
    add docs


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19416: [SPARK-22187][SS] Update unsaferow format for sav...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19416#discussion_r142304163
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/FlatMapGroupsWithState_StateManager.scala ---
    @@ -0,0 +1,143 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.state
    +
    +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BoundReference, CaseWhen, CreateNamedStruct, GetStructField, IsNull, Literal, UnsafeRow}
    +import org.apache.spark.sql.execution.ObjectOperator
    +import org.apache.spark.sql.execution.streaming.GroupStateImpl
    +import org.apache.spark.sql.execution.streaming.GroupStateImpl.NO_TIMESTAMP
    +import org.apache.spark.sql.types.{IntegerType, LongType, StructType}
    +
    +
    +class FlatMapGroupsWithState_StateManager(
    +    stateEncoder: ExpressionEncoder[Any],
    +    shouldStoreTimestamp: Boolean) extends Serializable {
    +
    +  val stateSchema = {
    +    val schema = new StructType().add("groupState", stateEncoder.schema, nullable = true)
    +    if (shouldStoreTimestamp) schema.add("timeoutTimestamp", LongType) else schema
    +  }
    +
    +  def getState(store: StateStore, keyRow: UnsafeRow): FlatMapGroupsWithState_StateData = {
    +    val stateRow = store.get(keyRow)
    +    stateDataForGets.withNew(
    +      keyRow, stateRow, getStateObj(stateRow), getTimestamp(stateRow))
    +  }
    +
    +  def putState(store: StateStore, keyRow: UnsafeRow, state: Any, timestamp: Long): Unit = {
    +    val stateRow = getStateRow(state)
    +    setTimestamp(stateRow, timestamp)
    +    store.put(keyRow, stateRow)
    +  }
    +
    +  def removeState(store: StateStore, keyRow: UnsafeRow): Unit = {
    +    store.remove(keyRow)
    +  }
    +
    +  def getAllState(store: StateStore): Iterator[FlatMapGroupsWithState_StateData] = {
    +    val stateDataForGetAllState = FlatMapGroupsWithState_StateData()
    +    store.getRange(None, None).map { pair =>
    +      stateDataForGetAllState.withNew(
    +        pair.key, pair.value, getStateObjFromRow(pair.value), getTimestamp(pair.value))
    +    }
    +  }
    +
    +  private val stateAttributes: Seq[Attribute] = stateSchema.toAttributes
    +
    +  // Get the serializer for the state, taking into account whether we need to save timestamps
    +  private val stateSerializer = {
    +    val nestedStateExpr = CreateNamedStruct(
    +      stateEncoder.namedExpressions.flatMap(e => Seq(Literal(e.name), e)))
    +    if (shouldStoreTimestamp) {
    +      Seq(nestedStateExpr, Literal(GroupStateImpl.NO_TIMESTAMP))
    +    } else {
    +      Seq(nestedStateExpr)
    +    }
    +  }
    +
    +  // Get the deserializer for the state. Note that this must be done in the driver, as
    +  // resolving and binding of deserializer expressions to the encoded type can be safely done
    +  // only in the driver.
    +  private val stateDeserializer = {
    +    val boundRefToNestedState = BoundReference(nestedStateOrdinal, stateEncoder.schema, true)
    +    val deser = stateEncoder.resolveAndBind().deserializer.transformUp {
    +      case BoundReference(ordinal, _, _) => GetStructField(boundRefToNestedState, ordinal)
    +    }
    +    CaseWhen(Seq(IsNull(boundRefToNestedState) -> Literal(null)), elseValue = deser).toCodegen()
    +  }
    +
    +  private lazy val nestedStateOrdinal = 0
    +  private lazy val timeoutTimestampOrdinal = 1
    +
    +  // Converters for translating state between rows and Java objects
    +  private lazy val getStateObjFromRow = ObjectOperator.deserializeRowToObject(
    +    stateDeserializer, stateAttributes)
    +  private lazy val getStateRowFromObj = ObjectOperator.serializeObjectToRow(stateSerializer)
    +
    +  private lazy val stateDataForGets = FlatMapGroupsWithState_StateData()
    +
    +  /** Returns the state as Java object if defined */
    +  private def getStateObj(stateRow: UnsafeRow): Any = {
    +    if (stateRow == null) null
    +    // else if (stateRow.isNullAt(nestedStateOrdinal)) null
    +    else getStateObjFromRow(stateRow)
    +  }
    +
    +  /** Returns the row for an updated state */
    +  private def getStateRow(obj: Any): UnsafeRow = {
    +    val row = getStateRowFromObj(obj)
    --- End diff --
    
    This part needs work. The current serializer expressions does not set the top level `groupState` column (pointing to nested state columns) in the unsaferow to null. So i am explicitly setting it to null. However, the saved nested columns are still set to a value. Its possible that nested unsaferow can be removed, thus saving space.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19416: [SPARK-22187][SS] Update unsaferow format for sav...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19416#discussion_r142303281
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala ---
    @@ -376,9 +388,35 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
             expectedTimeoutTimestamp = currentBatchTimestamp + 5000) // timestamp should change
     
           testStateUpdateWithData(
    +        s"ProcessingTimeTimeout - $testName - timeout updated after state removed",
    +        stateUpdates = state => { state.remove(); state.setTimeoutDuration(5000) },
    +        timeoutConf = ProcessingTimeTimeout,
    +        priorState = priorState,
    +        priorTimeoutTimestamp = priorTimeoutTimestamp,
    +        expectedState = None,
    +        expectedTimeoutTimestamp = currentBatchTimestamp + 5000)
    +
    +      // Tests with EventTimeTimeout
    +
    +      if (priorState == None) {
    +        testStateUpdateWithData(
    +          s"EventTimeTimeout - $testName - setting timeout without init state not allowed",
    +          stateUpdates = state => {
    --- End diff --
    
    condense to single line.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19416: [SPARK-22187][SS] Update unsaferow format for sav...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19416#discussion_r142583033
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/FlatMapGroupsWithState_StateManager.scala ---
    @@ -0,0 +1,143 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.state
    +
    +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BoundReference, CaseWhen, CreateNamedStruct, GetStructField, IsNull, Literal, UnsafeRow}
    +import org.apache.spark.sql.execution.ObjectOperator
    +import org.apache.spark.sql.execution.streaming.GroupStateImpl
    +import org.apache.spark.sql.execution.streaming.GroupStateImpl.NO_TIMESTAMP
    +import org.apache.spark.sql.types.{IntegerType, LongType, StructType}
    +
    +
    +class FlatMapGroupsWithState_StateManager(
    +    stateEncoder: ExpressionEncoder[Any],
    +    shouldStoreTimestamp: Boolean) extends Serializable {
    +
    +  val stateSchema = {
    +    val schema = new StructType().add("groupState", stateEncoder.schema, nullable = true)
    +    if (shouldStoreTimestamp) schema.add("timeoutTimestamp", LongType) else schema
    +  }
    +
    +  def getState(store: StateStore, keyRow: UnsafeRow): FlatMapGroupsWithState_StateData = {
    +    val stateRow = store.get(keyRow)
    +    stateDataForGets.withNew(
    +      keyRow, stateRow, getStateObj(stateRow), getTimestamp(stateRow))
    +  }
    +
    +  def putState(store: StateStore, keyRow: UnsafeRow, state: Any, timestamp: Long): Unit = {
    +    val stateRow = getStateRow(state)
    +    setTimestamp(stateRow, timestamp)
    +    store.put(keyRow, stateRow)
    +  }
    +
    +  def removeState(store: StateStore, keyRow: UnsafeRow): Unit = {
    +    store.remove(keyRow)
    +  }
    +
    +  def getAllState(store: StateStore): Iterator[FlatMapGroupsWithState_StateData] = {
    +    val stateDataForGetAllState = FlatMapGroupsWithState_StateData()
    +    store.getRange(None, None).map { pair =>
    +      stateDataForGetAllState.withNew(
    +        pair.key, pair.value, getStateObjFromRow(pair.value), getTimestamp(pair.value))
    +    }
    +  }
    +
    +  private val stateAttributes: Seq[Attribute] = stateSchema.toAttributes
    +
    +  // Get the serializer for the state, taking into account whether we need to save timestamps
    +  private val stateSerializer = {
    +    val nestedStateExpr = CreateNamedStruct(
    +      stateEncoder.namedExpressions.flatMap(e => Seq(Literal(e.name), e)))
    +    if (shouldStoreTimestamp) {
    +      Seq(nestedStateExpr, Literal(GroupStateImpl.NO_TIMESTAMP))
    +    } else {
    +      Seq(nestedStateExpr)
    +    }
    +  }
    +
    +  // Get the deserializer for the state. Note that this must be done in the driver, as
    +  // resolving and binding of deserializer expressions to the encoded type can be safely done
    +  // only in the driver.
    +  private val stateDeserializer = {
    +    val boundRefToNestedState = BoundReference(nestedStateOrdinal, stateEncoder.schema, true)
    +    val deser = stateEncoder.resolveAndBind().deserializer.transformUp {
    +      case BoundReference(ordinal, _, _) => GetStructField(boundRefToNestedState, ordinal)
    +    }
    +    CaseWhen(Seq(IsNull(boundRefToNestedState) -> Literal(null)), elseValue = deser).toCodegen()
    +  }
    +
    +  private lazy val nestedStateOrdinal = 0
    +  private lazy val timeoutTimestampOrdinal = 1
    +
    +  // Converters for translating state between rows and Java objects
    +  private lazy val getStateObjFromRow = ObjectOperator.deserializeRowToObject(
    +    stateDeserializer, stateAttributes)
    +  private lazy val getStateRowFromObj = ObjectOperator.serializeObjectToRow(stateSerializer)
    +
    +  private lazy val stateDataForGets = FlatMapGroupsWithState_StateData()
    +
    +  /** Returns the state as Java object if defined */
    +  private def getStateObj(stateRow: UnsafeRow): Any = {
    +    if (stateRow == null) null
    +    // else if (stateRow.isNullAt(nestedStateOrdinal)) null
    --- End diff --
    
    nit: remove this


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19416: [SPARK-22187][SS] Update unsaferow format for sav...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19416#discussion_r142303254
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala ---
    @@ -376,9 +388,35 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
             expectedTimeoutTimestamp = currentBatchTimestamp + 5000) // timestamp should change
     
           testStateUpdateWithData(
    +        s"ProcessingTimeTimeout - $testName - timeout updated after state removed",
    +        stateUpdates = state => { state.remove(); state.setTimeoutDuration(5000) },
    +        timeoutConf = ProcessingTimeTimeout,
    +        priorState = priorState,
    +        priorTimeoutTimestamp = priorTimeoutTimestamp,
    +        expectedState = None,
    +        expectedTimeoutTimestamp = currentBatchTimestamp + 5000)
    +
    +      // Tests with EventTimeTimeout
    +
    +      if (priorState == None) {
    +        testStateUpdateWithData(
    +          s"EventTimeTimeout - $testName - setting timeout without init state not allowed",
    +          stateUpdates = state => {
    +            state.setTimeoutTimestamp(10000)
    +          },
    +          timeoutConf = EventTimeTimeout,
    +          priorState = None,
    +          priorTimeoutTimestamp = priorTimeoutTimestamp,
    +          expectedState = None,
    +          expectedTimeoutTimestamp = 10000)
    +      }
    +
    +      testStateUpdateWithData(
             s"EventTimeTimeout - $testName - state and timeout timestamp updated",
             stateUpdates =
    -          (state: GroupState[Int]) => { state.update(5); state.setTimeoutTimestamp(5000) },
    +          (state: GroupState[Int]) => {
    --- End diff --
    
    undo this change.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19416: [SPARK-22187][SS] Update unsaferow format for sav...

Posted by yssharma <gi...@git.apache.org>.
Github user yssharma commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19416#discussion_r142368186
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/FlatMapGroupsWithState_StateManager.scala ---
    @@ -0,0 +1,143 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.state
    +
    +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BoundReference, CaseWhen, CreateNamedStruct, GetStructField, IsNull, Literal, UnsafeRow}
    +import org.apache.spark.sql.execution.ObjectOperator
    +import org.apache.spark.sql.execution.streaming.GroupStateImpl
    +import org.apache.spark.sql.execution.streaming.GroupStateImpl.NO_TIMESTAMP
    +import org.apache.spark.sql.types.{IntegerType, LongType, StructType}
    +
    +
    +class FlatMapGroupsWithState_StateManager(
    --- End diff --
    
    Is the underscore in the file name for clarity ?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19416: [SPARK-22187][SS] Update unsaferow format for saved stat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19416
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19416: [SPARK-22187][SS] Update unsaferow format for sav...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19416#discussion_r142562531
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/FlatMapGroupsWithState_StateManager.scala ---
    @@ -0,0 +1,143 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.state
    +
    +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BoundReference, CaseWhen, CreateNamedStruct, GetStructField, IsNull, Literal, UnsafeRow}
    +import org.apache.spark.sql.execution.ObjectOperator
    +import org.apache.spark.sql.execution.streaming.GroupStateImpl
    +import org.apache.spark.sql.execution.streaming.GroupStateImpl.NO_TIMESTAMP
    +import org.apache.spark.sql.types.{IntegerType, LongType, StructType}
    +
    +
    +class FlatMapGroupsWithState_StateManager(
    --- End diff --
    
    Yes it is.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19416: [SPARK-22187][SS] Update unsaferow format for saved stat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19416
  
    **[Test build #82454 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82454/testReport)** for PR 19416 at commit [`ee0b81a`](https://github.com/apache/spark/commit/ee0b81a148c892e0f27191c31dbef1c42424b6e1).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19416: [SPARK-22187][SS] Update unsaferow format for saved stat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19416
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82454/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19416: [SPARK-22187][SS] Update unsaferow format for saved stat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19416
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82407/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19416: [SPARK-22187][SS] Update unsaferow format for sav...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19416#discussion_r142302989
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala ---
    @@ -62,26 +60,7 @@ case class FlatMapGroupsWithStateExec(
       import GroupStateImpl._
     
       private val isTimeoutEnabled = timeoutConf != NoTimeout
    -  private val timestampTimeoutAttribute =
    -    AttributeReference("timeoutTimestamp", dataType = IntegerType, nullable = false)()
    -  private val stateAttributes: Seq[Attribute] = {
    -    val encSchemaAttribs = stateEncoder.schema.toAttributes
    -    if (isTimeoutEnabled) encSchemaAttribs :+ timestampTimeoutAttribute else encSchemaAttribs
    -  }
    -  // Get the serializer for the state, taking into account whether we need to save timestamps
    -  private val stateSerializer = {
    -    val encoderSerializer = stateEncoder.namedExpressions
    -    if (isTimeoutEnabled) {
    -      encoderSerializer :+ Literal(GroupStateImpl.NO_TIMESTAMP)
    -    } else {
    -      encoderSerializer
    -    }
    -  }
    -  // Get the deserializer for the state. Note that this must be done in the driver, as
    -  // resolving and binding of deserializer expressions to the encoded type can be safely done
    -  // only in the driver.
    -  private val stateDeserializer = stateEncoder.resolveAndBind().deserializer
    -
    +  val stateManager = new FlatMapGroupsWithState_StateManager(stateEncoder, isTimeoutEnabled)
    --- End diff --
    
    Refactored this class to separate out the state management from the processing. This results in this class being far simpler.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19416: [SPARK-22187][SS] Update unsaferow format for sav...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/19416


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19416: [SPARK-22187][SS] Update unsaferow format for saved stat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19416
  
    **[Test build #82454 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82454/testReport)** for PR 19416 at commit [`ee0b81a`](https://github.com/apache/spark/commit/ee0b81a148c892e0f27191c31dbef1c42424b6e1).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19416: [SPARK-22187][SS] Update unsaferow format for sav...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19416#discussion_r142303019
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/FlatMapGroupsWithState_StateManager.scala ---
    @@ -0,0 +1,143 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.state
    +
    +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BoundReference, CaseWhen, CreateNamedStruct, GetStructField, IsNull, Literal, UnsafeRow}
    +import org.apache.spark.sql.execution.ObjectOperator
    +import org.apache.spark.sql.execution.streaming.GroupStateImpl
    +import org.apache.spark.sql.execution.streaming.GroupStateImpl.NO_TIMESTAMP
    +import org.apache.spark.sql.types.{IntegerType, LongType, StructType}
    +
    +
    +class FlatMapGroupsWithState_StateManager(
    --- End diff --
    
    Add docs.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19416: [SPARK-22187][SS] Update unsaferow format for saved stat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19416
  
    **[Test build #82407 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82407/testReport)** for PR 19416 at commit [`64a8d86`](https://github.com/apache/spark/commit/64a8d865f71a92ed9f76879eb6c5a24d1fef8cec).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org