You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by sihuazhou <gi...@git.apache.org> on 2018/04/28 03:03:48 UTC

[GitHub] flink pull request #5934: [FLINK-9269][state] fix concurrency problem when p...

GitHub user sihuazhou opened a pull request:

    https://github.com/apache/flink/pull/5934

    [FLINK-9269][state]  fix concurrency problem when performing checkpoint in HeapKeyedStateBackend

    ## What is the purpose of the change
    
    *This PR fixes the concurrency problem in `HeapKeyedStateBackend`, and contains an additional hotfix for `RocksDBKeyedStateBackend` to duplicate the `KeySerializer` for full checkpoint*
    
    
    ## Brief change log
    
      - *fix concurrency problem when performing checkpoint in `HeapKeyedStateBackend`*
      - *[hotfix] duplicate the key serializer when performing full checkpoint  in `RocksDBKeyedBackend`*
    
    
    ## Verifying this change
    
    This change is a trivial rework / code cleanup without any test coverage.
    ## Does this pull request potentially affect one of the following parts:
    
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
    
    ## Documentation
    
    no

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/sihuazhou/flink FLINK-9269

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5934.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5934
    
----
commit f60d4d1fcd85612c792bec4d0c0d9ba1b142e1e9
Author: sihuazhou <su...@...>
Date:   2018-04-28T02:50:41Z

    fix concurrency problem when performing checkpoint in HeapKeyedStateBackend.

commit c998dbfc78b657bcbd47a836d23b819ce61778b9
Author: sihuazhou <su...@...>
Date:   2018-04-28T02:57:11Z

    [hotfix] duplicate the key serializer when performing full checkpoint in RocksDBKeyedBackend.

----


---

