You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StephanEwen <gi...@git.apache.org> on 2017/02/24 11:56:35 UTC

[GitHub] flink pull request #3411: [FLINK-5897] & [FLINK-5822] First step towards Gen...

GitHub user StephanEwen opened a pull request:

    https://github.com/apache/flink/pull/3411

    [FLINK-5897] &  [FLINK-5822]  First step towards Generic State Backends and Global State Cleanup Hooks

    **This is the first part of a larger parent issue: Self-contained externalized checkpoints and global cleanup hooks.**
    
    Parts of the changes may seem incomplete, because they are preparation for later changes. To avoid too large pull requests, this is the first part that by itself is stable and compatible with the prior behavior.
    
    ## High-level changes
    
      1. The Checkpoint Coordinator knows about the base state backend. That is the first step towards generic storage of checkpoints (not file system specific) and global cleanup hooks (rather than tracking for example each file cleanup individually).
      
      2. The CompletedCheckpoint is not assuming that it is stored on a FileSystem, but holds now a `StreamStateHandle` to its metadata (if externalized) and a generic external pointer. In the case of a checkpoint on a FileSystem, this pointer is the file path.
    
    
    
    ## Detailed Changed
    
      - This moves the logic to load a statebackend from the configuration out of the `StreamTask` and into the `AbstractStateBackend`, because both JobManager and TaskManager now share the same logic.
      
      - Adds tests for the loading behavior of state backends
      
      - Improves the Exception signatures of state backend loading
      
      - Allows CompletedCheckpointStores to specify whether they require externalized checkpoints.
        That is important for the next step, where the ZooKeeper store only stores pointers and does not externalize the metadata an additional time.
    
      - `CompletedCheckpoint` holds pointer to metadata
      
      - CheckpointCoordinator externalizes the checkpoint explicitly (rather than the pending checkpoint does it implicitly).
      
      - More comments and JavaDocs
      
    ## Tests
    
    Most of the functionality just made some parts more generic and needs no additional tests.
    
    Additional tests were added for the passing of state backend from program to checkpoint coordinator, and for the loading of the state backend from the configurations.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StephanEwen/incubator-flink statebackend

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3411.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3411
    
----
commit 5689aba6f2ae0363e7e36ef5d920fdae88d5b5cc
Author: Stephan Ewen <se...@apache.org>
Date:   2017-02-17T16:51:00Z

    [FLINK-5822] [state backends] Make JobManager / Checkpoint Coordinator aware of the root state backend

