You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by uce <gi...@git.apache.org> on 2015/12/04 13:04:18 UTC

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

GitHub user uce opened a pull request:

    https://github.com/apache/flink/pull/1434

    [FLINK-2976] Allow to trigger checkpoints manually

    This PR contains **documentation**: https://github.com/uce/flink/blob/2976-savepoints/docs/apis/savepoints.md
    
    **In a nutshell**, savepoints `(*)` are **manually triggered checkpoints**, which take a snapshot of a program and write it out to an external state backend. This allows you to stop and resume your program without loosing intermediate state.
    
    **Why is this nice?** Because you don't have to replay everything when you redeploy your long running streaming job after changing it or updating to a newer Flink version.
    
    `(*)` Initially I wrote it as sa**F**epoints, but then settled on sa**V**epoints after stubmling across a related feature in [an Oracle SQL reference](https://docs.oracle.com/cd/B19306_01/appdev.102/b14261/savepoint_statement.htm). What do you think? :smile: http://doodle.com/poll/2z2cp9hxu7eucdsz
    
    ## Example
    
    Start your stateful streaming program via `flink/bin run ...`.
    
    ```
    $ bin/flink list
    ------------------ Running/Restarting Jobs -------------------
    04.12.2015 13:51:10 : 46da86f25ca8daa1bbff8ccae64d53af : Flink Streaming Job (RUNNING)
    --------------------------------------------------------------
    ```
    
    Wait for some checkpoints to complete:
    
    ```
    $ tail -f log/flink-hadoop-client-uce-m.log
    ...
    13:50:59,806 INFO  org.apache.flink.runtime.jobmanager.JobManager - Status of job 46da86f25ca8daa1bbff8ccae64d53af (Flink Streaming Job) changed to RUNNING.
    ...
    13:55:37,225 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 1 @ 1449150937225
    13:55:37,581 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 1
    13:55:42,225 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 2 @ 1449150942225
    13:55:42,328 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 2
    ...
    13:56:27,225 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 11 @ 1449150987225
    13:56:27,237 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 11
    ...
    ```
    
    Trigger a savepoint and cancel the job:
    
    ```
    $ bin/flink savepoint 46da86f25ca8daa1bbff8ccae64d53af
    Triggering savepoint for job 46da86f25ca8daa1bbff8ccae64d53af. Waiting for response...
    Savepoint completed. Path: jobmanager://savepoints/1
    You can resume your program from this savepoint with the run command.
    
    $ bin/flink cancel 46da86f25ca8daa1bbff8ccae64d53af
    ```
    
    Now you can restart the program from the savepoint:
    
    ```
    $ bin/flink run --fromSavepoint jobmanager://savepoints/1 ...
    ```
    
    This will resume the application from the state of the savepoint.
    
    ## Changes to Flink
    
    I focussed on **not changing any major Flink component** for this. Savepoints use the same checkpointing mechanism as the periodic checkpoints with some plumbing around it.
    
    ### Savepoint coordinator
    
    In addition to the `CheckpointCoordinator`, we add another instance of the `CheckpointCoordinator` called `SavepointCoordinator`. This class extends the regular coordinator and registers some callbacks on shutdown, fully ack'ed checkpoint, and cancelled checkpoint. For this, I've added three callback methods to the checkpoint coordinator, which are overwritten by the savepoint coordinator. With two separate coordinators, periodic checkpoints and savepoints don't interfere with each other.
    
    The savepoint coordinator manages a map of `checkpoint ID => future`. The futures are completed when the checkpoint is ack'ed or cancelled (or the coordinator shuts down altogether).
    
    #### Restoring
    
    Restore happens on job submission if a savepoint path is provided in the `JobSnapshottingSettings`. The restore mechanism is similar to the regular checkpoint restore, but with some further sanity checks to ensure that the state to task mapping is correct (see below). All state has to be mapped to the restored program.
    
    ### JobManagerMessages
    
    Added `TriggerSavepoint(JobID)` and `DisposeSavepoint(String)` Akka messages to the job manager. They trigger and dispose the savepoints respectively. These operations work asynchronously and respond the the request when the savepoint futures complete. The requests are triggered by the user (see CLI frontend).
     
    ### Hashing of StreamNodes
    
    The state to task mapping of checkpoints happens via `(jobVertexID, subtaskIndex)`. With this change, the jobVertexIDs of streaming programs are generated deterministically with respect to the structure of the program. This is needed to make sure that a restore with a new program can map the savepoint state to the tasks.
    
    The hash starts from the sources and takes multiple things into account:
    - parallelism
    - user function class
    - hash of the input
    - hash of the outputs
    - stream node ID
    
    The automatic generations makes sure that you can just use the savepoints, but it is actually *not recommended*, because you cannot change the program in any meaningful way (except changing the user function internals).
    
    That's why the **recommended option** is to specify a unique ID as input to the hasher on the DataStream API level:
    
    ```
    DataStream<String> stream = env.
      // Stateful source (e.g. Kafka) with ID
      .addSource(new StatefulSource()).uid("source-id")
      .shuffle()
      // The stateful mapper with ID
      .map(new StatefulMapper()).uid("mapper-id")
    
    // Stateless sink (no specific ID required)
    stream.print()
    ```
    
    If you give IDs to all stateful operators, you can happily re-arrange and change the topology (except for parallelism, see below).
    
    ### Application ID and DbStateBackend
    
    Savepoints are pairs of `(ApplicationID, CompletedCheckpoint)`. I've added a new `ApplicationID` to allow scoping tasks across multiple job submissions (which have changing job IDs). This is for example required to restore state in the `DbStateBackend`. After consulting with @gyfora I've changed all references to `JobID` in `DbStateBackend` to  `ApplicationID`.
    
    The ApplicationID is assigned in the `ExecutionGraph` only and is reset to the application ID of the savepoint if there is a restore operation. This touches almost nothing of the existing code and it is only propagated to the `TaskDeploymentDescriptor` and `Environment` of the `Task` instances.
    
    ### State storage
    
    The state storage **does not** instantiate the regular state backend on the job manager. It is essentially a set of a few helper classes, which allow to put and get some state to the file system or the job manager's heap. I think this is fine for now, because I didn't want to make changes to the central state abstractions, which are kind of in flux right now. But we should think about it in the future.
    
    ### Configuration and CLIFrontend
    
    This works out of the box if the job is using checkpointing. The default state backend for savepoints is `jobmanager`, which allows to stop and restore a program while the same job manager is running.
    
    For configuration, there are two new keys:
    ```
    state.backend.savepoints
    state.backend.savepoints.fs.dir
    ```
    If you don't specify these, the regular state backend configuration is used with the `jobmanager` as a fallback if no viable config is found.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/uce/flink 2976-savepoints

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1434.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1434
    
----
commit d63ea457d11c89378c4d0f0173a5ac372b5a3f58
Author: Ufuk Celebi <uc...@apache.org>
Date:   2015-12-01T16:31:32Z

    [FLINK-2976] [streaming-java, streaming-scala] Set JobVertexID based on stream node hash

commit 4896bdcb0107059b2e0f57afc5d7776c26b820d7
Author: Ufuk Celebi <uc...@apache.org>
Date:   2015-12-01T16:51:44Z

    [FLINK-2976] [runtime] Add StateStore<T>

commit c748ac935edcd97a1cd7c49662420f55c9806354
Author: Ufuk Celebi <uc...@apache.org>
Date:   2015-12-01T17:33:31Z

    [FLINK-2976] [core, runtime, streaming-java] Add ApplicationID to ExecutionGraph

commit 6b20f6924df29c56783b0c0772a61a05639ef619
Author: Ufuk Celebi <uc...@apache.org>
Date:   2015-12-01T17:38:34Z

    [FLINK-2976] [runtime] Add setCount(long newCount) to CheckpointIDCounter

commit 60a0774133f460627a0d9949219299b1875d3c1f
Author: Ufuk Celebi <uc...@apache.org>
Date:   2015-12-01T17:47:07Z

    [FLINK-2976] [runtime, tests] Add SavepointCoordinator

commit 182b157cc92b09cdc5ce2867a4dd5cbc234a385d
Author: Ufuk Celebi <uc...@apache.org>
Date:   2015-12-01T17:49:14Z

    [FLINK-2976] [clients] Add savepoint commands to CliFrontend

commit a69c550967da3acb77ae2c8b5cef2982e835e6b4
Author: Ufuk Celebi <uc...@apache.org>
Date:   2015-12-02T15:21:06Z

    [FLINK-2976] [docs] Add docs about savepoints

commit b38481fe147a470127a045eb11edae3af198c134
Author: Ufuk Celebi <uc...@apache.org>
Date:   2015-12-03T10:35:46Z

    [FLINK-2976] [streaming-contrib] Use ApplicationID in DbStateBackend instead of JobID

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1434#discussion_r46936461
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---
    @@ -495,6 +513,76 @@ class JobManager(
         case checkpointMessage : AbstractCheckpointMessage =>
           handleCheckpointMessage(checkpointMessage)
     
    +    case TriggerSavepoint(jobId) =>
    +      currentJobs.get(jobId) match {
    +        case Some((graph, _)) =>
    +          val savepointCoordinator = graph.getSavepointCoordinator()
    +
    +          if (savepointCoordinator != null) {
    +            // Immutable copy for the future
    +            val senderRef = sender()
    +
    +            future {
    +              try {
    +                // Do this async, because checkpoint coordinator operations can
    +                // contain blocking calls to the state backend or ZooKeeper.
    +                val savepointFuture = savepointCoordinator.triggerSavepoint(
    +                  System.currentTimeMillis())
    +
    +                savepointFuture.onComplete {
    +                  // Success, respond with the savepoint path
    +                  case scala.util.Success(savepointPath) =>
    +                    senderRef ! TriggerSavepointSuccess(jobId, savepointPath)
    +
    +                  // Failure, respond with the cause
    +                  case scala.util.Failure(t) =>
    +                    senderRef ! TriggerSavepointFailure(jobId,
    --- End diff --
    
    If you break argument lists, then every argument is written in its own line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1434#discussion_r46839288
  
    --- Diff: docs/apis/savepoints.md ---
    @@ -0,0 +1,108 @@
    +---
    +title: "Savepoints"
    +is_beta: false
    +---
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +
    +Programs written in the [Data Stream API]({{ site.baseurl }}/apis/streaming_guide.html) can resume execution from a **savepoint**. Savepoints allow both updating your programs and your Flink cluster without loosing any state. This page covers all steps to trigger, restore, and dispose savepoints. For more details on how Flink handles state and failures, check out the [State in Streaming Programs]({{ site.baseurl }}/apis/state_backends.html) and [Fault Tolerance]({{ site.baseurl }}/apis/fault_tolerance.html) pages.
    +
    +* toc
    +{:toc}
    +
    +## Overview
    +
    +Savepoints are **manually triggered checkpoints**, which take a snapshot of the program and write it out to a state backend. They rely on the regular checkpointing mechanism for this. During execution programs are periodically snapshotted on the worker nodes and produce checkpoints. For recovery only the last completed checkpoint is needed and older checkpoints can be safely discarded as soon as a new one is completed.
    +
    +Savepoints are similar to these periodic checkpoints except that they are **triggered by the user** and **don't automatically expire** when newer checkpoints are completed.
    +
    +<img src="fig/savepoints-overview.png" class="center" />
    +
    +In the above example the workers produce checkpoints **c<sub>1</sub>**, **c<sub>2</sub>**, **c<sub>3</sub>**, and **c<sub>4</sub>** for job *0xA312Bc*. Periodic checkpoints **c<sub>1</sub>** and **c<sub>3</sub>** have already been *discarded* and **c<sub>4</sub>** is the *latest checkpoint*. **c<sub>2</sub> is special**. It is the state associated with the savepoint **s<sub>1</sub>** and has been triggered by the user and it doesn't expire automatically (as c<sub>1</sub> and c<sub>3</sub> did after the completion of newer checkpoints).
    +
    +Note that **s<sub>1</sub>** is only a **pointer to the actual checkpoint data c<sub>2</sub>**. This means that the actual state is *not copied* for the savepoint and periodic checkpoint data is kept around.
    +
    +## Configuration
    +
    +Savepoints point to regular checkpoints and store their state in a configured [state backend]({{ site.baseurl }}/apis/state_backends.html). Currently, the supported state backends are **jobmanager** and **filesystem**. The state backend configuration for the regular periodic checkpoints is **independent** of the savepoint state backend configuration. Checkpoint data is **not copied** for savepoints, but points to the configured checkpoint state backend.
    +
    +### JobManager
    +
    +This is the **default backend** for savepoints.
    +
    +Savepoints are stored on the heap of the job manager. They are *lost* after the job manager is shut down. This mode is only useful if you want to *stop* and *resume* your program while the **same cluster** keeps running. It is *not recommended* for production use. Savepoints are *not* part the [job manager's highly availabile]({{ site.baseurl }}/setup/jobmanager_high_availability.html) state.
    +
    +<pre>
    +state.backend.savepoints: jobmanager
    +</pre>
    +
    +**Note**: If you don't configure a specific state backend for the savepoints, the default state backend (config key `state.backend`) will be used.
    +
    +### File system
    +
    +Savepoints are stored in the configured **file system directory**. They are available between cluster instances and allow to move your program to another cluster.
    +
    +<pre>
    +state.backend.savepoints: filesystem
    +state.backend.savepoints.fs.dir: hdfs:///flink/savepoints
    +</pre>
    +
    +**Note**: If you don't configure a specific directory, the checkpoint directory (config key `state.backend.fs.checkpointdir`) will be used.
    +
    +**Important**: A savepoint is a pointer to completed checkpoint. That means that the state of a savepoint is not only found in the savepoint file itself, but also needs the actual checkpoint data (e.g. in a set of further files).
    +
    +## Changes to your program
    +
    +Savepoints **work out of the box**, but it is **highly recommended** that you slightly adjust your programs in order to be able to work with savepoints in future versions of your program.
    +
    +<img src="fig/savepoints-program_ids.png" class="center" />'
    +
    +For savepoints **only stateful tasks matter**. In the above example, the source and map tasks are stateful whereas the sink is not stateful. Therefore, only the state of the source and map tasks are part of the savepoint.
    +
    +Each task is identified by its **generated task IDs** and **subtask index**. In the above example the state of the source (**s<sub>1</sub>**, **s<sub>2</sub>**) and map tasks (**m<sub>1</sub>**, **m<sub>2</sub>**) is identified by their respective task ID (*0xC322EC* for the source tasks and *0x27B3EF* for the map tasks) and subtask index. There is no state for the sinks (**t<sub>1</sub>**, **t<sub>2</sub>**). Their IDs therefore do not matter.
    +
    +The IDs are generated **deterministically** from your program structure. This means that if your program does not change, the IDs do not change. In this case, it is straight forward to restore the state from a savepoint by mapping it back to the same task IDs and subtask indexes. This allows you to work with savepoints out of the box, but gets problematic as soon as you make changes to your program, because they result in changed IDs and the savepoint state cannot be mapped to your program any more.
    --- End diff --
    
    But changes to the UDF are allowed, right? If so, then we should state that here more clearly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1434


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1434#discussion_r46840284
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
    @@ -630,6 +638,136 @@ protected int cancel(String[] args) {
     		}
     	}
     
    +	/**
    +	 * Executes the SAVEPOINT action.
    +	 *
    +	 * @param args Command line arguments for the cancel action.
    +	 */
    +	protected int savepoint(String[] args) {
    +		LOG.info("Running 'savepoint' command.");
    +
    +		SavepointOptions options;
    +		try {
    +			options = CliFrontendParser.parseSavepointCommand(args);
    +		}
    +		catch (CliArgsException e) {
    +			return handleArgException(e);
    +		}
    +		catch (Throwable t) {
    +			return handleError(t);
    +		}
    +
    +		// evaluate help flag
    +		if (options.isPrintHelp()) {
    +			CliFrontendParser.printHelpForCancel();
    +			return 0;
    +		}
    +
    +		if (options.isDispose()) {
    +			// Discard
    +			return disposeSavepoint(options, options.getDisposeSavepointPath());
    +		}
    +		else {
    +			// Trigger
    +			String[] cleanedArgs = options.getArgs();
    +			JobID jobId;
    +
    +			if (cleanedArgs.length > 0) {
    +				String jobIdString = cleanedArgs[0];
    +				try {
    +					jobId = new JobID(StringUtils.hexStringToByte(jobIdString));
    +				}
    +				catch (Exception e) {
    +					LOG.error("Error: The value for the Job ID is not a valid ID.");
    --- End diff --
    
    Maybe we could use `handleError` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1434#discussion_r46940511
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---
    @@ -440,4 +478,226 @@ private void configureExecutionRetryDelay() {
     		long executionRetryDelay = streamGraph.getExecutionConfig().getExecutionRetryDelay();
     		jobGraph.setExecutionRetryDelay(executionRetryDelay);
     	}
    +
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Returns a map with a hash for each {@link StreamNode} of the {@link
    +	 * StreamGraph}. The hash is used as the {@link JobVertexID} in order to
    +	 * identify nodes across job submissions if they didn't change.
    +	 *
    +	 * <p>The complete {@link StreamGraph} is traversed. The hash is either
    +	 * computed from the transformation's user-specified id (see
    +	 * {@link StreamTransformation#getUid()}) or generated in a deterministic way.
    +	 *
    +	 * <p>The generated hash is deterministic with respect to:
    +	 * <ul>
    +	 * <li>node-local properties (like parallelism, UDF, node ID),
    +	 * <li>chained output nodes, and
    +	 * <li>input nodes hashes
    +	 * </ul>
    +	 *
    +	 * @return A map from {@link StreamNode#id} to hash as 16-byte array.
    +	 */
    +	private Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes() {
    +		// The hash function used to generate the hash
    +		final HashFunction hashFunction = Hashing.murmur3_128(0);
    +		final Map<Integer, byte[]> hashes = new HashMap<>();
    +
    +		Set<Integer> visited = new HashSet<>();
    +		Queue<StreamNode> remaining = new ArrayDeque<>();
    +
    +		// We need to make the source order deterministic. This depends on the
    +		// ordering of the sources in the Environment, e.g. if a source X is
    +		// added before source Y, X will have a lower ID than Y (assigned by a
    +		// static counter).
    +		List<Integer> sources = new ArrayList<>();
    +		for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
    +			sources.add(sourceNodeId);
    +		}
    +
    +		Collections.sort(sources);
    +
    +		// Traverse the graph in a breadth-first manner. Keep in mind that
    +		// the graph is not a tree and multiple paths to nodes can exist.
    +
    +		// Start with source nodes
    +		for (Integer sourceNodeId : sources) {
    +			remaining.add(streamGraph.getStreamNode(sourceNodeId));
    +			visited.add(sourceNodeId);
    +		}
    +
    +		StreamNode currentNode;
    +		while ((currentNode = remaining.poll()) != null) {
    +			// Generate the hash code. Because multiple path exist to each
    +			// node, we might not have all required inputs available to
    +			// generate the hash code.
    +			if (generateNodeHash(currentNode, hashFunction, hashes)) {
    +				// Add the child nodes
    +				for (StreamEdge outEdge : currentNode.getOutEdges()) {
    --- End diff --
    
    How are the out edges defined? Are they also set according to the creation order? If this is the case, then this means that changing the order of intermediate operators will render them incompatible even though the topology stays the same.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1434#discussion_r46938229
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---
    @@ -455,32 +490,43 @@ else if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
     			return false;
     		}
     	}
    -	
    -	public void receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws Exception {
    +
    +	/**
    +	 * Receives an AcknowledgeCheckpoint message and returns whether the
    +	 * message was associated with a pending checkpoint.
    +	 */
    --- End diff --
    
    JavaDocs not complete


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1434#discussion_r47777748
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/HeapStateStore.java ---
    @@ -0,0 +1,84 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.checkpoint;
    +
    +import java.io.Serializable;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +
    +/**
    + * Java heap backed {@link StateStore}.
    + *
    + * @param <T> Type of state
    + */
    +class HeapStateStore<T extends Serializable> implements StateStore<T> {
    +
    +	private final Map<String, T> stateMap = new HashMap<>();
    +
    +	private final AtomicInteger idCounter = new AtomicInteger();
    +
    +	@Override
    +	public String putState(T state) throws Exception {
    +		checkNotNull(state, "State");
    +
    +		String key = "jobmanager://savepoints/" + idCounter.incrementAndGet();
    --- End diff --
    
    Addressed (comment refers to old commit)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/flink/pull/1434#issuecomment-170558665
  
    :+1: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1434#discussion_r46856784
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---
    @@ -440,4 +478,228 @@ private void configureExecutionRetryDelay() {
     		long executionRetryDelay = streamGraph.getExecutionConfig().getExecutionRetryDelay();
     		jobGraph.setExecutionRetryDelay(executionRetryDelay);
     	}
    +
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Returns a map with a hash for each {@link StreamNode} of the {@link
    +	 * StreamGraph}. The hash is used as the {@link JobVertexID} in order to
    +	 * identify nodes across job submissions if they didn't change.
    +	 *
    +	 * <p>The complete {@link StreamGraph} is traversed. The hash is either
    +	 * computed from the transformation's user-specified id (see
    +	 * {@link StreamTransformation#getUid()}) or generated in a deterministic way.
    +	 *
    +	 * <p>The generated hash is deterministic with respect to:
    +	 * <ul>
    +	 * <li>node-local properties (like parallelism, UDF, node ID),
    +	 * <li>chained output nodes, and
    +	 * <li>input nodes hashes
    +	 * </ul>
    +	 *
    +	 * @return A map from {@link StreamNode#id} to hash as 16-byte array.
    +	 */
    +	private Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes() {
    +		// The hash function used to generate the hash
    +		final HashFunction hashFunction = Hashing.murmur3_128(0);
    +		final Map<Integer, byte[]> hashes = new HashMap<>();
    +
    +		Set<Integer> visited = new HashSet<>();
    +		Queue<StreamNode> remaining = new ArrayDeque<>();
    +
    +		// We need to make the source order deterministic. This depends on the
    +		// ordering of the sources in the Environment, e.g. if a source X is
    +		// added before source Y, X will have a lower ID than Y (assigned by a
    +		// static counter).
    --- End diff --
    
    For the intermediate nodes its used to circumvent a collision when you add the same operator the same source twice (rather artificial?).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1434#discussion_r46857083
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---
    @@ -440,4 +478,228 @@ private void configureExecutionRetryDelay() {
     		long executionRetryDelay = streamGraph.getExecutionConfig().getExecutionRetryDelay();
     		jobGraph.setExecutionRetryDelay(executionRetryDelay);
     	}
    +
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Returns a map with a hash for each {@link StreamNode} of the {@link
    +	 * StreamGraph}. The hash is used as the {@link JobVertexID} in order to
    +	 * identify nodes across job submissions if they didn't change.
    +	 *
    +	 * <p>The complete {@link StreamGraph} is traversed. The hash is either
    +	 * computed from the transformation's user-specified id (see
    +	 * {@link StreamTransformation#getUid()}) or generated in a deterministic way.
    +	 *
    +	 * <p>The generated hash is deterministic with respect to:
    +	 * <ul>
    +	 * <li>node-local properties (like parallelism, UDF, node ID),
    +	 * <li>chained output nodes, and
    +	 * <li>input nodes hashes
    +	 * </ul>
    +	 *
    +	 * @return A map from {@link StreamNode#id} to hash as 16-byte array.
    +	 */
    +	private Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes() {
    +		// The hash function used to generate the hash
    +		final HashFunction hashFunction = Hashing.murmur3_128(0);
    +		final Map<Integer, byte[]> hashes = new HashMap<>();
    +
    +		Set<Integer> visited = new HashSet<>();
    +		Queue<StreamNode> remaining = new ArrayDeque<>();
    +
    +		// We need to make the source order deterministic. This depends on the
    +		// ordering of the sources in the Environment, e.g. if a source X is
    +		// added before source Y, X will have a lower ID than Y (assigned by a
    +		// static counter).
    --- End diff --
    
    I just looked into it again and I don't think that this is necessary at all (for the sources).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1434: [FLINK-2976] Allow to trigger checkpoints manually

Posted by coveralls <gi...@git.apache.org>.
Github user coveralls commented on the issue:

    https://github.com/apache/flink/pull/1434
  
    
    [![Coverage Status](https://coveralls.io/builds/13763854/badge)](https://coveralls.io/builds/13763854)
    
    Changes Unknown when pulling **d9743343ec0c268a99c46ec7324a603506827c78 on uce:2976-savepoints** into ** on apache:master**.



---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1434#issuecomment-165116732
  
    I think this is good behavior, it is conservative and if users want it differently they can manually specify the ID.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/flink/pull/1434#issuecomment-170477891
  
    Thanks Gyula! I will rebase and merge this if there are no objections.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/flink/pull/1434#issuecomment-164555368
  
    I think we should first check whether the savepoint exists or not before trying to restore from it. Currently an invalid savepoint will send the job into a restart loop.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/flink/pull/1434#issuecomment-170550295
  
    I've rebased this. Waiting for Travis and then merging...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1434#issuecomment-162526253
  
    I think save point is the way to go, see here: https://en.wikipedia.org/wiki/Saved_game#Save_point :smile: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1434#discussion_r46839601
  
    --- Diff: docs/apis/savepoints.md ---
    @@ -0,0 +1,108 @@
    +---
    +title: "Savepoints"
    +is_beta: false
    +---
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +
    +Programs written in the [Data Stream API]({{ site.baseurl }}/apis/streaming_guide.html) can resume execution from a **savepoint**. Savepoints allow both updating your programs and your Flink cluster without loosing any state. This page covers all steps to trigger, restore, and dispose savepoints. For more details on how Flink handles state and failures, check out the [State in Streaming Programs]({{ site.baseurl }}/apis/state_backends.html) and [Fault Tolerance]({{ site.baseurl }}/apis/fault_tolerance.html) pages.
    +
    +* toc
    +{:toc}
    +
    +## Overview
    +
    +Savepoints are **manually triggered checkpoints**, which take a snapshot of the program and write it out to a state backend. They rely on the regular checkpointing mechanism for this. During execution programs are periodically snapshotted on the worker nodes and produce checkpoints. For recovery only the last completed checkpoint is needed and older checkpoints can be safely discarded as soon as a new one is completed.
    +
    +Savepoints are similar to these periodic checkpoints except that they are **triggered by the user** and **don't automatically expire** when newer checkpoints are completed.
    +
    +<img src="fig/savepoints-overview.png" class="center" />
    +
    +In the above example the workers produce checkpoints **c<sub>1</sub>**, **c<sub>2</sub>**, **c<sub>3</sub>**, and **c<sub>4</sub>** for job *0xA312Bc*. Periodic checkpoints **c<sub>1</sub>** and **c<sub>3</sub>** have already been *discarded* and **c<sub>4</sub>** is the *latest checkpoint*. **c<sub>2</sub> is special**. It is the state associated with the savepoint **s<sub>1</sub>** and has been triggered by the user and it doesn't expire automatically (as c<sub>1</sub> and c<sub>3</sub> did after the completion of newer checkpoints).
    +
    +Note that **s<sub>1</sub>** is only a **pointer to the actual checkpoint data c<sub>2</sub>**. This means that the actual state is *not copied* for the savepoint and periodic checkpoint data is kept around.
    +
    +## Configuration
    +
    +Savepoints point to regular checkpoints and store their state in a configured [state backend]({{ site.baseurl }}/apis/state_backends.html). Currently, the supported state backends are **jobmanager** and **filesystem**. The state backend configuration for the regular periodic checkpoints is **independent** of the savepoint state backend configuration. Checkpoint data is **not copied** for savepoints, but points to the configured checkpoint state backend.
    +
    +### JobManager
    +
    +This is the **default backend** for savepoints.
    +
    +Savepoints are stored on the heap of the job manager. They are *lost* after the job manager is shut down. This mode is only useful if you want to *stop* and *resume* your program while the **same cluster** keeps running. It is *not recommended* for production use. Savepoints are *not* part the [job manager's highly availabile]({{ site.baseurl }}/setup/jobmanager_high_availability.html) state.
    +
    +<pre>
    +state.backend.savepoints: jobmanager
    +</pre>
    +
    +**Note**: If you don't configure a specific state backend for the savepoints, the default state backend (config key `state.backend`) will be used.
    +
    +### File system
    +
    +Savepoints are stored in the configured **file system directory**. They are available between cluster instances and allow to move your program to another cluster.
    +
    +<pre>
    +state.backend.savepoints: filesystem
    +state.backend.savepoints.fs.dir: hdfs:///flink/savepoints
    +</pre>
    +
    +**Note**: If you don't configure a specific directory, the checkpoint directory (config key `state.backend.fs.checkpointdir`) will be used.
    +
    +**Important**: A savepoint is a pointer to completed checkpoint. That means that the state of a savepoint is not only found in the savepoint file itself, but also needs the actual checkpoint data (e.g. in a set of further files).
    +
    +## Changes to your program
    +
    +Savepoints **work out of the box**, but it is **highly recommended** that you slightly adjust your programs in order to be able to work with savepoints in future versions of your program.
    +
    +<img src="fig/savepoints-program_ids.png" class="center" />'
    +
    +For savepoints **only stateful tasks matter**. In the above example, the source and map tasks are stateful whereas the sink is not stateful. Therefore, only the state of the source and map tasks are part of the savepoint.
    +
    +Each task is identified by its **generated task IDs** and **subtask index**. In the above example the state of the source (**s<sub>1</sub>**, **s<sub>2</sub>**) and map tasks (**m<sub>1</sub>**, **m<sub>2</sub>**) is identified by their respective task ID (*0xC322EC* for the source tasks and *0x27B3EF* for the map tasks) and subtask index. There is no state for the sinks (**t<sub>1</sub>**, **t<sub>2</sub>**). Their IDs therefore do not matter.
    +
    +The IDs are generated **deterministically** from your program structure. This means that if your program does not change, the IDs do not change. In this case, it is straight forward to restore the state from a savepoint by mapping it back to the same task IDs and subtask indexes. This allows you to work with savepoints out of the box, but gets problematic as soon as you make changes to your program, because they result in changed IDs and the savepoint state cannot be mapped to your program any more.
    +
    +In order to be able to change your program and **have fixed IDs**, the *DataStream* API provides a method to manually specify the task IDs. Each operator provides a **`uid(String)`** method to override the generated ID. The ID is a String, which will be deterministically hashed to a 16-byte hash value. It is **important** that the specified IDs are **unique per transformation and job**. If this is not the case, job submission will fail.
    +
    +{% highlight scala %}
    +DataStream<String> stream = env.
    +  // Stateful source (e.g. Kafka) with ID
    +  .addSource(new StatefulSource())
    +  .uid("source-id")
    +  .shuffle()
    +  // The stateful mapper with ID
    +  .map(new StatefulMapper())
    +  .uid("mapper-id")
    +
    +// Stateless sink (no specific ID required)
    +stream.print()
    +{% endhighlight %}
    +
    +## Command-line client
    +
    +You control the savepoints via the [command line client]({{site.baseurl}}/apis/cli.html#savepoints).
    +
    +## Current limitations
    +
    +- **Parallelism**: When restoring a savepoint, the parallelism of the program has to match the parallelism of the original program from which the savepoint was drawn. There is no mechanism to re-partition the savepoint's state yet.
    +
    +- **Chaining**: Chained operators are identified by the ID of the first task. It's not possible to manually assign an ID to an intermediate chained task, e.g. in the chain `[  a -> b -> c ]` only **a** can have its ID assigned manually, but not **b** or **c**.
    --- End diff --
    
    What are the implications for the user?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1434#discussion_r46936527
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---
    @@ -495,6 +513,76 @@ class JobManager(
         case checkpointMessage : AbstractCheckpointMessage =>
           handleCheckpointMessage(checkpointMessage)
     
    +    case TriggerSavepoint(jobId) =>
    +      currentJobs.get(jobId) match {
    +        case Some((graph, _)) =>
    +          val savepointCoordinator = graph.getSavepointCoordinator()
    +
    +          if (savepointCoordinator != null) {
    +            // Immutable copy for the future
    +            val senderRef = sender()
    +
    +            future {
    +              try {
    +                // Do this async, because checkpoint coordinator operations can
    +                // contain blocking calls to the state backend or ZooKeeper.
    +                val savepointFuture = savepointCoordinator.triggerSavepoint(
    +                  System.currentTimeMillis())
    +
    +                savepointFuture.onComplete {
    +                  // Success, respond with the savepoint path
    +                  case scala.util.Success(savepointPath) =>
    +                    senderRef ! TriggerSavepointSuccess(jobId, savepointPath)
    +
    +                  // Failure, respond with the cause
    +                  case scala.util.Failure(t) =>
    +                    senderRef ! TriggerSavepointFailure(jobId,
    +                      new Exception("Failed to complete savepoint", t))
    +                }(context.dispatcher)
    +              }
    +              catch {
    +                case e: Exception =>
    +                  senderRef ! TriggerSavepointFailure(jobId, new Exception(
    +                    "Failed to trigger savepoint", e))
    +              }
    +            }(context.dispatcher)
    +          }
    +          else {
    +            sender() ! TriggerSavepointFailure(jobId, new IllegalStateException(
    +              "Checkpointing disabled. You can enable it via the execution environment of " +
    --- End diff --
    
    This is also breaking the argument list.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1434#discussion_r46838672
  
    --- Diff: docs/apis/savepoints.md ---
    @@ -0,0 +1,108 @@
    +---
    +title: "Savepoints"
    +is_beta: false
    +---
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +
    +Programs written in the [Data Stream API]({{ site.baseurl }}/apis/streaming_guide.html) can resume execution from a **savepoint**. Savepoints allow both updating your programs and your Flink cluster without loosing any state. This page covers all steps to trigger, restore, and dispose savepoints. For more details on how Flink handles state and failures, check out the [State in Streaming Programs]({{ site.baseurl }}/apis/state_backends.html) and [Fault Tolerance]({{ site.baseurl }}/apis/fault_tolerance.html) pages.
    +
    +* toc
    +{:toc}
    +
    +## Overview
    +
    +Savepoints are **manually triggered checkpoints**, which take a snapshot of the program and write it out to a state backend. They rely on the regular checkpointing mechanism for this. During execution programs are periodically snapshotted on the worker nodes and produce checkpoints. For recovery only the last completed checkpoint is needed and older checkpoints can be safely discarded as soon as a new one is completed.
    +
    +Savepoints are similar to these periodic checkpoints except that they are **triggered by the user** and **don't automatically expire** when newer checkpoints are completed.
    +
    +<img src="fig/savepoints-overview.png" class="center" />
    +
    +In the above example the workers produce checkpoints **c<sub>1</sub>**, **c<sub>2</sub>**, **c<sub>3</sub>**, and **c<sub>4</sub>** for job *0xA312Bc*. Periodic checkpoints **c<sub>1</sub>** and **c<sub>3</sub>** have already been *discarded* and **c<sub>4</sub>** is the *latest checkpoint*. **c<sub>2</sub> is special**. It is the state associated with the savepoint **s<sub>1</sub>** and has been triggered by the user and it doesn't expire automatically (as c<sub>1</sub> and c<sub>3</sub> did after the completion of newer checkpoints).
    +
    +Note that **s<sub>1</sub>** is only a **pointer to the actual checkpoint data c<sub>2</sub>**. This means that the actual state is *not copied* for the savepoint and periodic checkpoint data is kept around.
    +
    +## Configuration
    +
    +Savepoints point to regular checkpoints and store their state in a configured [state backend]({{ site.baseurl }}/apis/state_backends.html). Currently, the supported state backends are **jobmanager** and **filesystem**. The state backend configuration for the regular periodic checkpoints is **independent** of the savepoint state backend configuration. Checkpoint data is **not copied** for savepoints, but points to the configured checkpoint state backend.
    +
    +### JobManager
    +
    +This is the **default backend** for savepoints.
    +
    +Savepoints are stored on the heap of the job manager. They are *lost* after the job manager is shut down. This mode is only useful if you want to *stop* and *resume* your program while the **same cluster** keeps running. It is *not recommended* for production use. Savepoints are *not* part the [job manager's highly availabile]({{ site.baseurl }}/setup/jobmanager_high_availability.html) state.
    --- End diff --
    
    Typos: *not* part **of** the
    
    highly available


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1434#issuecomment-162852179
  
    Really good work @uce. I only had a comment concerning the hash generation and compatibility between topologies and some minor style comments. Once these are addressed +1 for merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1434#discussion_r46845825
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java ---
    @@ -0,0 +1,359 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.checkpoint;
    +
    +import akka.actor.ActorSystem;
    +import akka.actor.Props;
    +import org.apache.flink.api.common.ApplicationID;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.execution.ExecutionState;
    +import org.apache.flink.runtime.executiongraph.Execution;
    +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
    +import org.apache.flink.runtime.executiongraph.ExecutionVertex;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.instance.AkkaActorGateway;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.jobmanager.RecoveryMode;
    +import org.apache.flink.types.IntValue;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.concurrent.Future;
    +import scala.concurrent.Promise;
    +
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +
    +/**
    + * The savepoint coordinator is a slightly modified variant of the regular
    + * checkpoint coordinator. Checkpoints are not triggered periodically, but
    + * manually. The actual checkpointing mechanism is the same as for periodic
    + * checkpoints, only the control flow is modified.
    + *
    + * <p>The savepoint coordinator is meant to be used as a separate coordinator
    + * instance. Otherwise, there can be unwanted queueing effects like discarding
    + * savepoints, because of in-progress periodic checkpoints.
    + *
    + * <p>The savepoint coordnator registers callbacks on the regular checkpoint
    --- End diff --
    
    typo: coordinator


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1434#issuecomment-162168898
  
    Awesome change! I think its a feature many users are asking for because its crucial for production use!
    I'll try to review it soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1434#discussion_r46850418
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---
    @@ -495,6 +513,76 @@ class JobManager(
         case checkpointMessage : AbstractCheckpointMessage =>
           handleCheckpointMessage(checkpointMessage)
     
    +    case TriggerSavepoint(jobId) =>
    +      currentJobs.get(jobId) match {
    +        case Some((graph, _)) =>
    +          val savepointCoordinator = graph.getSavepointCoordinator()
    +
    +          if (savepointCoordinator != null) {
    +            // Immutable copy for the future
    +            val senderRef = sender()
    +
    +            future {
    +              try {
    +                // Do this async, because checkpoint coordinator operations can
    +                // contain blocking calls to the state backend or ZooKeeper.
    +                val savepointFuture = savepointCoordinator.triggerSavepoint(
    +                  System.currentTimeMillis())
    +
    +                savepointFuture.onComplete {
    +                  // Success, respond with the savepoint path
    +                  case scala.util.Success(savepointPath) =>
    +                    senderRef ! TriggerSavepointSuccess(jobId, savepointPath)
    +
    +                  // Failure, respond with the cause
    +                  case scala.util.Failure(t) =>
    +                    senderRef ! TriggerSavepointFailure(jobId,
    +                      new Exception("Failed to complete savepoint", t))
    +                }(context.dispatcher)
    +              }
    +              catch {
    +                case e: Exception =>
    +                  senderRef ! TriggerSavepointFailure(jobId, new Exception(
    +                    "Failed to trigger savepoint", e))
    +              }
    +            }(context.dispatcher)
    +          }
    +          else {
    --- End diff --
    
    The same for `else`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1434#discussion_r46855123
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---
    @@ -440,4 +478,228 @@ private void configureExecutionRetryDelay() {
     		long executionRetryDelay = streamGraph.getExecutionConfig().getExecutionRetryDelay();
     		jobGraph.setExecutionRetryDelay(executionRetryDelay);
     	}
    +
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Returns a map with a hash for each {@link StreamNode} of the {@link
    +	 * StreamGraph}. The hash is used as the {@link JobVertexID} in order to
    +	 * identify nodes across job submissions if they didn't change.
    +	 *
    +	 * <p>The complete {@link StreamGraph} is traversed. The hash is either
    +	 * computed from the transformation's user-specified id (see
    +	 * {@link StreamTransformation#getUid()}) or generated in a deterministic way.
    +	 *
    +	 * <p>The generated hash is deterministic with respect to:
    +	 * <ul>
    +	 * <li>node-local properties (like parallelism, UDF, node ID),
    +	 * <li>chained output nodes, and
    +	 * <li>input nodes hashes
    +	 * </ul>
    +	 *
    +	 * @return A map from {@link StreamNode#id} to hash as 16-byte array.
    +	 */
    +	private Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes() {
    +		// The hash function used to generate the hash
    +		final HashFunction hashFunction = Hashing.murmur3_128(0);
    +		final Map<Integer, byte[]> hashes = new HashMap<>();
    +
    +		Set<Integer> visited = new HashSet<>();
    +		Queue<StreamNode> remaining = new ArrayDeque<>();
    +
    +		// We need to make the source order deterministic. This depends on the
    +		// ordering of the sources in the Environment, e.g. if a source X is
    +		// added before source Y, X will have a lower ID than Y (assigned by a
    +		// static counter).
    --- End diff --
    
    Does this mean that if we change the creation order of the sources that the program becomes incompatible? Even if the rest stays the same?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/flink/pull/1434#issuecomment-164296923
  
    How do I increase the timeout for the savepoint? I rebased on the latest version but it still times out on 100000 ms. 
    
    Otherwise awesome work :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1434#discussion_r47777988
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---
    @@ -440,4 +478,226 @@ private void configureExecutionRetryDelay() {
     		long executionRetryDelay = streamGraph.getExecutionConfig().getExecutionRetryDelay();
     		jobGraph.setExecutionRetryDelay(executionRetryDelay);
     	}
    +
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Returns a map with a hash for each {@link StreamNode} of the {@link
    +	 * StreamGraph}. The hash is used as the {@link JobVertexID} in order to
    +	 * identify nodes across job submissions if they didn't change.
    +	 *
    +	 * <p>The complete {@link StreamGraph} is traversed. The hash is either
    +	 * computed from the transformation's user-specified id (see
    +	 * {@link StreamTransformation#getUid()}) or generated in a deterministic way.
    +	 *
    +	 * <p>The generated hash is deterministic with respect to:
    +	 * <ul>
    +	 * <li>node-local properties (like parallelism, UDF, node ID),
    +	 * <li>chained output nodes, and
    +	 * <li>input nodes hashes
    +	 * </ul>
    +	 *
    +	 * @return A map from {@link StreamNode#id} to hash as 16-byte array.
    +	 */
    +	private Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes() {
    +		// The hash function used to generate the hash
    +		final HashFunction hashFunction = Hashing.murmur3_128(0);
    +		final Map<Integer, byte[]> hashes = new HashMap<>();
    +
    +		Set<Integer> visited = new HashSet<>();
    +		Queue<StreamNode> remaining = new ArrayDeque<>();
    +
    +		// We need to make the source order deterministic. This depends on the
    +		// ordering of the sources in the Environment, e.g. if a source X is
    +		// added before source Y, X will have a lower ID than Y (assigned by a
    +		// static counter).
    +		List<Integer> sources = new ArrayList<>();
    +		for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
    +			sources.add(sourceNodeId);
    +		}
    +
    +		Collections.sort(sources);
    +
    +		// Traverse the graph in a breadth-first manner. Keep in mind that
    +		// the graph is not a tree and multiple paths to nodes can exist.
    +
    +		// Start with source nodes
    +		for (Integer sourceNodeId : sources) {
    +			remaining.add(streamGraph.getStreamNode(sourceNodeId));
    +			visited.add(sourceNodeId);
    +		}
    +
    +		StreamNode currentNode;
    +		while ((currentNode = remaining.poll()) != null) {
    +			// Generate the hash code. Because multiple path exist to each
    +			// node, we might not have all required inputs available to
    +			// generate the hash code.
    +			if (generateNodeHash(currentNode, hashFunction, hashes)) {
    +				// Add the child nodes
    +				for (StreamEdge outEdge : currentNode.getOutEdges()) {
    --- End diff --
    
    This is correct. Changing the order changes the hash code. The ordering question really depends on whether we include some form of ID to solve collisions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/flink/pull/1434#issuecomment-162534463
  
    @aljoscha I like your proposal. I will rename it accordingly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/flink/pull/1434#issuecomment-168802546
  
    I rebased your branch on the current master, if you want you can use this :)
    
    https://github.com/gyfora/flink/commits/ufuk-2976-rebased


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1434#discussion_r46846360
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/HeapStateStore.java ---
    @@ -0,0 +1,84 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.checkpoint;
    +
    +import java.io.Serializable;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +
    +/**
    + * Java heap backed {@link StateStore}.
    + *
    + * @param <T> Type of state
    + */
    +class HeapStateStore<T extends Serializable> implements StateStore<T> {
    +
    +	private final Map<String, T> stateMap = new HashMap<>();
    +
    +	private final AtomicInteger idCounter = new AtomicInteger();
    +
    +	@Override
    +	public String putState(T state) throws Exception {
    +		checkNotNull(state, "State");
    +
    +		String key = "jobmanager://savepoints/" + idCounter.incrementAndGet();
    --- End diff --
    
    That was an accident. There is only one single instance of this per job manager (at the moment) and it can be accessed by different threads. That's why the counter is an atomic integer. The map was a concurrent map, but it looks like I accidentally changed it to a regular map.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1434#discussion_r46840304
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
    @@ -630,6 +638,136 @@ protected int cancel(String[] args) {
     		}
     	}
     
    +	/**
    +	 * Executes the SAVEPOINT action.
    +	 *
    +	 * @param args Command line arguments for the cancel action.
    +	 */
    +	protected int savepoint(String[] args) {
    +		LOG.info("Running 'savepoint' command.");
    +
    +		SavepointOptions options;
    +		try {
    +			options = CliFrontendParser.parseSavepointCommand(args);
    +		}
    +		catch (CliArgsException e) {
    +			return handleArgException(e);
    +		}
    +		catch (Throwable t) {
    +			return handleError(t);
    +		}
    +
    +		// evaluate help flag
    +		if (options.isPrintHelp()) {
    +			CliFrontendParser.printHelpForCancel();
    +			return 0;
    +		}
    +
    +		if (options.isDispose()) {
    +			// Discard
    +			return disposeSavepoint(options, options.getDisposeSavepointPath());
    +		}
    +		else {
    +			// Trigger
    +			String[] cleanedArgs = options.getArgs();
    +			JobID jobId;
    +
    +			if (cleanedArgs.length > 0) {
    +				String jobIdString = cleanedArgs[0];
    +				try {
    +					jobId = new JobID(StringUtils.hexStringToByte(jobIdString));
    +				}
    +				catch (Exception e) {
    +					LOG.error("Error: The value for the Job ID is not a valid ID.");
    +					System.out.println("Error: The value for the Job ID is not a valid ID.");
    +					return 1;
    +				}
    +			}
    +			else {
    +				LOG.error("Missing JobID in the command line arguments.");
    --- End diff --
    
    Same here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1434#discussion_r46858130
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---
    @@ -440,4 +478,228 @@ private void configureExecutionRetryDelay() {
     		long executionRetryDelay = streamGraph.getExecutionConfig().getExecutionRetryDelay();
     		jobGraph.setExecutionRetryDelay(executionRetryDelay);
     	}
    +
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Returns a map with a hash for each {@link StreamNode} of the {@link
    +	 * StreamGraph}. The hash is used as the {@link JobVertexID} in order to
    +	 * identify nodes across job submissions if they didn't change.
    +	 *
    +	 * <p>The complete {@link StreamGraph} is traversed. The hash is either
    +	 * computed from the transformation's user-specified id (see
    +	 * {@link StreamTransformation#getUid()}) or generated in a deterministic way.
    +	 *
    +	 * <p>The generated hash is deterministic with respect to:
    +	 * <ul>
    +	 * <li>node-local properties (like parallelism, UDF, node ID),
    +	 * <li>chained output nodes, and
    +	 * <li>input nodes hashes
    +	 * </ul>
    +	 *
    +	 * @return A map from {@link StreamNode#id} to hash as 16-byte array.
    +	 */
    +	private Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes() {
    +		// The hash function used to generate the hash
    +		final HashFunction hashFunction = Hashing.murmur3_128(0);
    +		final Map<Integer, byte[]> hashes = new HashMap<>();
    +
    +		Set<Integer> visited = new HashSet<>();
    +		Queue<StreamNode> remaining = new ArrayDeque<>();
    +
    +		// We need to make the source order deterministic. This depends on the
    +		// ordering of the sources in the Environment, e.g. if a source X is
    +		// added before source Y, X will have a lower ID than Y (assigned by a
    +		// static counter).
    +		List<Integer> sources = new ArrayList<>();
    +		for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
    +			sources.add(sourceNodeId);
    +		}
    +
    +		Collections.sort(sources);
    +
    +		// Traverse the graph in a breadth-first manner. Keep in mind that
    +		// the graph is not a tree and multiple paths to nodes can exist.
    +
    +		// Start with source nodes
    +		for (Integer sourceNodeId : sources) {
    +			remaining.add(streamGraph.getStreamNode(sourceNodeId));
    +			visited.add(sourceNodeId);
    +		}
    +
    +		StreamNode currentNode;
    +		while ((currentNode = remaining.poll()) != null) {
    +			// Generate the hash code. Because multiple path exist to each
    +			// node, we might not have all required inputs available to
    +			// generate the hash code.
    +			if (generateNodeHash(currentNode, hashFunction, hashes, visited)) {
    +				// Add the child nodes
    +				for (StreamEdge outEdge : currentNode.getOutEdges()) {
    +					StreamNode child = outEdge.getTargetVertex();
    +
    +					if (!visited.contains(child.getId())) {
    +						remaining.add(child);
    +						visited.add(child.getId());
    +					}
    +				}
    +			}
    +			else {
    +				// We will revisit this later.
    +				visited.remove(currentNode.getId());
    +			}
    +		}
    +
    +		return hashes;
    +	}
    +
    +	/**
    +	 * Generates a hash for the node and returns whether the operation was
    +	 * successful.
    +	 *
    +	 * @param node         The node to generate the hash for
    +	 * @param hashFunction The hash function to use
    +	 * @param hashes       The current state of generated hashes
    +	 * @param visited      The current state of visited nodes
    +	 * @return <code>true</code> if the node hash has been generated.
    +	 * <code>false</code>, otherwise. If the operation is not successful, the
    +	 * hash needs be generated at a later point when all input is available.
    +	 * @throws IllegalStateException If node has user-specified hash and is
    +	 *                               intermediate node of a chain
    +	 */
    +	private boolean generateNodeHash(
    +			StreamNode node,
    +			HashFunction hashFunction,
    +			Map<Integer, byte[]> hashes,
    +			Set<Integer> visited) {
    --- End diff --
    
    Just saw that this is unused. Remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1434#discussion_r46847734
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Savepoint.java ---
    @@ -0,0 +1,55 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.checkpoint;
    +
    +import org.apache.flink.api.common.ApplicationID;
    +
    +import java.io.Serializable;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +
    +/**
    + * A {@link CompletedCheckpoint} instance with the {@link ApplicationID} of the program it belongs
    + * to.
    + */
    +public class Savepoint implements Serializable {
    --- End diff --
    
    `serialVersionUID` is missing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/flink/pull/1434#issuecomment-164407020
  
    @tillrohrmann Thanks for the review. I will address the remaining points and get back.
    
    @gyfora I changed the client timeout to INF, but it somehow got back in. I will address this. I think it's fine to have it as INF and let the checkpoint timeout handle it (default 10 mins). The user can just break out of the client (which will not cancel the savepoint though – something we can address as a follow up).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1434#discussion_r46940166
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---
    @@ -440,4 +478,228 @@ private void configureExecutionRetryDelay() {
     		long executionRetryDelay = streamGraph.getExecutionConfig().getExecutionRetryDelay();
     		jobGraph.setExecutionRetryDelay(executionRetryDelay);
     	}
    +
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Returns a map with a hash for each {@link StreamNode} of the {@link
    +	 * StreamGraph}. The hash is used as the {@link JobVertexID} in order to
    +	 * identify nodes across job submissions if they didn't change.
    +	 *
    +	 * <p>The complete {@link StreamGraph} is traversed. The hash is either
    +	 * computed from the transformation's user-specified id (see
    +	 * {@link StreamTransformation#getUid()}) or generated in a deterministic way.
    +	 *
    +	 * <p>The generated hash is deterministic with respect to:
    +	 * <ul>
    +	 * <li>node-local properties (like parallelism, UDF, node ID),
    +	 * <li>chained output nodes, and
    +	 * <li>input nodes hashes
    +	 * </ul>
    +	 *
    +	 * @return A map from {@link StreamNode#id} to hash as 16-byte array.
    +	 */
    +	private Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes() {
    +		// The hash function used to generate the hash
    +		final HashFunction hashFunction = Hashing.murmur3_128(0);
    +		final Map<Integer, byte[]> hashes = new HashMap<>();
    +
    +		Set<Integer> visited = new HashSet<>();
    +		Queue<StreamNode> remaining = new ArrayDeque<>();
    +
    +		// We need to make the source order deterministic. This depends on the
    +		// ordering of the sources in the Environment, e.g. if a source X is
    +		// added before source Y, X will have a lower ID than Y (assigned by a
    +		// static counter).
    --- End diff --
    
    Can't we use another feature to establish a deterministic order independent of the creation order of the sources? From a user perspective, topologies which contain the same operators and the same connections should behave identically no matter of the order the operators were created.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/flink/pull/1434#issuecomment-168033367
  
    Should we merge this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1434#discussion_r46845520
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/HeapStateStore.java ---
    @@ -0,0 +1,84 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.checkpoint;
    +
    +import java.io.Serializable;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +
    +/**
    + * Java heap backed {@link StateStore}.
    + *
    + * @param <T> Type of state
    + */
    +class HeapStateStore<T extends Serializable> implements StateStore<T> {
    +
    +	private final Map<String, T> stateMap = new HashMap<>();
    +
    +	private final AtomicInteger idCounter = new AtomicInteger();
    +
    +	@Override
    +	public String putState(T state) throws Exception {
    +		checkNotNull(state, "State");
    +
    +		String key = "jobmanager://savepoints/" + idCounter.incrementAndGet();
    --- End diff --
    
    Why do we use an `AtomicInteger` here? Does this mean that `putState` will be accessed concurrently? If this is the case, then `stateMap.put` is problematic, since a `HashMap` is not thread safe.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1434#discussion_r46850082
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---
    @@ -336,7 +354,7 @@ class JobManager(
           val client = sender()
     
           val jobInfo = new JobInfo(client, listeningBehaviour, System.currentTimeMillis(),
    -        jobGraph.getSessionTimeout)
    +            jobGraph.getSessionTimeout)
    --- End diff --
    
    wrong indentation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1434#discussion_r46850717
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---
    @@ -753,7 +841,11 @@ class JobManager(
        * @param jobInfo the job info
        * @param isRecovery Flag indicating whether this is a recovery or initial submission
        */
    -  private def submitJob(jobGraph: JobGraph, jobInfo: JobInfo, isRecovery: Boolean = false): Unit = {
    +  private def submitJob(
    +    jobGraph: JobGraph,
    +    jobInfo: JobInfo,
    +    isRecovery: Boolean = false): Unit = {
    --- End diff --
    
    two level indentation of parameters and one level indentation of return type:
    
    ```
    private def submitJob(
        a: B
        c: D)
      : R = {
    
    }
    ``` 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1434#discussion_r46924434
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---
    @@ -440,4 +478,228 @@ private void configureExecutionRetryDelay() {
     		long executionRetryDelay = streamGraph.getExecutionConfig().getExecutionRetryDelay();
     		jobGraph.setExecutionRetryDelay(executionRetryDelay);
     	}
    +
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Returns a map with a hash for each {@link StreamNode} of the {@link
    +	 * StreamGraph}. The hash is used as the {@link JobVertexID} in order to
    +	 * identify nodes across job submissions if they didn't change.
    +	 *
    +	 * <p>The complete {@link StreamGraph} is traversed. The hash is either
    +	 * computed from the transformation's user-specified id (see
    +	 * {@link StreamTransformation#getUid()}) or generated in a deterministic way.
    +	 *
    +	 * <p>The generated hash is deterministic with respect to:
    +	 * <ul>
    +	 * <li>node-local properties (like parallelism, UDF, node ID),
    +	 * <li>chained output nodes, and
    +	 * <li>input nodes hashes
    +	 * </ul>
    +	 *
    +	 * @return A map from {@link StreamNode#id} to hash as 16-byte array.
    +	 */
    +	private Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes() {
    +		// The hash function used to generate the hash
    +		final HashFunction hashFunction = Hashing.murmur3_128(0);
    +		final Map<Integer, byte[]> hashes = new HashMap<>();
    +
    +		Set<Integer> visited = new HashSet<>();
    +		Queue<StreamNode> remaining = new ArrayDeque<>();
    +
    +		// We need to make the source order deterministic. This depends on the
    +		// ordering of the sources in the Environment, e.g. if a source X is
    +		// added before source Y, X will have a lower ID than Y (assigned by a
    +		// static counter).
    --- End diff --
    
    I decided to leave everything as is for now. The main reason for the IDs is to prevent collisions from otherwise identical nodes. The reasoning behind it is that collisions will fail the submission of the job graph. But as you said, the resulting behaviour seems also rather aggressive.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1434#discussion_r46856258
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---
    @@ -440,4 +478,228 @@ private void configureExecutionRetryDelay() {
     		long executionRetryDelay = streamGraph.getExecutionConfig().getExecutionRetryDelay();
     		jobGraph.setExecutionRetryDelay(executionRetryDelay);
     	}
    +
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Returns a map with a hash for each {@link StreamNode} of the {@link
    +	 * StreamGraph}. The hash is used as the {@link JobVertexID} in order to
    +	 * identify nodes across job submissions if they didn't change.
    +	 *
    +	 * <p>The complete {@link StreamGraph} is traversed. The hash is either
    +	 * computed from the transformation's user-specified id (see
    +	 * {@link StreamTransformation#getUid()}) or generated in a deterministic way.
    +	 *
    +	 * <p>The generated hash is deterministic with respect to:
    +	 * <ul>
    +	 * <li>node-local properties (like parallelism, UDF, node ID),
    +	 * <li>chained output nodes, and
    +	 * <li>input nodes hashes
    +	 * </ul>
    +	 *
    +	 * @return A map from {@link StreamNode#id} to hash as 16-byte array.
    +	 */
    +	private Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes() {
    +		// The hash function used to generate the hash
    +		final HashFunction hashFunction = Hashing.murmur3_128(0);
    +		final Map<Integer, byte[]> hashes = new HashMap<>();
    +
    +		Set<Integer> visited = new HashSet<>();
    +		Queue<StreamNode> remaining = new ArrayDeque<>();
    +
    +		// We need to make the source order deterministic. This depends on the
    +		// ordering of the sources in the Environment, e.g. if a source X is
    +		// added before source Y, X will have a lower ID than Y (assigned by a
    +		// static counter).
    --- End diff --
    
    I think this is not even necessary if we don't include the StreamNode ID in the hash for the sources.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1434#discussion_r46839369
  
    --- Diff: docs/apis/savepoints.md ---
    @@ -0,0 +1,108 @@
    +---
    +title: "Savepoints"
    +is_beta: false
    +---
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +
    +Programs written in the [Data Stream API]({{ site.baseurl }}/apis/streaming_guide.html) can resume execution from a **savepoint**. Savepoints allow both updating your programs and your Flink cluster without loosing any state. This page covers all steps to trigger, restore, and dispose savepoints. For more details on how Flink handles state and failures, check out the [State in Streaming Programs]({{ site.baseurl }}/apis/state_backends.html) and [Fault Tolerance]({{ site.baseurl }}/apis/fault_tolerance.html) pages.
    +
    +* toc
    +{:toc}
    +
    +## Overview
    +
    +Savepoints are **manually triggered checkpoints**, which take a snapshot of the program and write it out to a state backend. They rely on the regular checkpointing mechanism for this. During execution programs are periodically snapshotted on the worker nodes and produce checkpoints. For recovery only the last completed checkpoint is needed and older checkpoints can be safely discarded as soon as a new one is completed.
    +
    +Savepoints are similar to these periodic checkpoints except that they are **triggered by the user** and **don't automatically expire** when newer checkpoints are completed.
    +
    +<img src="fig/savepoints-overview.png" class="center" />
    +
    +In the above example the workers produce checkpoints **c<sub>1</sub>**, **c<sub>2</sub>**, **c<sub>3</sub>**, and **c<sub>4</sub>** for job *0xA312Bc*. Periodic checkpoints **c<sub>1</sub>** and **c<sub>3</sub>** have already been *discarded* and **c<sub>4</sub>** is the *latest checkpoint*. **c<sub>2</sub> is special**. It is the state associated with the savepoint **s<sub>1</sub>** and has been triggered by the user and it doesn't expire automatically (as c<sub>1</sub> and c<sub>3</sub> did after the completion of newer checkpoints).
    +
    +Note that **s<sub>1</sub>** is only a **pointer to the actual checkpoint data c<sub>2</sub>**. This means that the actual state is *not copied* for the savepoint and periodic checkpoint data is kept around.
    +
    +## Configuration
    +
    +Savepoints point to regular checkpoints and store their state in a configured [state backend]({{ site.baseurl }}/apis/state_backends.html). Currently, the supported state backends are **jobmanager** and **filesystem**. The state backend configuration for the regular periodic checkpoints is **independent** of the savepoint state backend configuration. Checkpoint data is **not copied** for savepoints, but points to the configured checkpoint state backend.
    +
    +### JobManager
    +
    +This is the **default backend** for savepoints.
    +
    +Savepoints are stored on the heap of the job manager. They are *lost* after the job manager is shut down. This mode is only useful if you want to *stop* and *resume* your program while the **same cluster** keeps running. It is *not recommended* for production use. Savepoints are *not* part the [job manager's highly availabile]({{ site.baseurl }}/setup/jobmanager_high_availability.html) state.
    +
    +<pre>
    +state.backend.savepoints: jobmanager
    +</pre>
    +
    +**Note**: If you don't configure a specific state backend for the savepoints, the default state backend (config key `state.backend`) will be used.
    +
    +### File system
    +
    +Savepoints are stored in the configured **file system directory**. They are available between cluster instances and allow to move your program to another cluster.
    +
    +<pre>
    +state.backend.savepoints: filesystem
    +state.backend.savepoints.fs.dir: hdfs:///flink/savepoints
    +</pre>
    +
    +**Note**: If you don't configure a specific directory, the checkpoint directory (config key `state.backend.fs.checkpointdir`) will be used.
    +
    +**Important**: A savepoint is a pointer to completed checkpoint. That means that the state of a savepoint is not only found in the savepoint file itself, but also needs the actual checkpoint data (e.g. in a set of further files).
    --- End diff --
    
    +1
    
    Let's also add it as a sanity check in the code


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1434#discussion_r47777758
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Savepoint.java ---
    @@ -0,0 +1,55 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.checkpoint;
    +
    +import org.apache.flink.api.common.ApplicationID;
    +
    +import java.io.Serializable;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +
    +/**
    + * A {@link CompletedCheckpoint} instance with the {@link ApplicationID} of the program it belongs
    + * to.
    + */
    +public class Savepoint implements Serializable {
    --- End diff --
    
    Addressed (comment refers to old commit)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/flink/pull/1434#issuecomment-165115785
  
    I've addressed the last comments.
    
    Regarding the deterministic hashing and ordering of the graph: changing anything but the user function changes the hash (if not specified manually).
    
    If we don't want this, we will run into collisions when you have the same transformations more than once in the graph. I think it is OK to be aggressive with this in the beginning. I've tried to make this point clearer in the docs.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1434#discussion_r46837969
  
    --- Diff: docs/apis/savepoints.md ---
    @@ -0,0 +1,108 @@
    +---
    +title: "Savepoints"
    +is_beta: false
    +---
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +
    +Programs written in the [Data Stream API]({{ site.baseurl }}/apis/streaming_guide.html) can resume execution from a **savepoint**. Savepoints allow both updating your programs and your Flink cluster without loosing any state. This page covers all steps to trigger, restore, and dispose savepoints. For more details on how Flink handles state and failures, check out the [State in Streaming Programs]({{ site.baseurl }}/apis/state_backends.html) and [Fault Tolerance]({{ site.baseurl }}/apis/fault_tolerance.html) pages.
    --- End diff --
    
    typo: loosing --> losing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1434#issuecomment-162532182
  
    Very nice, I haven't yet gone through the code in detail but from the description and doc it looks very good.
    
    One thing I was confused about is the naming of the config options:
    ```
    state.backend.savepoints: filesystem
    state.backend.savepoints.fs.dir: hdfs:///flink/savepoints
    ```
    
    I think this could confuse users into thinking that a different state backend is used on the task manager to store the actual checkpoint data. If I'm not mistaken this is purely a JobManager thing, though. Also, before the naming scheme for stuff in `state.backend` was that `state.backend.x.y` was the setting of option `y` for state backend `x`. Now it looks like there is a state backend `savepoints`. Maybe we could change this to:
    ```
    savepoints.state.backend: filesystem
    savepoints.state.backend.fs.dir: hdfs:///flink/savepoints
    ```
    
    to clarify that this is changing the state backend settings for the savepoints on the JobManager.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1434#discussion_r46839071
  
    --- Diff: docs/apis/savepoints.md ---
    @@ -0,0 +1,108 @@
    +---
    +title: "Savepoints"
    +is_beta: false
    +---
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +
    +Programs written in the [Data Stream API]({{ site.baseurl }}/apis/streaming_guide.html) can resume execution from a **savepoint**. Savepoints allow both updating your programs and your Flink cluster without loosing any state. This page covers all steps to trigger, restore, and dispose savepoints. For more details on how Flink handles state and failures, check out the [State in Streaming Programs]({{ site.baseurl }}/apis/state_backends.html) and [Fault Tolerance]({{ site.baseurl }}/apis/fault_tolerance.html) pages.
    +
    +* toc
    +{:toc}
    +
    +## Overview
    +
    +Savepoints are **manually triggered checkpoints**, which take a snapshot of the program and write it out to a state backend. They rely on the regular checkpointing mechanism for this. During execution programs are periodically snapshotted on the worker nodes and produce checkpoints. For recovery only the last completed checkpoint is needed and older checkpoints can be safely discarded as soon as a new one is completed.
    +
    +Savepoints are similar to these periodic checkpoints except that they are **triggered by the user** and **don't automatically expire** when newer checkpoints are completed.
    +
    +<img src="fig/savepoints-overview.png" class="center" />
    +
    +In the above example the workers produce checkpoints **c<sub>1</sub>**, **c<sub>2</sub>**, **c<sub>3</sub>**, and **c<sub>4</sub>** for job *0xA312Bc*. Periodic checkpoints **c<sub>1</sub>** and **c<sub>3</sub>** have already been *discarded* and **c<sub>4</sub>** is the *latest checkpoint*. **c<sub>2</sub> is special**. It is the state associated with the savepoint **s<sub>1</sub>** and has been triggered by the user and it doesn't expire automatically (as c<sub>1</sub> and c<sub>3</sub> did after the completion of newer checkpoints).
    +
    +Note that **s<sub>1</sub>** is only a **pointer to the actual checkpoint data c<sub>2</sub>**. This means that the actual state is *not copied* for the savepoint and periodic checkpoint data is kept around.
    +
    +## Configuration
    +
    +Savepoints point to regular checkpoints and store their state in a configured [state backend]({{ site.baseurl }}/apis/state_backends.html). Currently, the supported state backends are **jobmanager** and **filesystem**. The state backend configuration for the regular periodic checkpoints is **independent** of the savepoint state backend configuration. Checkpoint data is **not copied** for savepoints, but points to the configured checkpoint state backend.
    +
    +### JobManager
    +
    +This is the **default backend** for savepoints.
    +
    +Savepoints are stored on the heap of the job manager. They are *lost* after the job manager is shut down. This mode is only useful if you want to *stop* and *resume* your program while the **same cluster** keeps running. It is *not recommended* for production use. Savepoints are *not* part the [job manager's highly availabile]({{ site.baseurl }}/setup/jobmanager_high_availability.html) state.
    +
    +<pre>
    +state.backend.savepoints: jobmanager
    +</pre>
    +
    +**Note**: If you don't configure a specific state backend for the savepoints, the default state backend (config key `state.backend`) will be used.
    +
    +### File system
    +
    +Savepoints are stored in the configured **file system directory**. They are available between cluster instances and allow to move your program to another cluster.
    +
    +<pre>
    +state.backend.savepoints: filesystem
    +state.backend.savepoints.fs.dir: hdfs:///flink/savepoints
    +</pre>
    +
    +**Note**: If you don't configure a specific directory, the checkpoint directory (config key `state.backend.fs.checkpointdir`) will be used.
    +
    +**Important**: A savepoint is a pointer to completed checkpoint. That means that the state of a savepoint is not only found in the savepoint file itself, but also needs the actual checkpoint data (e.g. in a set of further files).
    --- End diff --
    
    Maybe we can say here more prominently, that `state.backend.savepoints: filesystem` and `state.backend: jobmanager` does not make sense.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1434#discussion_r46850340
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---
    @@ -495,6 +513,76 @@ class JobManager(
         case checkpointMessage : AbstractCheckpointMessage =>
           handleCheckpointMessage(checkpointMessage)
     
    +    case TriggerSavepoint(jobId) =>
    +      currentJobs.get(jobId) match {
    +        case Some((graph, _)) =>
    +          val savepointCoordinator = graph.getSavepointCoordinator()
    +
    +          if (savepointCoordinator != null) {
    +            // Immutable copy for the future
    +            val senderRef = sender()
    +
    +            future {
    +              try {
    +                // Do this async, because checkpoint coordinator operations can
    +                // contain blocking calls to the state backend or ZooKeeper.
    +                val savepointFuture = savepointCoordinator.triggerSavepoint(
    +                  System.currentTimeMillis())
    +
    +                savepointFuture.onComplete {
    +                  // Success, respond with the savepoint path
    +                  case scala.util.Success(savepointPath) =>
    +                    senderRef ! TriggerSavepointSuccess(jobId, savepointPath)
    +
    +                  // Failure, respond with the cause
    +                  case scala.util.Failure(t) =>
    +                    senderRef ! TriggerSavepointFailure(jobId,
    +                      new Exception("Failed to complete savepoint", t))
    +                }(context.dispatcher)
    +              }
    +              catch {
    --- End diff --
    
    In Scala, `catch` is written in the same line as the closing curly bracket.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1434#discussion_r46850538
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---
    @@ -495,6 +513,76 @@ class JobManager(
         case checkpointMessage : AbstractCheckpointMessage =>
           handleCheckpointMessage(checkpointMessage)
     
    +    case TriggerSavepoint(jobId) =>
    +      currentJobs.get(jobId) match {
    +        case Some((graph, _)) =>
    +          val savepointCoordinator = graph.getSavepointCoordinator()
    +
    +          if (savepointCoordinator != null) {
    +            // Immutable copy for the future
    +            val senderRef = sender()
    +
    +            future {
    +              try {
    +                // Do this async, because checkpoint coordinator operations can
    +                // contain blocking calls to the state backend or ZooKeeper.
    +                val savepointFuture = savepointCoordinator.triggerSavepoint(
    +                  System.currentTimeMillis())
    +
    +                savepointFuture.onComplete {
    +                  // Success, respond with the savepoint path
    +                  case scala.util.Success(savepointPath) =>
    +                    senderRef ! TriggerSavepointSuccess(jobId, savepointPath)
    +
    +                  // Failure, respond with the cause
    +                  case scala.util.Failure(t) =>
    +                    senderRef ! TriggerSavepointFailure(jobId,
    +                      new Exception("Failed to complete savepoint", t))
    +                }(context.dispatcher)
    +              }
    +              catch {
    +                case e: Exception =>
    +                  senderRef ! TriggerSavepointFailure(jobId, new Exception(
    +                    "Failed to trigger savepoint", e))
    +              }
    +            }(context.dispatcher)
    +          }
    +          else {
    +            sender() ! TriggerSavepointFailure(jobId, new IllegalStateException(
    +              "Checkpointing disabled. You can enable it via the execution environment of " +
    +                "your job."))
    +          }
    +
    +        case None =>
    +          sender() ! TriggerSavepointFailure(jobId, new IllegalArgumentException("Unknown job."))
    +      }
    +
    +    case DisposeSavepoint(savepointPath) =>
    +      val senderRef = sender()
    +      future {
    +        try {
    +          log.info(s"Disposing savepoint at '$savepointPath'.")
    +
    +          val savepoint = savepointStore.getState(savepointPath)
    +
    +          log.debug(s"$savepoint")
    +
    +          // Discard the associated checkpoint
    +          savepoint.getCompletedCheckpoint.discard(getClass.getClassLoader)
    +
    +          // Dispose the savepoint
    +          savepointStore.disposeState(savepointPath)
    +
    +          senderRef ! DisposeSavepointSuccess
    +        }
    +        catch {
    --- End diff --
    
    no line break before `catch`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---