You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by EronWright <gi...@git.apache.org> on 2018/02/08 03:46:31 UTC

[GitHub] flink pull request #5427: [FLINK-8533] [checkpointing] Support MasterTrigger...

GitHub user EronWright opened a pull request:

    https://github.com/apache/flink/pull/5427

    [FLINK-8533] [checkpointing] Support MasterTriggerRestoreHook state reinitialization

    
    Signed-off-by: Eron Wright <er...@gmail.com>
    
    ## What is the purpose of the change
    Support MasterTriggerRestoreHook state re-initialization, to eliminate an edge case involving execution restarts where no checkpoint state is available.
    
    ## Brief change log
    - extend `MasterTriggerRestoreHook` with `initializeState` method.
    - invoke `initializeState` upon initial execution and upon global restart.
    
    ## Verifying this change
    - Revised test: `CheckpointCoordinatorMasterHooksTest`
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency):  no
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`:  **yes**
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): no
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **yes**
      - The S3 file system connector:  no
    
    ## Documentation
      - Does this pull request introduce a new feature?  no
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/EronWright/flink FLINK-8533-hook-initialization

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5427.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5427
    
----
commit 9ad0e03c1aa81012ae14f598acdcf3eb76c9ec9f
Author: Eron Wright <er...@...>
Date:   2018-02-08T03:38:47Z

    [FLINK-8533] Support MasterTriggerRestoreHook state reinitialization
    
    - extend `MasterTriggerRestoreHook` with `initializeState` method.
    - invoke `initializeState` upon initial execution and upon global
    restart.
    
    Signed-off-by: Eron Wright <er...@gmail.com>

----


---

[GitHub] flink issue #5427: [FLINK-8533] [checkpointing] Support MasterTriggerRestore...

Posted by EronWright <gi...@git.apache.org>.
Github user EronWright commented on the issue:

    https://github.com/apache/flink/pull/5427
  
    @StephanEwen thanks for taking a look, I agree with trying to avoid a new lifecycle method.
    
    The `initializeState` method on the hook interface gives the hook an unconditional initialization point. 
    In the Pravega case, we would move reader-group (RG) initialization from client to server, and always reset the RG to its initial conditions.   A subsequent restore may or may not occur.
    
    Assuming we like this approach, let's discuss how to make it work purely with `restoreLatestCheckpointedState`.   The `restoreLatestCheckpointedState` method is not called by the ExecutionGraph (EG) upon initial execution, which we would want to support the new `initializeState` method.   Would there be any issue with calling `restoreLatestCheckpointedState` on initial execution? Such symmetry would seem desirable.  
    
    **Existing approach**:
    ```
    === initial ===
    \-- JM.submitJob
    |   \-- EG.scheduleForExecution
    
    === restart===
    \-- RestartCallback.triggerFullRecovery
    |   \-- EG.restart
    |   |   \-- CC.restoreLatestCheckpointedState
    |   |   \-- EG.scheduleForExecution
    ```
    
    **Suggested approach**:
    ```
    === initial ===
    \-- JM.submitJob
    |   \-- EG.start  (** new method **)
    |   |   \-- CC.restoreLatestCheckpointedState
    |   |   |   \-- Hook.initializeState
    |   |   \-- EG.scheduleForExecution
    
    === restart===
    \-- RestartCallback.triggerFullRecovery
    |   \-- EG.restart
    |   |   \-- CC.restoreLatestCheckpointedState
    |   |   |   \-- Hook.initializeState
    |   |   \-- EG.scheduleForExecution
    ```


---

[GitHub] flink issue #5427: [FLINK-8533] [checkpointing] Support MasterTriggerRestore...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/5427
  
    @EronWright, you're right that on initial submission we don't call `restoreLatestCheckpointedState` in the old code. With Flip-6 this will be the case. See #5444.
    
    The underlying assumption to make this work, though, is that a user won't submit a new job with the a job id to a cluster with a cluster id for which ZooKeeper already contains persisted checkpoints from a previous run. So either the cluster id or the job id must be different. 
    
    I think so far, when using the Flink client this should be the case. However, when generating the `JobGraph` yourself and keeping it around to submit it to a standalone cluster, then this assumption will break because both the `JobID` and the cluster id will be the same.


---

[GitHub] flink issue #5427: [FLINK-8533] [checkpointing] Support MasterTriggerRestore...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5427
  
    Given the already non-trivial complexity of the `CheckpointCoordinator`, I am wondering if there is a way to do this without adding the `resetForNewExecution()` method.
    
    Adding another life cycle method (even if it is not exploited for other purposes currently) would need more involved tests and make future maintenance harder.
    
    Can we reset all hooks in the `restoreLatestCheckpointedState()` method? Would we miss any cases if we do only that? 


---

[GitHub] flink issue #5427: [FLINK-8533] [checkpointing] Support MasterTriggerRestore...

Posted by EronWright <gi...@git.apache.org>.
Github user EronWright commented on the issue:

    https://github.com/apache/flink/pull/5427
  
    @tillrohrmann @StephanEwen sorry about the long delay here, would you please take another look?
    
    I followed Stephan's suggestion of not introducing a new method.   However, the semantics that I was shooting for with `initializeState` is that it would be called on both _start_ and _restart_.  I adjusted `JobManager` to call `restoreLatestCheckpointedState` on first execution (as does `JobMaster`).  Are you OK with that?


---

[GitHub] flink issue #5427: [FLINK-8533] [checkpointing] Support MasterTriggerRestore...

Posted by EronWright <gi...@git.apache.org>.
Github user EronWright commented on the issue:

    https://github.com/apache/flink/pull/5427
  
    I feel that we're not addressing the core issue that we're trying to fix.   
    1. A new job starts up with checkpointing enabled and a hook-based source.
    2. The source begins to consume events, causing some external state to become mutated.
    3. _Before the first checkpoint_, a task throws an exception, causing a global restart.
    4. Since the hook has no opportunity to rewind the external state to initial conditions, data loss occurs.
    
    The above is a special case.  In the normal case, one or more checkpoints have occurred before the restart occurs, and so the hook's `restore` method is effective.



---

[GitHub] flink issue #5427: [FLINK-8533] [checkpointing] Support MasterTriggerRestore...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5427
  
    I think we may get around adding a new method. I checked with @tillrohrmann , here are the thoughts:
    
      - Submitting a job initially as a new reader group, no need to reset here
      - Recovering at any point calls the `restoreLatestCheckpointedState()` method
      - Also recovering from a JobManager failover basically "resubmits" the job with a special flag causing the JM to call the `restoreLatestCheckpointedState()` for the job.


---

[GitHub] flink issue #5427: [FLINK-8533] [checkpointing] Support MasterTriggerRestore...

Posted by EronWright <gi...@git.apache.org>.
Github user EronWright commented on the issue:

    https://github.com/apache/flink/pull/5427
  
    @StephanEwen thanks again for the feedback, which I took to heart and simplified the hook.  It now has a `reset ` method that is called only in the special case.
    
    I will refactor the thread context code in a separate PR.



---

[GitHub] flink issue #5427: [FLINK-8533] [checkpointing] Support MasterTriggerRestore...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5427
  
    Thanks a lot, looks good, can merge this now.
    
    One quick question: You decided to have empty default implementations for the new methods in the master hook interface. Given that Pravega is currently the only known user of that interface, I would be okay with breaking the interface (no default methods) if you think that would be cleaner.


---

[GitHub] flink issue #5427: [FLINK-8533] [checkpointing] Support MasterTriggerRestore...

Posted by EronWright <gi...@git.apache.org>.
Github user EronWright commented on the issue:

    https://github.com/apache/flink/pull/5427
  
    @tzulitai @StephanEwen please take a look, thanks.


---

[GitHub] flink issue #5427: [FLINK-8533] [checkpointing] Support MasterTriggerRestore...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5427
  
    I think we are talking about two different things here:
    
      1. We DO need a new method on the hook interface to reinitialize the reader group. I think the one suggested by Eron works.
      2. We DO NOT need a new method on the CheckpointCoordinator, but calling `Hook.initializeState()` within `CC.restoreLatestCheckpointedState()` (whenever there is no checkpoint to be restored) should work.
    
    ==> Let's add the Hook method, but not add an additional method to the CheckpointCoordinator.


---

[GitHub] flink pull request #5427: [FLINK-8533] [checkpointing] Support MasterTrigger...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5427#discussion_r184478068
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java ---
    @@ -138,4 +158,11 @@
     		 */
     		<V> MasterTriggerRestoreHook<V> create();
     	}
    +
    +	/**
    +	 * The hook initialization context.
    +	 */
    +	interface HookInitializationContext {
    --- End diff --
    
    I assume this is for ease of future evolvability?


---

[GitHub] flink pull request #5427: [FLINK-8533] [checkpointing] Support MasterTrigger...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5427#discussion_r184482881
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---
    @@ -1358,6 +1358,10 @@ class JobManager(
                       throw new SuppressRestartsException(e)
                   }
                 }
    +            else {
    --- End diff --
    
    If we did not have the unconditional initialization logic, we should also be able to drop this part. That would also make this work with the flip-6 code.


---

[GitHub] flink issue #5427: [FLINK-8533] [checkpointing] Support MasterTriggerRestore...

Posted by EronWright <gi...@git.apache.org>.
Github user EronWright commented on the issue:

    https://github.com/apache/flink/pull/5427
  
    @StephanEwen do you mean that we could avoid adding an initialization method to the hook interface?  We need to somehow call into the hook to reset its state, even when there's no checkpoint data to work with.   Could you rephrase your suggestion?  Thanks.


---

[GitHub] flink pull request #5427: [FLINK-8533] [checkpointing] Support MasterTrigger...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5427#discussion_r184477785
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---
    @@ -1009,6 +1013,11 @@ public boolean restoreLatestCheckpointedState(
     
     			LOG.debug("Status of the shared state registry after restore: {}.", sharedStateRegistry);
     
    +			// Instruct the master hooks to initialize their state (unconditionally)
    +			LOG.debug("Initializing the master hooks.");
    --- End diff --
    
    Can you elaborate a bit why this initialization is happening in all cases?
    An alternative would be to have a `reset()` method or so on the master hook that is called further below, in the `if (latest == null)` code block.
    
    Initializing the state seems a tad bit unituitive to me here - I somehow assume that an init function is called once, while this one here is called on every restore.


---

[GitHub] flink pull request #5427: [FLINK-8533] [checkpointing] Support MasterTrigger...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5427


---

[GitHub] flink pull request #5427: [FLINK-8533] [checkpointing] Support MasterTrigger...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5427#discussion_r184481704
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java ---
    @@ -291,6 +341,34 @@ else if (!allowUnmatchedState) {
     			this.userClassLoader = Preconditions.checkNotNull(userClassLoader);
     		}
     
    +		@Override
    +		public void initializeState(HookInitializationContext context) throws Exception {
    +			final Thread thread = Thread.currentThread();
    --- End diff --
    
    We could use a utility like:
    ```java
    public static void withContextClassLoader(ClassLoader cl, Runnable r) {
        final Thread thread = Thread.currentThread();
        final ClassLoader originalClassLoader = thread.getContextClassLoader();
    
        try {
            thread.setContextClassLoader(userClassLoader);
            r.run();
        }
        finally {
            thread.setContextClassLoader(originalClassLoader);
        }
    }
    
    withContextClassLoader(userClassLoader, () -> { hook.initializeState(context); });
    ```
    
    How about adding that to `org.apache.flink.util.LambdaUtil`?


---