You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by mwws <gi...@git.apache.org> on 2015/12/10 09:42:49 UTC

[GitHub] spark pull request: [Spark-12260][wip][Streaming]Graceful Shutdown...

GitHub user mwws opened a pull request:

    https://github.com/apache/spark/pull/10252

    [Spark-12260][wip][Streaming]Graceful Shutdown with In-Memory State

    [View Detail Design DOC](https://docs.google.com/document/d/1JS9W370hNTUCtwHLa8WiKRuOuIbo8A-UwYBdbK7WGg8/edit?usp=sharing)
    
    **Motivation:**
    Users often stop and restart their streaming jobs for tasks such as maintenance, software upgrades or even application logic updates. When a job re-starts it should pick up where it left off i.e. any state information that existed when the job stopped should be used as the initial state when the job restarts.
    
    **Proposal:**
    I  create an abstract class called `DumpableDStream` which extends `DStream`, and make stateful `DStreams` entend it, for instance, `MapWithStateDStream`, `StateDStream`. So that these stateful `DSteams` will have common ancestor.
    
    `DumpFormat` is also an abstract class, looks like following:
    ```scala
    abstract class DSteamDumpFormat[K, S] extends Serializable{
         val identity : String
         private[streaming] def dump(rdd : RDD[(K, S)], path: String): Unit
         def load(sc: SparkContext, path: String): RDD[(K, S)]
    }
    ```
    User can customize his own `DumpFormat` by extending this abstract class. There are three interfaces in it. “identity” is used to recognize dumped RDD during loading it back, it have to be provided for every DumpableDStream instance. “dump” method is designed to dump inputted DRR to given path, this method will be called by `DumpableDStream.dump` method. “load” method is designed to load RDD back from given path, user need to call it expilicatly.  
    
    When an application performs graceful shutdown via invoking `StreamingContext.stop(…)`, it invoke a method `ssc.graph.dump` that will traverse current `DStreamGraph` to find all `DumpableDStream` and perform `DumpableDStream.dump() `
    
    **Usage:**
    From user perspective, first he/she need to create a new DumpFormat instance:
    ```scala
    val df = new CheckpointDumpFormat[String, Int]("WordCount")
    ```
    Then set this DumpFormat instance into stateDStream reference”. That’s all.
    ```scala
    	val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc,
        new HashPartitioner(ssc.sparkContext.defaultParallelism), true, initialRDD)
      	stateDstream.setDumpFormat(df)
    ```
    Once StreamingContext.stop() is invoked with “stopGracefully = true”, the state of WordCount will be automatically saved into HDFS and user can easily load it back when restarting the process by following code:
    ```scala
    	val initialRDD = df.load(ssc.sc , checkpointDir+"/WordCount")
    ```
    For advance usage, user can create his own DumpFormat type to specify serializer or save in-memory state into Datadase, etc.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mwws/spark SPARK-GSD

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/10252.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #10252
    
----
commit c8eacb5ce6e5887e786c91f722ead4d22909503e
Author: mwws <we...@intel.com>
Date:   2015-11-18T06:39:17Z

    Merge pull request #3 from apache/master
    
    sync latest apache spark master (11/18)

commit bda45dd48ba270326d7b042ddc71af3d8032aa02
Author: mwws <we...@intel.com>
Date:   2015-11-23T07:52:14Z

    First checkin for dumping stateful RDD
    
    For detail usage, please refer to file Demo.scala

commit 6e0fcfc33a7b1dd455a4d3f331df766bace02153
Author: mwws <we...@intel.com>
Date:   2015-12-10T07:58:18Z

    refactor implementation of dumping status DStream

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [Spark-12260][wip][Streaming]Graceful Shutdown...

Posted by mwws <gi...@git.apache.org>.
Github user mwws commented on the pull request:

    https://github.com/apache/spark/pull/10252#issuecomment-163536795
  
    @tdas Could you help to review this PR? As we discussed before, some users have requirements to dump in-memory states when process shutdown, at the same time, they want to recover states back during restarting. This is the initial implementation to meet such requirement. And I have marked this PR with [wip].  Let me know how you think about it. Thank you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [Spark-12260][wip][Streaming]Graceful Shutdown...

Posted by mwws <gi...@git.apache.org>.
Github user mwws commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10252#discussion_r47458158
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala ---
    @@ -330,6 +330,20 @@ abstract class DStream[T: ClassTag] (
       }
     
       /**
    +   * compute the last valid time till input time
    +   * (validTime = zeroTime + N * slideDuration)
    +   */
    +  private[streaming] def lastValidTime(time: Time): Time = {
    +    if (time < zeroTime) {
    +      throw new IllegalArgumentException(
    +          s"Input(${time}) must bigger then zeroTime($zeroTime)")
    +    }
    +
    +    val multi: Int = ((time - zeroTime) / slideDuration).floor.toInt
    --- End diff --
    
    They do the same thing, but I would still like to keep `floor` method explicitly instead of delegating to default float->int convention, in order to make it clearer and avoiding potential risk.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [Spark-12260][wip][Streaming]Graceful Shutdown...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on the pull request:

    https://github.com/apache/spark/pull/10252#issuecomment-180635182
  
    @zzcclp can you please describe how do you recover from the streaming? We are actually will spit this into 2 separated ones, but still thinking what the API users want.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [Spark-12260][wip][Streaming]Graceful Shutdown...

Posted by mwws <gi...@git.apache.org>.
Github user mwws commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10252#discussion_r47457824
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala ---
    @@ -18,12 +18,14 @@
     package org.apache.spark.streaming
     
     import scala.collection.mutable.ArrayBuffer
    -import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
    +import java.io.{ ObjectInputStream, IOException, ObjectOutputStream }
    --- End diff --
    
    I guess it's changed by auto format.. I will remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [Spark-12260][wip][Streaming]Graceful Shutdown...

Posted by shimingfei <gi...@git.apache.org>.
Github user shimingfei commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10252#discussion_r47323020
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/DSteamDumpFormat.scala ---
    @@ -0,0 +1,58 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.dstream
    +
    +import scala.reflect.ClassTag
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.SerializableConfiguration
    +import org.apache.spark.rdd.ReliableCheckpointRDD
    +import org.apache.spark.SparkContext
    +
    +/**
    + * User can customize DumpFormat by extends this abstract class.
    + */
    +abstract class DSteamDumpFormat[K, S] extends Serializable{
    --- End diff --
    
    space “ Serializable {"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [Spark-12260][wip][Streaming]Graceful Shutdown...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10252#discussion_r47365574
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/DSteamDumpFormat.scala ---
    @@ -0,0 +1,59 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.dstream
    +
    +import scala.reflect.ClassTag
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.SerializableConfiguration
    +import org.apache.spark.rdd.ReliableCheckpointRDD
    +import org.apache.spark.SparkContext
    +
    +/**
    + * User can customize DumpFormat by extends this abstract class.
    + */
    +abstract class DSteamDumpFormat[K, S] extends Serializable {
    --- End diff --
    
    typo `DStreamDumpFormat`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [Spark-12260][wip][Streaming]Graceful Shutdown...

Posted by zzcclp <gi...@git.apache.org>.
Github user zzcclp commented on the pull request:

    https://github.com/apache/spark/pull/10252#issuecomment-174453565
  
    Hi, @mwws , @chenghao-intel , is there any progress on this pr?
    When we use spark streaming, it will lose some stateful data when we upgrade code, i think this pr may  resolve our problem. Is there plan to continue on this pr?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [Spark-12260][wip][Streaming]Graceful Shutdown...

Posted by mwws <gi...@git.apache.org>.
Github user mwws commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10252#discussion_r47463413
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/Demo.scala ---
    @@ -0,0 +1,94 @@
    +/*
    --- End diff --
    
    Yes, I will remove this file and add some unit tests if people agree that this PR is in the right direction. For now, this temp application should be enough to demonstrate the usage.   


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [Spark-12260][wip][Streaming]Graceful Shutdown...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the pull request:

    https://github.com/apache/spark/pull/10252#issuecomment-170501263
  
    That's fine too; it needs a rebase. I'm personally skeptical this can be solved in the general case, or should be, but getting more specific about the API is probably the only way forward.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [Spark-12260][wip][Streaming]Graceful Shutdown...

Posted by mwws <gi...@git.apache.org>.
Github user mwws commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10252#discussion_r47457840
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/DSteamDumpFormat.scala ---
    @@ -0,0 +1,59 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.dstream
    +
    +import scala.reflect.ClassTag
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.SerializableConfiguration
    +import org.apache.spark.rdd.ReliableCheckpointRDD
    +import org.apache.spark.SparkContext
    +
    +/**
    + * User can customize DumpFormat by extends this abstract class.
    + */
    +abstract class DSteamDumpFormat[K, S] extends Serializable {
    --- End diff --
    
    nice catch!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [Spark-12260][wip][Streaming]Graceful Shutdown...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on the pull request:

    https://github.com/apache/spark/pull/10252#issuecomment-185005558
  
    @zzcclp we have another PR for this fixing, #11101, please check if it can fix your problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [Spark-12260][wip][Streaming]Graceful Shutdown...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the pull request:

    https://github.com/apache/spark/pull/10252#issuecomment-167541007
  
    Before reviewing this further, let's return to the JIRA. I do not see that this implements something that you a) can't do with updateStateByKey or trackStateByKey, or b) that Spark should be managing anyway.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [Spark-12260][wip][Streaming]Graceful Shutdown...

Posted by adrian-wang <gi...@git.apache.org>.
Github user adrian-wang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10252#discussion_r48232309
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala ---
    @@ -171,6 +173,23 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
         inputStreams.map(_.rememberDuration).filter(_ != null).maxBy(_.milliseconds)
       }
     
    +  /**
    +   * recursively visit every node in DStreamGraph and invoke dump for all DumpableDStream
    +   */
    +  def dump(time: Time, path: String) {
    --- End diff --
    
    : Unit = 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [Spark-12260][wip][Streaming]Graceful Shutdown...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10252#issuecomment-163871326
  
    **[Test build #47576 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47576/consoleFull)** for PR 10252 at commit [`163d362`](https://github.com/apache/spark/commit/163d3627fc1c00c959729a3969906657e5e9eba8).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [Spark-12260][wip][Streaming]Graceful Shutdown...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10252#issuecomment-163646094
  
    **[Test build #47499 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47499/consoleFull)** for PR 10252 at commit [`6e0fcfc`](https://github.com/apache/spark/commit/6e0fcfc33a7b1dd455a4d3f331df766bace02153).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [Spark-12260][wip][Streaming]Graceful Shutdown...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on the pull request:

    https://github.com/apache/spark/pull/10252#issuecomment-170499432
  
    Sorry @srowen for the confusing, I was talking about this is a partial done PR, I think @mwws will update the code change soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [Spark-12260][wip][Streaming]Graceful Shutdown...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the pull request:

    https://github.com/apache/spark/pull/10252#issuecomment-170478682
  
    @mwws can you close this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [Spark-12260][wip][Streaming]Graceful Shutdown...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10252#discussion_r47364512
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/Demo.scala ---
    @@ -0,0 +1,94 @@
    +/*
    --- End diff --
    
    Can this be a unit test?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [Spark-12260][wip][Streaming]Graceful Shutdown...

Posted by adrian-wang <gi...@git.apache.org>.
Github user adrian-wang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10252#discussion_r48235929
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala ---
    @@ -330,6 +330,20 @@ abstract class DStream[T: ClassTag] (
       }
     
       /**
    +   * compute the last valid time till input time
    +   * (validTime = zeroTime + N * slideDuration)
    +   */
    +  private[streaming] def lastValidTime(time: Time): Time = {
    +    if (time < zeroTime) {
    +      throw new IllegalArgumentException(
    +          s"Input(${time}) must bigger then zeroTime($zeroTime)")
    --- End diff --
    
    than


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [Spark-12260][wip][Streaming]Graceful Shutdown...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10252#discussion_r47365646
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/DSteamDumpFormat.scala ---
    @@ -0,0 +1,59 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.dstream
    +
    +import scala.reflect.ClassTag
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.SerializableConfiguration
    +import org.apache.spark.rdd.ReliableCheckpointRDD
    +import org.apache.spark.SparkContext
    +
    +/**
    + * User can customize DumpFormat by extends this abstract class.
    + */
    +abstract class DSteamDumpFormat[K, S] extends Serializable {
    +  // Identity have to be provided here for every DumpableDStream instance. Otherwise it's not able
    +  // to recognize each other during loading.
    +  val identity : String
    --- End diff --
    
    no space between `identity` and `:`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [Spark-12260][wip][Streaming]Graceful Shutdown...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10252#discussion_r47364366
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala ---
    @@ -22,8 +22,10 @@ import org.apache.spark.Partitioner
     import org.apache.spark.SparkContext._
     import org.apache.spark.storage.StorageLevel
     import org.apache.spark.streaming.{Duration, Time}
    -
     import scala.reflect.ClassTag
    +import org.apache.spark.rdd.ReliableRDDCheckpointData
    --- End diff --
    
    Unnecessary imports?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [Spark-12260][wip][Streaming]Graceful Shutdown...

Posted by adrian-wang <gi...@git.apache.org>.
Github user adrian-wang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10252#discussion_r48232238
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala ---
    @@ -330,6 +330,20 @@ abstract class DStream[T: ClassTag] (
       }
     
       /**
    +   * compute the last valid time till input time
    --- End diff --
    
    Computes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [Spark-12260][wip][Streaming]Graceful Shutdown...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10252#issuecomment-163651610
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/47499/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [Spark-12260][wip][Streaming]Graceful Shutdown...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10252#issuecomment-163873125
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/47576/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [Spark-12260][wip][Streaming]Graceful Shutdown...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on the pull request:

    https://github.com/apache/spark/pull/10252#issuecomment-170436089
  
    After talk with @mwws offline, we probably need to serialize the `accumulator` variables as well.
    
    I personally agree with @srowen, we need to think provide better API, otherwise, app developers can implement the logic themselves.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [Spark-12260][wip][Streaming]Graceful Shutdown...

Posted by zzcclp <gi...@git.apache.org>.
Github user zzcclp commented on the pull request:

    https://github.com/apache/spark/pull/10252#issuecomment-184515856
  
    @chenghao-intel @mwws , sorry for my late reply.
    
    Currently, we just record the kafka offset and accumulators to third-party storage system after per batch, and then restore  them from the window‘s earliest start time. For stateful data, we have no good way to recover by now, so it will lose some statistical data. 
    Next, one of our business system must ensure data integrity after software upgrade or even application logic update, so we urgently hope that spark can native support this feature. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [Spark-12260][wip][Streaming]Graceful Shutdown...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10252#discussion_r47365686
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/DSteamDumpFormat.scala ---
    @@ -0,0 +1,59 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.dstream
    +
    +import scala.reflect.ClassTag
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.SerializableConfiguration
    +import org.apache.spark.rdd.ReliableCheckpointRDD
    +import org.apache.spark.SparkContext
    +
    +/**
    + * User can customize DumpFormat by extends this abstract class.
    + */
    +abstract class DSteamDumpFormat[K, S] extends Serializable {
    +  // Identity have to be provided here for every DumpableDStream instance. Otherwise it's not able
    +  // to recognize each other during loading.
    +  val identity : String
    +
    +  // user can customize how to dump state information
    +  private[streaming] def dump(rdd : RDD[(K, S)], path: String): Unit
    --- End diff --
    
    no space between `identity` and `:`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [Spark-12260][wip][Streaming]Graceful Shutdown...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10252#issuecomment-163873097
  
    **[Test build #47576 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47576/consoleFull)** for PR 10252 at commit [`163d362`](https://github.com/apache/spark/commit/163d3627fc1c00c959729a3969906657e5e9eba8).
     * This patch **fails MiMa tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:\n  * `abstract class DSteamDumpFormat[K, S] extends Serializable `\n  * `class CheckpointDumpFormat[K: ClassTag, S: ClassTag](val identity: String)`\n  * `abstract class DumpableDStream[K: ClassTag, S: ClassTag](ssc: StreamingContext)`\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [Spark-12260][wip][Streaming]Graceful Shutdown...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the pull request:

    https://github.com/apache/spark/pull/10252#issuecomment-185164712
  
    @mwws please close this PR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [Spark-12260][wip][Streaming]Graceful Shutdown...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10252#discussion_r47366152
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala ---
    @@ -330,6 +330,20 @@ abstract class DStream[T: ClassTag] (
       }
     
       /**
    +   * compute the last valid time till input time
    +   * (validTime = zeroTime + N * slideDuration)
    +   */
    +  private[streaming] def lastValidTime(time: Time): Time = {
    +    if (time < zeroTime) {
    +      throw new IllegalArgumentException(
    +          s"Input(${time}) must bigger then zeroTime($zeroTime)")
    +    }
    +
    +    val multi: Int = ((time - zeroTime) / slideDuration).floor.toInt
    --- End diff --
    
    The same as `val multi: Int = ((time - zeroTime) / slideDuration)`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [Spark-12260][wip][Streaming]Graceful Shutdown...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10252#discussion_r47364689
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala ---
    @@ -18,12 +18,14 @@
     package org.apache.spark.streaming
     
     import scala.collection.mutable.ArrayBuffer
    -import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
    +import java.io.{ ObjectInputStream, IOException, ObjectOutputStream }
    --- End diff --
    
    unnecessary spaces before `}`  or after `{`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [Spark-12260][wip][Streaming]Graceful Shutdown...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10252#issuecomment-163651602
  
    **[Test build #47499 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47499/consoleFull)** for PR 10252 at commit [`6e0fcfc`](https://github.com/apache/spark/commit/6e0fcfc33a7b1dd455a4d3f331df766bace02153).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:\n  * `abstract class DSteamDumpFormat[K, S] extends Serializable`\n  * `class CheckpointDumpFormat[K: ClassTag, S: ClassTag](val identity: String)`\n  * `abstract class DumpableDStream[K: ClassTag, S: ClassTag](ssc: StreamingContext)`\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [Spark-12260][wip][Streaming]Graceful Shutdown...

Posted by mwws <gi...@git.apache.org>.
Github user mwws closed the pull request at:

    https://github.com/apache/spark/pull/10252


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [Spark-12260][wip][Streaming]Graceful Shutdown...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10252#issuecomment-163651608
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [Spark-12260][wip][Streaming]Graceful Shutdown...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10252#issuecomment-163873124
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org