You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/16 21:06:34 UTC
[5/8] flink git commit: [FLINK-9489] Checkpoint timers as part of
managed keyed state instead of raw keyed state
[FLINK-9489] Checkpoint timers as part of managed keyed state instead of raw keyed state
Optimization for relaxed bulk polls
Deactivate optimization for now because it still contains a bug
This closes #6333.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dbddf00b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dbddf00b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dbddf00b
Branch: refs/heads/master
Commit: dbddf00b75032c20df6e7aef26814da392347194
Parents: 0bbc91e
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed Jun 13 11:56:16 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jul 16 22:11:57 2018 +0200
----------------------------------------------------------------------
.../state/AbstractKeyedStateBackend.java | 5 +
.../state/BackendWritableBroadcastState.java | 4 +-
.../state/DefaultOperatorStateBackend.java | 32 +--
.../flink/runtime/state/HeapBroadcastState.java | 10 +-
.../runtime/state/KeyExtractorFunction.java | 13 ++
.../runtime/state/KeyGroupPartitioner.java | 72 +++++-
.../org/apache/flink/runtime/state/Keyed.java | 32 +++
.../flink/runtime/state/PriorityComparable.java | 33 +++
.../flink/runtime/state/PriorityComparator.java | 7 +
.../runtime/state/PriorityQueueSetFactory.java | 6 +-
...RegisteredBroadcastBackendStateMetaInfo.java | 155 -------------
...RegisteredBroadcastStateBackendMetaInfo.java | 169 ++++++++++++++
.../RegisteredKeyValueStateBackendMetaInfo.java | 224 +++++++++++++++++++
.../RegisteredKeyedBackendStateMetaInfo.java | 212 ------------------
.../RegisteredOperatorBackendStateMetaInfo.java | 142 ------------
.../RegisteredOperatorStateBackendMetaInfo.java | 153 +++++++++++++
...steredPriorityQueueStateBackendMetaInfo.java | 87 +++++++
.../state/RegisteredStateMetaInfoBase.java | 17 ++
.../flink/runtime/state/StateSnapshot.java | 27 ++-
.../state/StateSnapshotKeyGroupReader.java | 44 ++++
.../runtime/state/StateSnapshotRestore.java | 47 ++++
.../state/TieBreakingPriorityComparator.java | 5 -
.../heap/CachingInternalPriorityQueueSet.java | 46 ++++
.../state/heap/CopyOnWriteStateTable.java | 13 +-
.../heap/CopyOnWriteStateTableSnapshot.java | 12 +-
.../state/heap/HeapKeyedStateBackend.java | 163 +++++++++-----
.../runtime/state/heap/HeapPriorityQueue.java | 3 +-
.../state/heap/HeapPriorityQueueSetFactory.java | 18 +-
...HeapPriorityQueueSnapshotRestoreWrapper.java | 102 +++++++++
.../heap/HeapPriorityQueueStateSnapshot.java | 118 ++++++++++
.../heap/KeyGroupPartitionedPriorityQueue.java | 18 ++
.../state/heap/NestedMapsStateTable.java | 38 ++--
.../flink/runtime/state/heap/StateTable.java | 25 ++-
.../state/heap/StateTableByKeyGroupReader.java | 38 ----
.../state/heap/StateTableByKeyGroupReaders.java | 89 +++-----
.../state/metainfo/StateMetaInfoSnapshot.java | 5 +-
.../StateMetaInfoSnapshotReadersWriters.java | 38 +++-
.../state/KeyGroupPartitionerTestBase.java | 4 +-
.../runtime/state/SerializationProxiesTest.java | 54 ++---
.../state/StateSnapshotCompressionTest.java | 9 +-
.../state/heap/CopyOnWriteStateTableTest.java | 32 +--
.../StateTableSnapshotCompatibilityTest.java | 17 +-
.../StateMetaInfoSnapshotEnumConstantsTest.java | 4 +-
.../state/ttl/mock/MockKeyedStateBackend.java | 19 +-
.../state/RocksDBAggregatingState.java | 4 +-
.../streaming/state/RocksDBFoldingState.java | 4 +-
.../state/RocksDBKeyedStateBackend.java | 188 +++++++---------
.../streaming/state/RocksDBListState.java | 4 +-
.../streaming/state/RocksDBMapState.java | 4 +-
.../streaming/state/RocksDBOrderedSetStore.java | 1 -
.../streaming/state/RocksDBReducingState.java | 4 +-
.../streaming/state/RocksDBValueState.java | 4 +-
.../state/RocksDBWriteBatchWrapper.java | 4 +
.../api/operators/AbstractStreamOperator.java | 6 +-
.../HeapPriorityQueueStateSnapshot.java | 112 ----------
.../operators/InternalTimeServiceManager.java | 31 ++-
.../streaming/api/operators/InternalTimer.java | 15 +-
.../InternalTimersSnapshotReaderWriters.java | 126 ++++++++++-
.../StreamTaskStateInitializerImpl.java | 3 +-
.../api/operators/TimerHeapInternalTimer.java | 125 +----------
.../api/operators/TimerSerializer.java | 31 ++-
.../operators/HeapInternalTimerServiceTest.java | 4 +-
62 files changed, 1807 insertions(+), 1224 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index c7f1bd9..17d24f7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -315,4 +315,9 @@ public abstract class AbstractKeyedStateBackend<K> implements
@VisibleForTesting
public abstract int numStateEntries();
+ // TODO remove this once heap-based timers are working with RocksDB incremental snapshots!
+ public boolean requiresLegacySynchronousTimerSnapshots() {
+ return false;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendWritableBroadcastState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendWritableBroadcastState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendWritableBroadcastState.java
index 8daf07c..ba3985b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendWritableBroadcastState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendWritableBroadcastState.java
@@ -36,7 +36,7 @@ public interface BackendWritableBroadcastState<K, V> extends BroadcastState<K, V
long write(FSDataOutputStream out) throws IOException;
- void setStateMetaInfo(RegisteredBroadcastBackendStateMetaInfo<K, V> stateMetaInfo);
+ void setStateMetaInfo(RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo);
- RegisteredBroadcastBackendStateMetaInfo<K, V> getStateMetaInfo();
+ RegisteredBroadcastStateBackendMetaInfo<K, V> getStateMetaInfo();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index dc9b75f..f1d0b57 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -209,7 +209,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
if (broadcastState == null) {
broadcastState = new HeapBroadcastState<>(
- new RegisteredBroadcastBackendStateMetaInfo<>(
+ new RegisteredBroadcastStateBackendMetaInfo<>(
name,
OperatorStateHandle.Mode.BROADCAST,
broadcastStateKeySerializer,
@@ -227,7 +227,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
final StateMetaInfoSnapshot metaInfoSnapshot = restoredBroadcastStateMetaInfos.get(name);
@SuppressWarnings("unchecked")
- RegisteredBroadcastBackendStateMetaInfo<K, V> restoredMetaInfo = new RegisteredBroadcastBackendStateMetaInfo<K, V>(metaInfoSnapshot);
+ RegisteredBroadcastStateBackendMetaInfo<K, V> restoredMetaInfo = new RegisteredBroadcastStateBackendMetaInfo<K, V>(metaInfoSnapshot);
// check compatibility to determine if state migration is required
CompatibilityResult<K> keyCompatibility = CompatibilityUtil.resolveCompatibilityResult(
@@ -247,7 +247,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
if (!keyCompatibility.isRequiresMigration() && !valueCompatibility.isRequiresMigration()) {
// new serializer is compatible; use it to replace the old serializer
broadcastState.setStateMetaInfo(
- new RegisteredBroadcastBackendStateMetaInfo<>(
+ new RegisteredBroadcastStateBackendMetaInfo<>(
name,
OperatorStateHandle.Mode.BROADCAST,
broadcastStateKeySerializer,
@@ -510,8 +510,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
// Recreate all PartitionableListStates from the meta info
for (StateMetaInfoSnapshot restoredSnapshot : restoredOperatorMetaInfoSnapshots) {
- final RegisteredOperatorBackendStateMetaInfo<?> restoredMetaInfo =
- new RegisteredOperatorBackendStateMetaInfo<>(restoredSnapshot);
+ final RegisteredOperatorStateBackendMetaInfo<?> restoredMetaInfo =
+ new RegisteredOperatorStateBackendMetaInfo<>(restoredSnapshot);
if (restoredMetaInfo.getPartitionStateSerializer() == null ||
restoredMetaInfo.getPartitionStateSerializer() instanceof UnloadableDummyTypeSerializer) {
@@ -546,8 +546,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
for (StateMetaInfoSnapshot restoredSnapshot : restoredBroadcastMetaInfoSnapshots) {
- final RegisteredBroadcastBackendStateMetaInfo<?, ?> restoredMetaInfo =
- new RegisteredBroadcastBackendStateMetaInfo<>(restoredSnapshot);
+ final RegisteredBroadcastStateBackendMetaInfo<?, ?> restoredMetaInfo =
+ new RegisteredBroadcastStateBackendMetaInfo<>(restoredSnapshot);
if (restoredMetaInfo.getKeySerializer() == null || restoredMetaInfo.getValueSerializer() == null ||
restoredMetaInfo.getKeySerializer() instanceof UnloadableDummyTypeSerializer ||
@@ -613,7 +613,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
/**
* Meta information of the state, including state name, assignment mode, and serializer
*/
- private RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo;
+ private RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo;
/**
* The internal list the holds the elements of the state
@@ -625,12 +625,12 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
*/
private final ArrayListSerializer<S> internalListCopySerializer;
- PartitionableListState(RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo) {
+ PartitionableListState(RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo) {
this(stateMetaInfo, new ArrayList<S>());
}
private PartitionableListState(
- RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo,
+ RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo,
ArrayList<S> internalList) {
this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo);
@@ -643,11 +643,11 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
this(toCopy.stateMetaInfo.deepCopy(), toCopy.internalListCopySerializer.copy(toCopy.internalList));
}
- public void setStateMetaInfo(RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo) {
+ public void setStateMetaInfo(RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo) {
this.stateMetaInfo = stateMetaInfo;
}
- public RegisteredOperatorBackendStateMetaInfo<S> getStateMetaInfo() {
+ public RegisteredOperatorStateBackendMetaInfo<S> getStateMetaInfo() {
return stateMetaInfo;
}
@@ -741,7 +741,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
// no restored state for the state name; simply create new state holder
partitionableListState = new PartitionableListState<>(
- new RegisteredOperatorBackendStateMetaInfo<>(
+ new RegisteredOperatorStateBackendMetaInfo<>(
name,
partitionStateSerializer,
mode));
@@ -757,8 +757,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
mode);
StateMetaInfoSnapshot restoredSnapshot = restoredOperatorStateMetaInfos.get(name);
- RegisteredOperatorBackendStateMetaInfo<S> metaInfo =
- new RegisteredOperatorBackendStateMetaInfo<>(restoredSnapshot);
+ RegisteredOperatorStateBackendMetaInfo<S> metaInfo =
+ new RegisteredOperatorStateBackendMetaInfo<>(restoredSnapshot);
// check compatibility to determine if state migration is required
TypeSerializer<S> newPartitionStateSerializer = partitionStateSerializer.duplicate();
@@ -772,7 +772,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
if (!stateCompatibility.isRequiresMigration()) {
// new serializer is compatible; use it to replace the old serializer
partitionableListState.setStateMetaInfo(
- new RegisteredOperatorBackendStateMetaInfo<>(name, newPartitionStateSerializer, mode));
+ new RegisteredOperatorStateBackendMetaInfo<>(name, newPartitionStateSerializer, mode));
} else {
// TODO state migration currently isn't possible.
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
index 7ebf1ce..a262103 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
@@ -42,7 +42,7 @@ public class HeapBroadcastState<K, V> implements BackendWritableBroadcastState<K
/**
* Meta information of the state, including state name, assignment mode, and serializer.
*/
- private RegisteredBroadcastBackendStateMetaInfo<K, V> stateMetaInfo;
+ private RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo;
/**
* The internal map the holds the elements of the state.
@@ -54,11 +54,11 @@ public class HeapBroadcastState<K, V> implements BackendWritableBroadcastState<K
*/
private final MapSerializer<K, V> internalMapCopySerializer;
- HeapBroadcastState(RegisteredBroadcastBackendStateMetaInfo<K, V> stateMetaInfo) {
+ HeapBroadcastState(RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo) {
this(stateMetaInfo, new HashMap<>());
}
- private HeapBroadcastState(final RegisteredBroadcastBackendStateMetaInfo<K, V> stateMetaInfo, final Map<K, V> internalMap) {
+ private HeapBroadcastState(final RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo, final Map<K, V> internalMap) {
this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo);
this.backingMap = Preconditions.checkNotNull(internalMap);
@@ -70,12 +70,12 @@ public class HeapBroadcastState<K, V> implements BackendWritableBroadcastState<K
}
@Override
- public void setStateMetaInfo(RegisteredBroadcastBackendStateMetaInfo<K, V> stateMetaInfo) {
+ public void setStateMetaInfo(RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo) {
this.stateMetaInfo = stateMetaInfo;
}
@Override
- public RegisteredBroadcastBackendStateMetaInfo<K, V> getStateMetaInfo() {
+ public RegisteredBroadcastStateBackendMetaInfo<K, V> getStateMetaInfo() {
return stateMetaInfo;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java
index a3ce11c..79fafc5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java
@@ -28,9 +28,22 @@ import javax.annotation.Nonnull;
@FunctionalInterface
public interface KeyExtractorFunction<T> {
+ KeyExtractorFunction<? extends Keyed<?>> FOR_KEYED_OBJECTS = new KeyExtractorFunction<Keyed<?>>() {
+ @Nonnull
+ @Override
+ public Object extractKeyFromElement(@Nonnull Keyed<?> element) {
+ return element.getKey();
+ }
+ };
+
/**
* Returns the key for the given element by which the key-group can be computed.
*/
@Nonnull
Object extractKeyFromElement(@Nonnull T element);
+
+ @SuppressWarnings("unchecked")
+ static <T extends Keyed<?>> KeyExtractorFunction<T> forKeyedObjects() {
+ return (KeyExtractorFunction<T>) FOR_KEYED_OBJECTS;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java
index 6a9dfb5..27d411c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state;
+import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.Preconditions;
@@ -28,7 +29,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
/**
- * Abstract class that contains the base algorithm for partitioning data into key-groups. This algorithm currently works
+ * Class that contains the base algorithm for partitioning data into key-groups. This algorithm currently works
* with two array (input, output) for optimal algorithmic complexity. Notice that this could also be implemented over a
* single array, using some cuckoo-hashing-style element replacement. This would have worse algorithmic complexity but
* better space efficiency. We currently prefer the trade-off in favor of better algorithmic complexity.
@@ -89,7 +90,7 @@ public class KeyGroupPartitioner<T> {
/** Cached result. */
@Nullable
- protected StateSnapshot.KeyGroupPartitionedSnapshot computedResult;
+ protected StateSnapshot.StateKeyGroupWriter computedResult;
/**
* Creates a new {@link KeyGroupPartitioner}.
@@ -131,7 +132,7 @@ public class KeyGroupPartitioner<T> {
/**
* Partitions the data into key-groups and returns the result via {@link PartitioningResult}.
*/
- public StateSnapshot.KeyGroupPartitionedSnapshot partitionByKeyGroup() {
+ public StateSnapshot.StateKeyGroupWriter partitionByKeyGroup() {
if (computedResult == null) {
reportAllElementKeyGroups();
buildHistogramByAccumulatingCounts();
@@ -198,7 +199,7 @@ public class KeyGroupPartitioner<T> {
* This represents the result of key-group partitioning. The data in {@link #partitionedElements} is partitioned
* w.r.t. {@link KeyGroupPartitioner#keyGroupRange}.
*/
- public static class PartitioningResult<T> implements StateSnapshot.KeyGroupPartitionedSnapshot {
+ private static class PartitioningResult<T> implements StateSnapshot.StateKeyGroupWriter {
/**
* Function to write one element to a {@link DataOutputView}.
@@ -249,7 +250,7 @@ public class KeyGroupPartitioner<T> {
}
@Override
- public void writeMappingsInKeyGroup(@Nonnull DataOutputView dov, int keyGroupId) throws IOException {
+ public void writeStateInKeyGroup(@Nonnull DataOutputView dov, int keyGroupId) throws IOException {
int startOffset = getKeyGroupStartOffsetInclusive(keyGroupId);
int endOffset = getKeyGroupEndOffsetExclusive(keyGroupId);
@@ -264,6 +265,43 @@ public class KeyGroupPartitioner<T> {
}
}
+ public static <T> StateSnapshotKeyGroupReader createKeyGroupPartitionReader(
+ @Nonnull ElementReaderFunction<T> readerFunction,
+ @Nonnull KeyGroupElementsConsumer<T> elementConsumer) {
+ return new PartitioningResultKeyGroupReader<>(readerFunction, elementConsumer);
+ }
+
+ /**
+ * General algorithm to read key-grouped state that was written from a {@link PartitioningResult}
+ *
+ * @param <T> type of the elements to read.
+ */
+ private static class PartitioningResultKeyGroupReader<T> implements StateSnapshotKeyGroupReader {
+
+ @Nonnull
+ private final ElementReaderFunction<T> readerFunction;
+
+ @Nonnull
+ private final KeyGroupElementsConsumer<T> elementConsumer;
+
+ public PartitioningResultKeyGroupReader(
+ @Nonnull ElementReaderFunction<T> readerFunction,
+ @Nonnull KeyGroupElementsConsumer<T> elementConsumer) {
+
+ this.readerFunction = readerFunction;
+ this.elementConsumer = elementConsumer;
+ }
+
+ @Override
+ public void readMappingsInKeyGroup(@Nonnull DataInputView in, @Nonnegative int keyGroupId) throws IOException {
+ int numElements = in.readInt();
+ for (int i = 0; i < numElements; i++) {
+ T element = readerFunction.readElement(in);
+ elementConsumer.consume(element, keyGroupId);
+ }
+ }
+ }
+
/**
* This functional interface defines how one element is written to a {@link DataOutputView}.
*
@@ -281,4 +319,28 @@ public class KeyGroupPartitioner<T> {
*/
void writeElement(@Nonnull T element, @Nonnull DataOutputView dov) throws IOException;
}
+
+ /**
+ * This functional interface defines how one element is read from a {@link DataInputView}.
+ *
+ * @param <T> type of the read elements.
+ */
+ @FunctionalInterface
+ public interface ElementReaderFunction<T> {
+
+ @Nonnull
+ T readElement(@Nonnull DataInputView div) throws IOException;
+ }
+
+ /**
+ * Functional interface to consume elements from a key group.
+ *
+ * @param <T> type of the consumed elements.
+ */
+ @FunctionalInterface
+ public interface KeyGroupElementsConsumer<T> {
+
+
+ void consume(@Nonnull T element, @Nonnegative int keyGroupId) throws IOException;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Keyed.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Keyed.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Keyed.java
new file mode 100644
index 0000000..4320b0b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Keyed.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+/**
+ * Interface for objects that have a key attribute.
+ *
+ * @param <K> type of the key.
+ */
+public interface Keyed<K> {
+
+ /**
+ * Returns the key attribute.
+ */
+ K getKey();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparable.java
new file mode 100644
index 0000000..4d6cce0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparable.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Interface for objects that can be compared by priority.
+ * @param <T> type of the compared objects.
+ */
+public interface PriorityComparable<T> {
+
+ /**
+ * @see PriorityComparator#comparePriority(Object, Object).
+ */
+ int comparePriorityTo(@Nonnull T other);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparator.java
index 2f6f5a7..ec36924 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparator.java
@@ -30,6 +30,8 @@ package org.apache.flink.runtime.state;
@FunctionalInterface
public interface PriorityComparator<T> {
+ PriorityComparator<? extends PriorityComparable<Object>> FOR_PRIORITY_COMPARABLE_OBJECTS = PriorityComparable::comparePriorityTo;
+
/**
* Compares two objects for priority. Returns a negative integer, zero, or a positive integer as the first
* argument has lower, equal to, or higher priority than the second.
@@ -39,4 +41,9 @@ public interface PriorityComparator<T> {
* priority than the second.
*/
int comparePriority(T left, T right);
+
+ @SuppressWarnings("unchecked")
+ static <T extends PriorityComparable<?>> PriorityComparator<T> forPriorityComparableObjects() {
+ return (PriorityComparator<T>) FOR_PRIORITY_COMPARABLE_OBJECTS;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java
index 6f509c0..2245e72 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java
@@ -38,9 +38,7 @@ public interface PriorityQueueSetFactory {
* @return the queue with the specified unique name.
*/
@Nonnull
- <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T> create(
+ <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T> create(
@Nonnull String stateName,
- @Nonnull TypeSerializer<T> byteOrderedElementSerializer,
- @Nonnull PriorityComparator<T> elementPriorityComparator,
- @Nonnull KeyExtractorFunction<T> keyExtractor);
+ @Nonnull TypeSerializer<T> byteOrderedElementSerializer);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java
deleted file mode 100644
index 98a8195..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nonnull;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-
-public class RegisteredBroadcastBackendStateMetaInfo<K, V> extends RegisteredStateMetaInfoBase {
-
- /** The mode how elements in this state are assigned to tasks during restore. */
- private final OperatorStateHandle.Mode assignmentMode;
-
- /** The type serializer for the keys in the map state. */
- private final TypeSerializer<K> keySerializer;
-
- /** The type serializer for the values in the map state. */
- private final TypeSerializer<V> valueSerializer;
-
- public RegisteredBroadcastBackendStateMetaInfo(
- final String name,
- final OperatorStateHandle.Mode assignmentMode,
- final TypeSerializer<K> keySerializer,
- final TypeSerializer<V> valueSerializer) {
-
- super(name);
- Preconditions.checkArgument(assignmentMode != null && assignmentMode == OperatorStateHandle.Mode.BROADCAST);
- this.assignmentMode = assignmentMode;
- this.keySerializer = Preconditions.checkNotNull(keySerializer);
- this.valueSerializer = Preconditions.checkNotNull(valueSerializer);
- }
-
- public RegisteredBroadcastBackendStateMetaInfo(RegisteredBroadcastBackendStateMetaInfo<K, V> copy) {
- this(
- Preconditions.checkNotNull(copy).name,
- copy.assignmentMode,
- copy.keySerializer.duplicate(),
- copy.valueSerializer.duplicate());
- }
-
- @SuppressWarnings("unchecked")
- public RegisteredBroadcastBackendStateMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) {
- this(
- snapshot.getName(),
- OperatorStateHandle.Mode.valueOf(
- snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)),
- (TypeSerializer<K>) snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER),
- (TypeSerializer<V>) snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
- Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.BROADCAST == snapshot.getBackendStateType());
- }
-
- /**
- * Creates a deep copy of the itself.
- */
- public RegisteredBroadcastBackendStateMetaInfo<K, V> deepCopy() {
- return new RegisteredBroadcastBackendStateMetaInfo<>(this);
- }
-
- @Nonnull
- @Override
- public StateMetaInfoSnapshot snapshot() {
- Map<String, String> optionsMap = Collections.singletonMap(
- StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(),
- assignmentMode.toString());
- Map<String, TypeSerializer<?>> serializerMap = new HashMap<>(2);
- Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshotsMap = new HashMap<>(2);
- String keySerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.toString();
- String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
- serializerMap.put(keySerializerKey, keySerializer.duplicate());
- serializerConfigSnapshotsMap.put(keySerializerKey, keySerializer.snapshotConfiguration());
- serializerMap.put(valueSerializerKey, valueSerializer.duplicate());
- serializerConfigSnapshotsMap.put(valueSerializerKey, valueSerializer.snapshotConfiguration());
-
- return new StateMetaInfoSnapshot(
- name,
- StateMetaInfoSnapshot.BackendStateType.BROADCAST,
- optionsMap,
- serializerConfigSnapshotsMap,
- serializerMap);
- }
-
- public TypeSerializer<K> getKeySerializer() {
- return keySerializer;
- }
-
- public TypeSerializer<V> getValueSerializer() {
- return valueSerializer;
- }
-
- public OperatorStateHandle.Mode getAssignmentMode() {
- return assignmentMode;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
-
- if (!(obj instanceof RegisteredBroadcastBackendStateMetaInfo)) {
- return false;
- }
-
- final RegisteredBroadcastBackendStateMetaInfo other =
- (RegisteredBroadcastBackendStateMetaInfo) obj;
-
- return Objects.equals(name, other.getName())
- && Objects.equals(assignmentMode, other.getAssignmentMode())
- && Objects.equals(keySerializer, other.getKeySerializer())
- && Objects.equals(valueSerializer, other.getValueSerializer());
- }
-
- @Override
- public int hashCode() {
- int result = name.hashCode();
- result = 31 * result + assignmentMode.hashCode();
- result = 31 * result + keySerializer.hashCode();
- result = 31 * result + valueSerializer.hashCode();
- return result;
- }
-
- @Override
- public String toString() {
- return "RegisteredBroadcastBackendStateMetaInfo{" +
- "name='" + name + '\'' +
- ", keySerializer=" + keySerializer +
- ", valueSerializer=" + valueSerializer +
- ", assignmentMode=" + assignmentMode +
- '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
new file mode 100644
index 0000000..02ab8ef
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+public class RegisteredBroadcastStateBackendMetaInfo<K, V> extends RegisteredStateMetaInfoBase {
+
+ /** The mode how elements in this state are assigned to tasks during restore. */
+ @Nonnull
+ private final OperatorStateHandle.Mode assignmentMode;
+
+ /** The type serializer for the keys in the map state. */
+ @Nonnull
+ private final TypeSerializer<K> keySerializer;
+
+ /** The type serializer for the values in the map state. */
+ @Nonnull
+ private final TypeSerializer<V> valueSerializer;
+
+ public RegisteredBroadcastStateBackendMetaInfo(
+ @Nonnull final String name,
+ @Nonnull final OperatorStateHandle.Mode assignmentMode,
+ @Nonnull final TypeSerializer<K> keySerializer,
+ @Nonnull final TypeSerializer<V> valueSerializer) {
+
+ super(name);
+ Preconditions.checkArgument(assignmentMode == OperatorStateHandle.Mode.BROADCAST);
+ this.assignmentMode = assignmentMode;
+ this.keySerializer = keySerializer;
+ this.valueSerializer = valueSerializer;
+ }
+
+ public RegisteredBroadcastStateBackendMetaInfo(@Nonnull RegisteredBroadcastStateBackendMetaInfo<K, V> copy) {
+ this(
+ Preconditions.checkNotNull(copy).name,
+ copy.assignmentMode,
+ copy.keySerializer.duplicate(),
+ copy.valueSerializer.duplicate());
+ }
+
+ @SuppressWarnings("unchecked")
+ public RegisteredBroadcastStateBackendMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) {
+ this(
+ snapshot.getName(),
+ OperatorStateHandle.Mode.valueOf(
+ snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)),
+ (TypeSerializer<K>) Preconditions.checkNotNull(
+ snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER)),
+ (TypeSerializer<V>) Preconditions.checkNotNull(
+ snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)));
+ Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.BROADCAST == snapshot.getBackendStateType());
+ }
+
+ /**
+ * Creates a deep copy of the itself.
+ */
+ @Nonnull
+ public RegisteredBroadcastStateBackendMetaInfo<K, V> deepCopy() {
+ return new RegisteredBroadcastStateBackendMetaInfo<>(this);
+ }
+
+ @Nonnull
+ @Override
+ public StateMetaInfoSnapshot snapshot() {
+ return computeSnapshot();
+ }
+
+ @Nonnull
+ public TypeSerializer<K> getKeySerializer() {
+ return keySerializer;
+ }
+
+ @Nonnull
+ public TypeSerializer<V> getValueSerializer() {
+ return valueSerializer;
+ }
+
+ @Nonnull
+ public OperatorStateHandle.Mode getAssignmentMode() {
+ return assignmentMode;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+
+ if (!(obj instanceof RegisteredBroadcastStateBackendMetaInfo)) {
+ return false;
+ }
+
+ final RegisteredBroadcastStateBackendMetaInfo other =
+ (RegisteredBroadcastStateBackendMetaInfo) obj;
+
+ return Objects.equals(name, other.getName())
+ && Objects.equals(assignmentMode, other.getAssignmentMode())
+ && Objects.equals(keySerializer, other.getKeySerializer())
+ && Objects.equals(valueSerializer, other.getValueSerializer());
+ }
+
+ @Override
+ public int hashCode() {
+ int result = name.hashCode();
+ result = 31 * result + assignmentMode.hashCode();
+ result = 31 * result + keySerializer.hashCode();
+ result = 31 * result + valueSerializer.hashCode();
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "RegisteredBroadcastBackendStateMetaInfo{" +
+ "name='" + name + '\'' +
+ ", keySerializer=" + keySerializer +
+ ", valueSerializer=" + valueSerializer +
+ ", assignmentMode=" + assignmentMode +
+ '}';
+ }
+
+ @Nonnull
+ private StateMetaInfoSnapshot computeSnapshot() {
+ Map<String, String> optionsMap = Collections.singletonMap(
+ StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(),
+ assignmentMode.toString());
+ Map<String, TypeSerializer<?>> serializerMap = new HashMap<>(2);
+ Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshotsMap = new HashMap<>(2);
+ String keySerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.toString();
+ String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
+ serializerMap.put(keySerializerKey, keySerializer.duplicate());
+ serializerConfigSnapshotsMap.put(keySerializerKey, keySerializer.snapshotConfiguration());
+ serializerMap.put(valueSerializerKey, valueSerializer.duplicate());
+ serializerConfigSnapshotsMap.put(valueSerializerKey, valueSerializer.snapshotConfiguration());
+
+ return new StateMetaInfoSnapshot(
+ name,
+ StateMetaInfoSnapshot.BackendStateType.BROADCAST,
+ optionsMap,
+ serializerConfigSnapshotsMap,
+ serializerMap);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
new file mode 100644
index 0000000..d49a05c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StateMigrationException;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Compound meta information for a registered state in a keyed state backend. This combines all serializers and the
+ * state name.
+ *
+ * @param <N> Type of namespace
+ * @param <S> Type of state value
+ */
+public class RegisteredKeyValueStateBackendMetaInfo<N, S> extends RegisteredStateMetaInfoBase {
+
+ @Nonnull
+ private final StateDescriptor.Type stateType;
+ @Nonnull
+ private final TypeSerializer<N> namespaceSerializer;
+ @Nonnull
+ private final TypeSerializer<S> stateSerializer;
+
+ public RegisteredKeyValueStateBackendMetaInfo(
+ @Nonnull StateDescriptor.Type stateType,
+ @Nonnull String name,
+ @Nonnull TypeSerializer<N> namespaceSerializer,
+ @Nonnull TypeSerializer<S> stateSerializer) {
+
+ super(name);
+ this.stateType = stateType;
+ this.namespaceSerializer = namespaceSerializer;
+ this.stateSerializer = stateSerializer;
+ }
+
+ @SuppressWarnings("unchecked")
+ public RegisteredKeyValueStateBackendMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) {
+ this(
+ StateDescriptor.Type.valueOf(snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)),
+ snapshot.getName(),
+ (TypeSerializer<N>) Preconditions.checkNotNull(
+ snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER)),
+ (TypeSerializer<S>) Preconditions.checkNotNull(
+ snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)));
+ Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.KEY_VALUE == snapshot.getBackendStateType());
+ }
+
+ @Nonnull
+ public StateDescriptor.Type getStateType() {
+ return stateType;
+ }
+
+ @Nonnull
+ public TypeSerializer<N> getNamespaceSerializer() {
+ return namespaceSerializer;
+ }
+
+ @Nonnull
+ public TypeSerializer<S> getStateSerializer() {
+ return stateSerializer;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ RegisteredKeyValueStateBackendMetaInfo<?, ?> that = (RegisteredKeyValueStateBackendMetaInfo<?, ?>) o;
+
+ if (!stateType.equals(that.stateType)) {
+ return false;
+ }
+
+ if (!getName().equals(that.getName())) {
+ return false;
+ }
+
+ return getStateSerializer().equals(that.getStateSerializer())
+ && getNamespaceSerializer().equals(that.getNamespaceSerializer());
+ }
+
+ @Override
+ public String toString() {
+ return "RegisteredKeyedBackendStateMetaInfo{" +
+ "stateType=" + stateType +
+ ", name='" + name + '\'' +
+ ", namespaceSerializer=" + namespaceSerializer +
+ ", stateSerializer=" + stateSerializer +
+ '}';
+ }
+
+ @Override
+ public int hashCode() {
+ int result = getName().hashCode();
+ result = 31 * result + getStateType().hashCode();
+ result = 31 * result + getNamespaceSerializer().hashCode();
+ result = 31 * result + getStateSerializer().hashCode();
+ return result;
+ }
+
+ /**
+ * Checks compatibility of a restored k/v state, with the new {@link StateDescriptor} provided to it.
+ * This checks that the descriptor specifies identical names and state types, as well as
+ * serializers that are compatible for the restored k/v state bytes.
+ */
+ @Nonnull
+ public static <N, S> RegisteredKeyValueStateBackendMetaInfo<N, S> resolveKvStateCompatibility(
+ StateMetaInfoSnapshot restoredStateMetaInfoSnapshot,
+ TypeSerializer<N> newNamespaceSerializer,
+ StateDescriptor<?, S> newStateDescriptor) throws StateMigrationException {
+
+ Preconditions.checkState(
+ Objects.equals(newStateDescriptor.getName(), restoredStateMetaInfoSnapshot.getName()),
+ "Incompatible state names. " +
+ "Was [" + restoredStateMetaInfoSnapshot.getName() + "], " +
+ "registered with [" + newStateDescriptor.getName() + "].");
+
+ final StateDescriptor.Type restoredType =
+ StateDescriptor.Type.valueOf(
+ restoredStateMetaInfoSnapshot.getOption(
+ StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE));
+
+ if (!Objects.equals(newStateDescriptor.getType(), StateDescriptor.Type.UNKNOWN)
+ && !Objects.equals(restoredType, StateDescriptor.Type.UNKNOWN)) {
+
+ Preconditions.checkState(
+ newStateDescriptor.getType() == restoredType,
+ "Incompatible state types. " +
+ "Was [" + restoredType + "], " +
+ "registered with [" + newStateDescriptor.getType() + "].");
+ }
+
+ // check compatibility results to determine if state migration is required
+ CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult(
+ restoredStateMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER),
+ null,
+ restoredStateMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
+ StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER),
+ newNamespaceSerializer);
+
+ TypeSerializer<S> newStateSerializer = newStateDescriptor.getSerializer();
+ CompatibilityResult<S> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
+ restoredStateMetaInfoSnapshot.getTypeSerializer(
+ StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
+ UnloadableDummyTypeSerializer.class,
+ restoredStateMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
+ StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
+ newStateSerializer);
+
+ if (namespaceCompatibility.isRequiresMigration() || stateCompatibility.isRequiresMigration()) {
+ // TODO state migration currently isn't possible.
+ throw new StateMigrationException("State migration isn't supported, yet.");
+ } else {
+ return new RegisteredKeyValueStateBackendMetaInfo<>(
+ newStateDescriptor.getType(),
+ newStateDescriptor.getName(),
+ newNamespaceSerializer,
+ newStateSerializer);
+ }
+ }
+
+ @Nonnull
+ @Override
+ public StateMetaInfoSnapshot snapshot() {
+ return computeSnapshot();
+ }
+
+ @Nonnull
+ private StateMetaInfoSnapshot computeSnapshot() {
+ Map<String, String> optionsMap = Collections.singletonMap(
+ StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE.toString(),
+ stateType.toString());
+ Map<String, TypeSerializer<?>> serializerMap = new HashMap<>(2);
+ Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshotsMap = new HashMap<>(2);
+ String namespaceSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString();
+ String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
+ serializerMap.put(namespaceSerializerKey, namespaceSerializer.duplicate());
+ serializerConfigSnapshotsMap.put(namespaceSerializerKey, namespaceSerializer.snapshotConfiguration());
+ serializerMap.put(valueSerializerKey, stateSerializer.duplicate());
+ serializerConfigSnapshotsMap.put(valueSerializerKey, stateSerializer.snapshotConfiguration());
+
+ return new StateMetaInfoSnapshot(
+ name,
+ StateMetaInfoSnapshot.BackendStateType.KEY_VALUE,
+ optionsMap,
+ serializerConfigSnapshotsMap,
+ serializerMap);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java
deleted file mode 100644
index e9b230a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
-import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.StateMigrationException;
-
-import javax.annotation.Nonnull;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Compound meta information for a registered state in a keyed state backend. This combines all serializers and the
- * state name.
- *
- * @param <N> Type of namespace
- * @param <S> Type of state value
- */
-public class RegisteredKeyedBackendStateMetaInfo<N, S> extends RegisteredStateMetaInfoBase {
-
- private final StateDescriptor.Type stateType;
- private final TypeSerializer<N> namespaceSerializer;
- private final TypeSerializer<S> stateSerializer;
-
- public RegisteredKeyedBackendStateMetaInfo(
- StateDescriptor.Type stateType,
- String name,
- TypeSerializer<N> namespaceSerializer,
- TypeSerializer<S> stateSerializer) {
-
- super(name);
- this.stateType = checkNotNull(stateType);
- this.namespaceSerializer = checkNotNull(namespaceSerializer);
- this.stateSerializer = checkNotNull(stateSerializer);
- }
-
- @SuppressWarnings("unchecked")
- public RegisteredKeyedBackendStateMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) {
- this(
- StateDescriptor.Type.valueOf(snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)),
- snapshot.getName(),
- (TypeSerializer<N>) snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER),
- (TypeSerializer<S>) snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
- Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.KEY_VALUE == snapshot.getBackendStateType());
- }
-
- public StateDescriptor.Type getStateType() {
- return stateType;
- }
-
- public TypeSerializer<N> getNamespaceSerializer() {
- return namespaceSerializer;
- }
-
- public TypeSerializer<S> getStateSerializer() {
- return stateSerializer;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
-
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- RegisteredKeyedBackendStateMetaInfo<?, ?> that = (RegisteredKeyedBackendStateMetaInfo<?, ?>) o;
-
- if (!stateType.equals(that.stateType)) {
- return false;
- }
-
- if (!getName().equals(that.getName())) {
- return false;
- }
-
- return getStateSerializer().equals(that.getStateSerializer())
- && getNamespaceSerializer().equals(that.getNamespaceSerializer());
- }
-
- @Override
- public String toString() {
- return "RegisteredKeyedBackendStateMetaInfo{" +
- "stateType=" + stateType +
- ", name='" + name + '\'' +
- ", namespaceSerializer=" + namespaceSerializer +
- ", stateSerializer=" + stateSerializer +
- '}';
- }
-
- @Override
- public int hashCode() {
- int result = getName().hashCode();
- result = 31 * result + getStateType().hashCode();
- result = 31 * result + getNamespaceSerializer().hashCode();
- result = 31 * result + getStateSerializer().hashCode();
- return result;
- }
-
- /**
- * Checks compatibility of a restored k/v state, with the new {@link StateDescriptor} provided to it.
- * This checks that the descriptor specifies identical names and state types, as well as
- * serializers that are compatible for the restored k/v state bytes.
- */
- public static <N, S> RegisteredKeyedBackendStateMetaInfo<N, S> resolveKvStateCompatibility(
- StateMetaInfoSnapshot restoredStateMetaInfoSnapshot,
- TypeSerializer<N> newNamespaceSerializer,
- StateDescriptor<?, S> newStateDescriptor) throws StateMigrationException {
-
- Preconditions.checkState(
- Objects.equals(newStateDescriptor.getName(), restoredStateMetaInfoSnapshot.getName()),
- "Incompatible state names. " +
- "Was [" + restoredStateMetaInfoSnapshot.getName() + "], " +
- "registered with [" + newStateDescriptor.getName() + "].");
-
- final StateDescriptor.Type restoredType =
- StateDescriptor.Type.valueOf(
- restoredStateMetaInfoSnapshot.getOption(
- StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE));
-
- if (!Objects.equals(newStateDescriptor.getType(), StateDescriptor.Type.UNKNOWN)
- && !Objects.equals(restoredType, StateDescriptor.Type.UNKNOWN)) {
-
- Preconditions.checkState(
- newStateDescriptor.getType() == restoredType,
- "Incompatible state types. " +
- "Was [" + restoredType + "], " +
- "registered with [" + newStateDescriptor.getType() + "].");
- }
-
- // check compatibility results to determine if state migration is required
- CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult(
- restoredStateMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER),
- null,
- restoredStateMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
- StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER),
- newNamespaceSerializer);
-
- TypeSerializer<S> newStateSerializer = newStateDescriptor.getSerializer();
- CompatibilityResult<S> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
- restoredStateMetaInfoSnapshot.getTypeSerializer(
- StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
- UnloadableDummyTypeSerializer.class,
- restoredStateMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
- StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
- newStateSerializer);
-
- if (namespaceCompatibility.isRequiresMigration() || stateCompatibility.isRequiresMigration()) {
- // TODO state migration currently isn't possible.
- throw new StateMigrationException("State migration isn't supported, yet.");
- } else {
- return new RegisteredKeyedBackendStateMetaInfo<>(
- newStateDescriptor.getType(),
- newStateDescriptor.getName(),
- newNamespaceSerializer,
- newStateSerializer);
- }
- }
-
- @Nonnull
- @Override
- public StateMetaInfoSnapshot snapshot() {
- Map<String, String> optionsMap = Collections.singletonMap(
- StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE.toString(),
- stateType.toString());
- Map<String, TypeSerializer<?>> serializerMap = new HashMap<>(2);
- Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshotsMap = new HashMap<>(2);
- String namespaceSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString();
- String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
- serializerMap.put(namespaceSerializerKey, namespaceSerializer.duplicate());
- serializerConfigSnapshotsMap.put(namespaceSerializerKey, namespaceSerializer.snapshotConfiguration());
- serializerMap.put(valueSerializerKey, stateSerializer.duplicate());
- serializerConfigSnapshotsMap.put(valueSerializerKey, stateSerializer.snapshotConfiguration());
-
- return new StateMetaInfoSnapshot(
- name,
- StateMetaInfoSnapshot.BackendStateType.KEY_VALUE,
- optionsMap,
- serializerConfigSnapshotsMap,
- serializerMap);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java
deleted file mode 100644
index f314add..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nonnull;
-
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * Compound meta information for a registered state in an operator state backend.
- * This contains the state name, assignment mode, and state partition serializer.
- *
- * @param <S> Type of the state.
- */
-public class RegisteredOperatorBackendStateMetaInfo<S> extends RegisteredStateMetaInfoBase {
-
- /**
- * The mode how elements in this state are assigned to tasks during restore
- */
- private final OperatorStateHandle.Mode assignmentMode;
-
- /**
- * The type serializer for the elements in the state list
- */
- private final TypeSerializer<S> partitionStateSerializer;
-
- public RegisteredOperatorBackendStateMetaInfo(
- String name,
- TypeSerializer<S> partitionStateSerializer,
- OperatorStateHandle.Mode assignmentMode) {
- super(Preconditions.checkNotNull(name));
- this.partitionStateSerializer = Preconditions.checkNotNull(partitionStateSerializer);
- this.assignmentMode = Preconditions.checkNotNull(assignmentMode);
- }
-
- private RegisteredOperatorBackendStateMetaInfo(RegisteredOperatorBackendStateMetaInfo<S> copy) {
- this(
- Preconditions.checkNotNull(copy).name,
- copy.partitionStateSerializer.duplicate(),
- copy.assignmentMode);
- }
-
- @SuppressWarnings("unchecked")
- public RegisteredOperatorBackendStateMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) {
- this(
- snapshot.getName(),
- (TypeSerializer<S>) snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
- OperatorStateHandle.Mode.valueOf(
- snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)));
- Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.OPERATOR == snapshot.getBackendStateType());
- }
-
- /**
- * Creates a deep copy of the itself.
- */
- public RegisteredOperatorBackendStateMetaInfo<S> deepCopy() {
- return new RegisteredOperatorBackendStateMetaInfo<>(this);
- }
-
- @Nonnull
- @Override
- public StateMetaInfoSnapshot snapshot() {
- Map<String, String> optionsMap = Collections.singletonMap(
- StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(),
- assignmentMode.toString());
- String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
- Map<String, TypeSerializer<?>> serializerMap =
- Collections.singletonMap(valueSerializerKey, partitionStateSerializer.duplicate());
- Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshotsMap =
- Collections.singletonMap(valueSerializerKey, partitionStateSerializer.snapshotConfiguration());
-
- return new StateMetaInfoSnapshot(
- name,
- StateMetaInfoSnapshot.BackendStateType.OPERATOR,
- optionsMap,
- serializerConfigSnapshotsMap,
- serializerMap);
- }
-
- public OperatorStateHandle.Mode getAssignmentMode() {
- return assignmentMode;
- }
-
- public TypeSerializer<S> getPartitionStateSerializer() {
- return partitionStateSerializer;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
-
- if (obj == null) {
- return false;
- }
-
- return (obj instanceof RegisteredOperatorBackendStateMetaInfo)
- && name.equals(((RegisteredOperatorBackendStateMetaInfo) obj).getName())
- && assignmentMode.equals(((RegisteredOperatorBackendStateMetaInfo) obj).getAssignmentMode())
- && partitionStateSerializer.equals(((RegisteredOperatorBackendStateMetaInfo) obj).getPartitionStateSerializer());
- }
-
- @Override
- public int hashCode() {
- int result = getName().hashCode();
- result = 31 * result + getAssignmentMode().hashCode();
- result = 31 * result + getPartitionStateSerializer().hashCode();
- return result;
- }
-
- @Override
- public String toString() {
- return "RegisteredOperatorBackendStateMetaInfo{" +
- "name='" + name + "\'" +
- ", assignmentMode=" + assignmentMode +
- ", partitionStateSerializer=" + partitionStateSerializer +
- '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
new file mode 100644
index 0000000..b65671e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Compound meta information for a registered state in an operator state backend.
+ * This contains the state name, assignment mode, and state partition serializer.
+ *
+ * @param <S> Type of the state.
+ */
+public class RegisteredOperatorStateBackendMetaInfo<S> extends RegisteredStateMetaInfoBase {
+
+ /**
+ * The mode how elements in this state are assigned to tasks during restore
+ */
+ @Nonnull
+ private final OperatorStateHandle.Mode assignmentMode;
+
+ /**
+ * The type serializer for the elements in the state list
+ */
+ @Nonnull
+ private final TypeSerializer<S> partitionStateSerializer;
+
+ public RegisteredOperatorStateBackendMetaInfo(
+ @Nonnull String name,
+ @Nonnull TypeSerializer<S> partitionStateSerializer,
+ @Nonnull OperatorStateHandle.Mode assignmentMode) {
+ super(name);
+ this.partitionStateSerializer = partitionStateSerializer;
+ this.assignmentMode = assignmentMode;
+ }
+
+ private RegisteredOperatorStateBackendMetaInfo(@Nonnull RegisteredOperatorStateBackendMetaInfo<S> copy) {
+ this(
+ Preconditions.checkNotNull(copy).name,
+ copy.partitionStateSerializer.duplicate(),
+ copy.assignmentMode);
+ }
+
+ @SuppressWarnings("unchecked")
+ public RegisteredOperatorStateBackendMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) {
+ this(
+ snapshot.getName(),
+ (TypeSerializer<S>) Preconditions.checkNotNull(
+ snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)),
+ OperatorStateHandle.Mode.valueOf(
+ snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)));
+ Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.OPERATOR == snapshot.getBackendStateType());
+ }
+
+ /**
+ * Creates a deep copy of the itself.
+ */
+ @Nonnull
+ public RegisteredOperatorStateBackendMetaInfo<S> deepCopy() {
+ return new RegisteredOperatorStateBackendMetaInfo<>(this);
+ }
+
+ @Nonnull
+ @Override
+ public StateMetaInfoSnapshot snapshot() {
+ return computeSnapshot();
+ }
+
+ @Nonnull
+ public OperatorStateHandle.Mode getAssignmentMode() {
+ return assignmentMode;
+ }
+
+ @Nonnull
+ public TypeSerializer<S> getPartitionStateSerializer() {
+ return partitionStateSerializer;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+
+ if (obj == null) {
+ return false;
+ }
+
+ return (obj instanceof RegisteredOperatorStateBackendMetaInfo)
+ && name.equals(((RegisteredOperatorStateBackendMetaInfo) obj).getName())
+ && assignmentMode.equals(((RegisteredOperatorStateBackendMetaInfo) obj).getAssignmentMode())
+ && partitionStateSerializer.equals(((RegisteredOperatorStateBackendMetaInfo) obj).getPartitionStateSerializer());
+ }
+
+ @Override
+ public int hashCode() {
+ int result = getName().hashCode();
+ result = 31 * result + getAssignmentMode().hashCode();
+ result = 31 * result + getPartitionStateSerializer().hashCode();
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "RegisteredOperatorBackendStateMetaInfo{" +
+ "name='" + name + "\'" +
+ ", assignmentMode=" + assignmentMode +
+ ", partitionStateSerializer=" + partitionStateSerializer +
+ '}';
+ }
+
+ @Nonnull
+ private StateMetaInfoSnapshot computeSnapshot() {
+ Map<String, String> optionsMap = Collections.singletonMap(
+ StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(),
+ assignmentMode.toString());
+ String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
+ Map<String, TypeSerializer<?>> serializerMap =
+ Collections.singletonMap(valueSerializerKey, partitionStateSerializer.duplicate());
+ Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshotsMap =
+ Collections.singletonMap(valueSerializerKey, partitionStateSerializer.snapshotConfiguration());
+
+ return new StateMetaInfoSnapshot(
+ name,
+ StateMetaInfoSnapshot.BackendStateType.OPERATOR,
+ optionsMap,
+ serializerConfigSnapshotsMap,
+ serializerMap);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java
new file mode 100644
index 0000000..9ef23ed
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Meta information about a priority queue state in a backend.
+ */
+public class RegisteredPriorityQueueStateBackendMetaInfo<T> extends RegisteredStateMetaInfoBase {
+
+ @Nonnull
+ private final TypeSerializer<T> elementSerializer;
+
+ public RegisteredPriorityQueueStateBackendMetaInfo(
+ @Nonnull String name,
+ @Nonnull TypeSerializer<T> elementSerializer) {
+
+ super(name);
+ this.elementSerializer = elementSerializer;
+ }
+
+ @SuppressWarnings("unchecked")
+ public RegisteredPriorityQueueStateBackendMetaInfo(StateMetaInfoSnapshot snapshot) {
+ this(snapshot.getName(),
+ (TypeSerializer<T>) Preconditions.checkNotNull(
+ snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)));
+ Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE == snapshot.getBackendStateType());
+ }
+
+ @Nonnull
+ @Override
+ public StateMetaInfoSnapshot snapshot() {
+ return computeSnapshot();
+ }
+
+ @Nonnull
+ public TypeSerializer<T> getElementSerializer() {
+ return elementSerializer;
+ }
+
+ private StateMetaInfoSnapshot computeSnapshot() {
+ Map<String, TypeSerializer<?>> serializerMap =
+ Collections.singletonMap(
+ StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(),
+ elementSerializer.duplicate());
+ Map<String, TypeSerializerConfigSnapshot> serializerSnapshotMap =
+ Collections.singletonMap(
+ StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(),
+ elementSerializer.snapshotConfiguration());
+
+ return new StateMetaInfoSnapshot(
+ name,
+ StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE,
+ Collections.emptyMap(),
+ serializerSnapshotMap,
+ serializerMap);
+ }
+
+ public RegisteredPriorityQueueStateBackendMetaInfo deepCopy() {
+ return new RegisteredPriorityQueueStateBackendMetaInfo<>(name, elementSerializer.duplicate());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredStateMetaInfoBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredStateMetaInfoBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredStateMetaInfoBase.java
index 4132d14..b7dff59 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredStateMetaInfoBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredStateMetaInfoBase.java
@@ -42,4 +42,21 @@ public abstract class RegisteredStateMetaInfoBase {
@Nonnull
public abstract StateMetaInfoSnapshot snapshot();
+
+ public static RegisteredStateMetaInfoBase fromMetaInfoSnapshot(@Nonnull StateMetaInfoSnapshot snapshot) {
+
+ final StateMetaInfoSnapshot.BackendStateType backendStateType = snapshot.getBackendStateType();
+ switch (backendStateType) {
+ case KEY_VALUE:
+ return new RegisteredKeyValueStateBackendMetaInfo<>(snapshot);
+ case OPERATOR:
+ return new RegisteredOperatorStateBackendMetaInfo<>(snapshot);
+ case BROADCAST:
+ return new RegisteredBroadcastStateBackendMetaInfo<>(snapshot);
+ case PRIORITY_QUEUE:
+ return new RegisteredPriorityQueueStateBackendMetaInfo<>(snapshot);
+ default:
+ throw new IllegalArgumentException("Unknown backend state type: " + backendStateType);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshot.java
index 1fcac5c..54180f1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshot.java
@@ -18,7 +18,9 @@
package org.apache.flink.runtime.state;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
@@ -30,38 +32,45 @@ import java.io.IOException;
* All snapshots should be released after usage. This interface outlines the asynchronous snapshot life-cycle, which
* typically looks as follows. In the synchronous part of a checkpoint, an instance of {@link StateSnapshot} is produced
* for a state and captures the state at this point in time. Then, in the asynchronous part of the checkpoint, the user
- * calls {@link #partitionByKeyGroup()} to ensure that the snapshot is partitioned into key-groups. For state that is
- * already partitioned, this can be a NOP. The returned {@link KeyGroupPartitionedSnapshot} can be used by the caller
+ * calls {@link #getKeyGroupWriter()} to ensure that the snapshot is partitioned into key-groups. For state that is
+ * already partitioned, this can be a NOP. The returned {@link StateKeyGroupWriter} can be used by the caller
* to write the state by key-group. As a last step, when the state is completely written, the user calls
* {@link #release()}.
*/
+@Internal
public interface StateSnapshot {
/**
- * This method partitions the snapshot by key-group and then returns a {@link KeyGroupPartitionedSnapshot}.
+ * This method returns {@link StateKeyGroupWriter} and should be called in the asynchronous part of the snapshot.
*/
@Nonnull
- KeyGroupPartitionedSnapshot partitionByKeyGroup();
+ StateKeyGroupWriter getKeyGroupWriter();
+
+ /**
+ * Returns a snapshot of the state's meta data.
+ */
+ @Nonnull
+ StateMetaInfoSnapshot getMetaInfoSnapshot();
/**
* Release the snapshot. All snapshots should be released when they are no longer used because some implementation
- * can only release resources after a release. Produced {@link KeyGroupPartitionedSnapshot} should no longer be used
+ * can only release resources after a release. Produced {@link StateKeyGroupWriter} should no longer be used
* after calling this method.
*/
void release();
/**
- * Interface for writing a snapshot after it is partitioned into key-groups.
+ * Interface for writing a snapshot that is partitioned into key-groups.
*/
- interface KeyGroupPartitionedSnapshot {
+ interface StateKeyGroupWriter {
/**
- * Writes the data for the specified key-group to the output. You must call {@link #partitionByKeyGroup()} once
+ * Writes the data for the specified key-group to the output. You must call {@link #getKeyGroupWriter()} once
* before first calling this method.
*
* @param dov the output.
* @param keyGroupId the key-group to write.
* @throws IOException on write-related problems.
*/
- void writeMappingsInKeyGroup(@Nonnull DataOutputView dov, @Nonnegative int keyGroupId) throws IOException;
+ void writeStateInKeyGroup(@Nonnull DataOutputView dov, @Nonnegative int keyGroupId) throws IOException;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotKeyGroupReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotKeyGroupReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotKeyGroupReader.java
new file mode 100644
index 0000000..b6b91ee
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotKeyGroupReader.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.heap.StateTable;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+
+/**
+ * Interface for state de-serialization into {@link StateTable}s by key-group.
+ */
+@Internal
+public interface StateSnapshotKeyGroupReader {
+
+ /**
+ * Read the data for the specified key-group from the input.
+ *
+ * @param div the input
+ * @param keyGroupId the key-group to write
+ * @throws IOException on write related problems
+ */
+ void readMappingsInKeyGroup(@Nonnull DataInputView div, @Nonnegative int keyGroupId) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotRestore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotRestore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotRestore.java
new file mode 100644
index 0000000..6fe50ca
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotRestore.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Interface to deal with state snapshot and restore of state.
+ * TODO find better name?
+ */
+@Internal
+public interface StateSnapshotRestore {
+
+ /**
+ * Returns a snapshot of the state.
+ */
+ @Nonnull
+ StateSnapshot stateSnapshot();
+
+ /**
+ * This method returns a {@link StateSnapshotKeyGroupReader} that can be used to restore the state on a
+ * per-key-group basis. This method tries to return a reader for the given version hint.
+ *
+ * @param readVersionHint the required version of the state to read.
+ * @return a reader that reads state by key-groups, for the given read version.
+ */
+ @Nonnull
+ StateSnapshotKeyGroupReader keyGroupReader(int readVersionHint);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java
index 4384eb7..94aed45 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java
@@ -85,11 +85,6 @@ public class TieBreakingPriorityComparator<T> implements Comparator<T>, Priority
return ((Comparable<T>) o1).compareTo(o2);
}
- // we catch this case before moving to more expensive tie breaks.
- if (o1.equals(o2)) {
- return 0;
- }
-
// if objects are not equal, their serialized form should somehow differ as well. this can be costly, and...
// TODO we should have an alternative approach in the future, e.g. a cache that does not rely on compare to check equality.
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
index 6dc8cf3..b1ad0df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
@@ -30,6 +30,8 @@ import java.util.Collection;
import java.util.function.Consumer;
import java.util.function.Predicate;
+import static org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.ENABLE_RELAXED_FIRING_ORDER_OPTIMIZATION;
+
/**
* This class is an implementation of a {@link InternalPriorityQueue} with set semantics that internally consists of
* two different storage types. The first storage is a (potentially slow) ordered set store manages the ground truth
@@ -80,6 +82,30 @@ public class CachingInternalPriorityQueueSet<E> implements InternalPriorityQueue
@Override
public void bulkPoll(@Nonnull Predicate<E> canConsume, @Nonnull Consumer<E> consumer) {
+ if (ENABLE_RELAXED_FIRING_ORDER_OPTIMIZATION) {
+ bulkPollRelaxedOrder(canConsume, consumer);
+ } else {
+ bulkPollStrictOrder(canConsume, consumer);
+ }
+ }
+
+ private void bulkPollRelaxedOrder(@Nonnull Predicate<E> canConsume, @Nonnull Consumer<E> consumer) {
+ if (orderedCache.isEmpty()) {
+ bulkPollStore(canConsume, consumer);
+ } else {
+ while (!orderedCache.isEmpty() && canConsume.test(orderedCache.peekFirst())) {
+ final E next = orderedCache.removeFirst();
+ orderedStore.remove(next);
+ consumer.accept(next);
+ }
+
+ if (orderedCache.isEmpty()) {
+ bulkPollStore(canConsume, consumer);
+ }
+ }
+ }
+
+ private void bulkPollStrictOrder(@Nonnull Predicate<E> canConsume, @Nonnull Consumer<E> consumer) {
E element;
while ((element = peek()) != null && canConsume.test(element)) {
poll();
@@ -87,6 +113,26 @@ public class CachingInternalPriorityQueueSet<E> implements InternalPriorityQueue
}
}
+ private void bulkPollStore(@Nonnull Predicate<E> canConsume, @Nonnull Consumer<E> consumer) {
+ try (CloseableIterator<E> iterator = orderedStore.orderedIterator()) {
+ while (iterator.hasNext()) {
+ final E next = iterator.next();
+ if (canConsume.test(next)) {
+ orderedStore.remove(next);
+ consumer.accept(next);
+ } else {
+ orderedCache.add(next);
+ while (iterator.hasNext() && !orderedCache.isFull()) {
+ orderedCache.add(iterator.next());
+ }
+ break;
+ }
+ }
+ } catch (Exception e) {
+ throw new FlinkRuntimeException("Exception while bulk polling store.", e);
+ }
+ }
+
@Nullable
@Override
public E poll() {