[GitHub] flink pull request #5934: [FLINK-9269][state] fix concurrency problem when p...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5934#discussion_r185464970
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---
    @@ -3594,6 +3599,58 @@ public String fold(String acc, Integer value) throws Exception {
     		}
     	}
     
    +	@Test
    +	public void testCheckConcurrencyProblemWhenPerformingCheckpointAsync() throws Exception {
    +
    +		CheckpointStreamFactory streamFactory = createStreamFactory();
    +		Environment env = new DummyEnvironment();
    +		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
    +
    +		ExecutorService executorService = Executors.newScheduledThreadPool(1);
    +		try {
    +			long checkpointID = 0;
    +			List<Future> futureList = new ArrayList();
    +			for (int i = 0; i < 10; ++i) {
    +				ValueStateDescriptor<Integer> kvId = new ValueStateDescriptor<>("id" + i, IntSerializer.INSTANCE);
    +				ValueState<Integer> state = backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId);
    +				((InternalValueState) state).setCurrentNamespace(VoidNamespace.INSTANCE);
    +				backend.setCurrentKey(i);
    +				state.update(i);
    +
    +				futureList.add(runSnapshotAsync(executorService,
    +					backend.snapshot(checkpointID++, System.currentTimeMillis(), streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())));
    +			}
    +
    +			for (Future future : futureList) {
    +				future.get(10, TimeUnit.SECONDS);
    +			}
    +		} catch (Exception e) {
    +			fail();
    +		} finally {
    +			backend.dispose();
    +			executorService.shutdown();
    +		}
    +	}
    +
    +	protected Future<SnapshotResult<KeyedStateHandle>> runSnapshotAsync(
    +		ExecutorService executorService,
    +		RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunnableFuture) throws Exception {
    +
    +		if (!snapshotRunnableFuture.isDone()) {
    --- End diff --
    
    I think you could replace this code with `CompletableFuture.runAsync(...)`.


---

[GitHub] flink issue #5934: [FLINK-9269][state] fix concurrency problem when performi...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/5934
  
    Hi @StefanRRichter sorry for the unclearly description here. What this PR trying to fix is the mainly relate to the below code which run async:
    ```java
    for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
       // do something    								
    } // this will just close the outer compression stream
    ```
    the `stateTables` may cause concurrency problem.


---

[GitHub] flink issue #5934: [FLINK-9269][state] fix concurrency problem when performi...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/5934
  
    Thanks for your work! Besides my comments, this looks good 👍 


---

[GitHub] flink issue #5934: [FLINK-9269][state] fix concurrency problem when performi...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/5934
  
    Thanks! Will merge once my travis run is green.


---

[GitHub] flink pull request #5934: [FLINK-9269][state] fix concurrency problem when p...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5934#discussion_r185485401
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---
    @@ -3594,6 +3599,58 @@ public String fold(String acc, Integer value) throws Exception {
     		}
     	}
     
    +	@Test
    +	public void testCheckConcurrencyProblemWhenPerformingCheckpointAsync() throws Exception {
    +
    +		CheckpointStreamFactory streamFactory = createStreamFactory();
    +		Environment env = new DummyEnvironment();
    +		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
    +
    +		ExecutorService executorService = Executors.newScheduledThreadPool(1);
    +		try {
    +			long checkpointID = 0;
    +			List<Future> futureList = new ArrayList();
    +			for (int i = 0; i < 10; ++i) {
    +				ValueStateDescriptor<Integer> kvId = new ValueStateDescriptor<>("id" + i, IntSerializer.INSTANCE);
    +				ValueState<Integer> state = backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId);
    +				((InternalValueState) state).setCurrentNamespace(VoidNamespace.INSTANCE);
    +				backend.setCurrentKey(i);
    +				state.update(i);
    +
    +				futureList.add(runSnapshotAsync(executorService,
    +					backend.snapshot(checkpointID++, System.currentTimeMillis(), streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())));
    +			}
    +
    +			for (Future future : futureList) {
    +				future.get(10, TimeUnit.SECONDS);
    +			}
    +		} catch (Exception e) {
    +			fail();
    +		} finally {
    +			backend.dispose();
    +			executorService.shutdown();
    +		}
    +	}
    +
    +	protected Future<SnapshotResult<KeyedStateHandle>> runSnapshotAsync(
    +		ExecutorService executorService,
    +		RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunnableFuture) throws Exception {
    +
    +		if (!snapshotRunnableFuture.isDone()) {
    --- End diff --
    
    👍 


---

[GitHub] flink issue #5934: [FLINK-9269][state] fix concurrency problem when performi...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/5934
  
    Yes, there is a theoretical problem if the serialization would not be threadsafe. I think currently the silent assumption that holds is that serializers are immutable w.r.t. serialization after they have been created and passed to create a state. I think Stephan's suggestion would be the right approach to materialize that assumption into code and also gain a bit of efficiency.
    I think the other concurrency problem is a problem that comes from lazy state registration, and I assume most jobs use de-facto eager state registration, so all the state is registered in `open()` or `initializeState(...)`, and they are never changed at runtime. For the few remaining jobs, there must also be a certain timing that they need to register a new state while a checkpoint is running. Overall, I think those are the reasons why there is no reported case of a problem from this in practice. But I agree this should be addressed and is also very easy to fix.


---

[GitHub] flink issue #5934: [FLINK-9269][state] fix concurrency problem when performi...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/5934
  
    Hi @StefanRRichter , I think I feel lost again... when I writing the comments for the serializer about why we don't duplicate it, I found a loophole there. In theory, even tough the serializer is just written, but if the serializer is stateful (e.g. it maintain a `variable` inside, and do the serializing works according to it and also may update the `variable` according to the serialized result) then it might also lead to concurrency problem if we don't duplicate it. But in practice, I can't find any use cases that need a stateful serializer as far as I can see... So, now I feel lost in duplicating it to be on the safer side in theory or just add some comments without duplicate it to avoid the costs of the duplicating. What do you think?


---

[GitHub] flink issue #5934: [FLINK-9269][state] fix concurrency problem when performi...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/5934
  
    I am also a bit torn. Sometimes I am thinking we might just have a pool with as many serializer copies as se can have concurrent checkpoints + savepoints. But then again, it is borderline to premature optimization. For this particular case, I think your suggestion sounds good.


---

[GitHub] flink issue #5934: [FLINK-9269][state] fix concurrency problem when performi...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/5934
  
    Thanks for your comments, @StephanEwen , If I am not misunderstanding , we don't need to duplicate the serializer now, because we will have a dedicated optimization for it in the near future, I am `+1` for that. Then, what about the concurrency problem cause by the `stateTables`, it's an obvious bug that there could be multi thread access the `stateTab` concurrently, and one of them could modify the `stateTab`...But so far, no users have reported that problem yet, maybe that's because most of the user are using the `RocksDBKeyedBackend` online instead of `HeapKeyedStateBackend`, so I think this is not an urgent bug, but...it's still a bug, Is it should be fixed for 1.5?


---

[GitHub] flink pull request #5934: [FLINK-9269][state] fix concurrency problem when p...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5934#discussion_r185486196
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -1987,6 +1987,8 @@ private void writeKVStateMetaData() throws IOException {
     
     			KeyedBackendSerializationProxy<K> serializationProxy =
     				new KeyedBackendSerializationProxy<>(
    +					// we use the statBackend's keySerializer directly here, because it just be written here.
    +					// NOTE: There's a loophole when the serializer is stateful, but that rarely occur in reality use case.
    --- End diff --
    
    👍 


---

[GitHub] flink pull request #5934: [FLINK-9269][state] fix concurrency problem when p...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5934#discussion_r185465142
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -1987,6 +1987,8 @@ private void writeKVStateMetaData() throws IOException {
     
     			KeyedBackendSerializationProxy<K> serializationProxy =
     				new KeyedBackendSerializationProxy<>(
    +					// we use the statBackend's keySerializer directly here, because it just be written here.
    --- End diff --
    
    `statBackend's ` -> `stateBackend's`


---

[GitHub] flink pull request #5934: [FLINK-9269][state] fix concurrency problem when p...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5934#discussion_r185486283
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---
    @@ -622,20 +622,23 @@ public HeapSnapshotStrategy(
     
     			final Map<String, Integer> kVStateToId = new HashMap<>(stateTables.size());
     
    -			final Map<StateTable<K, ?, ?>, StateTableSnapshot> cowStateStableSnapshots =
    +			final Map<String, StateTableSnapshot> cowStateStableSnapshots =
     				new HashedMap(stateTables.size());
    --- End diff --
    
    I will correct it in this PR.


---

[GitHub] flink pull request #5934: [FLINK-9269][state] fix concurrency problem when p...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5934#discussion_r185508430
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---
    @@ -3594,6 +3599,58 @@ public String fold(String acc, Integer value) throws Exception {
     		}
     	}
     
    +	@Test
    +	public void testCheckConcurrencyProblemWhenPerformingCheckpointAsync() throws Exception {
    +
    +		CheckpointStreamFactory streamFactory = createStreamFactory();
    +		Environment env = new DummyEnvironment();
    +		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
    +
    +		ExecutorService executorService = Executors.newScheduledThreadPool(1);
    +		try {
    +			long checkpointID = 0;
    +			List<Future> futureList = new ArrayList();
    +			for (int i = 0; i < 10; ++i) {
    +				ValueStateDescriptor<Integer> kvId = new ValueStateDescriptor<>("id" + i, IntSerializer.INSTANCE);
    +				ValueState<Integer> state = backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId);
    +				((InternalValueState) state).setCurrentNamespace(VoidNamespace.INSTANCE);
    +				backend.setCurrentKey(i);
    +				state.update(i);
    +
    +				futureList.add(runSnapshotAsync(executorService,
    +					backend.snapshot(checkpointID++, System.currentTimeMillis(), streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())));
    +			}
    +
    +			for (Future future : futureList) {
    +				future.get(10, TimeUnit.SECONDS);
    +			}
    +		} catch (Exception e) {
    +			fail();
    +		} finally {
    +			backend.dispose();
    +			executorService.shutdown();
    +		}
    +	}
    +
    +	protected Future<SnapshotResult<KeyedStateHandle>> runSnapshotAsync(
    +		ExecutorService executorService,
    +		RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunnableFuture) throws Exception {
    +
    +		if (!snapshotRunnableFuture.isDone()) {
    --- End diff --
    
    Sorry, my bad, I overlooked that you are using the return value. I will revert this to your first approach before merging because this does not really improve it. 


---

[GitHub] flink issue #5934: [FLINK-9269][state] fix concurrency problem when performi...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5934
  
    Concerning serializer snapshots:
      - We need to move away from Java Serializing the serializers into the config snapshots anyways and should do that in the near future.
      - I think the config snapshot should be created once when the state is created, encoded as `byte[]`, and then we only write the bytes. That safes us from repeated work on every checkpoint and would also prevent concurrent access to the serializer for creating the snapshot.


---

[GitHub] flink issue #5934: [FLINK-9269][state] fix concurrency problem when performi...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/5934
  
    About the serializer duplication problem, I think you are right, duplicating a serialize is not always super cheap, so I think maybe the best tradeoff is to not duplicate the serializer to save the performance cost, and add some dedicated comments to describe why we don't duplicate it to making the code more "defensive", what do you think?


---

[GitHub] flink pull request #5934: [FLINK-9269][state] fix concurrency problem when p...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5934#discussion_r185465407
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -1987,6 +1987,8 @@ private void writeKVStateMetaData() throws IOException {
     
     			KeyedBackendSerializationProxy<K> serializationProxy =
     				new KeyedBackendSerializationProxy<>(
    +					// we use the statBackend's keySerializer directly here, because it just be written here.
    +					// NOTE: There's a loophole when the serializer is stateful, but that rarely occur in reality use case.
    --- End diff --
    
    I wonder if this note does not sound a bit to scary, maybe we could just add a todo that this code assumes that writing a serializer is threadsafe and that we could get a serialized form already at state registration time in the future?


---

[GitHub] flink pull request #5934: [FLINK-9269][state] fix concurrency problem when p...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5934#discussion_r185470576
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---
    @@ -622,20 +622,23 @@ public HeapSnapshotStrategy(
     
     			final Map<String, Integer> kVStateToId = new HashMap<>(stateTables.size());
     
    -			final Map<StateTable<K, ?, ?>, StateTableSnapshot> cowStateStableSnapshots =
    +			final Map<String, StateTableSnapshot> cowStateStableSnapshots =
     				new HashedMap(stateTables.size());
    --- End diff --
    
    Seems like I accidentally used `new HashedMap(...)` here instead of Java's `new HashMap<>(...)`. As you touch this spot already, maybe you can just also correct that?


---

[GitHub] flink issue #5934: [FLINK-9269][state] fix concurrency problem when performi...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/5934
  
    Hi @StefanRRichter Thanks for your reply, I have updated the PR and add a test to guard this, but I not sure whether the test is indeed required because it looks a bit wired in my mind...


---

[GitHub] flink issue #5934: [FLINK-9269][state] fix concurrency problem when performi...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/5934
  
    Hi, can you give some more detail about the actual problem you are trying to fix here? To me it looks like duplicating the serializer only for the meta data should not be required, because the serializer is just written and the getter is only used in a restore, which is never async. You can make an argument that this is just making the code more defensive, which is a good thing. But I just want to raise awareness that duplicating a serializer is not always super cheap, and this counts for the time spend in the synchronous part. So there is a tradeoff and that is why I would like to discuss if this is really a benefit?


---

[GitHub] flink pull request #5934: [FLINK-9269][state] fix concurrency problem when p...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5934


---

[GitHub] flink pull request #5934: [FLINK-9269][state] fix concurrency problem when p...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5934#discussion_r185485532
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -1987,6 +1987,8 @@ private void writeKVStateMetaData() throws IOException {
     
     			KeyedBackendSerializationProxy<K> serializationProxy =
     				new KeyedBackendSerializationProxy<>(
    +					// we use the statBackend's keySerializer directly here, because it just be written here.
    --- End diff --
    
    👍 


---