You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/05/14 12:10:36 UTC

[13/14] flink git commit: [FLINK-6534] [checkpoint] Use async IO to dispose state in SharedStateRegistry

[FLINK-6534] [checkpoint] Use async IO to dispose state in SharedStateRegistry


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/471263cf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/471263cf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/471263cf

Branch: refs/heads/release-1.3
Commit: 471263cfe493dc1bbbd5a5733dbf918cc0872b9b
Parents: 7c6f348
Author: Stefan Richter <s....@data-artisans.com>
Authored: Fri May 12 16:01:05 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Sun May 14 14:07:26 2017 +0200

----------------------------------------------------------------------
 .../runtime/checkpoint/AbstractCompletedCheckpointStore.java   | 6 ++++++
 .../runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java  | 2 ++
 .../org/apache/flink/runtime/state/SharedStateRegistry.java    | 6 +++++-
 3 files changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/471263cf/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java
index f42fd06..bf70501 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.runtime.state.SharedStateRegistry;
 
+import java.util.concurrent.Executor;
+
 /**
  * This is the base class that provides implementation of some aspects common for all
  * {@link CompletedCheckpointStore}s.
@@ -34,4 +36,8 @@ public abstract class AbstractCompletedCheckpointStore implements CompletedCheck
 	public AbstractCompletedCheckpointStore() {
 		this.sharedStateRegistry = new SharedStateRegistry();
 	}
+
+	public AbstractCompletedCheckpointStore(Executor asyncIOExecutor) {
+		this.sharedStateRegistry = new SharedStateRegistry(asyncIOExecutor);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/471263cf/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 52a4eea..c8c68bc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -106,6 +106,8 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 			RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage,
 			Executor executor) throws Exception {
 
+		super(executor);
+
 		checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint.");
 		checkNotNull(stateStorage, "State storage");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/471263cf/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
index 9cfdec7..f9161b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
@@ -45,8 +45,12 @@ public class SharedStateRegistry {
 	private final Executor asyncDisposalExecutor;
 
 	public SharedStateRegistry() {
+		this(Executors.directExecutor());
+	}
+
+	public SharedStateRegistry(Executor asyncDisposalExecutor) {
 		this.registeredStates = new HashMap<>();
-		this.asyncDisposalExecutor = Executors.directExecutor(); //TODO: FLINK-6534
+		this.asyncDisposalExecutor = Preconditions.checkNotNull(asyncDisposalExecutor);
 	}
 
 	/**