You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/05/14 12:10:36 UTC
[13/14] flink git commit: [FLINK-6534] [checkpoint] Use async IO to
dispose state in SharedStateRegistry
[FLINK-6534] [checkpoint] Use async IO to dispose state in SharedStateRegistry
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/471263cf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/471263cf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/471263cf
Branch: refs/heads/release-1.3
Commit: 471263cfe493dc1bbbd5a5733dbf918cc0872b9b
Parents: 7c6f348
Author: Stefan Richter <s....@data-artisans.com>
Authored: Fri May 12 16:01:05 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Sun May 14 14:07:26 2017 +0200
----------------------------------------------------------------------
.../runtime/checkpoint/AbstractCompletedCheckpointStore.java | 6 ++++++
.../runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java | 2 ++
.../org/apache/flink/runtime/state/SharedStateRegistry.java | 6 +++++-
3 files changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/471263cf/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java
index f42fd06..bf70501 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.runtime.state.SharedStateRegistry;
+import java.util.concurrent.Executor;
+
/**
* This is the base class that provides implementation of some aspects common for all
* {@link CompletedCheckpointStore}s.
@@ -34,4 +36,8 @@ public abstract class AbstractCompletedCheckpointStore implements CompletedCheck
public AbstractCompletedCheckpointStore() {
this.sharedStateRegistry = new SharedStateRegistry();
}
+
+ public AbstractCompletedCheckpointStore(Executor asyncIOExecutor) {
+ this.sharedStateRegistry = new SharedStateRegistry(asyncIOExecutor);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/471263cf/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 52a4eea..c8c68bc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -106,6 +106,8 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage,
Executor executor) throws Exception {
+ super(executor);
+
checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint.");
checkNotNull(stateStorage, "State storage");
http://git-wip-us.apache.org/repos/asf/flink/blob/471263cf/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
index 9cfdec7..f9161b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
@@ -45,8 +45,12 @@ public class SharedStateRegistry {
private final Executor asyncDisposalExecutor;
public SharedStateRegistry() {
+ this(Executors.directExecutor());
+ }
+
+ public SharedStateRegistry(Executor asyncDisposalExecutor) {
this.registeredStates = new HashMap<>();
- this.asyncDisposalExecutor = Executors.directExecutor(); //TODO: FLINK-6534
+ this.asyncDisposalExecutor = Preconditions.checkNotNull(asyncDisposalExecutor);
}
/**