You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StefanRRichter <gi...@git.apache.org> on 2018/05/02 11:24:42 UTC

[GitHub] flink pull request #5934: [FLINK-9269][state] fix concurrency problem when p...

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5934#discussion_r185464970
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---
    @@ -3594,6 +3599,58 @@ public String fold(String acc, Integer value) throws Exception {
     		}
     	}
     
    +	@Test
    +	public void testCheckConcurrencyProblemWhenPerformingCheckpointAsync() throws Exception {
    +
    +		CheckpointStreamFactory streamFactory = createStreamFactory();
    +		Environment env = new DummyEnvironment();
    +		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
    +
    +		ExecutorService executorService = Executors.newScheduledThreadPool(1);
    +		try {
    +			long checkpointID = 0;
    +			List<Future> futureList = new ArrayList();
    +			for (int i = 0; i < 10; ++i) {
    +				ValueStateDescriptor<Integer> kvId = new ValueStateDescriptor<>("id" + i, IntSerializer.INSTANCE);
    +				ValueState<Integer> state = backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId);
    +				((InternalValueState) state).setCurrentNamespace(VoidNamespace.INSTANCE);
    +				backend.setCurrentKey(i);
    +				state.update(i);
    +
    +				futureList.add(runSnapshotAsync(executorService,
    +					backend.snapshot(checkpointID++, System.currentTimeMillis(), streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())));
    +			}
    +
    +			for (Future future : futureList) {
    +				future.get(10, TimeUnit.SECONDS);
    +			}
    +		} catch (Exception e) {
    +			fail();
    +		} finally {
    +			backend.dispose();
    +			executorService.shutdown();
    +		}
    +	}
    +
    +	protected Future<SnapshotResult<KeyedStateHandle>> runSnapshotAsync(
    +		ExecutorService executorService,
    +		RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunnableFuture) throws Exception {
    +
    +		if (!snapshotRunnableFuture.isDone()) {
    --- End diff --
    
    I think you could replace this code with `CompletableFuture.runAsync(...)`.


---