You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/16 21:06:30 UTC
[1/8] flink git commit: [hotfix] Harden
RocksDBAsyncSnapshotTest#testCancelFullyAsyncCheckpoints
Repository: flink
Updated Branches:
refs/heads/master 0bbc91eb1 -> 7a91f3071
[hotfix] Harden RocksDBAsyncSnapshotTest#testCancelFullyAsyncCheckpoints
Depending on RocksDBOptions#TIMER_SERVICE_IMPL we have to adapt the testCancelFullyAsyncCheckpoints wrt
how many checkpointing streams we skip.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a91f307
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a91f307
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a91f307
Branch: refs/heads/master
Commit: 7a91f30711473d670173107d7517c9b5acaa9d3f
Parents: 6bba0e7
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jul 16 16:55:49 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jul 16 22:11:57 2018 +0200
----------------------------------------------------------------------
.../streaming/state/RocksDBAsyncSnapshotTest.java | 18 +++++++++++++++---
1 file changed, 15 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7a91f307/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index fe1a625..a8e5a36 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -256,16 +256,28 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
File dbDir = temporaryFolder.newFolder();
+ final RocksDBStateBackend.PriorityQueueStateType timerServicePriorityQueueType = RocksDBStateBackend.PriorityQueueStateType.valueOf(RocksDBOptions.TIMER_SERVICE_IMPL.defaultValue());
+
+ final int skipStreams;
+
+ if (timerServicePriorityQueueType == RocksDBStateBackend.PriorityQueueStateType.HEAP) {
+ // we skip the first created stream, because it is used to checkpoint the timer service, which is
+ // currently not asynchronous.
+ skipStreams = 1;
+ } else if (timerServicePriorityQueueType == RocksDBStateBackend.PriorityQueueStateType.ROCKSDB) {
+ skipStreams = 0;
+ } else {
+ throw new AssertionError(String.format("Unknown timer service priority queue type %s.", timerServicePriorityQueueType));
+ }
+
// this is the proper instance that we need to call.
BlockerCheckpointStreamFactory blockerCheckpointStreamFactory =
new BlockerCheckpointStreamFactory(4 * 1024 * 1024) {
- int count = 1;
+ int count = skipStreams;
@Override
public CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException {
- // we skip the first created stream, because it is used to checkpoint the timer service, which is
- // currently not asynchronous.
if (count > 0) {
--count;
return new BlockingCheckpointOutputStream(
[8/8] flink git commit: [hotfix] Consolidate RocksDB configuration
options in RocksDBOptions
Posted by tr...@apache.org.
[hotfix] Consolidate RocksDB configuration options in RocksDBOptions
Rename from backend.rocksdb.priority_queue_state_type into state.backend.rocksdb.timer-service.impl
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a4b4cb70
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a4b4cb70
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a4b4cb70
Branch: refs/heads/master
Commit: a4b4cb70a5a26acd31294041c8a15479e69ef3bc
Parents: a88d6ef
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jul 16 09:29:37 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jul 16 22:11:57 2018 +0200
----------------------------------------------------------------------
.../generated/checkpointing_configuration.html | 5 ---
.../generated/rocks_db_configuration.html | 21 +++++++++
.../configuration/CheckpointingOptions.java | 11 -----
flink-docs/pom.xml | 5 +++
.../ConfigOptionsDocGenerator.java | 3 +-
.../streaming/state/RockDBBackendOptions.java | 38 ----------------
.../contrib/streaming/state/RocksDBOptions.java | 47 ++++++++++++++++++++
.../streaming/state/RocksDBStateBackend.java | 6 +--
.../state/RocksDBStateBackendFactoryTest.java | 6 +--
9 files changed, 81 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a4b4cb70/docs/_includes/generated/checkpointing_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/checkpointing_configuration.html b/docs/_includes/generated/checkpointing_configuration.html
index 7a2791f..8f5ce7b 100644
--- a/docs/_includes/generated/checkpointing_configuration.html
+++ b/docs/_includes/generated/checkpointing_configuration.html
@@ -33,11 +33,6 @@
<td></td>
</tr>
<tr>
- <td><h5>state.backend.rocksdb.localdir</h5></td>
- <td style="word-wrap: break-word;">(none)</td>
- <td>The local directory (on the TaskManager) where RocksDB puts its files.</td>
- </tr>
- <tr>
<td><h5>state.checkpoints.dir</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers).</td>
http://git-wip-us.apache.org/repos/asf/flink/blob/a4b4cb70/docs/_includes/generated/rocks_db_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/rocks_db_configuration.html b/docs/_includes/generated/rocks_db_configuration.html
new file mode 100644
index 0000000..57b9511
--- /dev/null
+++ b/docs/_includes/generated/rocks_db_configuration.html
@@ -0,0 +1,21 @@
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Key</th>
+ <th class="text-left" style="width: 15%">Default</th>
+ <th class="text-left" style="width: 65%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>state.backend.rocksdb.localdir</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>The local directory (on the TaskManager) where RocksDB puts its files.</td>
+ </tr>
+ <tr>
+ <td><h5>state.backend.rocksdb.timer-service.impl</h5></td>
+ <td style="word-wrap: break-word;">"HEAP"</td>
+ <td>This determines the timer service implementation. Options are either HEAP (heap-based, default) or ROCKSDB for an implementation based on RocksDB.</td>
+ </tr>
+ </tbody>
+</table>
http://git-wip-us.apache.org/repos/asf/flink/blob/a4b4cb70/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
index 60b7613..6557a9f 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
@@ -115,15 +115,4 @@ public class CheckpointingOptions {
.defaultValue(1024)
.withDescription("The minimum size of state data files. All state chunks smaller than that are stored" +
" inline in the root checkpoint metadata file.");
-
- // ------------------------------------------------------------------------
- // Options specific to the RocksDB state backend
- // ------------------------------------------------------------------------
-
- /** The local directory (on the TaskManager) where RocksDB puts its files. */
- public static final ConfigOption<String> ROCKSDB_LOCAL_DIRECTORIES = ConfigOptions
- .key("state.backend.rocksdb.localdir")
- .noDefaultValue()
- .withDeprecatedKeys("state.backend.rocksdb.checkpointdir")
- .withDescription("The local directory (on the TaskManager) where RocksDB puts its files.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a4b4cb70/flink-docs/pom.xml
----------------------------------------------------------------------
diff --git a/flink-docs/pom.xml b/flink-docs/pom.xml
index 2135dea..b7709f5 100644
--- a/flink-docs/pom.xml
+++ b/flink-docs/pom.xml
@@ -83,6 +83,11 @@ under the License.
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-jackson-module-jsonSchema</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.slf4j</groupId>
http://git-wip-us.apache.org/repos/asf/flink/blob/a4b4cb70/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
----------------------------------------------------------------------
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
index 953122f..d333719 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
@@ -59,7 +59,8 @@ public class ConfigOptionsDocGenerator {
new OptionsClassLocation("flink-yarn", "org.apache.flink.yarn.configuration"),
new OptionsClassLocation("flink-mesos", "org.apache.flink.mesos.configuration"),
new OptionsClassLocation("flink-mesos", "org.apache.flink.mesos.runtime.clusterframework"),
- new OptionsClassLocation("flink-metrics/flink-metrics-prometheus", "org.apache.flink.metrics.prometheus")
+ new OptionsClassLocation("flink-metrics/flink-metrics-prometheus", "org.apache.flink.metrics.prometheus"),
+ new OptionsClassLocation("flink-state-backends/flink-statebackend-rocksdb", "org.apache.flink.contrib.streaming.state")
};
static final Set<String> EXCLUSIONS = new HashSet<>(Arrays.asList(
http://git-wip-us.apache.org/repos/asf/flink/blob/a4b4cb70/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RockDBBackendOptions.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RockDBBackendOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RockDBBackendOptions.java
deleted file mode 100644
index ede45e3..0000000
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RockDBBackendOptions.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-
-/**
- * Configuration options for the RocksDB backend.
- */
-public class RockDBBackendOptions {
-
- /**
- * Choice of implementation for priority queue state (e.g. timers).
- */
- public static final ConfigOption<String> PRIORITY_QUEUE_STATE_TYPE = ConfigOptions
- .key("backend.rocksdb.priority_queue_state_type")
- .defaultValue(RocksDBStateBackend.PriorityQueueStateType.HEAP.name())
- .withDescription("This determines the implementation for the priority queue state (e.g. timers). Options are" +
- "either " + RocksDBStateBackend.PriorityQueueStateType.HEAP.name() + " (heap-based, default) or " +
- RocksDBStateBackend.PriorityQueueStateType.ROCKS.name() + " for in implementation based on RocksDB.");
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a4b4cb70/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
new file mode 100644
index 0000000..37eb6cf
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+import static org.apache.flink.contrib.streaming.state.RocksDBStateBackend.PriorityQueueStateType.HEAP;
+import static org.apache.flink.contrib.streaming.state.RocksDBStateBackend.PriorityQueueStateType.ROCKS;
+
+/**
+ * Configuration options for the RocksDB backend.
+ */
+public class RocksDBOptions {
+
+ /** The local directory (on the TaskManager) where RocksDB puts its files. */
+ public static final ConfigOption<String> LOCAL_DIRECTORIES = ConfigOptions
+ .key("state.backend.rocksdb.localdir")
+ .noDefaultValue()
+ .withDeprecatedKeys("state.backend.rocksdb.checkpointdir")
+ .withDescription("The local directory (on the TaskManager) where RocksDB puts its files.");
+
+ /**
+ * Choice of timer service implementation.
+ */
+ public static final ConfigOption<String> TIMER_SERVICE_IMPL = ConfigOptions
+ .key("state.backend.rocksdb.timer-service.impl")
+ .defaultValue(HEAP.name())
+ .withDescription(String.format("This determines the timer service implementation. Options are either %s " +
+ "(heap-based, default) or %s for an implementation based on RocksDB.", HEAP.name(), ROCKS.name()));
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a4b4cb70/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 1794e17..58e8de6 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -60,7 +60,7 @@ import java.util.List;
import java.util.Random;
import java.util.UUID;
-import static org.apache.flink.contrib.streaming.state.RockDBBackendOptions.PRIORITY_QUEUE_STATE_TYPE;
+import static org.apache.flink.contrib.streaming.state.RocksDBOptions.TIMER_SERVICE_IMPL;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -271,7 +271,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
this.enableIncrementalCheckpointing = original.enableIncrementalCheckpointing.resolveUndefined(
config.getBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS));
- final String priorityQueueTypeString = config.getString(PRIORITY_QUEUE_STATE_TYPE.key(), "");
+ final String priorityQueueTypeString = config.getString(TIMER_SERVICE_IMPL.key(), "");
this.priorityQueueStateType = priorityQueueTypeString.length() > 0 ?
PriorityQueueStateType.valueOf(priorityQueueTypeString.toUpperCase()) : original.priorityQueueStateType;
@@ -281,7 +281,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
this.localRocksDbDirectories = original.localRocksDbDirectories;
}
else {
- final String rocksdbLocalPaths = config.getString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES);
+ final String rocksdbLocalPaths = config.getString(RocksDBOptions.LOCAL_DIRECTORIES);
if (rocksdbLocalPaths != null) {
String[] directories = rocksdbLocalPaths.split(",|" + File.pathSeparator);
http://git-wip-us.apache.org/repos/asf/flink/blob/a4b4cb70/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
index 7612c4c..9e16dea 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
@@ -82,14 +82,14 @@ public class RocksDBStateBackendFactoryTest {
config1.setString(backendKey, "rocksdb");
config1.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
config1.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
- config1.setString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES, localDirs);
+ config1.setString(RocksDBOptions.LOCAL_DIRECTORIES, localDirs);
config1.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, incremental);
final Configuration config2 = new Configuration();
config2.setString(backendKey, RocksDBStateBackendFactory.class.getName());
config2.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
config2.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
- config2.setString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES, localDirs);
+ config2.setString(RocksDBOptions.LOCAL_DIRECTORIES, localDirs);
config2.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, incremental);
StateBackend backend1 = StateBackendLoader.loadStateBackendFromConfig(config1, cl, null);
@@ -143,7 +143,7 @@ public class RocksDBStateBackendFactoryTest {
config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); // this should not be picked up
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
config.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, !incremental); // this should not be picked up
- config.setString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES, localDir3 + ":" + localDir4); // this should not be picked up
+ config.setString(RocksDBOptions.LOCAL_DIRECTORIES, localDir3 + ":" + localDir4); // this should not be picked up
final StateBackend loadedBackend =
StateBackendLoader.fromApplicationOrConfigOrDefault(backend, config, cl, null);
[7/8] flink git commit: [hotfix] Use default value of
RocksDBOptions#TIMER_SERVICE_IMPL
Posted by tr...@apache.org.
[hotfix] Use default value of RocksDBOptions#TIMER_SERVICE_IMPL
In order to fully enable the RocksDBOptions#TIMER_SERVICE_IMPL we should use the default value
of this config option instead of "".
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6bba0e77
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6bba0e77
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6bba0e77
Branch: refs/heads/master
Commit: 6bba0e77ed0d98b8de24f457605b82c69980a0d4
Parents: 8db5ca6
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jul 16 15:07:31 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jul 16 22:11:57 2018 +0200
----------------------------------------------------------------------
.../apache/flink/contrib/streaming/state/RocksDBStateBackend.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6bba0e77/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index c6b50cd..0564849 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -271,7 +271,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
this.enableIncrementalCheckpointing = original.enableIncrementalCheckpointing.resolveUndefined(
config.getBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS));
- final String priorityQueueTypeString = config.getString(TIMER_SERVICE_IMPL.key(), "");
+ final String priorityQueueTypeString = config.getString(TIMER_SERVICE_IMPL);
this.priorityQueueStateType = priorityQueueTypeString.length() > 0 ?
PriorityQueueStateType.valueOf(priorityQueueTypeString.toUpperCase()) : original.priorityQueueStateType;
[5/8] flink git commit: [FLINK-9489] Checkpoint timers as part of
managed keyed state instead of raw keyed state
Posted by tr...@apache.org.
[FLINK-9489] Checkpoint timers as part of managed keyed state instead of raw keyed state
Optimization for relaxed bulk polls
Deactivate optimization for now because it still contains a bug
This closes #6333.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dbddf00b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dbddf00b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dbddf00b
Branch: refs/heads/master
Commit: dbddf00b75032c20df6e7aef26814da392347194
Parents: 0bbc91e
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed Jun 13 11:56:16 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jul 16 22:11:57 2018 +0200
----------------------------------------------------------------------
.../state/AbstractKeyedStateBackend.java | 5 +
.../state/BackendWritableBroadcastState.java | 4 +-
.../state/DefaultOperatorStateBackend.java | 32 +--
.../flink/runtime/state/HeapBroadcastState.java | 10 +-
.../runtime/state/KeyExtractorFunction.java | 13 ++
.../runtime/state/KeyGroupPartitioner.java | 72 +++++-
.../org/apache/flink/runtime/state/Keyed.java | 32 +++
.../flink/runtime/state/PriorityComparable.java | 33 +++
.../flink/runtime/state/PriorityComparator.java | 7 +
.../runtime/state/PriorityQueueSetFactory.java | 6 +-
...RegisteredBroadcastBackendStateMetaInfo.java | 155 -------------
...RegisteredBroadcastStateBackendMetaInfo.java | 169 ++++++++++++++
.../RegisteredKeyValueStateBackendMetaInfo.java | 224 +++++++++++++++++++
.../RegisteredKeyedBackendStateMetaInfo.java | 212 ------------------
.../RegisteredOperatorBackendStateMetaInfo.java | 142 ------------
.../RegisteredOperatorStateBackendMetaInfo.java | 153 +++++++++++++
...steredPriorityQueueStateBackendMetaInfo.java | 87 +++++++
.../state/RegisteredStateMetaInfoBase.java | 17 ++
.../flink/runtime/state/StateSnapshot.java | 27 ++-
.../state/StateSnapshotKeyGroupReader.java | 44 ++++
.../runtime/state/StateSnapshotRestore.java | 47 ++++
.../state/TieBreakingPriorityComparator.java | 5 -
.../heap/CachingInternalPriorityQueueSet.java | 46 ++++
.../state/heap/CopyOnWriteStateTable.java | 13 +-
.../heap/CopyOnWriteStateTableSnapshot.java | 12 +-
.../state/heap/HeapKeyedStateBackend.java | 163 +++++++++-----
.../runtime/state/heap/HeapPriorityQueue.java | 3 +-
.../state/heap/HeapPriorityQueueSetFactory.java | 18 +-
...HeapPriorityQueueSnapshotRestoreWrapper.java | 102 +++++++++
.../heap/HeapPriorityQueueStateSnapshot.java | 118 ++++++++++
.../heap/KeyGroupPartitionedPriorityQueue.java | 18 ++
.../state/heap/NestedMapsStateTable.java | 38 ++--
.../flink/runtime/state/heap/StateTable.java | 25 ++-
.../state/heap/StateTableByKeyGroupReader.java | 38 ----
.../state/heap/StateTableByKeyGroupReaders.java | 89 +++-----
.../state/metainfo/StateMetaInfoSnapshot.java | 5 +-
.../StateMetaInfoSnapshotReadersWriters.java | 38 +++-
.../state/KeyGroupPartitionerTestBase.java | 4 +-
.../runtime/state/SerializationProxiesTest.java | 54 ++---
.../state/StateSnapshotCompressionTest.java | 9 +-
.../state/heap/CopyOnWriteStateTableTest.java | 32 +--
.../StateTableSnapshotCompatibilityTest.java | 17 +-
.../StateMetaInfoSnapshotEnumConstantsTest.java | 4 +-
.../state/ttl/mock/MockKeyedStateBackend.java | 19 +-
.../state/RocksDBAggregatingState.java | 4 +-
.../streaming/state/RocksDBFoldingState.java | 4 +-
.../state/RocksDBKeyedStateBackend.java | 188 +++++++---------
.../streaming/state/RocksDBListState.java | 4 +-
.../streaming/state/RocksDBMapState.java | 4 +-
.../streaming/state/RocksDBOrderedSetStore.java | 1 -
.../streaming/state/RocksDBReducingState.java | 4 +-
.../streaming/state/RocksDBValueState.java | 4 +-
.../state/RocksDBWriteBatchWrapper.java | 4 +
.../api/operators/AbstractStreamOperator.java | 6 +-
.../HeapPriorityQueueStateSnapshot.java | 112 ----------
.../operators/InternalTimeServiceManager.java | 31 ++-
.../streaming/api/operators/InternalTimer.java | 15 +-
.../InternalTimersSnapshotReaderWriters.java | 126 ++++++++++-
.../StreamTaskStateInitializerImpl.java | 3 +-
.../api/operators/TimerHeapInternalTimer.java | 125 +----------
.../api/operators/TimerSerializer.java | 31 ++-
.../operators/HeapInternalTimerServiceTest.java | 4 +-
62 files changed, 1807 insertions(+), 1224 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index c7f1bd9..17d24f7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -315,4 +315,9 @@ public abstract class AbstractKeyedStateBackend<K> implements
@VisibleForTesting
public abstract int numStateEntries();
+ // TODO remove this once heap-based timers are working with RocksDB incremental snapshots!
+ public boolean requiresLegacySynchronousTimerSnapshots() {
+ return false;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendWritableBroadcastState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendWritableBroadcastState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendWritableBroadcastState.java
index 8daf07c..ba3985b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendWritableBroadcastState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendWritableBroadcastState.java
@@ -36,7 +36,7 @@ public interface BackendWritableBroadcastState<K, V> extends BroadcastState<K, V
long write(FSDataOutputStream out) throws IOException;
- void setStateMetaInfo(RegisteredBroadcastBackendStateMetaInfo<K, V> stateMetaInfo);
+ void setStateMetaInfo(RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo);
- RegisteredBroadcastBackendStateMetaInfo<K, V> getStateMetaInfo();
+ RegisteredBroadcastStateBackendMetaInfo<K, V> getStateMetaInfo();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index dc9b75f..f1d0b57 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -209,7 +209,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
if (broadcastState == null) {
broadcastState = new HeapBroadcastState<>(
- new RegisteredBroadcastBackendStateMetaInfo<>(
+ new RegisteredBroadcastStateBackendMetaInfo<>(
name,
OperatorStateHandle.Mode.BROADCAST,
broadcastStateKeySerializer,
@@ -227,7 +227,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
final StateMetaInfoSnapshot metaInfoSnapshot = restoredBroadcastStateMetaInfos.get(name);
@SuppressWarnings("unchecked")
- RegisteredBroadcastBackendStateMetaInfo<K, V> restoredMetaInfo = new RegisteredBroadcastBackendStateMetaInfo<K, V>(metaInfoSnapshot);
+ RegisteredBroadcastStateBackendMetaInfo<K, V> restoredMetaInfo = new RegisteredBroadcastStateBackendMetaInfo<K, V>(metaInfoSnapshot);
// check compatibility to determine if state migration is required
CompatibilityResult<K> keyCompatibility = CompatibilityUtil.resolveCompatibilityResult(
@@ -247,7 +247,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
if (!keyCompatibility.isRequiresMigration() && !valueCompatibility.isRequiresMigration()) {
// new serializer is compatible; use it to replace the old serializer
broadcastState.setStateMetaInfo(
- new RegisteredBroadcastBackendStateMetaInfo<>(
+ new RegisteredBroadcastStateBackendMetaInfo<>(
name,
OperatorStateHandle.Mode.BROADCAST,
broadcastStateKeySerializer,
@@ -510,8 +510,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
// Recreate all PartitionableListStates from the meta info
for (StateMetaInfoSnapshot restoredSnapshot : restoredOperatorMetaInfoSnapshots) {
- final RegisteredOperatorBackendStateMetaInfo<?> restoredMetaInfo =
- new RegisteredOperatorBackendStateMetaInfo<>(restoredSnapshot);
+ final RegisteredOperatorStateBackendMetaInfo<?> restoredMetaInfo =
+ new RegisteredOperatorStateBackendMetaInfo<>(restoredSnapshot);
if (restoredMetaInfo.getPartitionStateSerializer() == null ||
restoredMetaInfo.getPartitionStateSerializer() instanceof UnloadableDummyTypeSerializer) {
@@ -546,8 +546,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
for (StateMetaInfoSnapshot restoredSnapshot : restoredBroadcastMetaInfoSnapshots) {
- final RegisteredBroadcastBackendStateMetaInfo<?, ?> restoredMetaInfo =
- new RegisteredBroadcastBackendStateMetaInfo<>(restoredSnapshot);
+ final RegisteredBroadcastStateBackendMetaInfo<?, ?> restoredMetaInfo =
+ new RegisteredBroadcastStateBackendMetaInfo<>(restoredSnapshot);
if (restoredMetaInfo.getKeySerializer() == null || restoredMetaInfo.getValueSerializer() == null ||
restoredMetaInfo.getKeySerializer() instanceof UnloadableDummyTypeSerializer ||
@@ -613,7 +613,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
/**
* Meta information of the state, including state name, assignment mode, and serializer
*/
- private RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo;
+ private RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo;
/**
* The internal list the holds the elements of the state
@@ -625,12 +625,12 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
*/
private final ArrayListSerializer<S> internalListCopySerializer;
- PartitionableListState(RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo) {
+ PartitionableListState(RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo) {
this(stateMetaInfo, new ArrayList<S>());
}
private PartitionableListState(
- RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo,
+ RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo,
ArrayList<S> internalList) {
this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo);
@@ -643,11 +643,11 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
this(toCopy.stateMetaInfo.deepCopy(), toCopy.internalListCopySerializer.copy(toCopy.internalList));
}
- public void setStateMetaInfo(RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo) {
+ public void setStateMetaInfo(RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo) {
this.stateMetaInfo = stateMetaInfo;
}
- public RegisteredOperatorBackendStateMetaInfo<S> getStateMetaInfo() {
+ public RegisteredOperatorStateBackendMetaInfo<S> getStateMetaInfo() {
return stateMetaInfo;
}
@@ -741,7 +741,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
// no restored state for the state name; simply create new state holder
partitionableListState = new PartitionableListState<>(
- new RegisteredOperatorBackendStateMetaInfo<>(
+ new RegisteredOperatorStateBackendMetaInfo<>(
name,
partitionStateSerializer,
mode));
@@ -757,8 +757,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
mode);
StateMetaInfoSnapshot restoredSnapshot = restoredOperatorStateMetaInfos.get(name);
- RegisteredOperatorBackendStateMetaInfo<S> metaInfo =
- new RegisteredOperatorBackendStateMetaInfo<>(restoredSnapshot);
+ RegisteredOperatorStateBackendMetaInfo<S> metaInfo =
+ new RegisteredOperatorStateBackendMetaInfo<>(restoredSnapshot);
// check compatibility to determine if state migration is required
TypeSerializer<S> newPartitionStateSerializer = partitionStateSerializer.duplicate();
@@ -772,7 +772,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
if (!stateCompatibility.isRequiresMigration()) {
// new serializer is compatible; use it to replace the old serializer
partitionableListState.setStateMetaInfo(
- new RegisteredOperatorBackendStateMetaInfo<>(name, newPartitionStateSerializer, mode));
+ new RegisteredOperatorStateBackendMetaInfo<>(name, newPartitionStateSerializer, mode));
} else {
// TODO state migration currently isn't possible.
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
index 7ebf1ce..a262103 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
@@ -42,7 +42,7 @@ public class HeapBroadcastState<K, V> implements BackendWritableBroadcastState<K
/**
* Meta information of the state, including state name, assignment mode, and serializer.
*/
- private RegisteredBroadcastBackendStateMetaInfo<K, V> stateMetaInfo;
+ private RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo;
/**
* The internal map the holds the elements of the state.
@@ -54,11 +54,11 @@ public class HeapBroadcastState<K, V> implements BackendWritableBroadcastState<K
*/
private final MapSerializer<K, V> internalMapCopySerializer;
- HeapBroadcastState(RegisteredBroadcastBackendStateMetaInfo<K, V> stateMetaInfo) {
+ HeapBroadcastState(RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo) {
this(stateMetaInfo, new HashMap<>());
}
- private HeapBroadcastState(final RegisteredBroadcastBackendStateMetaInfo<K, V> stateMetaInfo, final Map<K, V> internalMap) {
+ private HeapBroadcastState(final RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo, final Map<K, V> internalMap) {
this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo);
this.backingMap = Preconditions.checkNotNull(internalMap);
@@ -70,12 +70,12 @@ public class HeapBroadcastState<K, V> implements BackendWritableBroadcastState<K
}
@Override
- public void setStateMetaInfo(RegisteredBroadcastBackendStateMetaInfo<K, V> stateMetaInfo) {
+ public void setStateMetaInfo(RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo) {
this.stateMetaInfo = stateMetaInfo;
}
@Override
- public RegisteredBroadcastBackendStateMetaInfo<K, V> getStateMetaInfo() {
+ public RegisteredBroadcastStateBackendMetaInfo<K, V> getStateMetaInfo() {
return stateMetaInfo;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java
index a3ce11c..79fafc5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java
@@ -28,9 +28,22 @@ import javax.annotation.Nonnull;
@FunctionalInterface
public interface KeyExtractorFunction<T> {
+ KeyExtractorFunction<? extends Keyed<?>> FOR_KEYED_OBJECTS = new KeyExtractorFunction<Keyed<?>>() {
+ @Nonnull
+ @Override
+ public Object extractKeyFromElement(@Nonnull Keyed<?> element) {
+ return element.getKey();
+ }
+ };
+
/**
* Returns the key for the given element by which the key-group can be computed.
*/
@Nonnull
Object extractKeyFromElement(@Nonnull T element);
+
+ @SuppressWarnings("unchecked")
+ static <T extends Keyed<?>> KeyExtractorFunction<T> forKeyedObjects() {
+ return (KeyExtractorFunction<T>) FOR_KEYED_OBJECTS;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java
index 6a9dfb5..27d411c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state;
+import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.Preconditions;
@@ -28,7 +29,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
/**
- * Abstract class that contains the base algorithm for partitioning data into key-groups. This algorithm currently works
+ * Class that contains the base algorithm for partitioning data into key-groups. This algorithm currently works
* with two array (input, output) for optimal algorithmic complexity. Notice that this could also be implemented over a
* single array, using some cuckoo-hashing-style element replacement. This would have worse algorithmic complexity but
* better space efficiency. We currently prefer the trade-off in favor of better algorithmic complexity.
@@ -89,7 +90,7 @@ public class KeyGroupPartitioner<T> {
/** Cached result. */
@Nullable
- protected StateSnapshot.KeyGroupPartitionedSnapshot computedResult;
+ protected StateSnapshot.StateKeyGroupWriter computedResult;
/**
* Creates a new {@link KeyGroupPartitioner}.
@@ -131,7 +132,7 @@ public class KeyGroupPartitioner<T> {
/**
* Partitions the data into key-groups and returns the result via {@link PartitioningResult}.
*/
- public StateSnapshot.KeyGroupPartitionedSnapshot partitionByKeyGroup() {
+ public StateSnapshot.StateKeyGroupWriter partitionByKeyGroup() {
if (computedResult == null) {
reportAllElementKeyGroups();
buildHistogramByAccumulatingCounts();
@@ -198,7 +199,7 @@ public class KeyGroupPartitioner<T> {
* This represents the result of key-group partitioning. The data in {@link #partitionedElements} is partitioned
* w.r.t. {@link KeyGroupPartitioner#keyGroupRange}.
*/
- public static class PartitioningResult<T> implements StateSnapshot.KeyGroupPartitionedSnapshot {
+ private static class PartitioningResult<T> implements StateSnapshot.StateKeyGroupWriter {
/**
* Function to write one element to a {@link DataOutputView}.
@@ -249,7 +250,7 @@ public class KeyGroupPartitioner<T> {
}
@Override
- public void writeMappingsInKeyGroup(@Nonnull DataOutputView dov, int keyGroupId) throws IOException {
+ public void writeStateInKeyGroup(@Nonnull DataOutputView dov, int keyGroupId) throws IOException {
int startOffset = getKeyGroupStartOffsetInclusive(keyGroupId);
int endOffset = getKeyGroupEndOffsetExclusive(keyGroupId);
@@ -264,6 +265,43 @@ public class KeyGroupPartitioner<T> {
}
}
+ public static <T> StateSnapshotKeyGroupReader createKeyGroupPartitionReader(
+ @Nonnull ElementReaderFunction<T> readerFunction,
+ @Nonnull KeyGroupElementsConsumer<T> elementConsumer) {
+ return new PartitioningResultKeyGroupReader<>(readerFunction, elementConsumer);
+ }
+
+ /**
+ * General algorithm to read key-grouped state that was written from a {@link PartitioningResult}
+ *
+ * @param <T> type of the elements to read.
+ */
+ private static class PartitioningResultKeyGroupReader<T> implements StateSnapshotKeyGroupReader {
+
+ @Nonnull
+ private final ElementReaderFunction<T> readerFunction;
+
+ @Nonnull
+ private final KeyGroupElementsConsumer<T> elementConsumer;
+
+ public PartitioningResultKeyGroupReader(
+ @Nonnull ElementReaderFunction<T> readerFunction,
+ @Nonnull KeyGroupElementsConsumer<T> elementConsumer) {
+
+ this.readerFunction = readerFunction;
+ this.elementConsumer = elementConsumer;
+ }
+
+ @Override
+ public void readMappingsInKeyGroup(@Nonnull DataInputView in, @Nonnegative int keyGroupId) throws IOException {
+ int numElements = in.readInt();
+ for (int i = 0; i < numElements; i++) {
+ T element = readerFunction.readElement(in);
+ elementConsumer.consume(element, keyGroupId);
+ }
+ }
+ }
+
/**
* This functional interface defines how one element is written to a {@link DataOutputView}.
*
@@ -281,4 +319,28 @@ public class KeyGroupPartitioner<T> {
*/
void writeElement(@Nonnull T element, @Nonnull DataOutputView dov) throws IOException;
}
+
+ /**
+ * This functional interface defines how one element is read from a {@link DataInputView}.
+ *
+ * @param <T> type of the read elements.
+ */
+ @FunctionalInterface
+ public interface ElementReaderFunction<T> {
+
+ @Nonnull
+ T readElement(@Nonnull DataInputView div) throws IOException;
+ }
+
+ /**
+ * Functional interface to consume elements from a key group.
+ *
+ * @param <T> type of the consumed elements.
+ */
+ @FunctionalInterface
+ public interface KeyGroupElementsConsumer<T> {
+
+
+ void consume(@Nonnull T element, @Nonnegative int keyGroupId) throws IOException;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Keyed.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Keyed.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Keyed.java
new file mode 100644
index 0000000..4320b0b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Keyed.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+/**
+ * Interface for objects that have a key attribute.
+ *
+ * @param <K> type of the key.
+ */
+public interface Keyed<K> {
+
+ /**
+ * Returns the key attribute.
+ */
+ K getKey();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparable.java
new file mode 100644
index 0000000..4d6cce0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparable.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Interface for objects that can be compared by priority.
+ * @param <T> type of the compared objects.
+ */
+public interface PriorityComparable<T> {
+
+ /**
+ * @see PriorityComparator#comparePriority(Object, Object).
+ */
+ int comparePriorityTo(@Nonnull T other);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparator.java
index 2f6f5a7..ec36924 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparator.java
@@ -30,6 +30,8 @@ package org.apache.flink.runtime.state;
@FunctionalInterface
public interface PriorityComparator<T> {
+ PriorityComparator<? extends PriorityComparable<Object>> FOR_PRIORITY_COMPARABLE_OBJECTS = PriorityComparable::comparePriorityTo;
+
/**
* Compares two objects for priority. Returns a negative integer, zero, or a positive integer as the first
* argument has lower, equal to, or higher priority than the second.
@@ -39,4 +41,9 @@ public interface PriorityComparator<T> {
* priority than the second.
*/
int comparePriority(T left, T right);
+
+ @SuppressWarnings("unchecked")
+ static <T extends PriorityComparable<?>> PriorityComparator<T> forPriorityComparableObjects() {
+ return (PriorityComparator<T>) FOR_PRIORITY_COMPARABLE_OBJECTS;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java
index 6f509c0..2245e72 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java
@@ -38,9 +38,7 @@ public interface PriorityQueueSetFactory {
* @return the queue with the specified unique name.
*/
@Nonnull
- <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T> create(
+ <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T> create(
@Nonnull String stateName,
- @Nonnull TypeSerializer<T> byteOrderedElementSerializer,
- @Nonnull PriorityComparator<T> elementPriorityComparator,
- @Nonnull KeyExtractorFunction<T> keyExtractor);
+ @Nonnull TypeSerializer<T> byteOrderedElementSerializer);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java
deleted file mode 100644
index 98a8195..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nonnull;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-
-public class RegisteredBroadcastBackendStateMetaInfo<K, V> extends RegisteredStateMetaInfoBase {
-
- /** The mode how elements in this state are assigned to tasks during restore. */
- private final OperatorStateHandle.Mode assignmentMode;
-
- /** The type serializer for the keys in the map state. */
- private final TypeSerializer<K> keySerializer;
-
- /** The type serializer for the values in the map state. */
- private final TypeSerializer<V> valueSerializer;
-
- public RegisteredBroadcastBackendStateMetaInfo(
- final String name,
- final OperatorStateHandle.Mode assignmentMode,
- final TypeSerializer<K> keySerializer,
- final TypeSerializer<V> valueSerializer) {
-
- super(name);
- Preconditions.checkArgument(assignmentMode != null && assignmentMode == OperatorStateHandle.Mode.BROADCAST);
- this.assignmentMode = assignmentMode;
- this.keySerializer = Preconditions.checkNotNull(keySerializer);
- this.valueSerializer = Preconditions.checkNotNull(valueSerializer);
- }
-
- public RegisteredBroadcastBackendStateMetaInfo(RegisteredBroadcastBackendStateMetaInfo<K, V> copy) {
- this(
- Preconditions.checkNotNull(copy).name,
- copy.assignmentMode,
- copy.keySerializer.duplicate(),
- copy.valueSerializer.duplicate());
- }
-
- @SuppressWarnings("unchecked")
- public RegisteredBroadcastBackendStateMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) {
- this(
- snapshot.getName(),
- OperatorStateHandle.Mode.valueOf(
- snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)),
- (TypeSerializer<K>) snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER),
- (TypeSerializer<V>) snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
- Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.BROADCAST == snapshot.getBackendStateType());
- }
-
- /**
- * Creates a deep copy of the itself.
- */
- public RegisteredBroadcastBackendStateMetaInfo<K, V> deepCopy() {
- return new RegisteredBroadcastBackendStateMetaInfo<>(this);
- }
-
- @Nonnull
- @Override
- public StateMetaInfoSnapshot snapshot() {
- Map<String, String> optionsMap = Collections.singletonMap(
- StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(),
- assignmentMode.toString());
- Map<String, TypeSerializer<?>> serializerMap = new HashMap<>(2);
- Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshotsMap = new HashMap<>(2);
- String keySerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.toString();
- String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
- serializerMap.put(keySerializerKey, keySerializer.duplicate());
- serializerConfigSnapshotsMap.put(keySerializerKey, keySerializer.snapshotConfiguration());
- serializerMap.put(valueSerializerKey, valueSerializer.duplicate());
- serializerConfigSnapshotsMap.put(valueSerializerKey, valueSerializer.snapshotConfiguration());
-
- return new StateMetaInfoSnapshot(
- name,
- StateMetaInfoSnapshot.BackendStateType.BROADCAST,
- optionsMap,
- serializerConfigSnapshotsMap,
- serializerMap);
- }
-
- public TypeSerializer<K> getKeySerializer() {
- return keySerializer;
- }
-
- public TypeSerializer<V> getValueSerializer() {
- return valueSerializer;
- }
-
- public OperatorStateHandle.Mode getAssignmentMode() {
- return assignmentMode;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
-
- if (!(obj instanceof RegisteredBroadcastBackendStateMetaInfo)) {
- return false;
- }
-
- final RegisteredBroadcastBackendStateMetaInfo other =
- (RegisteredBroadcastBackendStateMetaInfo) obj;
-
- return Objects.equals(name, other.getName())
- && Objects.equals(assignmentMode, other.getAssignmentMode())
- && Objects.equals(keySerializer, other.getKeySerializer())
- && Objects.equals(valueSerializer, other.getValueSerializer());
- }
-
- @Override
- public int hashCode() {
- int result = name.hashCode();
- result = 31 * result + assignmentMode.hashCode();
- result = 31 * result + keySerializer.hashCode();
- result = 31 * result + valueSerializer.hashCode();
- return result;
- }
-
- @Override
- public String toString() {
- return "RegisteredBroadcastBackendStateMetaInfo{" +
- "name='" + name + '\'' +
- ", keySerializer=" + keySerializer +
- ", valueSerializer=" + valueSerializer +
- ", assignmentMode=" + assignmentMode +
- '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
new file mode 100644
index 0000000..02ab8ef
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+public class RegisteredBroadcastStateBackendMetaInfo<K, V> extends RegisteredStateMetaInfoBase {
+
+ /** The mode how elements in this state are assigned to tasks during restore. */
+ @Nonnull
+ private final OperatorStateHandle.Mode assignmentMode;
+
+ /** The type serializer for the keys in the map state. */
+ @Nonnull
+ private final TypeSerializer<K> keySerializer;
+
+ /** The type serializer for the values in the map state. */
+ @Nonnull
+ private final TypeSerializer<V> valueSerializer;
+
+ public RegisteredBroadcastStateBackendMetaInfo(
+ @Nonnull final String name,
+ @Nonnull final OperatorStateHandle.Mode assignmentMode,
+ @Nonnull final TypeSerializer<K> keySerializer,
+ @Nonnull final TypeSerializer<V> valueSerializer) {
+
+ super(name);
+ Preconditions.checkArgument(assignmentMode == OperatorStateHandle.Mode.BROADCAST);
+ this.assignmentMode = assignmentMode;
+ this.keySerializer = keySerializer;
+ this.valueSerializer = valueSerializer;
+ }
+
+ public RegisteredBroadcastStateBackendMetaInfo(@Nonnull RegisteredBroadcastStateBackendMetaInfo<K, V> copy) {
+ this(
+ Preconditions.checkNotNull(copy).name,
+ copy.assignmentMode,
+ copy.keySerializer.duplicate(),
+ copy.valueSerializer.duplicate());
+ }
+
+ @SuppressWarnings("unchecked")
+ public RegisteredBroadcastStateBackendMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) {
+ this(
+ snapshot.getName(),
+ OperatorStateHandle.Mode.valueOf(
+ snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)),
+ (TypeSerializer<K>) Preconditions.checkNotNull(
+ snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER)),
+ (TypeSerializer<V>) Preconditions.checkNotNull(
+ snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)));
+ Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.BROADCAST == snapshot.getBackendStateType());
+ }
+
+ /**
+ * Creates a deep copy of the itself.
+ */
+ @Nonnull
+ public RegisteredBroadcastStateBackendMetaInfo<K, V> deepCopy() {
+ return new RegisteredBroadcastStateBackendMetaInfo<>(this);
+ }
+
+ @Nonnull
+ @Override
+ public StateMetaInfoSnapshot snapshot() {
+ return computeSnapshot();
+ }
+
+ @Nonnull
+ public TypeSerializer<K> getKeySerializer() {
+ return keySerializer;
+ }
+
+ @Nonnull
+ public TypeSerializer<V> getValueSerializer() {
+ return valueSerializer;
+ }
+
+ @Nonnull
+ public OperatorStateHandle.Mode getAssignmentMode() {
+ return assignmentMode;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+
+ if (!(obj instanceof RegisteredBroadcastStateBackendMetaInfo)) {
+ return false;
+ }
+
+ final RegisteredBroadcastStateBackendMetaInfo other =
+ (RegisteredBroadcastStateBackendMetaInfo) obj;
+
+ return Objects.equals(name, other.getName())
+ && Objects.equals(assignmentMode, other.getAssignmentMode())
+ && Objects.equals(keySerializer, other.getKeySerializer())
+ && Objects.equals(valueSerializer, other.getValueSerializer());
+ }
+
+ @Override
+ public int hashCode() {
+ int result = name.hashCode();
+ result = 31 * result + assignmentMode.hashCode();
+ result = 31 * result + keySerializer.hashCode();
+ result = 31 * result + valueSerializer.hashCode();
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "RegisteredBroadcastBackendStateMetaInfo{" +
+ "name='" + name + '\'' +
+ ", keySerializer=" + keySerializer +
+ ", valueSerializer=" + valueSerializer +
+ ", assignmentMode=" + assignmentMode +
+ '}';
+ }
+
+ @Nonnull
+ private StateMetaInfoSnapshot computeSnapshot() {
+ Map<String, String> optionsMap = Collections.singletonMap(
+ StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(),
+ assignmentMode.toString());
+ Map<String, TypeSerializer<?>> serializerMap = new HashMap<>(2);
+ Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshotsMap = new HashMap<>(2);
+ String keySerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.toString();
+ String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
+ serializerMap.put(keySerializerKey, keySerializer.duplicate());
+ serializerConfigSnapshotsMap.put(keySerializerKey, keySerializer.snapshotConfiguration());
+ serializerMap.put(valueSerializerKey, valueSerializer.duplicate());
+ serializerConfigSnapshotsMap.put(valueSerializerKey, valueSerializer.snapshotConfiguration());
+
+ return new StateMetaInfoSnapshot(
+ name,
+ StateMetaInfoSnapshot.BackendStateType.BROADCAST,
+ optionsMap,
+ serializerConfigSnapshotsMap,
+ serializerMap);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
new file mode 100644
index 0000000..d49a05c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StateMigrationException;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Compound meta information for a registered state in a keyed state backend. This combines all serializers and the
+ * state name.
+ *
+ * @param <N> Type of namespace
+ * @param <S> Type of state value
+ */
+public class RegisteredKeyValueStateBackendMetaInfo<N, S> extends RegisteredStateMetaInfoBase {
+
+ @Nonnull
+ private final StateDescriptor.Type stateType;
+ @Nonnull
+ private final TypeSerializer<N> namespaceSerializer;
+ @Nonnull
+ private final TypeSerializer<S> stateSerializer;
+
+ public RegisteredKeyValueStateBackendMetaInfo(
+ @Nonnull StateDescriptor.Type stateType,
+ @Nonnull String name,
+ @Nonnull TypeSerializer<N> namespaceSerializer,
+ @Nonnull TypeSerializer<S> stateSerializer) {
+
+ super(name);
+ this.stateType = stateType;
+ this.namespaceSerializer = namespaceSerializer;
+ this.stateSerializer = stateSerializer;
+ }
+
+ @SuppressWarnings("unchecked")
+ public RegisteredKeyValueStateBackendMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) {
+ this(
+ StateDescriptor.Type.valueOf(snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)),
+ snapshot.getName(),
+ (TypeSerializer<N>) Preconditions.checkNotNull(
+ snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER)),
+ (TypeSerializer<S>) Preconditions.checkNotNull(
+ snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)));
+ Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.KEY_VALUE == snapshot.getBackendStateType());
+ }
+
+ @Nonnull
+ public StateDescriptor.Type getStateType() {
+ return stateType;
+ }
+
+ @Nonnull
+ public TypeSerializer<N> getNamespaceSerializer() {
+ return namespaceSerializer;
+ }
+
+ @Nonnull
+ public TypeSerializer<S> getStateSerializer() {
+ return stateSerializer;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ RegisteredKeyValueStateBackendMetaInfo<?, ?> that = (RegisteredKeyValueStateBackendMetaInfo<?, ?>) o;
+
+ if (!stateType.equals(that.stateType)) {
+ return false;
+ }
+
+ if (!getName().equals(that.getName())) {
+ return false;
+ }
+
+ return getStateSerializer().equals(that.getStateSerializer())
+ && getNamespaceSerializer().equals(that.getNamespaceSerializer());
+ }
+
+ @Override
+ public String toString() {
+ return "RegisteredKeyedBackendStateMetaInfo{" +
+ "stateType=" + stateType +
+ ", name='" + name + '\'' +
+ ", namespaceSerializer=" + namespaceSerializer +
+ ", stateSerializer=" + stateSerializer +
+ '}';
+ }
+
+ @Override
+ public int hashCode() {
+ int result = getName().hashCode();
+ result = 31 * result + getStateType().hashCode();
+ result = 31 * result + getNamespaceSerializer().hashCode();
+ result = 31 * result + getStateSerializer().hashCode();
+ return result;
+ }
+
+ /**
+ * Checks compatibility of a restored k/v state, with the new {@link StateDescriptor} provided to it.
+ * This checks that the descriptor specifies identical names and state types, as well as
+ * serializers that are compatible for the restored k/v state bytes.
+ */
+ @Nonnull
+ public static <N, S> RegisteredKeyValueStateBackendMetaInfo<N, S> resolveKvStateCompatibility(
+ StateMetaInfoSnapshot restoredStateMetaInfoSnapshot,
+ TypeSerializer<N> newNamespaceSerializer,
+ StateDescriptor<?, S> newStateDescriptor) throws StateMigrationException {
+
+ Preconditions.checkState(
+ Objects.equals(newStateDescriptor.getName(), restoredStateMetaInfoSnapshot.getName()),
+ "Incompatible state names. " +
+ "Was [" + restoredStateMetaInfoSnapshot.getName() + "], " +
+ "registered with [" + newStateDescriptor.getName() + "].");
+
+ final StateDescriptor.Type restoredType =
+ StateDescriptor.Type.valueOf(
+ restoredStateMetaInfoSnapshot.getOption(
+ StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE));
+
+ if (!Objects.equals(newStateDescriptor.getType(), StateDescriptor.Type.UNKNOWN)
+ && !Objects.equals(restoredType, StateDescriptor.Type.UNKNOWN)) {
+
+ Preconditions.checkState(
+ newStateDescriptor.getType() == restoredType,
+ "Incompatible state types. " +
+ "Was [" + restoredType + "], " +
+ "registered with [" + newStateDescriptor.getType() + "].");
+ }
+
+ // check compatibility results to determine if state migration is required
+ CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult(
+ restoredStateMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER),
+ null,
+ restoredStateMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
+ StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER),
+ newNamespaceSerializer);
+
+ TypeSerializer<S> newStateSerializer = newStateDescriptor.getSerializer();
+ CompatibilityResult<S> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
+ restoredStateMetaInfoSnapshot.getTypeSerializer(
+ StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
+ UnloadableDummyTypeSerializer.class,
+ restoredStateMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
+ StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
+ newStateSerializer);
+
+ if (namespaceCompatibility.isRequiresMigration() || stateCompatibility.isRequiresMigration()) {
+ // TODO state migration currently isn't possible.
+ throw new StateMigrationException("State migration isn't supported, yet.");
+ } else {
+ return new RegisteredKeyValueStateBackendMetaInfo<>(
+ newStateDescriptor.getType(),
+ newStateDescriptor.getName(),
+ newNamespaceSerializer,
+ newStateSerializer);
+ }
+ }
+
+ @Nonnull
+ @Override
+ public StateMetaInfoSnapshot snapshot() {
+ return computeSnapshot();
+ }
+
+ @Nonnull
+ private StateMetaInfoSnapshot computeSnapshot() {
+ Map<String, String> optionsMap = Collections.singletonMap(
+ StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE.toString(),
+ stateType.toString());
+ Map<String, TypeSerializer<?>> serializerMap = new HashMap<>(2);
+ Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshotsMap = new HashMap<>(2);
+ String namespaceSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString();
+ String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
+ serializerMap.put(namespaceSerializerKey, namespaceSerializer.duplicate());
+ serializerConfigSnapshotsMap.put(namespaceSerializerKey, namespaceSerializer.snapshotConfiguration());
+ serializerMap.put(valueSerializerKey, stateSerializer.duplicate());
+ serializerConfigSnapshotsMap.put(valueSerializerKey, stateSerializer.snapshotConfiguration());
+
+ return new StateMetaInfoSnapshot(
+ name,
+ StateMetaInfoSnapshot.BackendStateType.KEY_VALUE,
+ optionsMap,
+ serializerConfigSnapshotsMap,
+ serializerMap);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java
deleted file mode 100644
index e9b230a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
-import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.StateMigrationException;
-
-import javax.annotation.Nonnull;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Compound meta information for a registered state in a keyed state backend. This combines all serializers and the
- * state name.
- *
- * @param <N> Type of namespace
- * @param <S> Type of state value
- */
-public class RegisteredKeyedBackendStateMetaInfo<N, S> extends RegisteredStateMetaInfoBase {
-
- private final StateDescriptor.Type stateType;
- private final TypeSerializer<N> namespaceSerializer;
- private final TypeSerializer<S> stateSerializer;
-
- public RegisteredKeyedBackendStateMetaInfo(
- StateDescriptor.Type stateType,
- String name,
- TypeSerializer<N> namespaceSerializer,
- TypeSerializer<S> stateSerializer) {
-
- super(name);
- this.stateType = checkNotNull(stateType);
- this.namespaceSerializer = checkNotNull(namespaceSerializer);
- this.stateSerializer = checkNotNull(stateSerializer);
- }
-
- @SuppressWarnings("unchecked")
- public RegisteredKeyedBackendStateMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) {
- this(
- StateDescriptor.Type.valueOf(snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)),
- snapshot.getName(),
- (TypeSerializer<N>) snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER),
- (TypeSerializer<S>) snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
- Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.KEY_VALUE == snapshot.getBackendStateType());
- }
-
- public StateDescriptor.Type getStateType() {
- return stateType;
- }
-
- public TypeSerializer<N> getNamespaceSerializer() {
- return namespaceSerializer;
- }
-
- public TypeSerializer<S> getStateSerializer() {
- return stateSerializer;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
-
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- RegisteredKeyedBackendStateMetaInfo<?, ?> that = (RegisteredKeyedBackendStateMetaInfo<?, ?>) o;
-
- if (!stateType.equals(that.stateType)) {
- return false;
- }
-
- if (!getName().equals(that.getName())) {
- return false;
- }
-
- return getStateSerializer().equals(that.getStateSerializer())
- && getNamespaceSerializer().equals(that.getNamespaceSerializer());
- }
-
- @Override
- public String toString() {
- return "RegisteredKeyedBackendStateMetaInfo{" +
- "stateType=" + stateType +
- ", name='" + name + '\'' +
- ", namespaceSerializer=" + namespaceSerializer +
- ", stateSerializer=" + stateSerializer +
- '}';
- }
-
- @Override
- public int hashCode() {
- int result = getName().hashCode();
- result = 31 * result + getStateType().hashCode();
- result = 31 * result + getNamespaceSerializer().hashCode();
- result = 31 * result + getStateSerializer().hashCode();
- return result;
- }
-
- /**
- * Checks compatibility of a restored k/v state, with the new {@link StateDescriptor} provided to it.
- * This checks that the descriptor specifies identical names and state types, as well as
- * serializers that are compatible for the restored k/v state bytes.
- */
- public static <N, S> RegisteredKeyedBackendStateMetaInfo<N, S> resolveKvStateCompatibility(
- StateMetaInfoSnapshot restoredStateMetaInfoSnapshot,
- TypeSerializer<N> newNamespaceSerializer,
- StateDescriptor<?, S> newStateDescriptor) throws StateMigrationException {
-
- Preconditions.checkState(
- Objects.equals(newStateDescriptor.getName(), restoredStateMetaInfoSnapshot.getName()),
- "Incompatible state names. " +
- "Was [" + restoredStateMetaInfoSnapshot.getName() + "], " +
- "registered with [" + newStateDescriptor.getName() + "].");
-
- final StateDescriptor.Type restoredType =
- StateDescriptor.Type.valueOf(
- restoredStateMetaInfoSnapshot.getOption(
- StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE));
-
- if (!Objects.equals(newStateDescriptor.getType(), StateDescriptor.Type.UNKNOWN)
- && !Objects.equals(restoredType, StateDescriptor.Type.UNKNOWN)) {
-
- Preconditions.checkState(
- newStateDescriptor.getType() == restoredType,
- "Incompatible state types. " +
- "Was [" + restoredType + "], " +
- "registered with [" + newStateDescriptor.getType() + "].");
- }
-
- // check compatibility results to determine if state migration is required
- CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult(
- restoredStateMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER),
- null,
- restoredStateMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
- StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER),
- newNamespaceSerializer);
-
- TypeSerializer<S> newStateSerializer = newStateDescriptor.getSerializer();
- CompatibilityResult<S> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
- restoredStateMetaInfoSnapshot.getTypeSerializer(
- StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
- UnloadableDummyTypeSerializer.class,
- restoredStateMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
- StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
- newStateSerializer);
-
- if (namespaceCompatibility.isRequiresMigration() || stateCompatibility.isRequiresMigration()) {
- // TODO state migration currently isn't possible.
- throw new StateMigrationException("State migration isn't supported, yet.");
- } else {
- return new RegisteredKeyedBackendStateMetaInfo<>(
- newStateDescriptor.getType(),
- newStateDescriptor.getName(),
- newNamespaceSerializer,
- newStateSerializer);
- }
- }
-
- @Nonnull
- @Override
- public StateMetaInfoSnapshot snapshot() {
- Map<String, String> optionsMap = Collections.singletonMap(
- StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE.toString(),
- stateType.toString());
- Map<String, TypeSerializer<?>> serializerMap = new HashMap<>(2);
- Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshotsMap = new HashMap<>(2);
- String namespaceSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString();
- String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
- serializerMap.put(namespaceSerializerKey, namespaceSerializer.duplicate());
- serializerConfigSnapshotsMap.put(namespaceSerializerKey, namespaceSerializer.snapshotConfiguration());
- serializerMap.put(valueSerializerKey, stateSerializer.duplicate());
- serializerConfigSnapshotsMap.put(valueSerializerKey, stateSerializer.snapshotConfiguration());
-
- return new StateMetaInfoSnapshot(
- name,
- StateMetaInfoSnapshot.BackendStateType.KEY_VALUE,
- optionsMap,
- serializerConfigSnapshotsMap,
- serializerMap);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java
deleted file mode 100644
index f314add..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nonnull;
-
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * Compound meta information for a registered state in an operator state backend.
- * This contains the state name, assignment mode, and state partition serializer.
- *
- * @param <S> Type of the state.
- */
-public class RegisteredOperatorBackendStateMetaInfo<S> extends RegisteredStateMetaInfoBase {
-
- /**
- * The mode how elements in this state are assigned to tasks during restore
- */
- private final OperatorStateHandle.Mode assignmentMode;
-
- /**
- * The type serializer for the elements in the state list
- */
- private final TypeSerializer<S> partitionStateSerializer;
-
- public RegisteredOperatorBackendStateMetaInfo(
- String name,
- TypeSerializer<S> partitionStateSerializer,
- OperatorStateHandle.Mode assignmentMode) {
- super(Preconditions.checkNotNull(name));
- this.partitionStateSerializer = Preconditions.checkNotNull(partitionStateSerializer);
- this.assignmentMode = Preconditions.checkNotNull(assignmentMode);
- }
-
- private RegisteredOperatorBackendStateMetaInfo(RegisteredOperatorBackendStateMetaInfo<S> copy) {
- this(
- Preconditions.checkNotNull(copy).name,
- copy.partitionStateSerializer.duplicate(),
- copy.assignmentMode);
- }
-
- @SuppressWarnings("unchecked")
- public RegisteredOperatorBackendStateMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) {
- this(
- snapshot.getName(),
- (TypeSerializer<S>) snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
- OperatorStateHandle.Mode.valueOf(
- snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)));
- Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.OPERATOR == snapshot.getBackendStateType());
- }
-
- /**
- * Creates a deep copy of the itself.
- */
- public RegisteredOperatorBackendStateMetaInfo<S> deepCopy() {
- return new RegisteredOperatorBackendStateMetaInfo<>(this);
- }
-
- @Nonnull
- @Override
- public StateMetaInfoSnapshot snapshot() {
- Map<String, String> optionsMap = Collections.singletonMap(
- StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(),
- assignmentMode.toString());
- String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
- Map<String, TypeSerializer<?>> serializerMap =
- Collections.singletonMap(valueSerializerKey, partitionStateSerializer.duplicate());
- Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshotsMap =
- Collections.singletonMap(valueSerializerKey, partitionStateSerializer.snapshotConfiguration());
-
- return new StateMetaInfoSnapshot(
- name,
- StateMetaInfoSnapshot.BackendStateType.OPERATOR,
- optionsMap,
- serializerConfigSnapshotsMap,
- serializerMap);
- }
-
- public OperatorStateHandle.Mode getAssignmentMode() {
- return assignmentMode;
- }
-
- public TypeSerializer<S> getPartitionStateSerializer() {
- return partitionStateSerializer;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
-
- if (obj == null) {
- return false;
- }
-
- return (obj instanceof RegisteredOperatorBackendStateMetaInfo)
- && name.equals(((RegisteredOperatorBackendStateMetaInfo) obj).getName())
- && assignmentMode.equals(((RegisteredOperatorBackendStateMetaInfo) obj).getAssignmentMode())
- && partitionStateSerializer.equals(((RegisteredOperatorBackendStateMetaInfo) obj).getPartitionStateSerializer());
- }
-
- @Override
- public int hashCode() {
- int result = getName().hashCode();
- result = 31 * result + getAssignmentMode().hashCode();
- result = 31 * result + getPartitionStateSerializer().hashCode();
- return result;
- }
-
- @Override
- public String toString() {
- return "RegisteredOperatorBackendStateMetaInfo{" +
- "name='" + name + "\'" +
- ", assignmentMode=" + assignmentMode +
- ", partitionStateSerializer=" + partitionStateSerializer +
- '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
new file mode 100644
index 0000000..b65671e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Compound meta information for a registered state in an operator state backend.
+ * This contains the state name, assignment mode, and state partition serializer.
+ *
+ * @param <S> Type of the state.
+ */
+public class RegisteredOperatorStateBackendMetaInfo<S> extends RegisteredStateMetaInfoBase {
+
+ /**
+ * The mode how elements in this state are assigned to tasks during restore
+ */
+ @Nonnull
+ private final OperatorStateHandle.Mode assignmentMode;
+
+ /**
+ * The type serializer for the elements in the state list
+ */
+ @Nonnull
+ private final TypeSerializer<S> partitionStateSerializer;
+
+ public RegisteredOperatorStateBackendMetaInfo(
+ @Nonnull String name,
+ @Nonnull TypeSerializer<S> partitionStateSerializer,
+ @Nonnull OperatorStateHandle.Mode assignmentMode) {
+ super(name);
+ this.partitionStateSerializer = partitionStateSerializer;
+ this.assignmentMode = assignmentMode;
+ }
+
+ private RegisteredOperatorStateBackendMetaInfo(@Nonnull RegisteredOperatorStateBackendMetaInfo<S> copy) {
+ this(
+ Preconditions.checkNotNull(copy).name,
+ copy.partitionStateSerializer.duplicate(),
+ copy.assignmentMode);
+ }
+
+ @SuppressWarnings("unchecked")
+ public RegisteredOperatorStateBackendMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) {
+ this(
+ snapshot.getName(),
+ (TypeSerializer<S>) Preconditions.checkNotNull(
+ snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)),
+ OperatorStateHandle.Mode.valueOf(
+ snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)));
+ Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.OPERATOR == snapshot.getBackendStateType());
+ }
+
+ /**
+ * Creates a deep copy of the itself.
+ */
+ @Nonnull
+ public RegisteredOperatorStateBackendMetaInfo<S> deepCopy() {
+ return new RegisteredOperatorStateBackendMetaInfo<>(this);
+ }
+
+ @Nonnull
+ @Override
+ public StateMetaInfoSnapshot snapshot() {
+ return computeSnapshot();
+ }
+
+ @Nonnull
+ public OperatorStateHandle.Mode getAssignmentMode() {
+ return assignmentMode;
+ }
+
+ @Nonnull
+ public TypeSerializer<S> getPartitionStateSerializer() {
+ return partitionStateSerializer;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+
+ if (obj == null) {
+ return false;
+ }
+
+ return (obj instanceof RegisteredOperatorStateBackendMetaInfo)
+ && name.equals(((RegisteredOperatorStateBackendMetaInfo) obj).getName())
+ && assignmentMode.equals(((RegisteredOperatorStateBackendMetaInfo) obj).getAssignmentMode())
+ && partitionStateSerializer.equals(((RegisteredOperatorStateBackendMetaInfo) obj).getPartitionStateSerializer());
+ }
+
+ @Override
+ public int hashCode() {
+ int result = getName().hashCode();
+ result = 31 * result + getAssignmentMode().hashCode();
+ result = 31 * result + getPartitionStateSerializer().hashCode();
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "RegisteredOperatorBackendStateMetaInfo{" +
+ "name='" + name + "\'" +
+ ", assignmentMode=" + assignmentMode +
+ ", partitionStateSerializer=" + partitionStateSerializer +
+ '}';
+ }
+
+ @Nonnull
+ private StateMetaInfoSnapshot computeSnapshot() {
+ Map<String, String> optionsMap = Collections.singletonMap(
+ StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(),
+ assignmentMode.toString());
+ String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
+ Map<String, TypeSerializer<?>> serializerMap =
+ Collections.singletonMap(valueSerializerKey, partitionStateSerializer.duplicate());
+ Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshotsMap =
+ Collections.singletonMap(valueSerializerKey, partitionStateSerializer.snapshotConfiguration());
+
+ return new StateMetaInfoSnapshot(
+ name,
+ StateMetaInfoSnapshot.BackendStateType.OPERATOR,
+ optionsMap,
+ serializerConfigSnapshotsMap,
+ serializerMap);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java
new file mode 100644
index 0000000..9ef23ed
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Meta information about a priority queue state in a backend.
+ */
+public class RegisteredPriorityQueueStateBackendMetaInfo<T> extends RegisteredStateMetaInfoBase {
+
+ @Nonnull
+ private final TypeSerializer<T> elementSerializer;
+
+ public RegisteredPriorityQueueStateBackendMetaInfo(
+ @Nonnull String name,
+ @Nonnull TypeSerializer<T> elementSerializer) {
+
+ super(name);
+ this.elementSerializer = elementSerializer;
+ }
+
+ @SuppressWarnings("unchecked")
+ public RegisteredPriorityQueueStateBackendMetaInfo(StateMetaInfoSnapshot snapshot) {
+ this(snapshot.getName(),
+ (TypeSerializer<T>) Preconditions.checkNotNull(
+ snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)));
+ Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE == snapshot.getBackendStateType());
+ }
+
+ @Nonnull
+ @Override
+ public StateMetaInfoSnapshot snapshot() {
+ return computeSnapshot();
+ }
+
+ @Nonnull
+ public TypeSerializer<T> getElementSerializer() {
+ return elementSerializer;
+ }
+
+ private StateMetaInfoSnapshot computeSnapshot() {
+ Map<String, TypeSerializer<?>> serializerMap =
+ Collections.singletonMap(
+ StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(),
+ elementSerializer.duplicate());
+ Map<String, TypeSerializerConfigSnapshot> serializerSnapshotMap =
+ Collections.singletonMap(
+ StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(),
+ elementSerializer.snapshotConfiguration());
+
+ return new StateMetaInfoSnapshot(
+ name,
+ StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE,
+ Collections.emptyMap(),
+ serializerSnapshotMap,
+ serializerMap);
+ }
+
+ public RegisteredPriorityQueueStateBackendMetaInfo deepCopy() {
+ return new RegisteredPriorityQueueStateBackendMetaInfo<>(name, elementSerializer.duplicate());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredStateMetaInfoBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredStateMetaInfoBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredStateMetaInfoBase.java
index 4132d14..b7dff59 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredStateMetaInfoBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredStateMetaInfoBase.java
@@ -42,4 +42,21 @@ public abstract class RegisteredStateMetaInfoBase {
@Nonnull
public abstract StateMetaInfoSnapshot snapshot();
+
+ public static RegisteredStateMetaInfoBase fromMetaInfoSnapshot(@Nonnull StateMetaInfoSnapshot snapshot) {
+
+ final StateMetaInfoSnapshot.BackendStateType backendStateType = snapshot.getBackendStateType();
+ switch (backendStateType) {
+ case KEY_VALUE:
+ return new RegisteredKeyValueStateBackendMetaInfo<>(snapshot);
+ case OPERATOR:
+ return new RegisteredOperatorStateBackendMetaInfo<>(snapshot);
+ case BROADCAST:
+ return new RegisteredBroadcastStateBackendMetaInfo<>(snapshot);
+ case PRIORITY_QUEUE:
+ return new RegisteredPriorityQueueStateBackendMetaInfo<>(snapshot);
+ default:
+ throw new IllegalArgumentException("Unknown backend state type: " + backendStateType);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshot.java
index 1fcac5c..54180f1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshot.java
@@ -18,7 +18,9 @@
package org.apache.flink.runtime.state;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
@@ -30,38 +32,45 @@ import java.io.IOException;
* All snapshots should be released after usage. This interface outlines the asynchronous snapshot life-cycle, which
* typically looks as follows. In the synchronous part of a checkpoint, an instance of {@link StateSnapshot} is produced
* for a state and captures the state at this point in time. Then, in the asynchronous part of the checkpoint, the user
- * calls {@link #partitionByKeyGroup()} to ensure that the snapshot is partitioned into key-groups. For state that is
- * already partitioned, this can be a NOP. The returned {@link KeyGroupPartitionedSnapshot} can be used by the caller
+ * calls {@link #getKeyGroupWriter()} to ensure that the snapshot is partitioned into key-groups. For state that is
+ * already partitioned, this can be a NOP. The returned {@link StateKeyGroupWriter} can be used by the caller
* to write the state by key-group. As a last step, when the state is completely written, the user calls
* {@link #release()}.
*/
+@Internal
public interface StateSnapshot {
/**
- * This method partitions the snapshot by key-group and then returns a {@link KeyGroupPartitionedSnapshot}.
+ * This method returns {@link StateKeyGroupWriter} and should be called in the asynchronous part of the snapshot.
*/
@Nonnull
- KeyGroupPartitionedSnapshot partitionByKeyGroup();
+ StateKeyGroupWriter getKeyGroupWriter();
+
+ /**
+ * Returns a snapshot of the state's meta data.
+ */
+ @Nonnull
+ StateMetaInfoSnapshot getMetaInfoSnapshot();
/**
* Release the snapshot. All snapshots should be released when they are no longer used because some implementation
- * can only release resources after a release. Produced {@link KeyGroupPartitionedSnapshot} should no longer be used
+ * can only release resources after a release. Produced {@link StateKeyGroupWriter} should no longer be used
* after calling this method.
*/
void release();
/**
- * Interface for writing a snapshot after it is partitioned into key-groups.
+ * Interface for writing a snapshot that is partitioned into key-groups.
*/
- interface KeyGroupPartitionedSnapshot {
+ interface StateKeyGroupWriter {
/**
- * Writes the data for the specified key-group to the output. You must call {@link #partitionByKeyGroup()} once
+ * Writes the data for the specified key-group to the output. You must call {@link #getKeyGroupWriter()} once
* before first calling this method.
*
* @param dov the output.
* @param keyGroupId the key-group to write.
* @throws IOException on write-related problems.
*/
- void writeMappingsInKeyGroup(@Nonnull DataOutputView dov, @Nonnegative int keyGroupId) throws IOException;
+ void writeStateInKeyGroup(@Nonnull DataOutputView dov, @Nonnegative int keyGroupId) throws IOException;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotKeyGroupReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotKeyGroupReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotKeyGroupReader.java
new file mode 100644
index 0000000..b6b91ee
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotKeyGroupReader.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.heap.StateTable;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+
+/**
+ * Interface for state de-serialization into {@link StateTable}s by key-group.
+ */
+@Internal
+public interface StateSnapshotKeyGroupReader {
+
+ /**
+ * Read the data for the specified key-group from the input.
+ *
+ * @param div the input
+ * @param keyGroupId the key-group to write
+ * @throws IOException on write related problems
+ */
+ void readMappingsInKeyGroup(@Nonnull DataInputView div, @Nonnegative int keyGroupId) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotRestore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotRestore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotRestore.java
new file mode 100644
index 0000000..6fe50ca
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotRestore.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Interface to deal with state snapshot and restore of state.
+ * TODO find better name?
+ */
+@Internal
+public interface StateSnapshotRestore {
+
+ /**
+ * Returns a snapshot of the state.
+ */
+ @Nonnull
+ StateSnapshot stateSnapshot();
+
+ /**
+ * This method returns a {@link StateSnapshotKeyGroupReader} that can be used to restore the state on a
+ * per-key-group basis. This method tries to return a reader for the given version hint.
+ *
+ * @param readVersionHint the required version of the state to read.
+ * @return a reader that reads state by key-groups, for the given read version.
+ */
+ @Nonnull
+ StateSnapshotKeyGroupReader keyGroupReader(int readVersionHint);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java
index 4384eb7..94aed45 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java
@@ -85,11 +85,6 @@ public class TieBreakingPriorityComparator<T> implements Comparator<T>, Priority
return ((Comparable<T>) o1).compareTo(o2);
}
- // we catch this case before moving to more expensive tie breaks.
- if (o1.equals(o2)) {
- return 0;
- }
-
// if objects are not equal, their serialized form should somehow differ as well. this can be costly, and...
// TODO we should have an alternative approach in the future, e.g. a cache that does not rely on compare to check equality.
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
index 6dc8cf3..b1ad0df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
@@ -30,6 +30,8 @@ import java.util.Collection;
import java.util.function.Consumer;
import java.util.function.Predicate;
+import static org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.ENABLE_RELAXED_FIRING_ORDER_OPTIMIZATION;
+
/**
* This class is an implementation of a {@link InternalPriorityQueue} with set semantics that internally consists of
* two different storage types. The first storage is a (potentially slow) ordered set store manages the ground truth
@@ -80,6 +82,30 @@ public class CachingInternalPriorityQueueSet<E> implements InternalPriorityQueue
@Override
public void bulkPoll(@Nonnull Predicate<E> canConsume, @Nonnull Consumer<E> consumer) {
+ if (ENABLE_RELAXED_FIRING_ORDER_OPTIMIZATION) {
+ bulkPollRelaxedOrder(canConsume, consumer);
+ } else {
+ bulkPollStrictOrder(canConsume, consumer);
+ }
+ }
+
+ private void bulkPollRelaxedOrder(@Nonnull Predicate<E> canConsume, @Nonnull Consumer<E> consumer) {
+ if (orderedCache.isEmpty()) {
+ bulkPollStore(canConsume, consumer);
+ } else {
+ while (!orderedCache.isEmpty() && canConsume.test(orderedCache.peekFirst())) {
+ final E next = orderedCache.removeFirst();
+ orderedStore.remove(next);
+ consumer.accept(next);
+ }
+
+ if (orderedCache.isEmpty()) {
+ bulkPollStore(canConsume, consumer);
+ }
+ }
+ }
+
+ private void bulkPollStrictOrder(@Nonnull Predicate<E> canConsume, @Nonnull Consumer<E> consumer) {
E element;
while ((element = peek()) != null && canConsume.test(element)) {
poll();
@@ -87,6 +113,26 @@ public class CachingInternalPriorityQueueSet<E> implements InternalPriorityQueue
}
}
+ private void bulkPollStore(@Nonnull Predicate<E> canConsume, @Nonnull Consumer<E> consumer) {
+ try (CloseableIterator<E> iterator = orderedStore.orderedIterator()) {
+ while (iterator.hasNext()) {
+ final E next = iterator.next();
+ if (canConsume.test(next)) {
+ orderedStore.remove(next);
+ consumer.accept(next);
+ } else {
+ orderedCache.add(next);
+ while (iterator.hasNext() && !orderedCache.isFull()) {
+ orderedCache.add(iterator.next());
+ }
+ break;
+ }
+ }
+ } catch (Exception e) {
+ throw new FlinkRuntimeException("Exception while bulk polling store.", e);
+ }
+ }
+
@Nullable
@Override
public E poll() {
[2/8] flink git commit: [hotfix] Rename PriorityQueueStateType.ROCKS
into PriorityQueueStateType.ROCKSDB
Posted by tr...@apache.org.
[hotfix] Rename PriorityQueueStateType.ROCKS into PriorityQueueStateType.ROCKSDB
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8db5ca6b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8db5ca6b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8db5ca6b
Branch: refs/heads/master
Commit: 8db5ca6b0da13f7bd8dab3edee127c50cf26109b
Parents: a4b4cb7
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jul 16 09:31:06 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jul 16 22:11:57 2018 +0200
----------------------------------------------------------------------
.../flink/contrib/streaming/state/RocksDBKeyedStateBackend.java | 2 +-
.../org/apache/flink/contrib/streaming/state/RocksDBOptions.java | 4 ++--
.../flink/contrib/streaming/state/RocksDBStateBackend.java | 2 +-
3 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8db5ca6b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 0697463..72aa4fb 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -324,7 +324,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
case HEAP:
this.priorityQueueFactory = new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);
break;
- case ROCKS:
+ case ROCKSDB:
this.priorityQueueFactory = new RocksDBPriorityQueueSetFactory();
break;
default:
http://git-wip-us.apache.org/repos/asf/flink/blob/8db5ca6b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
index 37eb6cf..18f9ec9 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
@@ -22,7 +22,7 @@ import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import static org.apache.flink.contrib.streaming.state.RocksDBStateBackend.PriorityQueueStateType.HEAP;
-import static org.apache.flink.contrib.streaming.state.RocksDBStateBackend.PriorityQueueStateType.ROCKS;
+import static org.apache.flink.contrib.streaming.state.RocksDBStateBackend.PriorityQueueStateType.ROCKSDB;
/**
* Configuration options for the RocksDB backend.
@@ -43,5 +43,5 @@ public class RocksDBOptions {
.key("state.backend.rocksdb.timer-service.impl")
.defaultValue(HEAP.name())
.withDescription(String.format("This determines the timer service implementation. Options are either %s " +
- "(heap-based, default) or %s for an implementation based on RocksDB.", HEAP.name(), ROCKS.name()));
+ "(heap-based, default) or %s for an implementation based on RocksDB.", HEAP.name(), ROCKSDB.name()));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8db5ca6b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 58e8de6..c6b50cd 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -83,7 +83,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
*/
public enum PriorityQueueStateType {
HEAP,
- ROCKS
+ ROCKSDB
}
private static final long serialVersionUID = 1L;
[6/8] flink git commit: [hotfix] Add EXCLUSIONS set to
ConfigOptionsDocGenerator
Posted by tr...@apache.org.
[hotfix] Add EXCLUSIONS set to ConfigOptionsDocGenerator
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a88d6efc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a88d6efc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a88d6efc
Branch: refs/heads/master
Commit: a88d6efc5d2280367e27c3df46451bd161a4d100
Parents: dbddf00
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jul 16 13:11:14 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jul 16 22:11:57 2018 +0200
----------------------------------------------------------------------
.../ConfigOptionsDocGenerator.java | 23 +++++++++++++++-----
1 file changed, 17 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a88d6efc/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
----------------------------------------------------------------------
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
index 743f49f..953122f 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
@@ -26,8 +26,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.util.function.ThrowingConsumer;
-import static org.apache.flink.docs.util.Utils.escapeCharacters;
-
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
@@ -36,15 +34,20 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import static org.apache.flink.docs.util.Utils.escapeCharacters;
+
/**
* Class used for generating code based documentation of configuration parameters.
*/
@@ -56,9 +59,13 @@ public class ConfigOptionsDocGenerator {
new OptionsClassLocation("flink-yarn", "org.apache.flink.yarn.configuration"),
new OptionsClassLocation("flink-mesos", "org.apache.flink.mesos.configuration"),
new OptionsClassLocation("flink-mesos", "org.apache.flink.mesos.runtime.clusterframework"),
- new OptionsClassLocation("flink-metrics/flink-metrics-prometheus", "org.apache.flink.metrics.prometheus"),
+ new OptionsClassLocation("flink-metrics/flink-metrics-prometheus", "org.apache.flink.metrics.prometheus")
};
+ static final Set<String> EXCLUSIONS = new HashSet<>(Arrays.asList(
+ "org.apache.flink.configuration.ConfigOptions",
+ "org.apache.flink.contrib.streaming.state.PredefinedOptions"));
+
static final String DEFAULT_PATH_PREFIX = "src/main/java";
@VisibleForTesting
@@ -149,9 +156,13 @@ public class ConfigOptionsDocGenerator {
for (Path entry : stream) {
String fileName = entry.getFileName().toString();
Matcher matcher = CLASS_NAME_PATTERN.matcher(fileName);
- if (!fileName.equals("ConfigOptions.java") && matcher.matches()) {
- Class<?> optionsClass = Class.forName(packageName + '.' + matcher.group(CLASS_NAME_GROUP));
- classConsumer.accept(optionsClass);
+ if (matcher.matches()) {
+ final String className = packageName + '.' + matcher.group(CLASS_NAME_GROUP);
+
+ if (!EXCLUSIONS.contains(className)) {
+ Class<?> optionsClass = Class.forName(className);
+ classConsumer.accept(optionsClass);
+ }
}
}
}
[4/8] flink git commit: [FLINK-9489] Checkpoint timers as part of
managed keyed state instead of raw keyed state
Posted by tr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
index 3a348a9..72c70bc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.state.heap;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.Preconditions;
@@ -204,7 +204,7 @@ public class CopyOnWriteStateTable<K, N, S> extends StateTable<K, N, S> implemen
* @param keyContext the key context.
* @param metaInfo the meta information, including the type serializer for state copy-on-write.
*/
- CopyOnWriteStateTable(InternalKeyContext<K> keyContext, RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo) {
+ CopyOnWriteStateTable(InternalKeyContext<K> keyContext, RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo) {
this(keyContext, metaInfo, 1024);
}
@@ -217,7 +217,7 @@ public class CopyOnWriteStateTable<K, N, S> extends StateTable<K, N, S> implemen
* @throws IllegalArgumentException when the capacity is less than zero.
*/
@SuppressWarnings("unchecked")
- private CopyOnWriteStateTable(InternalKeyContext<K> keyContext, RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo, int capacity) {
+ private CopyOnWriteStateTable(InternalKeyContext<K> keyContext, RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo, int capacity) {
super(keyContext, metaInfo);
// initialized tables to EMPTY_TABLE.
@@ -547,12 +547,12 @@ public class CopyOnWriteStateTable<K, N, S> extends StateTable<K, N, S> implemen
}
@Override
- public RegisteredKeyedBackendStateMetaInfo<N, S> getMetaInfo() {
+ public RegisteredKeyValueStateBackendMetaInfo<N, S> getMetaInfo() {
return metaInfo;
}
@Override
- public void setMetaInfo(RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo) {
+ public void setMetaInfo(RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo) {
this.metaInfo = metaInfo;
}
@@ -871,8 +871,9 @@ public class CopyOnWriteStateTable<K, N, S> extends StateTable<K, N, S> implemen
*
* @return a snapshot from this {@link CopyOnWriteStateTable}, for checkpointing.
*/
+ @Nonnull
@Override
- public CopyOnWriteStateTableSnapshot<K, N, S> createSnapshot() {
+ public CopyOnWriteStateTableSnapshot<K, N, S> stateSnapshot() {
return new CopyOnWriteStateTableSnapshot<>(this);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java
index 4c0ab6f..f3f21dd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.KeyGroupPartitioner;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.state.StateSnapshot;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
@@ -91,7 +91,7 @@ public class CopyOnWriteStateTableSnapshot<K, N, S>
* to an output as part of checkpointing.
*/
@Nullable
- private StateSnapshot.KeyGroupPartitionedSnapshot partitionedStateTableSnapshot;
+ private StateKeyGroupWriter partitionedStateTableSnapshot;
/**
* Creates a new {@link CopyOnWriteStateTableSnapshot}.
@@ -135,7 +135,7 @@ public class CopyOnWriteStateTableSnapshot<K, N, S>
@Nonnull
@SuppressWarnings("unchecked")
@Override
- public KeyGroupPartitionedSnapshot partitionByKeyGroup() {
+ public StateKeyGroupWriter getKeyGroupWriter() {
if (partitionedStateTableSnapshot == null) {
@@ -160,6 +160,12 @@ public class CopyOnWriteStateTableSnapshot<K, N, S>
return partitionedStateTableSnapshot;
}
+ @Nonnull
+ @Override
+ public StateMetaInfoSnapshot getMetaInfoSnapshot() {
+ return owningStateTable.metaInfo.snapshot();
+ }
+
@Override
public void release() {
owningStateTable.releaseSnapshot(this);
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 495dfe0..2c6101e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -49,22 +49,25 @@ import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.Keyed;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
-import org.apache.flink.runtime.state.PriorityComparator;
-import org.apache.flink.runtime.state.PriorityQueueSetFactory;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.StateSnapshot;
+import org.apache.flink.runtime.state.StateSnapshotKeyGroupReader;
+import org.apache.flink.runtime.state.StateSnapshotRestore;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
-import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
@@ -108,19 +111,47 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
Tuple2.of(FoldingStateDescriptor.class, (StateFactory) HeapFoldingState::create)
).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+ @SuppressWarnings("unchecked")
@Nonnull
@Override
- public <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T> create(
+ public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T> create(
@Nonnull String stateName,
- @Nonnull TypeSerializer<T> byteOrderedElementSerializer,
- @Nonnull PriorityComparator<T> elementPriorityComparator,
- @Nonnull KeyExtractorFunction<T> keyExtractor) {
+ @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
+
+ final StateSnapshotRestore snapshotRestore = registeredStates.get(stateName);
+
+ if (snapshotRestore instanceof HeapPriorityQueueSnapshotRestoreWrapper) {
+ //TODO Serializer upgrade story!?
+ return ((HeapPriorityQueueSnapshotRestoreWrapper<T>) snapshotRestore).getPriorityQueue();
+ } else if (snapshotRestore != null) {
+ throw new IllegalStateException("Already found a different state type registered under this name: " + snapshotRestore.getClass());
+ }
+
+ final RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo =
+ new RegisteredPriorityQueueStateBackendMetaInfo<>(stateName, byteOrderedElementSerializer);
- return priorityQueueSetFactory.create(
+ return createInternal(metaInfo);
+ }
+
+ @Nonnull
+ private <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T> createInternal(
+ RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo) {
+
+ final String stateName = metaInfo.getName();
+ final HeapPriorityQueueSet<T> priorityQueue = priorityQueueSetFactory.create(
stateName,
- byteOrderedElementSerializer,
- elementPriorityComparator,
- keyExtractor);
+ metaInfo.getElementSerializer());
+
+ HeapPriorityQueueSnapshotRestoreWrapper<T> wrapper =
+ new HeapPriorityQueueSnapshotRestoreWrapper<>(
+ priorityQueue,
+ metaInfo,
+ KeyExtractorFunction.forKeyedObjects(),
+ keyGroupRange,
+ numberOfKeyGroups);
+
+ registeredStates.put(stateName, wrapper);
+ return priorityQueue;
}
private interface StateFactory {
@@ -131,14 +162,9 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
/**
- * Map of state tables that stores all state of key/value states. We store it centrally so
- * that we can easily checkpoint/restore it.
- *
- * <p>The actual parameters of StateTable are {@code StateTable<NamespaceT, Map<KeyT, StateT>>}
- * but we can't put them here because different key/value states with different types and
- * namespace types share this central list of tables.
+ * Map of registered states for snapshot/restore.
*/
- private final Map<String, StateTable<K, ?, ?>> stateTables = new HashMap<>();
+ private final Map<String, StateSnapshotRestore> registeredStates = new HashMap<>();
/**
* Map of state names to their corresponding restored state meta info.
@@ -161,7 +187,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
/**
* Factory for state that is organized as priority queue.
*/
- private final PriorityQueueSetFactory priorityQueueSetFactory;
+ private final HeapPriorityQueueSetFactory priorityQueueSetFactory;
public HeapKeyedStateBackend(
TaskKvStateRegistry kvStateRegistry,
@@ -172,7 +198,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
boolean asynchronousSnapshots,
ExecutionConfig executionConfig,
LocalRecoveryConfig localRecoveryConfig,
- PriorityQueueSetFactory priorityQueueSetFactory,
+ HeapPriorityQueueSetFactory priorityQueueSetFactory,
TtlTimeProvider ttlTimeProvider) {
super(kvStateRegistry, keySerializer, userCodeClassLoader,
@@ -197,9 +223,9 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
TypeSerializer<N> namespaceSerializer, StateDescriptor<?, V> stateDesc) throws StateMigrationException {
@SuppressWarnings("unchecked")
- StateTable<K, N, V> stateTable = (StateTable<K, N, V>) stateTables.get(stateDesc.getName());
+ StateTable<K, N, V> stateTable = (StateTable<K, N, V>) registeredStates.get(stateDesc.getName());
- RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo;
+ RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo;
if (stateTable != null) {
@SuppressWarnings("unchecked")
StateMetaInfoSnapshot restoredMetaInfoSnapshot =
@@ -210,37 +236,43 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
"Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," +
" but its corresponding restored snapshot cannot be found.");
- newMetaInfo = RegisteredKeyedBackendStateMetaInfo.resolveKvStateCompatibility(
+ newMetaInfo = RegisteredKeyValueStateBackendMetaInfo.resolveKvStateCompatibility(
restoredMetaInfoSnapshot,
namespaceSerializer,
stateDesc);
stateTable.setMetaInfo(newMetaInfo);
} else {
- newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
+ newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(
stateDesc.getType(),
stateDesc.getName(),
namespaceSerializer,
stateDesc.getSerializer());
stateTable = snapshotStrategy.newStateTable(newMetaInfo);
- stateTables.put(stateDesc.getName(), stateTable);
+ registeredStates.put(stateDesc.getName(), stateTable);
}
return stateTable;
}
+ @SuppressWarnings("unchecked")
@Override
public <N> Stream<K> getKeys(String state, N namespace) {
- if (!stateTables.containsKey(state)) {
+ if (!registeredStates.containsKey(state)) {
return Stream.empty();
}
- StateTable<K, N, ?> table = (StateTable<K, N, ?>) stateTables.get(state);
+
+ final StateSnapshotRestore stateSnapshotRestore = registeredStates.get(state);
+ if (!(stateSnapshotRestore instanceof StateTable)) {
+ return Stream.empty();
+ }
+ StateTable<K, N, ?> table = (StateTable<K, N, ?>) stateSnapshotRestore;
return table.getKeys(namespace);
}
private boolean hasRegisteredState() {
- return !stateTables.isEmpty();
+ return !registeredStates.isEmpty();
}
@Override
@@ -288,7 +320,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
final Map<Integer, String> kvStatesById = new HashMap<>();
int numRegisteredKvStates = 0;
- stateTables.clear();
+ registeredStates.clear();
boolean keySerializerRestored = false;
@@ -342,16 +374,20 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
for (StateMetaInfoSnapshot restoredMetaInfo : restoredMetaInfos) {
restoredKvStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo);
- StateTable<K, ?, ?> stateTable = stateTables.get(restoredMetaInfo.getName());
+ StateSnapshotRestore snapshotRestore = registeredStates.get(restoredMetaInfo.getName());
//important: only create a new table we did not already create it previously
- if (null == stateTable) {
+ if (null == snapshotRestore) {
- RegisteredKeyedBackendStateMetaInfo<?, ?> registeredKeyedBackendStateMetaInfo =
- new RegisteredKeyedBackendStateMetaInfo<>(restoredMetaInfo);
+ if (restoredMetaInfo.getBackendStateType() == StateMetaInfoSnapshot.BackendStateType.KEY_VALUE) {
+ RegisteredKeyValueStateBackendMetaInfo<?, ?> registeredKeyedBackendStateMetaInfo =
+ new RegisteredKeyValueStateBackendMetaInfo<>(restoredMetaInfo);
- stateTable = snapshotStrategy.newStateTable(registeredKeyedBackendStateMetaInfo);
- stateTables.put(restoredMetaInfo.getName(), stateTable);
+ snapshotRestore = snapshotStrategy.newStateTable(registeredKeyedBackendStateMetaInfo);
+ registeredStates.put(restoredMetaInfo.getName(), snapshotRestore);
+ } else {
+ createInternal(new RegisteredPriorityQueueStateBackendMetaInfo<>(restoredMetaInfo));
+ }
kvStatesById.put(numRegisteredKvStates, restoredMetaInfo.getName());
++numRegisteredKvStates;
} else {
@@ -384,12 +420,10 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
for (int i = 0; i < restoredMetaInfos.size(); i++) {
int kvStateId = kgCompressionInView.readShort();
- StateTable<K, ?, ?> stateTable = stateTables.get(kvStatesById.get(kvStateId));
+ StateSnapshotRestore registeredState = registeredStates.get(kvStatesById.get(kvStateId));
- StateTableByKeyGroupReader keyGroupReader =
- StateTableByKeyGroupReaders.readerForVersion(
- stateTable,
- serializationProxy.getReadVersion());
+ StateSnapshotKeyGroupReader keyGroupReader =
+ registeredState.keyGroupReader(serializationProxy.getReadVersion());
keyGroupReader.readMappingsInKeyGroup(kgCompressionInView, keyGroupIndex);
}
@@ -446,8 +480,10 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
@Override
public int numStateEntries() {
int sum = 0;
- for (StateTable<K, ?, ?> stateTable : stateTables.values()) {
- sum += stateTable.size();
+ for (StateSnapshotRestore state : registeredStates.values()) {
+ if (state instanceof StateTable) {
+ sum += ((StateTable<?, ?, ?>) state).size();
+ }
}
return sum;
}
@@ -458,8 +494,10 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
@VisibleForTesting
public int numStateEntries(Object namespace) {
int sum = 0;
- for (StateTable<K, ?, ?> stateTable : stateTables.values()) {
- sum += stateTable.sizeOfNamespace(namespace);
+ for (StateSnapshotRestore state : registeredStates.values()) {
+ if (state instanceof StateTable) {
+ sum += ((StateTable<?, ?, ?>) state).sizeOfNamespace(namespace);
+ }
}
return sum;
}
@@ -486,7 +524,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
boolean isAsynchronous();
- <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo);
+ <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo);
}
private class AsyncSnapshotStrategySynchronicityBehavior implements SnapshotStrategySynchronicityBehavior<K> {
@@ -503,7 +541,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
@Override
- public <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo) {
+ public <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo) {
return new CopyOnWriteStateTable<>(HeapKeyedStateBackend.this, newMetaInfo);
}
}
@@ -522,7 +560,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
@Override
- public <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo) {
+ public <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo) {
return new NestedMapsStateTable<>(HeapKeyedStateBackend.this, newMetaInfo);
}
}
@@ -554,25 +592,26 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
long syncStartTime = System.currentTimeMillis();
- Preconditions.checkState(stateTables.size() <= Short.MAX_VALUE,
- "Too many KV-States: " + stateTables.size() +
+ Preconditions.checkState(registeredStates.size() <= Short.MAX_VALUE,
+ "Too many KV-States: " + registeredStates.size() +
". Currently at most " + Short.MAX_VALUE + " states are supported");
List<StateMetaInfoSnapshot> metaInfoSnapshots =
- new ArrayList<>(stateTables.size());
+ new ArrayList<>(registeredStates.size());
- final Map<String, Integer> kVStateToId = new HashMap<>(stateTables.size());
+ final Map<String, Integer> kVStateToId = new HashMap<>(registeredStates.size());
final Map<String, StateSnapshot> cowStateStableSnapshots =
- new HashMap<>(stateTables.size());
+ new HashMap<>(registeredStates.size());
- for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
+ for (Map.Entry<String, StateSnapshotRestore> kvState : registeredStates.entrySet()) {
String stateName = kvState.getKey();
kVStateToId.put(stateName, kVStateToId.size());
- StateTable<K, ?, ?> stateTable = kvState.getValue();
- if (null != stateTable) {
- metaInfoSnapshots.add(stateTable.getMetaInfo().snapshot());
- cowStateStableSnapshots.put(stateName, stateTable.createSnapshot());
+ StateSnapshotRestore state = kvState.getValue();
+ if (null != state) {
+ final StateSnapshot stateSnapshot = state.stateSnapshot();
+ metaInfoSnapshots.add(stateSnapshot.getMetaInfoSnapshot());
+ cowStateStableSnapshots.put(stateName, stateSnapshot);
}
}
@@ -654,13 +693,13 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
outView.writeInt(keyGroupId);
for (Map.Entry<String, StateSnapshot> kvState : cowStateStableSnapshots.entrySet()) {
- StateSnapshot.KeyGroupPartitionedSnapshot partitionedSnapshot =
- kvState.getValue().partitionByKeyGroup();
+ StateSnapshot.StateKeyGroupWriter partitionedSnapshot =
+ kvState.getValue().getKeyGroupWriter();
try (OutputStream kgCompressionOut = keyGroupCompressionDecorator.decorateWithCompression(localStream)) {
String stateName = kvState.getKey();
DataOutputViewStreamWrapper kgCompressionView = new DataOutputViewStreamWrapper(kgCompressionOut);
kgCompressionView.writeShort(kVStateToId.get(stateName));
- partitionedSnapshot.writeMappingsInKeyGroup(kgCompressionView, keyGroupId);
+ partitionedSnapshot.writeStateInKeyGroup(kgCompressionView, keyGroupId);
} // this will just close the outer compression stream
}
}
@@ -705,7 +744,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
@Override
- public <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo) {
+ public <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo) {
return snapshotStrategySynchronicityTrait.newStateTable(newMetaInfo);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java
index e5f610e..22b2419 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java
@@ -50,7 +50,8 @@ import static org.apache.flink.util.CollectionUtil.MAX_ARRAY_SIZE;
*
* @param <T> type of the contained elements.
*/
-public class HeapPriorityQueue<T extends HeapPriorityQueueElement> implements InternalPriorityQueue<T> {
+public class HeapPriorityQueue<T extends HeapPriorityQueueElement>
+ implements InternalPriorityQueue<T> {
/**
* The index of the head element in the array that represents the heap.
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java
index ee6fda9..b0255d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java
@@ -21,7 +21,8 @@ package org.apache.flink.runtime.state.heap;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.KeyExtractorFunction;
import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.PriorityComparator;
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
@@ -29,7 +30,7 @@ import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
/**
- *
+ * Factory for {@link HeapPriorityQueueSet}.
*/
public class HeapPriorityQueueSetFactory implements PriorityQueueSetFactory {
@@ -54,14 +55,13 @@ public class HeapPriorityQueueSetFactory implements PriorityQueueSetFactory {
@Nonnull
@Override
- public <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T> create(
+ public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> HeapPriorityQueueSet<T> create(
@Nonnull String stateName,
- @Nonnull TypeSerializer<T> byteOrderedElementSerializer,
- @Nonnull PriorityComparator<T> elementPriorityComparator,
- @Nonnull KeyExtractorFunction<T> keyExtractor) {
- return new HeapPriorityQueueSet<>(
- elementPriorityComparator,
- keyExtractor,
+ @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
+
+ return new HeapPriorityQueueSet<T>(
+ PriorityComparator.forPriorityComparableObjects(),
+ KeyExtractorFunction.forKeyedObjects(),
minimumCapacity,
keyGroupRange,
totalKeyGroups);
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSnapshotRestoreWrapper.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSnapshotRestoreWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSnapshotRestoreWrapper.java
new file mode 100644
index 0000000..5fd67f0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSnapshotRestoreWrapper.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyExtractorFunction;
+import org.apache.flink.runtime.state.KeyGroupPartitioner;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.StateSnapshot;
+import org.apache.flink.runtime.state.StateSnapshotKeyGroupReader;
+import org.apache.flink.runtime.state.StateSnapshotRestore;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+/**
+ * This wrapper combines a HeapPriorityQueue with backend meta data.
+ *
+ * @param <T> type of the queue elements.
+ */
+public class HeapPriorityQueueSnapshotRestoreWrapper<T extends HeapPriorityQueueElement>
+ implements StateSnapshotRestore {
+
+ @Nonnull
+ private final HeapPriorityQueueSet<T> priorityQueue;
+ @Nonnull
+ private final KeyExtractorFunction<T> keyExtractorFunction;
+ @Nonnull
+ private final RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo;
+ @Nonnull
+ private final KeyGroupRange localKeyGroupRange;
+ @Nonnegative
+ private final int totalKeyGroups;
+
+ public HeapPriorityQueueSnapshotRestoreWrapper(
+ @Nonnull HeapPriorityQueueSet<T> priorityQueue,
+ @Nonnull RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo,
+ @Nonnull KeyExtractorFunction<T> keyExtractorFunction,
+ @Nonnull KeyGroupRange localKeyGroupRange,
+ int totalKeyGroups) {
+
+ this.priorityQueue = priorityQueue;
+ this.keyExtractorFunction = keyExtractorFunction;
+ this.metaInfo = metaInfo;
+ this.localKeyGroupRange = localKeyGroupRange;
+ this.totalKeyGroups = totalKeyGroups;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Nonnull
+ @Override
+ public StateSnapshot stateSnapshot() {
+ final T[] queueDump = (T[]) priorityQueue.toArray(new HeapPriorityQueueElement[priorityQueue.size()]);
+
+ final TypeSerializer<T> elementSerializer = metaInfo.getElementSerializer();
+
+ // turn the flat copy into a deep copy if required.
+ if (!elementSerializer.isImmutableType()) {
+ for (int i = 0; i < queueDump.length; ++i) {
+ queueDump[i] = elementSerializer.copy(queueDump[i]);
+ }
+ }
+
+ return new HeapPriorityQueueStateSnapshot<>(
+ queueDump,
+ keyExtractorFunction,
+ metaInfo.deepCopy(),
+ localKeyGroupRange,
+ totalKeyGroups);
+ }
+
+ @Nonnull
+ @Override
+ public StateSnapshotKeyGroupReader keyGroupReader(int readVersionHint) {
+ final TypeSerializer<T> elementSerializer = metaInfo.getElementSerializer();
+ return KeyGroupPartitioner.createKeyGroupPartitionReader(
+ elementSerializer::deserialize, //we know that this does not deliver nulls, because we never write nulls
+ (element, keyGroupId) -> priorityQueue.add(element));
+ }
+
+ @Nonnull
+ public HeapPriorityQueueSet<T> getPriorityQueue() {
+ return priorityQueue;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueStateSnapshot.java
new file mode 100644
index 0000000..18e7d54
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueStateSnapshot.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyExtractorFunction;
+import org.apache.flink.runtime.state.KeyGroupPartitioner;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.StateSnapshot;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Array;
+
+/**
+ * This class represents the snapshot of an {@link HeapPriorityQueueSet}.
+ *
+ * @param <T> type of the state elements.
+ */
+public class HeapPriorityQueueStateSnapshot<T> implements StateSnapshot {
+
+ /** Function that extracts keys from elements. */
+ @Nonnull
+ private final KeyExtractorFunction<T> keyExtractor;
+
+ /** Copy of the heap array containing all the (immutable or deeply copied) elements. */
+ @Nonnull
+ private final T[] heapArrayCopy;
+
+ /** The meta info of the state. */
+ @Nonnull
+ private final RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo;
+
+ /** The key-group range covered by this snapshot. */
+ @Nonnull
+ private final KeyGroupRange keyGroupRange;
+
+ /** The total number of key-groups in the job. */
+ @Nonnegative
+ private final int totalKeyGroups;
+
+ /** Result of partitioning the snapshot by key-group. */
+ @Nullable
+ private StateKeyGroupWriter stateKeyGroupWriter;
+
+ HeapPriorityQueueStateSnapshot(
+ @Nonnull T[] heapArrayCopy,
+ @Nonnull KeyExtractorFunction<T> keyExtractor,
+ @Nonnull RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo,
+ @Nonnull KeyGroupRange keyGroupRange,
+ @Nonnegative int totalKeyGroups) {
+
+ this.keyExtractor = keyExtractor;
+ this.heapArrayCopy = heapArrayCopy;
+ this.metaInfo = metaInfo;
+ this.keyGroupRange = keyGroupRange;
+ this.totalKeyGroups = totalKeyGroups;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Nonnull
+ @Override
+ public StateKeyGroupWriter getKeyGroupWriter() {
+
+ if (stateKeyGroupWriter == null) {
+
+ T[] partitioningOutput = (T[]) Array.newInstance(
+ heapArrayCopy.getClass().getComponentType(),
+ heapArrayCopy.length);
+
+ final TypeSerializer<T> elementSerializer = metaInfo.getElementSerializer();
+
+ KeyGroupPartitioner<T> keyGroupPartitioner =
+ new KeyGroupPartitioner<>(
+ heapArrayCopy,
+ heapArrayCopy.length,
+ partitioningOutput,
+ keyGroupRange,
+ totalKeyGroups,
+ keyExtractor,
+ elementSerializer::serialize);
+
+ stateKeyGroupWriter = keyGroupPartitioner.partitionByKeyGroup();
+ }
+
+ return stateKeyGroupWriter;
+ }
+
+ @Nonnull
+ @Override
+ public StateMetaInfoSnapshot getMetaInfoSnapshot() {
+ return metaInfo.snapshot();
+ }
+
+ @Override
+ public void release() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java
index 6f4f911..d8b0a5a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java
@@ -49,6 +49,8 @@ import java.util.function.Predicate;
public class KeyGroupPartitionedPriorityQueue<T, PQ extends InternalPriorityQueue<T> & HeapPriorityQueueElement>
implements InternalPriorityQueue<T>, KeyGroupedInternalPriorityQueue<T> {
+ static final boolean ENABLE_RELAXED_FIRING_ORDER_OPTIMIZATION = false;
+
/** A heap of heap sets. Each sub-heap represents the partition for a key-group.*/
@Nonnull
private final HeapPriorityQueue<PQ> heapOfkeyGroupedHeaps;
@@ -94,6 +96,22 @@ public class KeyGroupPartitionedPriorityQueue<T, PQ extends InternalPriorityQueu
@Override
public void bulkPoll(@Nonnull Predicate<T> canConsume, @Nonnull Consumer<T> consumer) {
+ if (ENABLE_RELAXED_FIRING_ORDER_OPTIMIZATION) {
+ bulkPollRelaxedOrder(canConsume, consumer);
+ } else {
+ bulkPollStrictOrder(canConsume, consumer);
+ }
+ }
+
+ private void bulkPollRelaxedOrder(@Nonnull Predicate<T> canConsume, @Nonnull Consumer<T> consumer) {
+ PQ headList = heapOfkeyGroupedHeaps.peek();
+ while (headList.peek() != null && canConsume.test(headList.peek())) {
+ headList.bulkPoll(canConsume, consumer);
+ heapOfkeyGroupedHeaps.adjustModifiedElement(headList);
+ }
+ }
+
+ private void bulkPollStrictOrder(@Nonnull Predicate<T> canConsume, @Nonnull Consumer<T> consumer) {
T element;
while ((element = peek()) != null && canConsume.test(element)) {
poll();
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
index 18551b5..efed1cc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
@@ -22,9 +22,10 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.StateSnapshot;
import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nonnull;
@@ -34,6 +35,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import java.util.stream.Stream;
/**
@@ -69,7 +71,7 @@ public class NestedMapsStateTable<K, N, S> extends StateTable<K, N, S> {
* @param keyContext the key context.
* @param metaInfo the meta information for this state table.
*/
- public NestedMapsStateTable(InternalKeyContext<K> keyContext, RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo) {
+ public NestedMapsStateTable(InternalKeyContext<K> keyContext, RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo) {
super(keyContext, metaInfo);
this.keyGroupOffset = keyContext.getKeyGroupRange().getStartKeyGroup();
@@ -175,7 +177,7 @@ public class NestedMapsStateTable<K, N, S> extends StateTable<K, N, S> {
@Override
public Stream<K> getKeys(N namespace) {
return Arrays.stream(state)
- .filter(namespaces -> namespaces != null)
+ .filter(Objects::nonNull)
.map(namespaces -> namespaces.getOrDefault(namespace, Collections.emptyMap()))
.flatMap(namespaceSate -> namespaceSate.keySet().stream());
}
@@ -232,12 +234,7 @@ public class NestedMapsStateTable<K, N, S> extends StateTable<K, N, S> {
setMapForKeyGroup(keyGroupIndex, namespaceMap);
}
- Map<K, S> keyedMap = namespaceMap.get(namespace);
-
- if (keyedMap == null) {
- keyedMap = new HashMap<>();
- namespaceMap.put(namespace, keyedMap);
- }
+ Map<K, S> keyedMap = namespaceMap.computeIfAbsent(namespace, k -> new HashMap<>());
return keyedMap.put(key, value);
}
@@ -302,13 +299,7 @@ public class NestedMapsStateTable<K, N, S> extends StateTable<K, N, S> {
setMapForKeyGroup(keyGroupIndex, namespaceMap);
}
- Map<K, S> keyedMap = namespaceMap.get(namespace);
-
- if (keyedMap == null) {
- keyedMap = new HashMap<>();
- namespaceMap.put(namespace, keyedMap);
- }
-
+ Map<K, S> keyedMap = namespaceMap.computeIfAbsent(namespace, k -> new HashMap<>());
keyedMap.put(key, transformation.apply(keyedMap.get(key), value));
}
@@ -323,8 +314,9 @@ public class NestedMapsStateTable<K, N, S> extends StateTable<K, N, S> {
return count;
}
+ @Nonnull
@Override
- public NestedMapsStateTableSnapshot<K, N, S> createSnapshot() {
+ public NestedMapsStateTableSnapshot<K, N, S> stateSnapshot() {
return new NestedMapsStateTableSnapshot<>(this);
}
@@ -337,7 +329,7 @@ public class NestedMapsStateTable<K, N, S> extends StateTable<K, N, S> {
*/
static class NestedMapsStateTableSnapshot<K, N, S>
extends AbstractStateTableSnapshot<K, N, S, NestedMapsStateTable<K, N, S>>
- implements StateSnapshot.KeyGroupPartitionedSnapshot {
+ implements StateSnapshot.StateKeyGroupWriter {
NestedMapsStateTableSnapshot(NestedMapsStateTable<K, N, S> owningTable) {
super(owningTable);
@@ -345,10 +337,16 @@ public class NestedMapsStateTable<K, N, S> extends StateTable<K, N, S> {
@Nonnull
@Override
- public KeyGroupPartitionedSnapshot partitionByKeyGroup() {
+ public StateKeyGroupWriter getKeyGroupWriter() {
return this;
}
+ @Nonnull
+ @Override
+ public StateMetaInfoSnapshot getMetaInfoSnapshot() {
+ return owningStateTable.metaInfo.snapshot();
+ }
+
/**
* Implementation note: we currently chose the same format between {@link NestedMapsStateTable} and
* {@link CopyOnWriteStateTable}.
@@ -359,7 +357,7 @@ public class NestedMapsStateTable<K, N, S> extends StateTable<K, N, S> {
* implementations).
*/
@Override
- public void writeMappingsInKeyGroup(@Nonnull DataOutputView dov, int keyGroupId) throws IOException {
+ public void writeStateInKeyGroup(@Nonnull DataOutputView dov, int keyGroupId) throws IOException {
final Map<N, Map<K, S>> keyGroupMap = owningStateTable.getMapForKeyGroup(keyGroupId);
if (null != keyGroupMap) {
TypeSerializer<K> keySerializer = owningStateTable.keyContext.getKeySerializer();
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
index de2290a..58a2f97 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
@@ -20,11 +20,14 @@ package org.apache.flink.runtime.state.heap;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
-import org.apache.flink.runtime.state.StateSnapshot;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.StateSnapshotKeyGroupReader;
+import org.apache.flink.runtime.state.StateSnapshotRestore;
import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.util.Preconditions;
+import javax.annotation.Nonnull;
+
import java.util.stream.Stream;
/**
@@ -35,7 +38,7 @@ import java.util.stream.Stream;
* @param <N> type of namespace
* @param <S> type of state
*/
-public abstract class StateTable<K, N, S> {
+public abstract class StateTable<K, N, S> implements StateSnapshotRestore {
/**
* The key context view on the backend. This provides information, such as the currently active key.
@@ -45,14 +48,14 @@ public abstract class StateTable<K, N, S> {
/**
* Combined meta information such as name and serializers for this state
*/
- protected RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo;
+ protected RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo;
/**
*
* @param keyContext the key context provides the key scope for all put/get/delete operations.
* @param metaInfo the meta information, including the type serializer for state copy-on-write.
*/
- public StateTable(InternalKeyContext<K> keyContext, RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo) {
+ public StateTable(InternalKeyContext<K> keyContext, RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo) {
this.keyContext = Preconditions.checkNotNull(keyContext);
this.metaInfo = Preconditions.checkNotNull(metaInfo);
}
@@ -173,22 +176,26 @@ public abstract class StateTable<K, N, S> {
return metaInfo.getNamespaceSerializer();
}
- public RegisteredKeyedBackendStateMetaInfo<N, S> getMetaInfo() {
+ public RegisteredKeyValueStateBackendMetaInfo<N, S> getMetaInfo() {
return metaInfo;
}
- public void setMetaInfo(RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo) {
+ public void setMetaInfo(RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo) {
this.metaInfo = metaInfo;
}
// Snapshot / Restore -------------------------------------------------------------------------
- abstract StateSnapshot createSnapshot();
-
public abstract void put(K key, int keyGroup, N namespace, S state);
// For testing --------------------------------------------------------------------------------
@VisibleForTesting
public abstract int sizeOfNamespace(Object namespace);
+
+ @Nonnull
+ @Override
+ public StateSnapshotKeyGroupReader keyGroupReader(int readVersion) {
+ return StateTableByKeyGroupReaders.readerForVersion(this, readVersion);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReader.java
deleted file mode 100644
index 659c174..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReader.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.heap;
-
-import org.apache.flink.core.memory.DataInputView;
-
-import java.io.IOException;
-
-/**
- * Interface for state de-serialization into {@link StateTable}s by key-group.
- */
-interface StateTableByKeyGroupReader {
-
- /**
- * Read the data for the specified key-group from the input.
- *
- * @param div the input
- * @param keyGroupId the key-group to write
- * @throws IOException on write related problems
- */
- void readMappingsInKeyGroup(DataInputView div, int keyGroupId) throws IOException;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
index e08e90e..2f83857 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
@@ -19,12 +19,18 @@
package org.apache.flink.runtime.state.heap;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.KeyGroupPartitioner;
+import org.apache.flink.runtime.state.StateSnapshotKeyGroupReader;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
import java.io.IOException;
/**
- * This class provides a static factory method to create different implementations of {@link StateTableByKeyGroupReader}
+ * This class provides a static factory method to create different implementations of {@link StateSnapshotKeyGroupReader}
* depending on the provided serialization format version.
* <p>
* The implementations are also located here as inner classes.
@@ -35,69 +41,58 @@ class StateTableByKeyGroupReaders {
* Creates a new StateTableByKeyGroupReader that inserts de-serialized mappings into the given table, using the
* de-serialization algorithm that matches the given version.
*
- * @param table the {@link StateTable} into which de-serialized mappings are inserted.
+ * @param stateTable the {@link StateTable} into which de-serialized mappings are inserted.
* @param version version for the de-serialization algorithm.
* @param <K> type of key.
* @param <N> type of namespace.
* @param <S> type of state.
* @return the appropriate reader.
*/
- static <K, N, S> StateTableByKeyGroupReader readerForVersion(StateTable<K, N, S> table, int version) {
+ static <K, N, S> StateSnapshotKeyGroupReader readerForVersion(StateTable<K, N, S> stateTable, int version) {
switch (version) {
case 1:
- return new StateTableByKeyGroupReaderV1<>(table);
+ return new StateTableByKeyGroupReaderV1<>(stateTable);
case 2:
case 3:
case 4:
case 5:
- return new StateTableByKeyGroupReaderV2V3<>(table);
+ return createV2PlusReader(stateTable);
default:
throw new IllegalArgumentException("Unknown version: " + version);
}
}
- static abstract class AbstractStateTableByKeyGroupReader<K, N, S>
- implements StateTableByKeyGroupReader {
-
- protected final StateTable<K, N, S> stateTable;
-
- AbstractStateTableByKeyGroupReader(StateTable<K, N, S> stateTable) {
- this.stateTable = stateTable;
- }
-
- @Override
- public abstract void readMappingsInKeyGroup(DataInputView div, int keyGroupId) throws IOException;
-
- protected TypeSerializer<K> getKeySerializer() {
- return stateTable.keyContext.getKeySerializer();
- }
-
- protected TypeSerializer<N> getNamespaceSerializer() {
- return stateTable.getNamespaceSerializer();
- }
-
- protected TypeSerializer<S> getStateSerializer() {
- return stateTable.getStateSerializer();
- }
+ private static <K, N, S> StateSnapshotKeyGroupReader createV2PlusReader(StateTable<K, N, S> stateTable) {
+ final TypeSerializer<K> keySerializer = stateTable.keyContext.getKeySerializer();
+ final TypeSerializer<N> namespaceSerializer = stateTable.getNamespaceSerializer();
+ final TypeSerializer<S> stateSerializer = stateTable.getStateSerializer();
+ final Tuple3<N, K, S> buffer = new Tuple3<>();
+ return KeyGroupPartitioner.createKeyGroupPartitionReader((in) -> {
+ buffer.f0 = namespaceSerializer.deserialize(in);
+ buffer.f1 = keySerializer.deserialize(in);
+ buffer.f2 = stateSerializer.deserialize(in);
+ return buffer;
+ }, (element, keyGroupId1) -> stateTable.put(element.f1, keyGroupId1, element.f0, element.f2));
}
- static final class StateTableByKeyGroupReaderV1<K, N, S>
- extends AbstractStateTableByKeyGroupReader<K, N, S> {
+ static final class StateTableByKeyGroupReaderV1<K, N, S> implements StateSnapshotKeyGroupReader {
+
+ protected final StateTable<K, N, S> stateTable;
StateTableByKeyGroupReaderV1(StateTable<K, N, S> stateTable) {
- super(stateTable);
+ this.stateTable = stateTable;
}
@Override
- public void readMappingsInKeyGroup(DataInputView inView, int keyGroupId) throws IOException {
+ public void readMappingsInKeyGroup(@Nonnull DataInputView inView, @Nonnegative int keyGroupId) throws IOException {
if (inView.readByte() == 0) {
return;
}
- final TypeSerializer<K> keySerializer = getKeySerializer();
- final TypeSerializer<N> namespaceSerializer = getNamespaceSerializer();
- final TypeSerializer<S> stateSerializer = getStateSerializer();
+ final TypeSerializer<K> keySerializer = stateTable.keyContext.getKeySerializer();
+ final TypeSerializer<N> namespaceSerializer = stateTable.getNamespaceSerializer();
+ final TypeSerializer<S> stateSerializer = stateTable.getStateSerializer();
// V1 uses kind of namespace compressing format
int numNamespaces = inView.readInt();
@@ -112,28 +107,4 @@ class StateTableByKeyGroupReaders {
}
}
}
-
- private static final class StateTableByKeyGroupReaderV2V3<K, N, S>
- extends AbstractStateTableByKeyGroupReader<K, N, S> {
-
- StateTableByKeyGroupReaderV2V3(StateTable<K, N, S> stateTable) {
- super(stateTable);
- }
-
- @Override
- public void readMappingsInKeyGroup(DataInputView inView, int keyGroupId) throws IOException {
-
- final TypeSerializer<K> keySerializer = getKeySerializer();
- final TypeSerializer<N> namespaceSerializer = getNamespaceSerializer();
- final TypeSerializer<S> stateSerializer = getStateSerializer();
-
- int numKeys = inView.readInt();
- for (int i = 0; i < numKeys; ++i) {
- N namespace = namespaceSerializer.deserialize(inView);
- K key = keySerializer.deserialize(inView);
- S state = stateSerializer.deserialize(inView);
- stateTable.put(key, keyGroupId, namespace, state);
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshot.java
index 9341a5a..5a3190c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshot.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state.metainfo;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -30,7 +31,7 @@ import java.util.Map;
/**
* Generalized snapshot for meta information about one state in a state backend
- * (e.g. {@link org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo}).
+ * (e.g. {@link RegisteredKeyValueStateBackendMetaInfo}).
*/
public class StateMetaInfoSnapshot {
@@ -41,7 +42,7 @@ public class StateMetaInfoSnapshot {
KEY_VALUE,
OPERATOR,
BROADCAST,
- TIMER
+ PRIORITY_QUEUE
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotReadersWriters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotReadersWriters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotReadersWriters.java
index ce535ef..926e75f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotReadersWriters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotReadersWriters.java
@@ -39,6 +39,8 @@ import java.util.Map;
*/
public class StateMetaInfoSnapshotReadersWriters {
+ private StateMetaInfoSnapshotReadersWriters() {}
+
/**
* Current version for the serialization format of {@link StateMetaInfoSnapshotReadersWriters}.
* - v5: Flink 1.6.x
@@ -74,23 +76,35 @@ public class StateMetaInfoSnapshotReadersWriters {
@Nonnull
public static StateMetaInfoReader getReader(int readVersion, @Nonnull StateTypeHint stateTypeHint) {
+ if (readVersion < CURRENT_STATE_META_INFO_SNAPSHOT_VERSION) {
+ switch (stateTypeHint) {
+ case KEYED_STATE:
+ return getLegacyKeyedStateMetaInfoReader(readVersion);
+ case OPERATOR_STATE:
+ return getLegacyOperatorStateMetaInfoReader(readVersion);
+ default:
+ throw new IllegalArgumentException("Unsupported state type hint: " + stateTypeHint +
+ " with version " + readVersion);
+ }
+ } else {
+ return getReader(readVersion);
+ }
+ }
+
+ /**
+ * Returns a reader for {@link StateMetaInfoSnapshot} with the requested state type and version number.
+ *
+ * @param readVersion the format version to read.
+ * @return the requested reader.
+ */
+ @Nonnull
+ public static StateMetaInfoReader getReader(int readVersion) {
if (readVersion == CURRENT_STATE_META_INFO_SNAPSHOT_VERSION) {
// latest version shortcut
return CurrentReaderImpl.INSTANCE;
- }
-
- if (readVersion > CURRENT_STATE_META_INFO_SNAPSHOT_VERSION) {
+ } else {
throw new IllegalArgumentException("Unsupported read version for state meta info: " + readVersion);
}
-
- switch (stateTypeHint) {
- case KEYED_STATE:
- return getLegacyKeyedStateMetaInfoReader(readVersion);
- case OPERATOR_STATE:
- return getLegacyOperatorStateMetaInfoReader(readVersion);
- default:
- throw new IllegalArgumentException("Unsupported state type hint: " + stateTypeHint);
- }
}
@Nonnull
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupPartitionerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupPartitionerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupPartitionerTestBase.java
index e6b7739..3756187 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupPartitionerTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupPartitionerTestBase.java
@@ -85,11 +85,11 @@ public abstract class KeyGroupPartitionerTestBase<T> extends TestLogger {
new ValidatingElementWriterDummy<>(keyExtractorFunction, numberOfKeyGroups, allElementsIdentitySet);
final KeyGroupPartitioner<T> testInstance = createPartitioner(data, testSize, range, numberOfKeyGroups, validatingElementWriter);
- final StateSnapshot.KeyGroupPartitionedSnapshot result = testInstance.partitionByKeyGroup();
+ final StateSnapshot.StateKeyGroupWriter result = testInstance.partitionByKeyGroup();
for (int keyGroup = 0; keyGroup < numberOfKeyGroups; ++keyGroup) {
validatingElementWriter.setCurrentKeyGroup(keyGroup);
- result.writeMappingsInKeyGroup(DUMMY_OUT_VIEW, keyGroup);
+ result.writeStateInKeyGroup(DUMMY_OUT_VIEW, keyGroup);
}
validatingElementWriter.validateAllElementsSeen();
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
index 5241dd8..9c487a4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
@@ -55,11 +55,11 @@ public class SerializationProxiesTest {
List<StateMetaInfoSnapshot> stateMetaInfoList = new ArrayList<>();
- stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
+ stateMetaInfoList.add(new RegisteredKeyValueStateBackendMetaInfo<>(
StateDescriptor.Type.VALUE, "a", namespaceSerializer, stateSerializer).snapshot());
- stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
+ stateMetaInfoList.add(new RegisteredKeyValueStateBackendMetaInfo<>(
StateDescriptor.Type.VALUE, "b", namespaceSerializer, stateSerializer).snapshot());
- stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
+ stateMetaInfoList.add(new RegisteredKeyValueStateBackendMetaInfo<>(
StateDescriptor.Type.VALUE, "c", namespaceSerializer, stateSerializer).snapshot());
KeyedBackendSerializationProxy<?> serializationProxy =
@@ -93,11 +93,11 @@ public class SerializationProxiesTest {
List<StateMetaInfoSnapshot> stateMetaInfoList = new ArrayList<>();
- stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
+ stateMetaInfoList.add(new RegisteredKeyValueStateBackendMetaInfo<>(
StateDescriptor.Type.VALUE, "a", namespaceSerializer, stateSerializer).snapshot());
- stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
+ stateMetaInfoList.add(new RegisteredKeyValueStateBackendMetaInfo<>(
StateDescriptor.Type.VALUE, "b", namespaceSerializer, stateSerializer).snapshot());
- stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
+ stateMetaInfoList.add(new RegisteredKeyValueStateBackendMetaInfo<>(
StateDescriptor.Type.VALUE, "c", namespaceSerializer, stateSerializer).snapshot());
KeyedBackendSerializationProxy<?> serializationProxy =
@@ -132,7 +132,7 @@ public class SerializationProxiesTest {
Assert.assertEquals(keySerializer.snapshotConfiguration(), serializationProxy.getKeySerializerConfigSnapshot());
for (StateMetaInfoSnapshot snapshot : serializationProxy.getStateMetaInfoSnapshots()) {
- final RegisteredKeyedBackendStateMetaInfo<?, ?> restoredMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(snapshot);
+ final RegisteredKeyValueStateBackendMetaInfo<?, ?> restoredMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(snapshot);
Assert.assertTrue(restoredMetaInfo.getNamespaceSerializer() instanceof UnloadableDummyTypeSerializer);
Assert.assertTrue(restoredMetaInfo.getStateSerializer() instanceof UnloadableDummyTypeSerializer);
Assert.assertEquals(namespaceSerializer.snapshotConfiguration(), snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER));
@@ -147,7 +147,7 @@ public class SerializationProxiesTest {
TypeSerializer<?> namespaceSerializer = LongSerializer.INSTANCE;
TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
- StateMetaInfoSnapshot metaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
+ StateMetaInfoSnapshot metaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(
StateDescriptor.Type.VALUE, name, namespaceSerializer, stateSerializer).snapshot();
byte[] serialized;
@@ -173,7 +173,7 @@ public class SerializationProxiesTest {
TypeSerializer<?> namespaceSerializer = LongSerializer.INSTANCE;
TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
- StateMetaInfoSnapshot snapshot = new RegisteredKeyedBackendStateMetaInfo<>(
+ StateMetaInfoSnapshot snapshot = new RegisteredKeyValueStateBackendMetaInfo<>(
StateDescriptor.Type.VALUE, name, namespaceSerializer, stateSerializer).snapshot();
byte[] serialized;
@@ -198,7 +198,7 @@ public class SerializationProxiesTest {
new DataInputViewStreamWrapper(in), classLoader);
}
- RegisteredKeyedBackendStateMetaInfo<?, ?> restoredMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(snapshot);
+ RegisteredKeyValueStateBackendMetaInfo<?, ?> restoredMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(snapshot);
Assert.assertEquals(name, restoredMetaInfo.getName());
Assert.assertTrue(restoredMetaInfo.getNamespaceSerializer() instanceof UnloadableDummyTypeSerializer);
@@ -216,18 +216,18 @@ public class SerializationProxiesTest {
List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = new ArrayList<>();
- stateMetaInfoSnapshots.add(new RegisteredOperatorBackendStateMetaInfo<>(
+ stateMetaInfoSnapshots.add(new RegisteredOperatorStateBackendMetaInfo<>(
"a", stateSerializer, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE).snapshot());
- stateMetaInfoSnapshots.add(new RegisteredOperatorBackendStateMetaInfo<>(
+ stateMetaInfoSnapshots.add(new RegisteredOperatorStateBackendMetaInfo<>(
"b", stateSerializer, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE).snapshot());
- stateMetaInfoSnapshots.add(new RegisteredOperatorBackendStateMetaInfo<>(
+ stateMetaInfoSnapshots.add(new RegisteredOperatorStateBackendMetaInfo<>(
"c", stateSerializer, OperatorStateHandle.Mode.UNION).snapshot());
List<StateMetaInfoSnapshot> broadcastStateMetaInfoSnapshots = new ArrayList<>();
- broadcastStateMetaInfoSnapshots.add(new RegisteredBroadcastBackendStateMetaInfo<>(
+ broadcastStateMetaInfoSnapshots.add(new RegisteredBroadcastStateBackendMetaInfo<>(
"d", OperatorStateHandle.Mode.BROADCAST, keySerializer, valueSerializer).snapshot());
- broadcastStateMetaInfoSnapshots.add(new RegisteredBroadcastBackendStateMetaInfo<>(
+ broadcastStateMetaInfoSnapshots.add(new RegisteredBroadcastStateBackendMetaInfo<>(
"e", OperatorStateHandle.Mode.BROADCAST, valueSerializer, keySerializer).snapshot());
OperatorBackendSerializationProxy serializationProxy =
@@ -257,7 +257,7 @@ public class SerializationProxiesTest {
TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
StateMetaInfoSnapshot snapshot =
- new RegisteredOperatorBackendStateMetaInfo<>(
+ new RegisteredOperatorStateBackendMetaInfo<>(
name, stateSerializer, OperatorStateHandle.Mode.UNION).snapshot();
byte[] serialized;
@@ -274,8 +274,8 @@ public class SerializationProxiesTest {
new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
}
- RegisteredOperatorBackendStateMetaInfo<?> restoredMetaInfo =
- new RegisteredOperatorBackendStateMetaInfo<>(snapshot);
+ RegisteredOperatorStateBackendMetaInfo<?> restoredMetaInfo =
+ new RegisteredOperatorStateBackendMetaInfo<>(snapshot);
Assert.assertEquals(name, restoredMetaInfo.getName());
Assert.assertEquals(OperatorStateHandle.Mode.UNION, restoredMetaInfo.getAssignmentMode());
@@ -290,7 +290,7 @@ public class SerializationProxiesTest {
TypeSerializer<?> valueSerializer = StringSerializer.INSTANCE;
StateMetaInfoSnapshot snapshot =
- new RegisteredBroadcastBackendStateMetaInfo<>(
+ new RegisteredBroadcastStateBackendMetaInfo<>(
name, OperatorStateHandle.Mode.BROADCAST, keySerializer, valueSerializer).snapshot();
byte[] serialized;
@@ -308,8 +308,8 @@ public class SerializationProxiesTest {
new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
}
- RegisteredBroadcastBackendStateMetaInfo<?, ?> restoredMetaInfo =
- new RegisteredBroadcastBackendStateMetaInfo<>(snapshot);
+ RegisteredBroadcastStateBackendMetaInfo<?, ?> restoredMetaInfo =
+ new RegisteredBroadcastStateBackendMetaInfo<>(snapshot);
Assert.assertEquals(name, restoredMetaInfo.getName());
Assert.assertEquals(
@@ -325,7 +325,7 @@ public class SerializationProxiesTest {
TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
StateMetaInfoSnapshot snapshot =
- new RegisteredOperatorBackendStateMetaInfo<>(
+ new RegisteredOperatorStateBackendMetaInfo<>(
name, stateSerializer, OperatorStateHandle.Mode.UNION).snapshot();
byte[] serialized;
@@ -348,8 +348,8 @@ public class SerializationProxiesTest {
snapshot = reader.readStateMetaInfoSnapshot(new DataInputViewStreamWrapper(in), classLoader);
}
- RegisteredOperatorBackendStateMetaInfo<?> restoredMetaInfo =
- new RegisteredOperatorBackendStateMetaInfo<>(snapshot);
+ RegisteredOperatorStateBackendMetaInfo<?> restoredMetaInfo =
+ new RegisteredOperatorStateBackendMetaInfo<>(snapshot);
Assert.assertEquals(name, restoredMetaInfo.getName());
Assert.assertTrue(restoredMetaInfo.getPartitionStateSerializer() instanceof UnloadableDummyTypeSerializer);
@@ -365,7 +365,7 @@ public class SerializationProxiesTest {
TypeSerializer<?> valueSerializer = StringSerializer.INSTANCE;
StateMetaInfoSnapshot snapshot =
- new RegisteredBroadcastBackendStateMetaInfo<>(
+ new RegisteredBroadcastStateBackendMetaInfo<>(
broadcastName, OperatorStateHandle.Mode.BROADCAST, keySerializer, valueSerializer).snapshot();
byte[] serialized;
@@ -393,8 +393,8 @@ public class SerializationProxiesTest {
snapshot = reader.readStateMetaInfoSnapshot(new DataInputViewStreamWrapper(in), classLoader);
}
- RegisteredBroadcastBackendStateMetaInfo<?, ?> restoredMetaInfo =
- new RegisteredBroadcastBackendStateMetaInfo<>(snapshot);
+ RegisteredBroadcastStateBackendMetaInfo<?, ?> restoredMetaInfo =
+ new RegisteredBroadcastStateBackendMetaInfo<>(snapshot);
Assert.assertEquals(broadcastName, restoredMetaInfo.getName());
Assert.assertEquals(OperatorStateHandle.Mode.BROADCAST, restoredMetaInfo.getAssignmentMode());
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
index 558f629..355387d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
@@ -55,7 +56,7 @@ public class StateSnapshotCompressionTest extends TestLogger {
true,
executionConfig,
TestLocalRecoveryConfig.disabled(),
- mock(PriorityQueueSetFactory.class),
+ mock(HeapPriorityQueueSetFactory.class),
TtlTimeProvider.DEFAULT);
try {
@@ -79,7 +80,7 @@ public class StateSnapshotCompressionTest extends TestLogger {
true,
executionConfig,
TestLocalRecoveryConfig.disabled(),
- mock(PriorityQueueSetFactory.class),
+ mock(HeapPriorityQueueSetFactory.class),
TtlTimeProvider.DEFAULT);
try {
@@ -121,7 +122,7 @@ public class StateSnapshotCompressionTest extends TestLogger {
true,
executionConfig,
TestLocalRecoveryConfig.disabled(),
- mock(PriorityQueueSetFactory.class),
+ mock(HeapPriorityQueueSetFactory.class),
TtlTimeProvider.DEFAULT);
try {
@@ -164,7 +165,7 @@ public class StateSnapshotCompressionTest extends TestLogger {
true,
executionConfig,
TestLocalRecoveryConfig.disabled(),
- mock(PriorityQueueSetFactory.class),
+ mock(HeapPriorityQueueSetFactory.class),
TtlTimeProvider.DEFAULT);
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
index cf6bcc8..2c48e4b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
@@ -31,7 +31,7 @@ import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.ArrayListSerializer;
import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.StateSnapshot;
import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.util.TestLogger;
@@ -53,8 +53,8 @@ public class CopyOnWriteStateTableTest extends TestLogger {
*/
@Test
public void testPutGetRemoveContainsTransform() throws Exception {
- RegisteredKeyedBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo =
- new RegisteredKeyedBackendStateMetaInfo<>(
+ RegisteredKeyValueStateBackendMetaInfo<Integer, ArrayList<Integer>> metaInfo =
+ new RegisteredKeyValueStateBackendMetaInfo<>(
StateDescriptor.Type.UNKNOWN,
"test",
IntSerializer.INSTANCE,
@@ -125,8 +125,8 @@ public class CopyOnWriteStateTableTest extends TestLogger {
*/
@Test
public void testIncrementalRehash() {
- RegisteredKeyedBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo =
- new RegisteredKeyedBackendStateMetaInfo<>(
+ RegisteredKeyValueStateBackendMetaInfo<Integer, ArrayList<Integer>> metaInfo =
+ new RegisteredKeyValueStateBackendMetaInfo<>(
StateDescriptor.Type.UNKNOWN,
"test",
IntSerializer.INSTANCE,
@@ -170,8 +170,8 @@ public class CopyOnWriteStateTableTest extends TestLogger {
@Test
public void testRandomModificationsAndCopyOnWriteIsolation() throws Exception {
- final RegisteredKeyedBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo =
- new RegisteredKeyedBackendStateMetaInfo<>(
+ final RegisteredKeyValueStateBackendMetaInfo<Integer, ArrayList<Integer>> metaInfo =
+ new RegisteredKeyValueStateBackendMetaInfo<>(
StateDescriptor.Type.UNKNOWN,
"test",
IntSerializer.INSTANCE,
@@ -325,8 +325,8 @@ public class CopyOnWriteStateTableTest extends TestLogger {
*/
@Test
public void testCopyOnWriteContracts() {
- RegisteredKeyedBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo =
- new RegisteredKeyedBackendStateMetaInfo<>(
+ RegisteredKeyValueStateBackendMetaInfo<Integer, ArrayList<Integer>> metaInfo =
+ new RegisteredKeyValueStateBackendMetaInfo<>(
StateDescriptor.Type.UNKNOWN,
"test",
IntSerializer.INSTANCE,
@@ -356,7 +356,7 @@ public class CopyOnWriteStateTableTest extends TestLogger {
// no snapshot taken, we get the original back
Assert.assertTrue(stateTable.get(1, 1) == originalState1);
- CopyOnWriteStateTableSnapshot<Integer, Integer, ArrayList<Integer>> snapshot1 = stateTable.createSnapshot();
+ CopyOnWriteStateTableSnapshot<Integer, Integer, ArrayList<Integer>> snapshot1 = stateTable.stateSnapshot();
// after snapshot1 is taken, we get a copy...
final ArrayList<Integer> copyState = stateTable.get(1, 1);
Assert.assertFalse(copyState == originalState1);
@@ -370,7 +370,7 @@ public class CopyOnWriteStateTableTest extends TestLogger {
Assert.assertTrue(copyState == stateTable.get(1, 1));
// we take snapshot2
- CopyOnWriteStateTableSnapshot<Integer, Integer, ArrayList<Integer>> snapshot2 = stateTable.createSnapshot();
+ CopyOnWriteStateTableSnapshot<Integer, Integer, ArrayList<Integer>> snapshot2 = stateTable.stateSnapshot();
// after the second snapshot, copy-on-write is active again for old entries
Assert.assertFalse(copyState == stateTable.get(1, 1));
// and equality still holds
@@ -400,8 +400,8 @@ public class CopyOnWriteStateTableTest extends TestLogger {
final TestDuplicateSerializer stateSerializer = new TestDuplicateSerializer();
final TestDuplicateSerializer keySerializer = new TestDuplicateSerializer();
- RegisteredKeyedBackendStateMetaInfo<Integer, Integer> metaInfo =
- new RegisteredKeyedBackendStateMetaInfo<>(
+ RegisteredKeyValueStateBackendMetaInfo<Integer, Integer> metaInfo =
+ new RegisteredKeyValueStateBackendMetaInfo<>(
StateDescriptor.Type.VALUE,
"test",
namespaceSerializer,
@@ -443,15 +443,15 @@ public class CopyOnWriteStateTableTest extends TestLogger {
table.put(2, 0, 1, 2);
- final CopyOnWriteStateTableSnapshot<Integer, Integer, Integer> snapshot = table.createSnapshot();
+ final CopyOnWriteStateTableSnapshot<Integer, Integer, Integer> snapshot = table.stateSnapshot();
try {
- final StateSnapshot.KeyGroupPartitionedSnapshot partitionedSnapshot = snapshot.partitionByKeyGroup();
+ final StateSnapshot.StateKeyGroupWriter partitionedSnapshot = snapshot.getKeyGroupWriter();
namespaceSerializer.disable();
keySerializer.disable();
stateSerializer.disable();
- partitionedSnapshot.writeMappingsInKeyGroup(
+ partitionedSnapshot.writeStateInKeyGroup(
new DataOutputViewStreamWrapper(
new ByteArrayOutputStreamWithPos(1024)), 0);
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java
index 45c86c7..41ef0c9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java
@@ -27,8 +27,9 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.ArrayListSerializer;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.StateSnapshot;
+import org.apache.flink.runtime.state.StateSnapshotKeyGroupReader;
import org.junit.Assert;
import org.junit.Test;
@@ -46,8 +47,8 @@ public class StateTableSnapshotCompatibilityTest {
@Test
public void checkCompatibleSerializationFormats() throws IOException {
final Random r = new Random(42);
- RegisteredKeyedBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo =
- new RegisteredKeyedBackendStateMetaInfo<>(
+ RegisteredKeyValueStateBackendMetaInfo<Integer, ArrayList<Integer>> metaInfo =
+ new RegisteredKeyValueStateBackendMetaInfo<>(
StateDescriptor.Type.UNKNOWN,
"test",
IntSerializer.INSTANCE,
@@ -69,7 +70,7 @@ public class StateTableSnapshotCompatibilityTest {
cowStateTable.put(r.nextInt(10), r.nextInt(2), list);
}
- StateSnapshot snapshot = cowStateTable.createSnapshot();
+ StateSnapshot snapshot = cowStateTable.stateSnapshot();
final NestedMapsStateTable<Integer, Integer, ArrayList<Integer>> nestedMapsStateTable =
new NestedMapsStateTable<>(keyContext, metaInfo);
@@ -83,7 +84,7 @@ public class StateTableSnapshotCompatibilityTest {
Assert.assertEquals(entry.getState(), nestedMapsStateTable.get(entry.getKey(), entry.getNamespace()));
}
- snapshot = nestedMapsStateTable.createSnapshot();
+ snapshot = nestedMapsStateTable.stateSnapshot();
cowStateTable = new CopyOnWriteStateTable<>(keyContext, metaInfo);
restoreStateTableFromSnapshot(cowStateTable, snapshot, keyContext.getKeyGroupRange());
@@ -102,15 +103,15 @@ public class StateTableSnapshotCompatibilityTest {
final ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos(1024 * 1024);
final DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper(out);
- final StateSnapshot.KeyGroupPartitionedSnapshot keyGroupPartitionedSnapshot = snapshot.partitionByKeyGroup();
+ final StateSnapshot.StateKeyGroupWriter keyGroupPartitionedSnapshot = snapshot.getKeyGroupWriter();
for (Integer keyGroup : keyGroupRange) {
- keyGroupPartitionedSnapshot.writeMappingsInKeyGroup(dov, keyGroup);
+ keyGroupPartitionedSnapshot.writeStateInKeyGroup(dov, keyGroup);
}
final ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(out.getBuf());
final DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(in);
- final StateTableByKeyGroupReader keyGroupReader =
+ final StateSnapshotKeyGroupReader keyGroupReader =
StateTableByKeyGroupReaders.readerForVersion(stateTable, KeyedBackendSerializationProxy.VERSION);
for (Integer keyGroup : keyGroupRange) {
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotEnumConstantsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotEnumConstantsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotEnumConstantsTest.java
index 409c796..e196b1a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotEnumConstantsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotEnumConstantsTest.java
@@ -32,11 +32,11 @@ public class StateMetaInfoSnapshotEnumConstantsTest {
Assert.assertEquals(0, StateMetaInfoSnapshot.BackendStateType.KEY_VALUE.ordinal());
Assert.assertEquals(1, StateMetaInfoSnapshot.BackendStateType.OPERATOR.ordinal());
Assert.assertEquals(2, StateMetaInfoSnapshot.BackendStateType.BROADCAST.ordinal());
- Assert.assertEquals(3, StateMetaInfoSnapshot.BackendStateType.TIMER.ordinal());
+ Assert.assertEquals(3, StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE.ordinal());
Assert.assertEquals("KEY_VALUE", StateMetaInfoSnapshot.BackendStateType.KEY_VALUE.toString());
Assert.assertEquals("OPERATOR", StateMetaInfoSnapshot.BackendStateType.OPERATOR.toString());
Assert.assertEquals("BROADCAST", StateMetaInfoSnapshot.BackendStateType.BROADCAST.toString());
- Assert.assertEquals("TIMER", StateMetaInfoSnapshot.BackendStateType.TIMER.toString());
+ Assert.assertEquals("PRIORITY_QUEUE", StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE.toString());
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
index 363ecf8..9e9328b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
@@ -36,8 +36,10 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyExtractorFunction;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
import org.apache.flink.runtime.state.KeyedStateFactory;
import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.PriorityComparator;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SnapshotResult;
@@ -118,6 +120,11 @@ public class MockKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
@Override
+ public boolean requiresLegacySynchronousTimerSnapshots() {
+ return false;
+ }
+
+ @Override
public void notifyCheckpointComplete(long checkpointId) {
// noop
}
@@ -167,15 +174,13 @@ public class MockKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
@Nonnull
@Override
- public <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T>
+ public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T>
create(
@Nonnull String stateName,
- @Nonnull TypeSerializer<T> byteOrderedElementSerializer,
- @Nonnull PriorityComparator<T> elementPriorityComparator,
- @Nonnull KeyExtractorFunction<T> keyExtractor) {
- return new HeapPriorityQueueSet<>(
- elementPriorityComparator,
- keyExtractor,
+ @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
+ return new HeapPriorityQueueSet<T>(
+ PriorityComparator.forPriorityComparableObjects(),
+ KeyExtractorFunction.forKeyedObjects(),
0,
keyGroupRange,
0);
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
index ceae3e1..209d18f 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.util.FlinkRuntimeException;
@@ -177,7 +177,7 @@ class RocksDBAggregatingState<K, N, T, ACC, R>
@SuppressWarnings("unchecked")
static <K, N, SV, S extends State, IS extends S> IS create(
StateDescriptor<S, SV> stateDesc,
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+ Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult,
RocksDBKeyedStateBackend<K> backend) {
return (IS) new RocksDBAggregatingState<>(
registerResult.f0,
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
index cf7974f..4d66357 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.internal.InternalFoldingState;
import org.rocksdb.ColumnFamilyHandle;
@@ -103,7 +103,7 @@ class RocksDBFoldingState<K, N, T, ACC>
@SuppressWarnings("unchecked")
static <K, N, SV, S extends State, IS extends S> IS create(
StateDescriptor<S, SV> stateDesc,
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+ Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult,
RocksDBKeyedStateBackend<K> backend) {
return (IS) new RocksDBFoldingState<>(
registerResult.f0,
[3/8] flink git commit: [FLINK-9489] Checkpoint timers as part of
managed keyed state instead of raw keyed state
Posted by tr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 622dd0c..0697463 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -63,14 +63,18 @@ import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.Keyed;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.PriorityComparator;
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
import org.apache.flink.runtime.state.SnapshotDirectory;
import org.apache.flink.runtime.state.SnapshotResult;
@@ -177,7 +181,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private interface StateFactory {
<K, N, SV, S extends State, IS extends S> IS createState(
StateDescriptor<S, SV> stateDesc,
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+ Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult,
RocksDBKeyedStateBackend<K> backend) throws Exception;
}
@@ -224,7 +228,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* Information about the k/v states as we create them. This is used to retrieve the
* column family that is used for a state and also for sanity checks when restoring.
*/
- private final Map<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> kvStateInformation;
+ private final Map<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> kvStateInformation;
/**
* Map of state names to their corresponding restored state meta info.
@@ -256,7 +260,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private final SnapshotStrategy<SnapshotResult<KeyedStateHandle>> snapshotStrategy;
/** Factory for priority queue state. */
- private PriorityQueueSetFactory priorityQueueFactory;
+ private final PriorityQueueSetFactory priorityQueueFactory;
+
+ private RocksDBWriteBatchWrapper writeBatchWrapper;
public RocksDBKeyedStateBackend(
String operatorIdentifier,
@@ -322,7 +328,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
this.priorityQueueFactory = new RocksDBPriorityQueueSetFactory();
break;
default:
- break;
+ throw new IllegalArgumentException("Unknown priority queue state type: " + priorityQueueStateType);
}
LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID);
@@ -344,12 +350,15 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
@SuppressWarnings("unchecked")
@Override
public <N> Stream<K> getKeys(String state, N namespace) {
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> columnInfo = kvStateInformation.get(state);
- if (columnInfo == null) {
+ Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> columnInfo = kvStateInformation.get(state);
+ if (columnInfo == null || !(columnInfo.f1 instanceof RegisteredKeyValueStateBackendMetaInfo)) {
return Stream.empty();
}
- final TypeSerializer<N> namespaceSerializer = (TypeSerializer<N>) columnInfo.f1.getNamespaceSerializer();
+ RegisteredKeyValueStateBackendMetaInfo<N, ?> registeredKeyValueStateBackendMetaInfo =
+ (RegisteredKeyValueStateBackendMetaInfo<N, ?>) columnInfo.f1;
+
+ final TypeSerializer<N> namespaceSerializer = registeredKeyValueStateBackendMetaInfo.getNamespaceSerializer();
final ByteArrayOutputStreamWithPos namespaceOutputStream = new ByteArrayOutputStreamWithPos(8);
boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(keySerializer, namespaceSerializer);
final byte[] nameSpaceBytes;
@@ -396,6 +405,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
// working on the disposed object results in SEGFAULTS.
if (db != null) {
+ IOUtils.closeQuietly(writeBatchWrapper);
+
// RocksDB's native memory management requires that *all* CFs (including default) are closed before the
// DB is closed. See:
// https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families
@@ -403,16 +414,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
IOUtils.closeQuietly(defaultColumnFamily);
// ... continue with the ones created by Flink...
- for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> columnMetaData :
+ for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> columnMetaData :
kvStateInformation.values()) {
IOUtils.closeQuietly(columnMetaData.f0);
}
- // ... then close the priority queue related resources ...
- if (priorityQueueFactory instanceof AutoCloseable) {
- IOUtils.closeQuietly((AutoCloseable) priorityQueueFactory);
- }
-
// ... and finally close the DB instance ...
IOUtils.closeQuietly(db);
@@ -431,13 +437,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
@Nonnull
@Override
- public <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T> create(
+ public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T>
+ create(
@Nonnull String stateName,
- @Nonnull TypeSerializer<T> byteOrderedElementSerializer,
- @Nonnull PriorityComparator<T> elementComparator,
- @Nonnull KeyExtractorFunction<T> keyExtractor) {
-
- return priorityQueueFactory.create(stateName, byteOrderedElementSerializer, elementComparator, keyExtractor);
+ @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
+ return priorityQueueFactory.create(stateName, byteOrderedElementSerializer);
}
private void cleanInstanceBasePath() {
@@ -482,6 +486,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
@Override
public void restore(Collection<KeyedStateHandle> restoreState) throws Exception {
+
LOG.info("Initializing RocksDB keyed state backend.");
if (LOG.isDebugEnabled()) {
@@ -534,6 +539,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private void createDB() throws IOException {
List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
this.db = openDB(instanceRocksDBPath.getAbsolutePath(), Collections.emptyList(), columnFamilyHandles);
+ this.writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeOptions);
this.defaultColumnFamily = columnFamilyHandles.get(0);
}
@@ -679,7 +685,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
for (StateMetaInfoSnapshot restoredMetaInfo : restoredMetaInfos) {
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredColumn =
+ Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> registeredColumn =
rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName());
if (registeredColumn == null) {
@@ -689,8 +695,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
nameBytes,
rocksDBKeyedStateBackend.columnOptions);
- RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
- new RegisteredKeyedBackendStateMetaInfo<>(restoredMetaInfo);
+ RegisteredStateMetaInfoBase stateMetaInfo =
+ RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(restoredMetaInfo);
rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo);
@@ -987,12 +993,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
ColumnFamilyHandle columnFamilyHandle,
StateMetaInfoSnapshot stateMetaInfoSnapshot) throws RocksDBException {
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredStateMetaInfoEntry =
+ Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> registeredStateMetaInfoEntry =
stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName());
if (null == registeredStateMetaInfoEntry) {
- RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
- new RegisteredKeyedBackendStateMetaInfo<>(stateMetaInfoSnapshot);
+ RegisteredStateMetaInfoBase stateMetaInfo =
+ RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot);
registeredStateMetaInfoEntry =
new Tuple2<>(
@@ -1037,6 +1043,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
stateBackend.db = restoreDBInfo.db;
stateBackend.defaultColumnFamily = restoreDBInfo.defaultColumnFamilyHandle;
+ stateBackend.writeBatchWrapper =
+ new RocksDBWriteBatchWrapper(stateBackend.db, stateBackend.writeOptions);
for (int i = 0; i < restoreDBInfo.stateMetaInfoSnapshots.size(); ++i) {
getOrRegisterColumnFamilyHandle(
@@ -1061,6 +1069,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
Collections.emptyList(),
columnFamilyHandles);
stateBackend.defaultColumnFamily = columnFamilyHandles.get(0);
+ stateBackend.writeBatchWrapper =
+ new RocksDBWriteBatchWrapper(stateBackend.db, stateBackend.writeOptions);
}
}
@@ -1115,13 +1125,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
// extract and store the default column family which is located at the first index
stateBackend.defaultColumnFamily = columnFamilyHandles.remove(0);
+ stateBackend.writeBatchWrapper = new RocksDBWriteBatchWrapper(stateBackend.db, stateBackend.writeOptions);
for (int i = 0; i < columnFamilyDescriptors.size(); ++i) {
StateMetaInfoSnapshot stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i);
ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i);
- RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
- new RegisteredKeyedBackendStateMetaInfo<>(stateMetaInfoSnapshot);
+ RegisteredStateMetaInfoBase stateMetaInfo =
+ RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot);
stateBackend.kvStateInformation.put(
stateMetaInfoSnapshot.getName(),
@@ -1290,14 +1301,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* already have a registered entry for that and return it (after some necessary state compatibility checks)
* or create a new one if it does not exist.
*/
- private <N, S> Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, S>> tryRegisterKvStateInformation(
+ private <N, S> Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, S>> tryRegisterKvStateInformation(
StateDescriptor<?, S> stateDesc,
TypeSerializer<N> namespaceSerializer) throws StateMigrationException {
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo =
+ Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> stateInfo =
kvStateInformation.get(stateDesc.getName());
- RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo;
+ RegisteredKeyValueStateBackendMetaInfo<N, S> newMetaInfo;
if (stateInfo != null) {
@SuppressWarnings("unchecked")
@@ -1308,7 +1319,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
"Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," +
" but its corresponding restored snapshot cannot be found.");
- newMetaInfo = RegisteredKeyedBackendStateMetaInfo.resolveKvStateCompatibility(
+ newMetaInfo = RegisteredKeyValueStateBackendMetaInfo.resolveKvStateCompatibility(
restoredMetaInfoSnapshot,
namespaceSerializer,
stateDesc);
@@ -1317,13 +1328,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
} else {
String stateName = stateDesc.getName();
- newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
+ newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(
stateDesc.getType(),
stateName,
namespaceSerializer,
stateDesc.getSerializer());
- ColumnFamilyHandle columnFamily = createColumnFamily(stateName, db);
+ ColumnFamilyHandle columnFamily = createColumnFamily(stateName);
stateInfo = Tuple2.of(columnFamily, newMetaInfo);
kvStateInformation.put(stateDesc.getName(), stateInfo);
@@ -1335,7 +1346,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
/**
* Creates a column family handle for use with a k/v state.
*/
- private ColumnFamilyHandle createColumnFamily(String stateName, RocksDB db) {
+ private ColumnFamilyHandle createColumnFamily(String stateName) {
byte[] nameBytes = stateName.getBytes(ConfigConstants.DEFAULT_CHARSET);
Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes),
"The chosen state name 'default' collides with the name of the default column family!");
@@ -1359,7 +1370,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
stateDesc.getClass(), this.getClass());
throw new FlinkRuntimeException(message);
}
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult =
+ Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult =
tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
return stateFactory.createState(stateDesc, registerResult, RocksDBKeyedStateBackend.this);
}
@@ -1382,7 +1393,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
public int numStateEntries() {
int count = 0;
- for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> column : kvStateInformation.values()) {
+ for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> column : kvStateInformation.values()) {
+ //TODO maybe filter only for k/v states
try (RocksIteratorWrapper rocksIterator = getRocksIterator(db, column.f0)) {
rocksIterator.seekToFirst();
@@ -1405,7 +1417,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
@VisibleForTesting
static final class RocksDBMergeIterator implements AutoCloseable {
- private final PriorityQueue<MergeIterator> heap;
+ private final PriorityQueue<RocksDBKeyedStateBackend.MergeIterator> heap;
private final int keyGroupPrefixByteCount;
private boolean newKeyGroup;
private boolean newKVState;
@@ -1763,6 +1775,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
final CloseableRegistry snapshotCloseableRegistry = new CloseableRegistry();
+ // flush everything into db before taking a snapshot
+ writeBatchWrapper.flush();
+
final RocksDBFullSnapshotOperation<K> snapshotOperation =
new RocksDBFullSnapshotOperation<>(
RocksDBKeyedStateBackend.this,
@@ -1982,7 +1997,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
this.copiedColumnFamilyHandles = new ArrayList<>(stateBackend.kvStateInformation.size());
- for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> tuple2 :
+ for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> tuple2 :
stateBackend.kvStateInformation.values()) {
// snapshot meta info
this.stateMetaInfoSnapshots.add(tuple2.f1.snapshot());
@@ -2433,7 +2448,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
"assuming the following (shared) files as base: {}.", checkpointId, lastCompletedCheckpoint, baseSstFiles);
// save meta data
- for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
+ for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> stateMetaInfoEntry
: stateBackend.kvStateInformation.entrySet()) {
stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().f1.snapshot());
}
@@ -2625,10 +2640,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
/**
* Encapsulates the logic and resources in connection with creating priority queue state structures.
*/
- class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory, AutoCloseable {
+ class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
/** Default cache size per key-group. */
- private static final int DEFAULT_CACHES_SIZE = 8 * 1024;
+ private static final int DEFAULT_CACHES_SIZE = 1024;
/** A shared buffer to serialize elements for the priority queue. */
@Nonnull
@@ -2638,68 +2653,47 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
@Nonnull
private final DataOutputViewStreamWrapper elementSerializationOutView;
- /** A shared {@link RocksDBWriteBatchWrapper} to batch modifications to priority queues. */
- @Nonnull
- private final RocksDBWriteBatchWrapper writeBatchWrapper;
-
- /** Map to track all column families created to back priority queues. */
- @Nonnull
- private final Map<String, ColumnFamilyHandle> priorityQueueColumnFamilies;
-
- /** The mandatory default column family, so that we can close it later. */
- @Nonnull
- private final ColumnFamilyHandle defaultColumnFamily;
-
- /** Path of the RocksDB instance that holds the priority queues. */
- @Nonnull
- private final File pqInstanceRocksDBPath;
-
- /** RocksDB instance that holds the priority queues. */
- @Nonnull
- private final RocksDB pqDb;
-
- RocksDBPriorityQueueSetFactory() throws IOException {
- this.pqInstanceRocksDBPath = new File(instanceBasePath, "pqdb");
- if (pqInstanceRocksDBPath.exists()) {
- try {
- FileUtils.deleteDirectory(pqInstanceRocksDBPath);
- } catch (IOException ex) {
- LOG.warn("Could not delete instance path for PQ RocksDB: " + pqInstanceRocksDBPath, ex);
- }
- }
- List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
- this.pqDb = openDB(pqInstanceRocksDBPath.getAbsolutePath(), Collections.emptyList(), columnFamilyHandles);
+ RocksDBPriorityQueueSetFactory() {
this.elementSerializationOutStream = new ByteArrayOutputStreamWithPos();
this.elementSerializationOutView = new DataOutputViewStreamWrapper(elementSerializationOutStream);
- this.writeBatchWrapper = new RocksDBWriteBatchWrapper(pqDb, writeOptions);
- this.defaultColumnFamily = columnFamilyHandles.get(0);
- this.priorityQueueColumnFamilies = new HashMap<>();
}
@Nonnull
@Override
- public <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T> create(
+ public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T>
+ create(
@Nonnull String stateName,
- @Nonnull TypeSerializer<T> byteOrderedElementSerializer,
- @Nonnull PriorityComparator<T> elementPriorityComparator,
- @Nonnull KeyExtractorFunction<T> keyExtractor) {
+ @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
+
+ final PriorityComparator<T> priorityComparator =
+ PriorityComparator.forPriorityComparableObjects();
+
+ Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> entry =
+ kvStateInformation.get(stateName);
- final ColumnFamilyHandle columnFamilyHandle =
- priorityQueueColumnFamilies.computeIfAbsent(
- stateName,
- (name) -> RocksDBKeyedStateBackend.this.createColumnFamily(name, pqDb));
+ if (entry == null) {
+ RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo =
+ new RegisteredPriorityQueueStateBackendMetaInfo<>(stateName, byteOrderedElementSerializer);
+
+ final ColumnFamilyHandle columnFamilyHandle = createColumnFamily(stateName);
+
+ entry = new Tuple2<>(columnFamilyHandle, metaInfo);
+ kvStateInformation.put(stateName, entry);
+ }
+
+ final ColumnFamilyHandle columnFamilyHandle = entry.f0;
@Nonnull
TieBreakingPriorityComparator<T> tieBreakingComparator =
new TieBreakingPriorityComparator<>(
- elementPriorityComparator,
+ priorityComparator,
byteOrderedElementSerializer,
elementSerializationOutStream,
elementSerializationOutView);
return new KeyGroupPartitionedPriorityQueue<>(
- keyExtractor,
- elementPriorityComparator,
+ KeyExtractorFunction.forKeyedObjects(),
+ priorityComparator,
new KeyGroupPartitionedPriorityQueue.PartitionQueueSetFactory<T, CachingInternalPriorityQueueSet<T>>() {
@Nonnull
@Override
@@ -2714,7 +2708,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
new RocksDBOrderedSetStore<>(
keyGroupId,
keyGroupPrefixBytes,
- pqDb,
+ db,
columnFamilyHandle,
byteOrderedElementSerializer,
elementSerializationOutStream,
@@ -2727,20 +2721,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
keyGroupRange,
numberOfKeyGroups);
}
+ }
- @Override
- public void close() {
- IOUtils.closeQuietly(writeBatchWrapper);
- for (ColumnFamilyHandle columnFamilyHandle : priorityQueueColumnFamilies.values()) {
- IOUtils.closeQuietly(columnFamilyHandle);
- }
- IOUtils.closeQuietly(defaultColumnFamily);
- IOUtils.closeQuietly(pqDb);
- try {
- FileUtils.deleteDirectory(pqInstanceRocksDBPath);
- } catch (IOException ex) {
- LOG.warn("Could not delete instance path for PQ RocksDB: " + pqInstanceRocksDBPath, ex);
- }
- }
+ @Override
+ public boolean requiresLegacySynchronousTimerSnapshots() {
+ return priorityQueueFactory instanceof HeapPriorityQueueSetFactory;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index 03faa44..aa5e93a 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
@@ -257,7 +257,7 @@ class RocksDBListState<K, N, V>
@SuppressWarnings("unchecked")
static <E, K, N, SV, S extends State, IS extends S> IS create(
StateDescriptor<S, SV> stateDesc,
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+ Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult,
RocksDBKeyedStateBackend<K> backend) {
return (IS) new RocksDBListState<>(
registerResult.f0,
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 9d00a67..4ec1f77 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -30,7 +30,7 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
@@ -616,7 +616,7 @@ class RocksDBMapState<K, N, UK, UV>
@SuppressWarnings("unchecked")
static <UK, UV, K, N, SV, S extends State, IS extends S> IS create(
StateDescriptor<S, SV> stateDesc,
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+ Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult,
RocksDBKeyedStateBackend<K> backend) {
return (IS) new RocksDBMapState<>(
registerResult.f0,
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java
index 5284314..4068c50 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java
@@ -159,7 +159,6 @@ public class RocksDBOrderedSetStore<T> implements CachingInternalPriorityQueueSe
public RocksToJavaIteratorAdapter orderedIterator() {
flushWriteBatch();
-
return new RocksToJavaIteratorAdapter(
new RocksIteratorWrapper(
db.newIterator(columnFamilyHandle)));
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index d138045..490960e 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.util.FlinkRuntimeException;
@@ -172,7 +172,7 @@ class RocksDBReducingState<K, N, V>
@SuppressWarnings("unchecked")
static <K, N, SV, S extends State, IS extends S> IS create(
StateDescriptor<S, SV> stateDesc,
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+ Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult,
RocksDBKeyedStateBackend<K> backend) {
return (IS) new RocksDBReducingState<>(
registerResult.f0,
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
index 2b60fc1..5ae894e 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.util.FlinkRuntimeException;
@@ -116,7 +116,7 @@ class RocksDBValueState<K, N, V>
@SuppressWarnings("unchecked")
static <K, N, SV, S extends State, IS extends S> IS create(
StateDescriptor<S, SV> stateDesc,
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+ Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult,
RocksDBKeyedStateBackend<K> backend) {
return (IS) new RocksDBValueState<>(
registerResult.f0,
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
index d54f122..05adf4f 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
@@ -102,6 +102,10 @@ public class RocksDBWriteBatchWrapper implements AutoCloseable {
batch.clear();
}
+ public WriteOptions getOptions() {
+ return options;
+ }
+
@Override
public void close() throws RocksDBException {
if (batch.count() != 0) {
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 797a26a..52ba3e4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -402,7 +402,11 @@ public abstract class AbstractStreamOperator<OUT>
* @param context context that provides information and means required for taking a snapshot
*/
public void snapshotState(StateSnapshotContext context) throws Exception {
- if (getKeyedStateBackend() != null) {
+ final KeyedStateBackend<?> keyedStateBackend = getKeyedStateBackend();
+ //TODO all of this can be removed once heap-based timers are integrated with RocksDB incremental snapshots
+ if (keyedStateBackend instanceof AbstractKeyedStateBackend &&
+ ((AbstractKeyedStateBackend<?>) keyedStateBackend).requiresLegacySynchronousTimerSnapshots()) {
+
KeyedStateCheckpointOutputStream out;
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapPriorityQueueStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapPriorityQueueStateSnapshot.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapPriorityQueueStateSnapshot.java
deleted file mode 100644
index cc7fbc4..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapPriorityQueueStateSnapshot.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.KeyExtractorFunction;
-import org.apache.flink.runtime.state.KeyGroupPartitioner;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.StateSnapshot;
-import org.apache.flink.runtime.state.heap.HeapPriorityQueueSet;
-
-import javax.annotation.Nonnegative;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-import java.lang.reflect.Array;
-
-/**
- * This class represents the snapshot of an {@link HeapPriorityQueueSet}.
- *
- * @param <T> type of the state elements.
- */
-public class HeapPriorityQueueStateSnapshot<T> implements StateSnapshot {
-
- /** Function that extracts keys from elements. */
- @Nonnull
- private final KeyExtractorFunction<T> keyExtractor;
-
- /** Copy of the heap array containing all the (immutable or deeply copied) elements. */
- @Nonnull
- private final T[] heapArrayCopy;
-
- /** The element serializer. */
- @Nonnull
- private final TypeSerializer<T> elementSerializer;
-
- /** The key-group range covered by this snapshot. */
- @Nonnull
- private final KeyGroupRange keyGroupRange;
-
- /** The total number of key-groups in the job. */
- @Nonnegative
- private final int totalKeyGroups;
-
- /** Result of partitioning the snapshot by key-group. */
- @Nullable
- private KeyGroupPartitionedSnapshot partitionedSnapshot;
-
- HeapPriorityQueueStateSnapshot(
- @Nonnull T[] heapArrayCopy,
- @Nonnull KeyExtractorFunction<T> keyExtractor,
- @Nonnull TypeSerializer<T> elementSerializer,
- @Nonnull KeyGroupRange keyGroupRange,
- @Nonnegative int totalKeyGroups) {
-
- // TODO ensure that the array contains a deep copy of elements if we are *not* dealing with immutable types.
- assert elementSerializer.isImmutableType();
-
- this.keyExtractor = keyExtractor;
- this.heapArrayCopy = heapArrayCopy;
- this.elementSerializer = elementSerializer;
- this.keyGroupRange = keyGroupRange;
- this.totalKeyGroups = totalKeyGroups;
- }
-
- @SuppressWarnings("unchecked")
- @Nonnull
- @Override
- public KeyGroupPartitionedSnapshot partitionByKeyGroup() {
-
- if (partitionedSnapshot == null) {
-
- T[] partitioningOutput = (T[]) Array.newInstance(
- heapArrayCopy.getClass().getComponentType(),
- heapArrayCopy.length);
-
- KeyGroupPartitioner<T> keyGroupPartitioner =
- new KeyGroupPartitioner<>(
- heapArrayCopy,
- heapArrayCopy.length,
- partitioningOutput,
- keyGroupRange,
- totalKeyGroups,
- keyExtractor,
- elementSerializer::serialize);
-
- partitionedSnapshot = keyGroupPartitioner.partitionByKeyGroup();
- }
-
- return partitionedSnapshot;
- }
-
- @Override
- public void release() {
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
index ad1617e..29b89c2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
@@ -46,6 +46,11 @@ import java.util.Map;
@Internal
public class InternalTimeServiceManager<K> {
+ //TODO guard these constants with a test
+ private static final String TIMER_STATE_PREFIX = "_timer_state";
+ private static final String PROCESSING_TIMER_PREFIX = TIMER_STATE_PREFIX + "/processing_";
+ private static final String EVENT_TIMER_PREFIX = TIMER_STATE_PREFIX + "/event_";
+
private final int totalKeyGroups;
private final KeyGroupRange localKeyGroupRange;
private final KeyContext keyContext;
@@ -55,12 +60,14 @@ public class InternalTimeServiceManager<K> {
private final Map<String, HeapInternalTimerService<K, ?>> timerServices;
+ private final boolean useLegacySynchronousSnapshots;
+
InternalTimeServiceManager(
- int totalKeyGroups,
- KeyGroupRange localKeyGroupRange,
- KeyContext keyContext,
- PriorityQueueSetFactory priorityQueueSetFactory,
- ProcessingTimeService processingTimeService) {
+ int totalKeyGroups,
+ KeyGroupRange localKeyGroupRange,
+ KeyContext keyContext,
+ PriorityQueueSetFactory priorityQueueSetFactory,
+ ProcessingTimeService processingTimeService, boolean useLegacySynchronousSnapshots) {
Preconditions.checkArgument(totalKeyGroups > 0);
this.totalKeyGroups = totalKeyGroups;
@@ -68,6 +75,7 @@ public class InternalTimeServiceManager<K> {
this.priorityQueueSetFactory = Preconditions.checkNotNull(priorityQueueSetFactory);
this.keyContext = Preconditions.checkNotNull(keyContext);
this.processingTimeService = Preconditions.checkNotNull(processingTimeService);
+ this.useLegacySynchronousSnapshots = useLegacySynchronousSnapshots;
this.timerServices = new HashMap<>();
}
@@ -97,8 +105,8 @@ public class InternalTimeServiceManager<K> {
localKeyGroupRange,
keyContext,
processingTimeService,
- createTimerPriorityQueue("__ts_" + name + "/processing_timers", timerSerializer),
- createTimerPriorityQueue("__ts_" + name + "/event_timers", timerSerializer));
+ createTimerPriorityQueue(PROCESSING_TIMER_PREFIX + name, timerSerializer),
+ createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer));
timerServices.put(name, timerService);
}
@@ -114,9 +122,7 @@ public class InternalTimeServiceManager<K> {
TimerSerializer<K, N> timerSerializer) {
return priorityQueueSetFactory.create(
name,
- timerSerializer,
- InternalTimer.getTimerComparator(),
- InternalTimer.getKeyExtractorFunction());
+ timerSerializer);
}
public void advanceWatermark(Watermark watermark) throws Exception {
@@ -128,6 +134,7 @@ public class InternalTimeServiceManager<K> {
////////////////// Fault Tolerance Methods ///////////////////
public void snapshotStateForKeyGroup(DataOutputView stream, int keyGroupIdx) throws IOException {
+ Preconditions.checkState(useLegacySynchronousSnapshots);
InternalTimerServiceSerializationProxy<K> serializationProxy =
new InternalTimerServiceSerializationProxy<>(this, keyGroupIdx);
@@ -148,6 +155,10 @@ public class InternalTimeServiceManager<K> {
serializationProxy.read(stream);
}
+ public boolean isUseLegacySynchronousSnapshots() {
+ return useLegacySynchronousSnapshots;
+ }
+
//////////////////// Methods used ONLY IN TESTS ////////////////////
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
index f88b4fb..324f3fc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.api.operators;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.state.KeyExtractorFunction;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.PriorityComparator;
import javax.annotation.Nonnull;
@@ -31,7 +33,7 @@ import javax.annotation.Nonnull;
* @param <N> Type of the namespace to which timers are scoped.
*/
@Internal
-public interface InternalTimer<K, N> {
+public interface InternalTimer<K, N> extends PriorityComparable<InternalTimer<?, ?>>, Keyed<K> {
/** Function to extract the key from a {@link InternalTimer}. */
KeyExtractorFunction<InternalTimer<?, ?>> KEY_EXTRACTOR_FUNCTION = InternalTimer::getKey;
@@ -48,6 +50,7 @@ public interface InternalTimer<K, N> {
* Returns the key that is bound to this timer.
*/
@Nonnull
+ @Override
K getKey();
/**
@@ -55,14 +58,4 @@ public interface InternalTimer<K, N> {
*/
@Nonnull
N getNamespace();
-
- @SuppressWarnings("unchecked")
- static <T extends InternalTimer> PriorityComparator<T> getTimerComparator() {
- return (PriorityComparator<T>) TIMER_COMPARATOR;
- }
-
- @SuppressWarnings("unchecked")
- static <T extends InternalTimer> KeyExtractorFunction<T> getKeyExtractorFunction() {
- return (KeyExtractorFunction<T>) KEY_EXTRACTOR_FUNCTION;
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java
index dbb74e5..4f762e2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java
@@ -19,9 +19,11 @@
package org.apache.flink.streaming.api.operators;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
@@ -29,6 +31,8 @@ import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.InstantiationUtil;
+import javax.annotation.Nonnull;
+
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
@@ -96,7 +100,7 @@ public class InternalTimersSnapshotReaderWriters {
public final void writeTimersSnapshot(DataOutputView out) throws IOException {
writeKeyAndNamespaceSerializers(out);
- TimerHeapInternalTimer.TimerSerializer<K, N> timerSerializer = new TimerHeapInternalTimer.TimerSerializer<>(
+ LegacyTimerSerializer<K, N> timerSerializer = new LegacyTimerSerializer<>(
timersSnapshot.getKeySerializer(),
timersSnapshot.getNamespaceSerializer());
@@ -215,8 +219,8 @@ public class InternalTimersSnapshotReaderWriters {
restoreKeyAndNamespaceSerializers(restoredTimersSnapshot, in);
- TimerHeapInternalTimer.TimerSerializer<K, N> timerSerializer =
- new TimerHeapInternalTimer.TimerSerializer<>(
+ LegacyTimerSerializer<K, N> timerSerializer =
+ new LegacyTimerSerializer<>(
restoredTimersSnapshot.getKeySerializer(),
restoredTimersSnapshot.getNamespaceSerializer());
@@ -289,4 +293,120 @@ public class InternalTimersSnapshotReaderWriters {
restoredTimersSnapshot.setNamespaceSerializerConfigSnapshot(serializersAndConfigs.get(1).f1);
}
}
+
+ /**
+ * A {@link TypeSerializer} used to serialize/deserialize a {@link TimerHeapInternalTimer}.
+ */
+ public static class LegacyTimerSerializer<K, N> extends TypeSerializer<TimerHeapInternalTimer<K, N>> {
+
+ private static final long serialVersionUID = 1119562170939152304L;
+
+ @Nonnull
+ private final TypeSerializer<K> keySerializer;
+
+ @Nonnull
+ private final TypeSerializer<N> namespaceSerializer;
+
+ LegacyTimerSerializer(@Nonnull TypeSerializer<K> keySerializer, @Nonnull TypeSerializer<N> namespaceSerializer) {
+ this.keySerializer = keySerializer;
+ this.namespaceSerializer = namespaceSerializer;
+ }
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public TypeSerializer<TimerHeapInternalTimer<K, N>> duplicate() {
+
+ final TypeSerializer<K> keySerializerDuplicate = keySerializer.duplicate();
+ final TypeSerializer<N> namespaceSerializerDuplicate = namespaceSerializer.duplicate();
+
+ if (keySerializerDuplicate == keySerializer &&
+ namespaceSerializerDuplicate == namespaceSerializer) {
+ // all delegate serializers seem stateless, so this is also stateless.
+ return this;
+ } else {
+ // at least one delegate serializer seems to be stateful, so we return a new instance.
+ return new LegacyTimerSerializer<>(keySerializerDuplicate, namespaceSerializerDuplicate);
+ }
+ }
+
+ @Override
+ public TimerHeapInternalTimer<K, N> createInstance() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TimerHeapInternalTimer<K, N> copy(TimerHeapInternalTimer<K, N> from) {
+ return new TimerHeapInternalTimer<>(from.getTimestamp(), from.getKey(), from.getNamespace());
+ }
+
+ @Override
+ public TimerHeapInternalTimer<K, N> copy(TimerHeapInternalTimer<K, N> from, TimerHeapInternalTimer<K, N> reuse) {
+ return copy(from);
+ }
+
+ @Override
+ public int getLength() {
+ // we do not have fixed length
+ return -1;
+ }
+
+ @Override
+ public void serialize(TimerHeapInternalTimer<K, N> record, DataOutputView target) throws IOException {
+ keySerializer.serialize(record.getKey(), target);
+ namespaceSerializer.serialize(record.getNamespace(), target);
+ LongSerializer.INSTANCE.serialize(record.getTimestamp(), target);
+ }
+
+ @Override
+ public TimerHeapInternalTimer<K, N> deserialize(DataInputView source) throws IOException {
+ K key = keySerializer.deserialize(source);
+ N namespace = namespaceSerializer.deserialize(source);
+ Long timestamp = LongSerializer.INSTANCE.deserialize(source);
+ return new TimerHeapInternalTimer<>(timestamp, key, namespace);
+ }
+
+ @Override
+ public TimerHeapInternalTimer<K, N> deserialize(TimerHeapInternalTimer<K, N> reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ keySerializer.copy(source, target);
+ namespaceSerializer.copy(source, target);
+ LongSerializer.INSTANCE.copy(source, target);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj == this ||
+ (obj != null && obj.getClass() == getClass() &&
+ keySerializer.equals(((LegacyTimerSerializer) obj).keySerializer) &&
+ namespaceSerializer.equals(((LegacyTimerSerializer) obj).namespaceSerializer));
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return getClass().hashCode();
+ }
+
+ @Override
+ public TypeSerializerConfigSnapshot snapshotConfiguration() {
+ throw new UnsupportedOperationException("This serializer is not registered for managed state.");
+ }
+
+ @Override
+ public CompatibilityResult<TimerHeapInternalTimer<K, N>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+ throw new UnsupportedOperationException("This serializer is not registered for managed state.");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
index ca9cb0b..a6bee4c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
@@ -209,7 +209,8 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize
keyGroupRange,
keyContext,
keyedStatedBackend,
- processingTimeService);
+ processingTimeService,
+ keyedStatedBackend.requiresLegacySynchronousTimerSnapshots());
// and then initialize the timer services
for (KeyGroupStatePartitionStreamProvider streamProvider : rawKeyedStates) {
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
index b9ef88e..a6194ed 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
@@ -19,19 +19,11 @@
package org.apache.flink.streaming.api.operators;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSet;
import javax.annotation.Nonnull;
-import java.io.IOException;
-
/**
* Implementation of {@link InternalTimer} to use with a {@link HeapPriorityQueueSet}.
*
@@ -133,119 +125,8 @@ public final class TimerHeapInternalTimer<K, N> implements InternalTimer<K, N>,
'}';
}
- /**
- * A {@link TypeSerializer} used to serialize/deserialize a {@link TimerHeapInternalTimer}.
- */
- public static class TimerSerializer<K, N> extends TypeSerializer<TimerHeapInternalTimer<K, N>> {
-
- private static final long serialVersionUID = 1119562170939152304L;
-
- @Nonnull
- private final TypeSerializer<K> keySerializer;
-
- @Nonnull
- private final TypeSerializer<N> namespaceSerializer;
-
- TimerSerializer(@Nonnull TypeSerializer<K> keySerializer, @Nonnull TypeSerializer<N> namespaceSerializer) {
- this.keySerializer = keySerializer;
- this.namespaceSerializer = namespaceSerializer;
- }
-
- @Override
- public boolean isImmutableType() {
- return false;
- }
-
- @Override
- public TypeSerializer<TimerHeapInternalTimer<K, N>> duplicate() {
-
- final TypeSerializer<K> keySerializerDuplicate = keySerializer.duplicate();
- final TypeSerializer<N> namespaceSerializerDuplicate = namespaceSerializer.duplicate();
-
- if (keySerializerDuplicate == keySerializer &&
- namespaceSerializerDuplicate == namespaceSerializer) {
- // all delegate serializers seem stateless, so this is also stateless.
- return this;
- } else {
- // at least one delegate serializer seems to be stateful, so we return a new instance.
- return new TimerSerializer<>(keySerializerDuplicate, namespaceSerializerDuplicate);
- }
- }
-
- @Override
- public TimerHeapInternalTimer<K, N> createInstance() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public TimerHeapInternalTimer<K, N> copy(TimerHeapInternalTimer<K, N> from) {
- return new TimerHeapInternalTimer<>(from.getTimestamp(), from.getKey(), from.getNamespace());
- }
-
- @Override
- public TimerHeapInternalTimer<K, N> copy(TimerHeapInternalTimer<K, N> from, TimerHeapInternalTimer<K, N> reuse) {
- return copy(from);
- }
-
- @Override
- public int getLength() {
- // we do not have fixed length
- return -1;
- }
-
- @Override
- public void serialize(TimerHeapInternalTimer<K, N> record, DataOutputView target) throws IOException {
- keySerializer.serialize(record.getKey(), target);
- namespaceSerializer.serialize(record.getNamespace(), target);
- LongSerializer.INSTANCE.serialize(record.getTimestamp(), target);
- }
-
- @Override
- public TimerHeapInternalTimer<K, N> deserialize(DataInputView source) throws IOException {
- K key = keySerializer.deserialize(source);
- N namespace = namespaceSerializer.deserialize(source);
- Long timestamp = LongSerializer.INSTANCE.deserialize(source);
- return new TimerHeapInternalTimer<>(timestamp, key, namespace);
- }
-
- @Override
- public TimerHeapInternalTimer<K, N> deserialize(TimerHeapInternalTimer<K, N> reuse, DataInputView source) throws IOException {
- return deserialize(source);
- }
-
- @Override
- public void copy(DataInputView source, DataOutputView target) throws IOException {
- keySerializer.copy(source, target);
- namespaceSerializer.copy(source, target);
- LongSerializer.INSTANCE.copy(source, target);
- }
-
- @Override
- public boolean equals(Object obj) {
- return obj == this ||
- (obj != null && obj.getClass() == getClass() &&
- keySerializer.equals(((TimerSerializer) obj).keySerializer) &&
- namespaceSerializer.equals(((TimerSerializer) obj).namespaceSerializer));
- }
-
- @Override
- public boolean canEqual(Object obj) {
- return true;
- }
-
- @Override
- public int hashCode() {
- return getClass().hashCode();
- }
-
- @Override
- public TypeSerializerConfigSnapshot snapshotConfiguration() {
- throw new UnsupportedOperationException("This serializer is not registered for managed state.");
- }
-
- @Override
- public CompatibilityResult<TimerHeapInternalTimer<K, N>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
- throw new UnsupportedOperationException("This serializer is not registered for managed state.");
- }
+ @Override
+ public int comparePriorityTo(@Nonnull InternalTimer<?, ?> other) {
+ return Long.compare(timestamp, other.getTimestamp());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializer.java
index 87a3159..73f42ef 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializer.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.operators;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.core.memory.DataInputView;
@@ -31,8 +32,8 @@ import java.io.IOException;
import java.util.Objects;
/**
- * A serializer for {@link TimerHeapInternalTimer} objects that produces a serialization format that is aligned with
- * {@link InternalTimer#getTimerComparator()}.
+ * A serializer for {@link TimerHeapInternalTimer} objects that produces a serialization format that is
+ * lexicographically aligned the priority of the timers.
*
* @param <K> type of the timer key.
* @param <N> type of the timer namespace.
@@ -201,13 +202,14 @@ public class TimerSerializer<K, N> extends TypeSerializer<TimerHeapInternalTimer
@Override
public TypeSerializerConfigSnapshot snapshotConfiguration() {
- throw new UnsupportedOperationException("This serializer is currently not used to write state.");
+ return new TimerSerializerConfigSnapshot<>(keySerializer, namespaceSerializer);
}
@Override
public CompatibilityResult<TimerHeapInternalTimer<K, N>> ensureCompatibility(
TypeSerializerConfigSnapshot configSnapshot) {
- throw new UnsupportedOperationException("This serializer is currently not used to write state.");
+ //TODO this is just a mock (assuming no serializer updates) for now and needs a proper implementation! change this before release.
+ return CompatibilityResult.compatible();
}
@Nonnull
@@ -219,4 +221,25 @@ public class TimerSerializer<K, N> extends TypeSerializer<TimerHeapInternalTimer
public TypeSerializer<N> getNamespaceSerializer() {
return namespaceSerializer;
}
+
+ /**
+ * Snaphot of a {@link TimerSerializer}.
+ *
+ * @param <K> type of key.
+ * @param <N> type of namespace.
+ */
+ public static class TimerSerializerConfigSnapshot<K, N> extends CompositeTypeSerializerConfigSnapshot {
+
+ public TimerSerializerConfigSnapshot() {
+ }
+
+ public TimerSerializerConfigSnapshot(TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer) {
+ super(keySerializer, namespaceSerializer);
+ }
+
+ @Override
+ public int getVersion() {
+ return 0;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
index 519f10e..957a535 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
@@ -910,8 +910,6 @@ public class HeapInternalTimerServiceTest {
PriorityQueueSetFactory priorityQueueSetFactory) {
return priorityQueueSetFactory.create(
name,
- timerSerializer,
- InternalTimer.getTimerComparator(),
- InternalTimer.getKeyExtractorFunction());
+ timerSerializer);
}
}