commit 16c1f5afaabd0cff58afe5086ae0aabc82441072
Author: Stephan Ewen <se...@apache.org>
Date:   2017-02-22T21:18:50Z

    [FLINK-5897] [checkpoints] Make checkpoint externalization not depend strictly on FileSystems
    
    That is the first step towards checkpoints that can be externalized to other stores as well,
    like k/v stores and databases, if supported by the state backend.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3411: [FLINK-5897] & [FLINK-5822] First step towards Generic St...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/3411
  
    Thanks for the review, @uce - addressing the comments and merging this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3411: [FLINK-5897] & [FLINK-5822] First step towards Gen...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3411#discussion_r103169078
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---
    @@ -100,7 +107,9 @@
     	 * accessing this don't block the job manager actor and run asynchronously. */
     	private final CompletedCheckpointStore completedCheckpointStore;
     
    -	/** Default directory for persistent checkpoints; <code>null</code> if none configured. */
    +	/** Default directory for persistent checkpoints; <code>null</code> if none configured.
    +	 * THIS WILL BE REPLACED BY PROPER STATE-BACKEND METADATA WRITING */
    --- End diff --
    
    Very good! :+1: Will you file follow up JIRAs for this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3411: [FLINK-5897] & [FLINK-5822] First step towards Generic St...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the issue:

    https://github.com/apache/flink/pull/3411
  
    The test failure is an unrelated instability that I've seen over the week end as well (https://issues.apache.org/jira/browse/FLINK-5923).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3411: [FLINK-5897] & [FLINK-5822] First step towards Gen...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3411#discussion_r103192397
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java ---
    @@ -203,48 +208,67 @@ void setStatsCallback(@Nullable PendingCheckpointStats trackerCallback) {
     		return onCompletionPromise;
     	}
     
    -	public CompletedCheckpoint finalizeCheckpoint() {
    +	public CompletedCheckpoint finalizeCheckpointExternalized() throws IOException {
    +
     		synchronized (lock) {
    -			Preconditions.checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet.");
    -
    -			// Persist if required
    -			String externalPath = null;
    -			if (props.externalizeCheckpoint()) {
    -				try {
    -					Savepoint savepoint = new SavepointV1(checkpointId, taskStates.values());
    -					externalPath = SavepointStore.storeSavepoint(
    -							targetDirectory,
    -							savepoint
    -					);
    -				} catch (IOException e) {
    -					LOG.error("Failed to persist checkpoint {}.",checkpointId, e);
    -				}
    -			}
    +			checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet.");
     
    -			CompletedCheckpoint completed = new CompletedCheckpoint(
    -					jobId,
    -					checkpointId,
    -					checkpointTimestamp,
    -					System.currentTimeMillis(),
    -					new HashMap<>(taskStates),
    -					props,
    -					externalPath);
    +			// externalize the metadata
    +			final Savepoint savepoint = new SavepointV1(checkpointId, taskStates.values());
     
    -			onCompletionPromise.complete(completed);
    +			// TEMP FIX - The savepoint store is strictly typed to file systems currently
    +			//            but the checkpoints think more generic. we need to work with file handles
    +			//            here until the savepoint serializer accepts a generic stream factory
     
    -			if (statsCallback != null) {
    -				// Finalize the statsCallback and give the completed checkpoint a
    -				// callback for discards.
    -				CompletedCheckpointStats.DiscardCallback discardCallback = statsCallback.reportCompletedCheckpoint(externalPath);
    -				completed.setDiscardCallback(discardCallback);
    -			}
    +			final FileStateHandle metadataHandle = SavepointStore.storeSavepointToHandle(targetDirectory, savepoint);
    +			final String externalPointer = metadataHandle.getFilePath().getParent().toString();
     
    -			dispose(false);
    +			return finalizeInternal(metadataHandle, externalPointer);
    +		}
    +	}
    +
    +	public CompletedCheckpoint finalizeCheckpointNonExternalized() {
    +		synchronized (lock) {
    +			checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet.");
     
    -			return completed;
    +			// finalize without external metadata
    +			return finalizeInternal(null, null);
     		}
     	}
     
    +	@GuardedBy("lock")
    +	private CompletedCheckpoint finalizeInternal(
    +			@Nullable StreamStateHandle externalMetadata,
    +			@Nullable String externalPointer) {
    +
    +		assert(Thread.holdsLock(lock));
    +
    +		CompletedCheckpoint completed = new CompletedCheckpoint(
    +				jobId,
    +				checkpointId,
    +				checkpointTimestamp,
    +				System.currentTimeMillis(),
    +				new HashMap<>(taskStates),
    +				props,
    +				externalMetadata,
    +				externalPointer);
    +
    +		onCompletionPromise.complete(completed);
    --- End diff --
    
    If the creation `CompletedCheckpoint` fails (for example because it the external metadata is null although the properties say the checkpoint should have been externalized), the promise is never completed. I think we should do a try catch and fail the promise in that case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3411: [FLINK-5897] & [FLINK-5822] First step towards Gen...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3411


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3411: [FLINK-5897] & [FLINK-5822] First step towards Gen...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3411#discussion_r103169593
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java ---
    @@ -72,4 +72,5 @@
     	 */
     	int getNumberOfRetainedCheckpoints();
     
    +	boolean requiresExternalizedCheckpoints();
    --- End diff --
    
    Should we add a short JavaDocs comment?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3411: [FLINK-5897] & [FLINK-5822] First step towards Gen...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3411#discussion_r103170089
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java ---
    @@ -21,20 +21,50 @@
     import org.apache.flink.annotation.PublicEvolving;
     import org.apache.flink.api.common.JobID;
     import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.CoreOptions;
    +import org.apache.flink.configuration.IllegalConfigurationException;
     import org.apache.flink.runtime.execution.Environment;
     import org.apache.flink.runtime.query.TaskKvStateRegistry;
    +import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    +import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
    +import org.apache.flink.runtime.state.memory.MemoryStateBackend;
    +import org.apache.flink.util.DynamicCodeLoadingException;
    +
    +import org.slf4j.Logger;
     
     import javax.annotation.Nullable;
     import java.io.IOException;
     
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
     /**
      * An abstract base implementation of the {@link StateBackend} interface.
    + * 
    + * <p>
    --- End diff --
    
    Left over comment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---