You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/03/16 17:34:29 UTC
[4/4] flink git commit: [FLINK-5715] Asynchronous snapshots for
heap-based keyed state backend
[FLINK-5715] Asynchronous snapshots for heap-based keyed state backend
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ab014ef9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ab014ef9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ab014ef9
Branch: refs/heads/master
Commit: ab014ef94e0e9137ac6f8f41dae385ff71e8ba5b
Parents: 30bb958
Author: Stefan Richter <s....@data-artisans.com>
Authored: Fri Mar 3 10:51:15 2017 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Mar 16 18:34:02 2017 +0100
----------------------------------------------------------------------
.../state/RocksDBKeyedStateBackend.java | 5 +
.../state/RocksDBStateBackendTest.java | 86 +-
.../java/org/apache/flink/util/MathUtils.java | 35 +-
.../filesystem/AbstractFsStateSnapshot.java | 30 +-
.../state/memory/AbstractMemStateSnapshot.java | 40 +-
.../AbstractMigrationRestoreStrategy.java | 117 ++
.../state/memory/MigrationRestoreSnapshot.java | 32 +
.../state/AbstractKeyedStateBackend.java | 7 +
.../runtime/state/ArrayListSerializer.java | 14 +-
.../state/KeyedBackendSerializationProxy.java | 23 +-
.../flink/runtime/state/KeyedStateBackend.java | 28 +-
.../state/StateTransformationFunction.java | 23 +
.../state/filesystem/FsStateBackend.java | 109 +-
.../state/heap/AbstractHeapMergingState.java | 93 +-
.../runtime/state/heap/AbstractHeapState.java | 84 +-
.../state/heap/AbstractStateTableSnapshot.java | 51 +
.../state/heap/CopyOnWriteStateTable.java | 1066 ++++++++++++++++++
.../heap/CopyOnWriteStateTableSnapshot.java | 188 +++
.../state/heap/HeapAggregatingState.java | 92 +-
.../runtime/state/heap/HeapFoldingState.java | 70 +-
.../state/heap/HeapKeyedStateBackend.java | 426 +++----
.../flink/runtime/state/heap/HeapListState.java | 63 +-
.../flink/runtime/state/heap/HeapMapState.java | 167 +--
.../runtime/state/heap/HeapReducingState.java | 82 +-
.../runtime/state/heap/HeapValueState.java | 47 +-
.../runtime/state/heap/InternalKeyContext.java | 60 +
.../state/heap/NestedMapsStateTable.java | 363 ++++++
.../flink/runtime/state/heap/StateEntry.java | 44 +
.../flink/runtime/state/heap/StateTable.java | 222 ++--
.../state/heap/StateTableByKeyGroupReader.java | 38 +
.../state/heap/StateTableByKeyGroupReaders.java | 136 +++
.../runtime/state/heap/StateTableSnapshot.java | 45 +
.../state/memory/MemoryStateBackend.java | 26 +
.../runtime/query/QueryableStateClientTest.java | 7 +-
.../message/KvStateRequestSerializerTest.java | 38 +-
.../state/AsyncFileStateBackendTest.java | 27 +
.../state/AsyncMemoryStateBackendTest.java | 27 +
.../runtime/state/FileStateBackendTest.java | 8 +-
.../runtime/state/MemoryStateBackendTest.java | 8 +-
.../runtime/state/StateBackendTestBase.java | 216 +++-
.../state/heap/CopyOnWriteStateTableTest.java | 486 ++++++++
.../state/heap/HeapAggregatingStateTest.java | 21 +-
...pKeyedStateBackendSnapshotMigrationTest.java | 173 +++
.../runtime/state/heap/HeapListStateTest.java | 17 +-
.../state/heap/HeapReducingStateTest.java | 23 +-
.../state/heap/HeapStateBackendTestBase.java | 54 +
.../StateTableSnapshotCompatibilityTest.java | 118 ++
.../util/BlockerCheckpointStreamFactory.java | 118 ++
.../heap_keyed_statebackend_1_2.snapshot | Bin 0 -> 2068 bytes
.../api/windowing/windows/TimeWindow.java | 5 +-
...tractEventTimeWindowCheckpointingITCase.java | 64 +-
...ckendEventTimeWindowCheckpointingITCase.java | 26 +
...ckendEventTimeWindowCheckpointingITCase.java | 26 +
...ckendEventTimeWindowCheckpointingITCase.java | 20 +
.../test/state/ManualWindowSpeedITCase.java | 7 +-
55 files changed, 4239 insertions(+), 1162 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index aaccc2f..f585d21 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1219,4 +1219,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
// expected
}
}
+
+ @Override
+ public boolean supportsAsynchronousSnapshots() {
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index c7b5c20..708613b 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -31,14 +31,13 @@ import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.StateBackendTestBase;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -364,89 +363,6 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
assertEquals(null, keyedStateBackend.db);
}
- static class BlockerCheckpointStreamFactory implements CheckpointStreamFactory {
-
- private final int maxSize;
- private int afterNumberInvocations;
- private OneShotLatch blocker;
- private OneShotLatch waiter;
-
- MemCheckpointStreamFactory.MemoryCheckpointOutputStream lastCreatedStream;
-
- public MemCheckpointStreamFactory.MemoryCheckpointOutputStream getLastCreatedStream() {
- return lastCreatedStream;
- }
-
- public BlockerCheckpointStreamFactory(int maxSize) {
- this.maxSize = maxSize;
- }
-
- public void setAfterNumberInvocations(int afterNumberInvocations) {
- this.afterNumberInvocations = afterNumberInvocations;
- }
-
- public void setBlockerLatch(OneShotLatch latch) {
- this.blocker = latch;
- }
-
- public void setWaiterLatch(OneShotLatch latch) {
- this.waiter = latch;
- }
-
- @Override
- public MemCheckpointStreamFactory.MemoryCheckpointOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
- waiter.trigger();
- this.lastCreatedStream = new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize) {
-
- private int afterNInvocations = afterNumberInvocations;
- private final OneShotLatch streamBlocker = blocker;
- private final OneShotLatch streamWaiter = waiter;
-
- @Override
- public void write(int b) throws IOException {
-
- if (afterNInvocations > 0) {
- --afterNInvocations;
- }
-
- if (0 == afterNInvocations && null != streamBlocker) {
- try {
- streamBlocker.await();
- } catch (InterruptedException ignored) {
- }
- }
- try {
- super.write(b);
- } catch (IOException ex) {
- if (null != streamWaiter) {
- streamWaiter.trigger();
- }
- throw ex;
- }
-
- if (0 == afterNInvocations && null != streamWaiter) {
- streamWaiter.trigger();
- }
- }
-
- @Override
- public void close() {
- super.close();
- if (null != streamWaiter) {
- streamWaiter.trigger();
- }
- }
- };
-
- return lastCreatedStream;
- }
-
- @Override
- public void close() throws Exception {
-
- }
- }
-
private static class AcceptAllFilter implements IOFileFilter {
@Override
public boolean accept(File file) {
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
index 074e8ae..1d84a39 100644
--- a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
@@ -140,11 +140,7 @@ public final class MathUtils {
code = code * 5 + 0xe6546b64;
code ^= 4;
- code ^= code >>> 16;
- code *= 0x85ebca6b;
- code ^= code >>> 13;
- code *= 0xc2b2ae35;
- code ^= code >>> 16;
+ code = bitMix(code);
if (code >= 0) {
return code;
@@ -172,6 +168,35 @@ public final class MathUtils {
return x + 1;
}
+ /**
+ * Pseudo-randomly maps a long (64-bit) to an integer (32-bit) using some bit-mixing for better distribution.
+ *
+ * @param in the long (64-bit)input.
+ * @return the bit-mixed int (32-bit) output
+ */
+ public static int longToIntWithBitMixing(long in) {
+ in = (in ^ (in >>> 30)) * 0xbf58476d1ce4e5b9L;
+ in = (in ^ (in >>> 27)) * 0x94d049bb133111ebL;
+ in = in ^ (in >>> 31);
+ return (int) in;
+ }
+
+ /**
+ * Bit-mixing for pseudo-randomization of integers (e.g., to guard against bad hash functions). Implementation is
+ * from Murmur's 32 bit finalizer.
+ *
+ * @param in the input value
+ * @return the bit-mixed output value
+ */
+ public static int bitMix(int in) {
+ in ^= in >>> 16;
+ in *= 0x85ebca6b;
+ in ^= in >>> 13;
+ in *= 0xc2b2ae35;
+ in ^= in >>> 16;
+ return in;
+ }
+
// ============================================================================================
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java
index 103c214..a15e49d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java
@@ -21,8 +21,16 @@ package org.apache.flink.migration.runtime.state.filesystem;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.migration.runtime.state.KvStateSnapshot;
+import org.apache.flink.migration.runtime.state.memory.AbstractMigrationRestoreStrategy;
+import org.apache.flink.migration.runtime.state.memory.MigrationRestoreSnapshot;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.StateTable;
import java.io.IOException;
@@ -36,7 +44,7 @@ import java.io.IOException;
@Deprecated
@SuppressWarnings("deprecation")
public abstract class AbstractFsStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>>
- extends AbstractFileStateHandle implements KvStateSnapshot<K, N, S, SD> {
+ extends AbstractFileStateHandle implements KvStateSnapshot<K, N, S, SD>, MigrationRestoreSnapshot<K, N, SV> {
private static final long serialVersionUID = 1L;
@@ -85,4 +93,24 @@ public abstract class AbstractFsStateSnapshot<K, N, SV, S extends State, SD exte
public SD getStateDesc() {
return stateDesc;
}
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public StateTable<K, N, SV> deserialize(
+ String stateName,
+ HeapKeyedStateBackend<K> stateBackend) throws IOException {
+
+ final FileSystem fs = getFilePath().getFileSystem();
+ try (FSDataInputStream inStream = fs.open(getFilePath())) {
+ final DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(inStream);
+ AbstractMigrationRestoreStrategy<K, N, SV> restoreStrategy =
+ new AbstractMigrationRestoreStrategy<K, N, SV>(keySerializer, namespaceSerializer, stateSerializer) {
+ @Override
+ protected DataInputView openDataInputView() throws IOException {
+ return inView;
+ }
+ };
+ return restoreStrategy.deserialize(stateName, stateBackend);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java
index 6056578..ff86f7c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java
@@ -21,17 +21,18 @@ package org.apache.flink.migration.runtime.state.memory;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.migration.runtime.state.KvStateSnapshot;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.StateTable;
import org.apache.flink.runtime.util.DataInputDeserializer;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
@Deprecated
@SuppressWarnings("deprecation")
public abstract class AbstractMemStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>>
- implements KvStateSnapshot<K, N, S, SD> {
+ implements KvStateSnapshot<K, N, S, SD>, MigrationRestoreSnapshot<K, N, SV> {
private static final long serialVersionUID = 1L;
@@ -73,24 +74,21 @@ public abstract class AbstractMemStateSnapshot<K, N, SV, S extends State, SD ext
this.data = data;
}
- public HashMap<N, Map<K, SV>> deserialize() throws IOException {
- DataInputDeserializer inView = new DataInputDeserializer(data, 0, data.length);
-
- final int numKeys = inView.readInt();
- HashMap<N, Map<K, SV>> stateMap = new HashMap<>(numKeys);
-
- for (int i = 0; i < numKeys && !closed; i++) {
- N namespace = namespaceSerializer.deserialize(inView);
- final int numValues = inView.readInt();
- Map<K, SV> namespaceMap = new HashMap<>(numValues);
- stateMap.put(namespace, namespaceMap);
- for (int j = 0; j < numValues; j++) {
- K key = keySerializer.deserialize(inView);
- SV value = stateSerializer.deserialize(inView);
- namespaceMap.put(key, value);
- }
- }
- return stateMap;
+ @Override
+ @SuppressWarnings("unchecked")
+ public StateTable<K, N, SV> deserialize(
+ String stateName,
+ HeapKeyedStateBackend<K> stateBackend) throws IOException {
+
+ final DataInputDeserializer inView = new DataInputDeserializer(data, 0, data.length);
+ AbstractMigrationRestoreStrategy<K, N, SV> restoreStrategy =
+ new AbstractMigrationRestoreStrategy<K, N, SV>(keySerializer, namespaceSerializer, stateSerializer) {
+ @Override
+ protected DataInputView openDataInputView() throws IOException {
+ return inView;
+ }
+ };
+ return restoreStrategy.deserialize(stateName, stateBackend);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java
new file mode 100644
index 0000000..e572619
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.memory;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.VoidSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.StateTable;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * This class outlines the general strategy to restore from migration states.
+ *
+ * @param <K> type of key.
+ * @param <N> type of namespace.
+ * @param <S> type of state.
+ */
+@Deprecated
+public abstract class AbstractMigrationRestoreStrategy<K, N, S> implements MigrationRestoreSnapshot<K, N, S> {
+
+ /**
+ * Key Serializer
+ */
+ protected final TypeSerializer<K> keySerializer;
+
+ /**
+ * Namespace Serializer
+ */
+ protected final TypeSerializer<N> namespaceSerializer;
+
+ /**
+ * Serializer for the state value
+ */
+ protected final TypeSerializer<S> stateSerializer;
+
+ public AbstractMigrationRestoreStrategy(
+ TypeSerializer<K> keySerializer,
+ TypeSerializer<N> namespaceSerializer,
+ TypeSerializer<S> stateSerializer) {
+
+ this.keySerializer = Preconditions.checkNotNull(keySerializer);
+ this.namespaceSerializer = Preconditions.checkNotNull(namespaceSerializer);
+ this.stateSerializer = Preconditions.checkNotNull(stateSerializer);
+ }
+
+ @Override
+ public StateTable<K, N, S> deserialize(String stateName, HeapKeyedStateBackend<K> stateBackend) throws IOException {
+
+ Preconditions.checkNotNull(stateName, "State name is null. Cannot deserialize snapshot.");
+ Preconditions.checkNotNull(stateBackend, "State backend is null. Cannot deserialize snapshot.");
+
+ final KeyGroupRange keyGroupRange = stateBackend.getKeyGroupRange();
+ Preconditions.checkState(1 == keyGroupRange.getNumberOfKeyGroups(),
+ "Unexpected number of key-groups for restoring from Flink 1.1");
+
+ TypeSerializer<N> patchedNamespaceSerializer = this.namespaceSerializer;
+
+ if (patchedNamespaceSerializer instanceof VoidSerializer) {
+ patchedNamespaceSerializer = (TypeSerializer<N>) VoidNamespaceSerializer.INSTANCE;
+ }
+
+ RegisteredBackendStateMetaInfo<N, S> registeredBackendStateMetaInfo =
+ new RegisteredBackendStateMetaInfo<>(
+ StateDescriptor.Type.UNKNOWN,
+ stateName,
+ patchedNamespaceSerializer,
+ stateSerializer);
+
+ final StateTable<K, N, S> stateTable = stateBackend.newStateTable(registeredBackendStateMetaInfo);
+ final DataInputView inView = openDataInputView();
+ final int keyGroup = keyGroupRange.getStartKeyGroup();
+ final int numNamespaces = inView.readInt();
+
+ for (int i = 0; i < numNamespaces; i++) {
+ N namespace = namespaceSerializer.deserialize(inView);
+ if (null == namespace) {
+ namespace = (N) VoidNamespace.INSTANCE;
+ }
+ final int numKV = inView.readInt();
+ for (int j = 0; j < numKV; j++) {
+ K key = keySerializer.deserialize(inView);
+ S value = stateSerializer.deserialize(inView);
+ stateTable.put(key, keyGroup, namespace, value);
+ }
+ }
+ return stateTable;
+ }
+
+ /**
+ * Different state handles require different code to end up with a {@link DataInputView}.
+ */
+ protected abstract DataInputView openDataInputView() throws IOException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MigrationRestoreSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MigrationRestoreSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MigrationRestoreSnapshot.java
new file mode 100644
index 0000000..ea529db
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MigrationRestoreSnapshot.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.memory;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.StateTable;
+import org.apache.flink.util.Migration;
+
+import java.io.IOException;
+
+@Deprecated
+@Internal
+public interface MigrationRestoreSnapshot<K, N, S> extends Migration {
+ StateTable<K, N, S> deserialize(String stateName, HeapKeyedStateBackend<K> stateBackend) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index aba00f3..1f2f4a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
@@ -254,6 +255,7 @@ public abstract class AbstractKeyedStateBackend<K>
/**
* @see KeyedStateBackend
*/
+ @Override
public KeyGroupRange getKeyGroupRange() {
return keyGroupRange;
}
@@ -382,4 +384,9 @@ public abstract class AbstractKeyedStateBackend<K>
public void close() throws IOException {
cancelStreamRegistry.close();
}
+
+ @VisibleForTesting
+ public boolean supportsAsynchronousSnapshots() {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
index f5a6405..0badb41 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
@@ -57,11 +57,17 @@ final public class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> {
@Override
public ArrayList<T> copy(ArrayList<T> from) {
- ArrayList<T> newList = new ArrayList<>(from.size());
- for (int i = 0; i < from.size(); i++) {
- newList.add(elementSerializer.copy(from.get(i)));
+ if (elementSerializer.isImmutableType()) {
+ // fast track using memcopy for immutable types
+ return new ArrayList<>(from);
+ } else {
+ // element-wise deep copy for mutable types
+ ArrayList<T> newList = new ArrayList<>(from.size());
+ for (int i = 0; i < from.size(); i++) {
+ newList.add(elementSerializer.copy(from.get(i)));
+ }
+ return newList;
}
- return newList;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
index dbee6cb..5661c38 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.io.VersionMismatchException;
import org.apache.flink.core.io.VersionedIOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -37,11 +38,12 @@ import java.util.List;
*/
public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable {
- private static final int VERSION = 1;
+ public static final int VERSION = 2;
private TypeSerializerSerializationProxy<?> keySerializerProxy;
private List<StateMetaInfo<?, ?>> namedStateSerializationProxies;
+ private int restoredVersion;
private ClassLoader userCodeClassLoader;
public KeyedBackendSerializationProxy(ClassLoader userCodeClassLoader) {
@@ -51,6 +53,7 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
public KeyedBackendSerializationProxy(TypeSerializer<?> keySerializer, List<StateMetaInfo<?, ?>> namedStateSerializationProxies) {
this.keySerializerProxy = new TypeSerializerSerializationProxy<>(Preconditions.checkNotNull(keySerializer));
this.namedStateSerializationProxies = Preconditions.checkNotNull(namedStateSerializationProxies);
+ this.restoredVersion = VERSION;
Preconditions.checkArgument(namedStateSerializationProxies.size() <= Short.MAX_VALUE);
}
@@ -67,6 +70,22 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
return VERSION;
}
+ public int getRestoredVersion() {
+ return restoredVersion;
+ }
+
+ @Override
+ protected void resolveVersionRead(int foundVersion) throws VersionMismatchException {
+ super.resolveVersionRead(foundVersion);
+ this.restoredVersion = foundVersion;
+ }
+
+ @Override
+ public boolean isCompatibleVersion(int version) {
+ // we are compatible with version 2 (Flink 1.3.x) and version 1 (Flink 1.2.x)
+ return super.isCompatibleVersion(version) || version == 1;
+ }
+
@Override
public void write(DataOutputView out) throws IOException {
super.write(out);
@@ -96,7 +115,7 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
}
}
-//----------------------------------------------------------------------------------------------------------------------
+ //----------------------------------------------------------------------------------------------------------------------
/**
* This is the serialization proxy for {@link RegisteredBackendStateMetaInfo} for a single registered state in a
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
index 15e0491..09e27e7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
@@ -21,13 +21,14 @@ package org.apache.flink.runtime.state;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.heap.InternalKeyContext;
/**
* A keyed state backend provides methods for managing keyed state.
*
* @param <K> The key by which state is keyed.
*/
-public interface KeyedStateBackend<K> {
+public interface KeyedStateBackend<K> extends InternalKeyContext<K> {
/**
* Sets the current key that is used for partitioned state.
@@ -36,31 +37,6 @@ public interface KeyedStateBackend<K> {
void setCurrentKey(K newKey);
/**
- * Used by states to access the current key.
- */
- K getCurrentKey();
-
- /**
- * Returns the key-group to which the current key belongs.
- */
- int getCurrentKeyGroupIndex();
-
- /**
- * Returns the number of key-groups aka max parallelism.
- */
- int getNumberOfKeyGroups();
-
- /**
- * Returns the key groups for this backend.
- */
- KeyGroupsList getKeyGroupRange();
-
- /**
- * {@link TypeSerializer} for the state backend key type.
- */
- TypeSerializer<K> getKeySerializer();
-
- /**
* Creates or retrieves a keyed state backed by this state backend.
*
* @param namespaceSerializer The serializer used for the namespace type of the state
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java
new file mode 100644
index 0000000..9e12ee5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+public interface StateTransformationFunction<S, T> {
+ S apply(S previousState, T value) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 2e9198f..e27712c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -66,7 +66,10 @@ public class FsStateBackend extends AbstractStateBackend {
/** State below this size will be stored as part of the metadata, rather than in files */
private final int fileStateThreshold;
-
+
+ /** Switch to chose between synchronous and asynchronous snapshots */
+ private final boolean asynchronousSnapshots;
+
/**
* Creates a new state backend that stores its checkpoint data in the file system and location
* defined by the given URI.
@@ -99,6 +102,27 @@ public class FsStateBackend extends AbstractStateBackend {
*
* @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
* and the path to the checkpoint data directory.
+ * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
+ *
+ * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
+ */
+ public FsStateBackend(String checkpointDataUri, boolean asynchronousSnapshots) throws IOException {
+ this(new Path(checkpointDataUri), asynchronousSnapshots);
+ }
+
+ /**
+ * Creates a new state backend that stores its checkpoint data in the file system and location
+ * defined by the given URI.
+ *
+ * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
+ * must be accessible via {@link FileSystem#get(URI)}.
+ *
+ * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
+ * (host and port), or that the Hadoop configuration that describes that information must be in the
+ * classpath.
+ *
+ * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
+ * and the path to the checkpoint data directory.
* @throws IOException Thrown, if no file system can be found for the scheme in the URI.
*/
public FsStateBackend(Path checkpointDataUri) throws IOException {
@@ -118,10 +142,52 @@ public class FsStateBackend extends AbstractStateBackend {
*
* @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
* and the path to the checkpoint data directory.
+ * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
+ *
+ * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
+ */
+ public FsStateBackend(Path checkpointDataUri, boolean asynchronousSnapshots) throws IOException {
+ this(checkpointDataUri.toUri(), asynchronousSnapshots);
+ }
+
+ /**
+ * Creates a new state backend that stores its checkpoint data in the file system and location
+ * defined by the given URI.
+ *
+ * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
+ * must be accessible via {@link FileSystem#get(URI)}.
+ *
+ * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
+ * (host and port), or that the Hadoop configuration that describes that information must be in the
+ * classpath.
+ *
+ * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
+ * and the path to the checkpoint data directory.
* @throws IOException Thrown, if no file system can be found for the scheme in the URI.
*/
public FsStateBackend(URI checkpointDataUri) throws IOException {
- this(checkpointDataUri, DEFAULT_FILE_STATE_THRESHOLD);
+ this(checkpointDataUri, DEFAULT_FILE_STATE_THRESHOLD, false);
+ }
+
+ /**
+ * Creates a new state backend that stores its checkpoint data in the file system and location
+ * defined by the given URI.
+ *
+ * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
+ * must be accessible via {@link FileSystem#get(URI)}.
+ *
+ * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
+ * (host and port), or that the Hadoop configuration that describes that information must be in the
+ * classpath.
+ *
+ * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
+ * and the path to the checkpoint data directory.
+ * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
+ *
+ * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
+ */
+ public FsStateBackend(URI checkpointDataUri, boolean asynchronousSnapshots) throws IOException {
+ this(checkpointDataUri, DEFAULT_FILE_STATE_THRESHOLD, asynchronousSnapshots);
}
/**
@@ -139,17 +205,47 @@ public class FsStateBackend extends AbstractStateBackend {
* and the path to the checkpoint data directory.
* @param fileStateSizeThreshold State up to this size will be stored as part of the metadata,
* rather than in files
- *
+ *
* @throws IOException Thrown, if no file system can be found for the scheme in the URI.
* @throws IllegalArgumentException Thrown, if the {@code fileStateSizeThreshold} is out of bounds.
*/
public FsStateBackend(URI checkpointDataUri, int fileStateSizeThreshold) throws IOException {
+
+ this(checkpointDataUri, fileStateSizeThreshold, false);
+ }
+
+ /**
+ * Creates a new state backend that stores its checkpoint data in the file system and location
+ * defined by the given URI.
+ *
+ * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
+ * must be accessible via {@link FileSystem#get(URI)}.
+ *
+ * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
+ * (host and port), or that the Hadoop configuration that describes that information must be in the
+ * classpath.
+ *
+ * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
+ * and the path to the checkpoint data directory.
+ * @param fileStateSizeThreshold State up to this size will be stored as part of the metadata,
+ * rather than in files
+ * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
+ *
+ * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
+ */
+ public FsStateBackend(
+ URI checkpointDataUri,
+ int fileStateSizeThreshold,
+ boolean asynchronousSnapshots) throws IOException {
+
checkArgument(fileStateSizeThreshold >= 0, "The threshold for file state size must be zero or larger.");
- checkArgument(fileStateSizeThreshold <= MAX_FILE_STATE_THRESHOLD,
+ checkArgument(fileStateSizeThreshold <= MAX_FILE_STATE_THRESHOLD,
"The threshold for file state size cannot be larger than %s", MAX_FILE_STATE_THRESHOLD);
this.fileStateThreshold = fileStateSizeThreshold;
this.basePath = validateAndNormalizeUri(checkpointDataUri);
+
+ this.asynchronousSnapshots = asynchronousSnapshots;
}
/**
@@ -166,9 +262,9 @@ public class FsStateBackend extends AbstractStateBackend {
* Gets the threshold below which state is stored as part of the metadata, rather than in files.
* This threshold ensures that the backend does not create a large amount of very small files,
* where potentially the file pointers are larger than the state itself.
- *
+ *
* <p>By default, this threshold is {@value #DEFAULT_FILE_STATE_THRESHOLD}.
- *
+ *
* @return The file size threshold, in bytes.
*/
public int getMinFileSizeThreshold() {
@@ -209,6 +305,7 @@ public class FsStateBackend extends AbstractStateBackend {
env.getUserClassLoader(),
numberOfKeyGroups,
keyGroupRange,
+ asynchronousSnapshots,
env.getExecutionConfig());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
index 4ac7125..3e76423 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
@@ -22,18 +22,15 @@ import org.apache.flink.api.common.state.MergingState;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.runtime.state.internal.InternalMergingState;
import java.util.Collection;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkState;
/**
* Base class for {@link MergingState} ({@link org.apache.flink.runtime.state.internal.InternalMergingState})
* that is stored on the heap.
- *
+ *
* @param <K> The type of the key.
* @param <N> The type of the namespace.
* @param <SV> The type of the values in the state.
@@ -45,21 +42,25 @@ public abstract class AbstractHeapMergingState<K, N, IN, OUT, SV, S extends Stat
implements InternalMergingState<N, IN, OUT> {
/**
+ * The merge transformation function that implements the merge logic.
+ */
+ private final MergeTransformation mergeTransformation;
+
+ /**
* Creates a new key/value state for the given hash map of key/value pairs.
*
- * @param backend The state backend backing that created this state.
* @param stateDesc The state identifier for the state. This contains name
* and can create a default state value.
* @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
*/
protected AbstractHeapMergingState(
- KeyedStateBackend<K> backend,
SD stateDesc,
StateTable<K, N, SV> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer) {
- super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer);
+ super(stateDesc, stateTable, keySerializer, namespaceSerializer);
+ this.mergeTransformation = new MergeTransformation();
}
@Override
@@ -68,56 +69,40 @@ public abstract class AbstractHeapMergingState<K, N, IN, OUT, SV, S extends Stat
return; // nothing to do
}
- final K key = backend.getCurrentKey();
- checkState(key != null, "No key set.");
-
- final Map<N, Map<K, SV>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex());
-
- if (namespaceMap != null) {
- SV merged = null;
-
- // merge the sources
- for (N source : sources) {
- Map<K, SV> keysForNamespace = namespaceMap.get(source);
- if (keysForNamespace != null) {
- // get and remove the next source per namespace/key
- SV sourceState = keysForNamespace.remove(key);
-
- // if the namespace map became empty, remove
- if (keysForNamespace.isEmpty()) {
- namespaceMap.remove(source);
- }
-
- if (merged != null && sourceState != null) {
- merged = mergeState(merged, sourceState);
- }
- else if (merged == null) {
- merged = sourceState;
- }
- }
- }
+ final StateTable<K, N, SV> map = stateTable;
+
+ SV merged = null;
+
+ // merge the sources
+ for (N source : sources) {
- // merge into the target, if needed
- if (merged != null) {
- Map<K, SV> keysForTarget = namespaceMap.get(target);
- if (keysForTarget == null) {
- keysForTarget = createNewMap();
- namespaceMap.put(target, keysForTarget);
- }
- SV targetState = keysForTarget.get(key);
-
- if (targetState != null) {
- targetState = mergeState(targetState, merged);
- }
- else {
- targetState = merged;
- }
- keysForTarget.put(key, targetState);
+ // get and remove the next source per namespace/key
+ SV sourceState = map.removeAndGetOld(source);
+
+ if (merged != null && sourceState != null) {
+ merged = mergeState(merged, sourceState);
+ } else if (merged == null) {
+ merged = sourceState;
}
}
- // else no entries for that key at all, nothing to do skip
+ // merge into the target, if needed
+ if (merged != null) {
+ map.transform(target, merged, mergeTransformation);
+ }
}
protected abstract SV mergeState(SV a, SV b) throws Exception;
-}
+
+ final class MergeTransformation implements StateTransformationFunction<SV, SV> {
+
+ @Override
+ public SV apply(SV targetState, SV merged) throws Exception {
+ if (targetState != null) {
+ return mergeState(targetState, merged);
+ } else {
+ return merged;
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
index 18b71de..7e1123d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state.heap;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
@@ -25,18 +26,12 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.state.internal.InternalKvState;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.util.Preconditions;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
/**
* Base class for partitioned {@link ListState} implementations that are backed by a regular
* heap hash map. The concrete implementations define how the state is checkpointed.
- *
+ *
* @param <K> The type of the key.
* @param <N> The type of the namespace.
* @param <SV> The type of the values in the state.
@@ -53,9 +48,7 @@ public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends St
protected final SD stateDesc;
/** The current namespace, which the access methods will refer to. */
- protected N currentNamespace = null;
-
- protected final KeyedStateBackend<K> backend;
+ protected N currentNamespace;
protected final TypeSerializer<K> keySerializer;
@@ -64,58 +57,28 @@ public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends St
/**
* Creates a new key/value state for the given hash map of key/value pairs.
*
- * @param backend The state backend backing that created this state.
* @param stateDesc The state identifier for the state. This contains name
* and can create a default state value.
* @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
*/
protected AbstractHeapState(
- KeyedStateBackend<K> backend,
SD stateDesc,
StateTable<K, N, SV> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer) {
- Preconditions.checkNotNull(stateTable, "State table must not be null.");
-
- this.backend = backend;
this.stateDesc = stateDesc;
- this.stateTable = stateTable;
+ this.stateTable = Preconditions.checkNotNull(stateTable, "State table must not be null.");
this.keySerializer = keySerializer;
this.namespaceSerializer = namespaceSerializer;
+ this.currentNamespace = null;
}
// ------------------------------------------------------------------------
@Override
public final void clear() {
- Preconditions.checkState(currentNamespace != null, "No namespace set.");
- Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
-
- Map<N, Map<K, SV>> namespaceMap =
- stateTable.get(backend.getCurrentKeyGroupIndex());
-
- if (namespaceMap == null) {
- return;
- }
-
- Map<K, SV> keyedMap = namespaceMap.get(currentNamespace);
-
- if (keyedMap == null) {
- return;
- }
-
- SV removed = keyedMap.remove(backend.getCurrentKey());
-
- if (removed == null) {
- return;
- }
-
- if (!keyedMap.isEmpty()) {
- return;
- }
-
- namespaceMap.remove(currentNamespace);
+ stateTable.remove(currentNamespace);
}
@Override
@@ -137,20 +100,7 @@ public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends St
Preconditions.checkState(namespace != null, "No namespace given.");
Preconditions.checkState(key != null, "No key given.");
- Map<N, Map<K, SV>> namespaceMap =
- stateTable.get(KeyGroupRangeAssignment.assignToKeyGroup(key, backend.getNumberOfKeyGroups()));
-
- if (namespaceMap == null) {
- return null;
- }
-
- Map<K, SV> keyedMap = namespaceMap.get(currentNamespace);
-
- if (keyedMap == null) {
- return null;
- }
-
- SV result = keyedMap.get(key);
+ SV result = stateTable.get(key, namespace);
if (result == null) {
return null;
@@ -158,30 +108,14 @@ public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends St
@SuppressWarnings("unchecked,rawtypes")
TypeSerializer serializer = stateDesc.getSerializer();
-
return KvStateRequestSerializer.serializeValue(result, serializer);
}
/**
- * Creates a new map for use in Heap based state.
- *
- * <p>If the state queryable ({@link StateDescriptor#isQueryable()}, this
- * will create a concurrent hash map instead of a regular one.
- *
- * @return A new namespace map.
- */
- protected <MK, MV> Map<MK, MV> createNewMap() {
- if (stateDesc.isQueryable()) {
- return new ConcurrentHashMap<>();
- } else {
- return new HashMap<>();
- }
- }
-
- /**
* This should only be used for testing.
*/
+ @VisibleForTesting
public StateTable<K, N, SV> getStateTable() {
return stateTable;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java
new file mode 100644
index 0000000..b0d7727
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Abstract class to encapsulate the logic to take snapshots of {@link StateTable} implementations and also defines how
+ * the snapshot is written during the serialization phase of checkpointing.
+ */
+@Internal
+abstract class AbstractStateTableSnapshot<K, N, S, T extends StateTable<K, N, S>> implements StateTableSnapshot {
+
+ /**
+ * The {@link StateTable} from which this snapshot was created.
+ */
+ final T owningStateTable;
+
+ /**
+ * Creates a new {@link AbstractStateTableSnapshot} for and owned by the given table.
+ *
+ * @param owningStateTable the {@link StateTable} for which this object represents a snapshot.
+ */
+ AbstractStateTableSnapshot(T owningStateTable) {
+ this.owningStateTable = Preconditions.checkNotNull(owningStateTable);
+ }
+
+ /**
+ * Optional hook to release resources for this snapshot at the end of its lifecycle.
+ */
+ @Override
+ public void release() {
+ }
+}
\ No newline at end of file