You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StefanRRichter <gi...@git.apache.org> on 2018/05/02 11:24:42 UTC
[GitHub] flink pull request #5934: [FLINK-9269][state] fix concurrency problem when p...
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5934#discussion_r185464970
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---
@@ -3594,6 +3599,58 @@ public String fold(String acc, Integer value) throws Exception {
}
}
+ @Test
+ public void testCheckConcurrencyProblemWhenPerformingCheckpointAsync() throws Exception {
+
+ CheckpointStreamFactory streamFactory = createStreamFactory();
+ Environment env = new DummyEnvironment();
+ AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
+
+ ExecutorService executorService = Executors.newScheduledThreadPool(1);
+ try {
+ long checkpointID = 0;
+ List<Future> futureList = new ArrayList();
+ for (int i = 0; i < 10; ++i) {
+ ValueStateDescriptor<Integer> kvId = new ValueStateDescriptor<>("id" + i, IntSerializer.INSTANCE);
+ ValueState<Integer> state = backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId);
+ ((InternalValueState) state).setCurrentNamespace(VoidNamespace.INSTANCE);
+ backend.setCurrentKey(i);
+ state.update(i);
+
+ futureList.add(runSnapshotAsync(executorService,
+ backend.snapshot(checkpointID++, System.currentTimeMillis(), streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())));
+ }
+
+ for (Future future : futureList) {
+ future.get(10, TimeUnit.SECONDS);
+ }
+ } catch (Exception e) {
+ fail();
+ } finally {
+ backend.dispose();
+ executorService.shutdown();
+ }
+ }
+
+ protected Future<SnapshotResult<KeyedStateHandle>> runSnapshotAsync(
+ ExecutorService executorService,
+ RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunnableFuture) throws Exception {
+
+ if (!snapshotRunnableFuture.isDone()) {
--- End diff --
I think you could replace this code with `CompletableFuture.runAsync(...)`.
---