You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/03/16 17:34:26 UTC
[1/4] flink git commit: [FLINK-5715] Asynchronous snapshots for
heap-based keyed state backend
Repository: flink
Updated Branches:
refs/heads/master 30bb958a7 -> ab014ef94
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
new file mode 100644
index 0000000..08896da
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.runtime.state.ArrayListSerializer;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+public class CopyOnWriteStateTableTest {
+
+ /**
+ * Testing the basic map operations.
+ */
+ @Test
+ public void testPutGetRemoveContainsTransform() throws Exception {
+ RegisteredBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo =
+ new RegisteredBackendStateMetaInfo<>(
+ StateDescriptor.Type.UNKNOWN,
+ "test",
+ IntSerializer.INSTANCE,
+ new ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects.
+
+ final MockInternalKeyContext<Integer> keyContext = new MockInternalKeyContext<>(IntSerializer.INSTANCE);
+
+ final CopyOnWriteStateTable<Integer, Integer, ArrayList<Integer>> stateTable =
+ new CopyOnWriteStateTable<>(keyContext, metaInfo);
+
+ ArrayList<Integer> state_1_1 = new ArrayList<>();
+ state_1_1.add(41);
+ ArrayList<Integer> state_2_1 = new ArrayList<>();
+ state_2_1.add(42);
+ ArrayList<Integer> state_1_2 = new ArrayList<>();
+ state_1_2.add(43);
+
+ Assert.assertNull(stateTable.putAndGetOld(1, 1, state_1_1));
+ Assert.assertEquals(state_1_1, stateTable.get(1, 1));
+ Assert.assertEquals(1, stateTable.size());
+
+ Assert.assertNull(stateTable.putAndGetOld(2, 1, state_2_1));
+ Assert.assertEquals(state_2_1, stateTable.get(2, 1));
+ Assert.assertEquals(2, stateTable.size());
+
+ Assert.assertNull(stateTable.putAndGetOld(1, 2, state_1_2));
+ Assert.assertEquals(state_1_2, stateTable.get(1, 2));
+ Assert.assertEquals(3, stateTable.size());
+
+ Assert.assertTrue(stateTable.containsKey(2, 1));
+ Assert.assertFalse(stateTable.containsKey(3, 1));
+ Assert.assertFalse(stateTable.containsKey(2, 3));
+ stateTable.put(2, 1, null);
+ Assert.assertTrue(stateTable.containsKey(2, 1));
+ Assert.assertEquals(3, stateTable.size());
+ Assert.assertNull(stateTable.get(2, 1));
+ stateTable.put(2, 1, state_2_1);
+ Assert.assertEquals(3, stateTable.size());
+
+ Assert.assertEquals(state_2_1, stateTable.removeAndGetOld(2, 1));
+ Assert.assertFalse(stateTable.containsKey(2, 1));
+ Assert.assertEquals(2, stateTable.size());
+
+ stateTable.remove(1, 2);
+ Assert.assertFalse(stateTable.containsKey(1, 2));
+ Assert.assertEquals(1, stateTable.size());
+
+ Assert.assertNull(stateTable.removeAndGetOld(4, 2));
+ Assert.assertEquals(1, stateTable.size());
+
+ StateTransformationFunction<ArrayList<Integer>, Integer> function =
+ new StateTransformationFunction<ArrayList<Integer>, Integer>() {
+ @Override
+ public ArrayList<Integer> apply(ArrayList<Integer> previousState, Integer value) throws Exception {
+ previousState.add(value);
+ return previousState;
+ }
+ };
+
+ final int value = 4711;
+ stateTable.transform(1, 1, value, function);
+ state_1_1 = function.apply(state_1_1, value);
+ Assert.assertEquals(state_1_1, stateTable.get(1, 1));
+ }
+
+ /**
+ * This test triggers incremental rehash and tests for corruptions.
+ */
+ @Test
+ public void testIncrementalRehash() {
+ RegisteredBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo =
+ new RegisteredBackendStateMetaInfo<>(
+ StateDescriptor.Type.UNKNOWN,
+ "test",
+ IntSerializer.INSTANCE,
+ new ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects.
+
+ final MockInternalKeyContext<Integer> keyContext = new MockInternalKeyContext<>(IntSerializer.INSTANCE);
+
+ final CopyOnWriteStateTable<Integer, Integer, ArrayList<Integer>> stateTable =
+ new CopyOnWriteStateTable<>(keyContext, metaInfo);
+
+ int insert = 0;
+ int remove = 0;
+ while (!stateTable.isRehashing()) {
+ stateTable.put(insert++, 0, new ArrayList<Integer>());
+ if (insert % 8 == 0) {
+ stateTable.remove(remove++, 0);
+ }
+ }
+ Assert.assertEquals(insert - remove, stateTable.size());
+ while (stateTable.isRehashing()) {
+ stateTable.put(insert++, 0, new ArrayList<Integer>());
+ if (insert % 8 == 0) {
+ stateTable.remove(remove++, 0);
+ }
+ }
+ Assert.assertEquals(insert - remove, stateTable.size());
+
+ for (int i = 0; i < insert; ++i) {
+ if (i < remove) {
+ Assert.assertFalse(stateTable.containsKey(i, 0));
+ } else {
+ Assert.assertTrue(stateTable.containsKey(i, 0));
+ }
+ }
+ }
+
+ /**
+ * This test does some random modifications to a state table and a reference (hash map). Then draws snapshots,
+ * performs more modifications and checks snapshot integrity.
+ */
+ @Test
+ public void testRandomModificationsAndCopyOnWriteIsolation() throws Exception {
+
+ final RegisteredBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo =
+ new RegisteredBackendStateMetaInfo<>(
+ StateDescriptor.Type.UNKNOWN,
+ "test",
+ IntSerializer.INSTANCE,
+ new ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects.
+
+ final MockInternalKeyContext<Integer> keyContext = new MockInternalKeyContext<>(IntSerializer.INSTANCE);
+
+ final CopyOnWriteStateTable<Integer, Integer, ArrayList<Integer>> stateTable =
+ new CopyOnWriteStateTable<>(keyContext, metaInfo);
+
+ final HashMap<Tuple2<Integer, Integer>, ArrayList<Integer>> referenceMap = new HashMap<>();
+
+ final Random random = new Random(42);
+
+ // holds snapshots from the map under test
+ CopyOnWriteStateTable.StateTableEntry<Integer, Integer, ArrayList<Integer>>[] snapshot = null;
+ int snapshotSize = 0;
+
+ // holds a reference snapshot from our reference map that we compare against
+ Tuple3<Integer, Integer, ArrayList<Integer>>[] reference = null;
+
+ int val = 0;
+
+
+ int snapshotCounter = 0;
+ int referencedSnapshotId = 0;
+
+ final StateTransformationFunction<ArrayList<Integer>, Integer> transformationFunction =
+ new StateTransformationFunction<ArrayList<Integer>, Integer>() {
+ @Override
+ public ArrayList<Integer> apply(ArrayList<Integer> previousState, Integer value) throws Exception {
+ if (previousState == null) {
+ previousState = new ArrayList<>();
+ }
+ previousState.add(value);
+ // we give back the original, attempting to spot errors in to copy-on-write
+ return previousState;
+ }
+ };
+
+ // the main loop for modifications
+ for (int i = 0; i < 10_000_000; ++i) {
+
+ int key = random.nextInt(20);
+ int namespace = random.nextInt(4);
+ Tuple2<Integer, Integer> compositeKey = new Tuple2<>(key, namespace);
+
+ int op = random.nextInt(7);
+
+ ArrayList<Integer> state = null;
+ ArrayList<Integer> referenceState = null;
+
+ switch (op) {
+ case 0:
+ case 1: {
+ state = stateTable.get(key, namespace);
+ referenceState = referenceMap.get(compositeKey);
+ if (null == state) {
+ state = new ArrayList<>();
+ stateTable.put(key, namespace, state);
+ referenceState = new ArrayList<>();
+ referenceMap.put(compositeKey, referenceState);
+ }
+ break;
+ }
+ case 2: {
+ stateTable.put(key, namespace, new ArrayList<Integer>());
+ referenceMap.put(compositeKey, new ArrayList<Integer>());
+ break;
+ }
+ case 3: {
+ state = stateTable.putAndGetOld(key, namespace, new ArrayList<Integer>());
+ referenceState = referenceMap.put(compositeKey, new ArrayList<Integer>());
+ break;
+ }
+ case 4: {
+ stateTable.remove(key, namespace);
+ referenceMap.remove(compositeKey);
+ break;
+ }
+ case 5: {
+ state = stateTable.removeAndGetOld(key, namespace);
+ referenceState = referenceMap.remove(compositeKey);
+ break;
+ }
+ case 6: {
+ final int updateValue = random.nextInt(1000);
+ stateTable.transform(key, namespace, updateValue, transformationFunction);
+ referenceMap.put(compositeKey, transformationFunction.apply(
+ referenceMap.remove(compositeKey), updateValue));
+ break;
+ }
+ default: {
+ Assert.fail("Unknown op-code " + op);
+ }
+ }
+
+ Assert.assertEquals(referenceMap.size(), stateTable.size());
+
+ if (state != null) {
+ // mutate the states a bit...
+ if (random.nextBoolean() && !state.isEmpty()) {
+ state.remove(state.size() - 1);
+ referenceState.remove(referenceState.size() - 1);
+ } else {
+ state.add(val);
+ referenceState.add(val);
+ ++val;
+ }
+ }
+
+ Assert.assertEquals(referenceState, state);
+
+ // snapshot triggering / comparison / release
+ if (i > 0 && i % 500 == 0) {
+
+ if (snapshot != null) {
+ // check our referenced snapshot
+ deepCheck(reference, convert(snapshot, snapshotSize));
+
+ if (i % 1_000 == 0) {
+ // draw and release some other snapshot while holding on the old snapshot
+ ++snapshotCounter;
+ stateTable.snapshotTableArrays();
+ stateTable.releaseSnapshot(snapshotCounter);
+ }
+
+ //release the snapshot after some time
+ if (i % 5_000 == 0) {
+ snapshot = null;
+ reference = null;
+ snapshotSize = 0;
+ stateTable.releaseSnapshot(referencedSnapshotId);
+ }
+
+ } else {
+ // if there is no more referenced snapshot, we create one
+ ++snapshotCounter;
+ referencedSnapshotId = snapshotCounter;
+ snapshot = stateTable.snapshotTableArrays();
+ snapshotSize = stateTable.size();
+ reference = manualDeepDump(referenceMap);
+ }
+ }
+ }
+ }
+
+ /**
+ * This tests for the copy-on-write contracts, e.g. ensures that no copy-on-write is active after all snapshots are
+ * released.
+ */
+ @Test
+ public void testCopyOnWriteContracts() {
+ RegisteredBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo =
+ new RegisteredBackendStateMetaInfo<>(
+ StateDescriptor.Type.UNKNOWN,
+ "test",
+ IntSerializer.INSTANCE,
+ new ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects.
+
+ final MockInternalKeyContext<Integer> keyContext = new MockInternalKeyContext<>(IntSerializer.INSTANCE);
+
+ final CopyOnWriteStateTable<Integer, Integer, ArrayList<Integer>> stateTable =
+ new CopyOnWriteStateTable<>(keyContext, metaInfo);
+
+ ArrayList<Integer> originalState1 = new ArrayList<>(1);
+ ArrayList<Integer> originalState2 = new ArrayList<>(1);
+ ArrayList<Integer> originalState3 = new ArrayList<>(1);
+ ArrayList<Integer> originalState4 = new ArrayList<>(1);
+ ArrayList<Integer> originalState5 = new ArrayList<>(1);
+
+ originalState1.add(1);
+ originalState2.add(2);
+ originalState3.add(3);
+ originalState4.add(4);
+ originalState5.add(5);
+
+ stateTable.put(1, 1, originalState1);
+ stateTable.put(2, 1, originalState2);
+ stateTable.put(4, 1, originalState4);
+ stateTable.put(5, 1, originalState5);
+
+ // no snapshot taken, we get the original back
+ Assert.assertTrue(stateTable.get(1, 1) == originalState1);
+ CopyOnWriteStateTableSnapshot<Integer, Integer, ArrayList<Integer>> snapshot1 = stateTable.createSnapshot();
+ // after snapshot1 is taken, we get a copy...
+ final ArrayList<Integer> copyState = stateTable.get(1, 1);
+ Assert.assertFalse(copyState == originalState1);
+ // ...and the copy is equal
+ Assert.assertEquals(originalState1, copyState);
+
+ // we make an insert AFTER snapshot1
+ stateTable.put(3, 1, originalState3);
+
+ // on repeated lookups, we get the same copy because no further snapshot was taken
+ Assert.assertTrue(copyState == stateTable.get(1, 1));
+
+ // we take snapshot2
+ CopyOnWriteStateTableSnapshot<Integer, Integer, ArrayList<Integer>> snapshot2 = stateTable.createSnapshot();
+ // after the second snapshot, copy-on-write is active again for old entries
+ Assert.assertFalse(copyState == stateTable.get(1, 1));
+ // and equality still holds
+ Assert.assertEquals(copyState, stateTable.get(1, 1));
+
+ // after releasing snapshot2
+ stateTable.releaseSnapshot(snapshot2);
+ // we still get the original of the untouched late insert (after snapshot1)
+ Assert.assertTrue(originalState3 == stateTable.get(3, 1));
+ // but copy-on-write is still active for older inserts (before snapshot1)
+ Assert.assertFalse(originalState4 == stateTable.get(4, 1));
+
+ // after releasing snapshot1
+ stateTable.releaseSnapshot(snapshot1);
+ // no copy-on-write is active
+ Assert.assertTrue(originalState5 == stateTable.get(5, 1));
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <K, N, S> Tuple3<K, N, S>[] convert(CopyOnWriteStateTable.StateTableEntry<K, N, S>[] snapshot, int mapSize) {
+
+ Tuple3<K, N, S>[] result = new Tuple3[mapSize];
+ int pos = 0;
+ for (CopyOnWriteStateTable.StateTableEntry<K, N, S> entry : snapshot) {
+ while (null != entry) {
+ result[pos++] = new Tuple3<>(entry.getKey(), entry.getNamespace(), entry.getState());
+ entry = entry.next;
+ }
+ }
+ Assert.assertEquals(mapSize, pos);
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ private Tuple3<Integer, Integer, ArrayList<Integer>>[] manualDeepDump(
+ HashMap<Tuple2<Integer, Integer>,
+ ArrayList<Integer>> map) {
+
+ Tuple3<Integer, Integer, ArrayList<Integer>>[] result = new Tuple3[map.size()];
+ int pos = 0;
+ for (Map.Entry<Tuple2<Integer, Integer>, ArrayList<Integer>> entry : map.entrySet()) {
+ Integer key = entry.getKey().f0;
+ Integer namespace = entry.getKey().f1;
+ result[pos++] = new Tuple3<>(key, namespace, new ArrayList<>(entry.getValue()));
+ }
+ return result;
+ }
+
+ private void deepCheck(
+ Tuple3<Integer, Integer, ArrayList<Integer>>[] a,
+ Tuple3<Integer, Integer, ArrayList<Integer>>[] b) {
+
+ if (a == b) {
+ return;
+ }
+
+ Assert.assertEquals(a.length, b.length);
+
+ Comparator<Tuple3<Integer, Integer, ArrayList<Integer>>> comparator =
+ new Comparator<Tuple3<Integer, Integer, ArrayList<Integer>>>() {
+
+ @Override
+ public int compare(Tuple3<Integer, Integer, ArrayList<Integer>> o1, Tuple3<Integer, Integer, ArrayList<Integer>> o2) {
+ int namespaceDiff = o1.f1 - o2.f1;
+ return namespaceDiff != 0 ? namespaceDiff : o1.f0 - o2.f0;
+ }
+ };
+
+ Arrays.sort(a, comparator);
+ Arrays.sort(b, comparator);
+
+ for (int i = 0; i < a.length; ++i) {
+ Tuple3<Integer, Integer, ArrayList<Integer>> av = a[i];
+ Tuple3<Integer, Integer, ArrayList<Integer>> bv = b[i];
+
+ Assert.assertEquals(av.f0, bv.f0);
+ Assert.assertEquals(av.f1, bv.f1);
+ Assert.assertEquals(av.f2, bv.f2);
+ }
+ }
+
+ static class MockInternalKeyContext<T> implements InternalKeyContext<T> {
+
+ private T key;
+ private final TypeSerializer<T> serializer;
+ private final KeyGroupRange keyGroupRange;
+
+ public MockInternalKeyContext(TypeSerializer<T> serializer) {
+ this.serializer = serializer;
+ this.keyGroupRange = new KeyGroupRange(0, 0);
+ }
+
+ public void setKey(T key) {
+ this.key = key;
+ }
+
+ @Override
+ public T getCurrentKey() {
+ return key;
+ }
+
+ @Override
+ public int getCurrentKeyGroupIndex() {
+ return 0;
+ }
+
+ @Override
+ public int getNumberOfKeyGroups() {
+ return 1;
+ }
+
+ @Override
+ public KeyGroupRange getKeyGroupRange() {
+ return keyGroupRange;
+ }
+
+ @Override
+ public TypeSerializer<T> getKeySerializer() {
+ return serializer;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
index 735b5f5..cb4e403 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
@@ -23,25 +23,20 @@ import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
-
import org.junit.Test;
import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
/**
* Tests for the simple Java heap objects implementation of the {@link AggregatingState}.
*/
-public class HeapAggregatingStateTest {
+public class HeapAggregatingStateTest extends HeapStateBackendTestBase {
@Test
public void testAddAndGet() throws Exception {
@@ -227,20 +222,6 @@ public class HeapAggregatingStateTest {
}
// ------------------------------------------------------------------------
- // utilities
- // ------------------------------------------------------------------------
-
- private static HeapKeyedStateBackend<String> createKeyedBackend() throws Exception {
- return new HeapKeyedStateBackend<>(
- mock(TaskKvStateRegistry.class),
- StringSerializer.INSTANCE,
- HeapAggregatingStateTest.class.getClassLoader(),
- 16,
- new KeyGroupRange(0, 15),
- new ExecutionConfig());
- }
-
- // ------------------------------------------------------------------------
// test functions
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
new file mode 100644
index 0000000..da0666a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.junit.Test;
+
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.net.URL;
+import java.util.Collections;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests backwards compatibility in the serialization format of heap-based KeyedStateBackends.
+ */
+public class HeapKeyedStateBackendSnapshotMigrationTest extends HeapStateBackendTestBase {
+
+ /**
+ * [FLINK-5979]
+ *
+ * This test takes a snapshot that was created with Flink 1.2 and tries to restore it in master to check
+ * the backwards compatibility of the serialization format of {@link StateTable}s.
+ */
+ @Test
+ public void testRestore1_2ToMaster() throws Exception {
+
+ ClassLoader cl = getClass().getClassLoader();
+ URL resource = cl.getResource("heap_keyed_statebackend_1_2.snapshot");
+
+ Preconditions.checkNotNull(resource, "Binary snapshot resource not found!");
+
+ final Integer namespace1 = 1;
+ final Integer namespace2 = 2;
+ final Integer namespace3 = 3;
+
+ try (final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend()) {
+ final KeyGroupsStateHandle stateHandle;
+ try (BufferedInputStream bis = new BufferedInputStream((new FileInputStream(resource.getFile())))) {
+ stateHandle = InstantiationUtil.deserializeObject(bis, Thread.currentThread().getContextClassLoader());
+ }
+ keyedBackend.restore(Collections.singleton(stateHandle));
+ final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class);
+ stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
+
+ InternalListState<Integer, Long> state = keyedBackend.createListState(IntSerializer.INSTANCE, stateDescr);
+
+ assertEquals(7, keyedBackend.numStateEntries());
+
+ keyedBackend.setCurrentKey("abc");
+ state.setCurrentNamespace(namespace1);
+ assertEquals(asList(33L, 55L), state.get());
+ state.setCurrentNamespace(namespace2);
+ assertEquals(asList(22L, 11L), state.get());
+ state.setCurrentNamespace(namespace3);
+ assertEquals(Collections.singletonList(44L), state.get());
+
+ keyedBackend.setCurrentKey("def");
+ state.setCurrentNamespace(namespace1);
+ assertEquals(asList(11L, 44L), state.get());
+
+ state.setCurrentNamespace(namespace3);
+ assertEquals(asList(22L, 55L, 33L), state.get());
+
+ keyedBackend.setCurrentKey("jkl");
+ state.setCurrentNamespace(namespace1);
+ assertEquals(asList(11L, 22L, 33L, 44L, 55L), state.get());
+
+ keyedBackend.setCurrentKey("mno");
+ state.setCurrentNamespace(namespace3);
+ assertEquals(asList(11L, 22L, 33L, 44L, 55L), state.get());
+ }
+ }
+
+// /**
+// * This code was used to create the binary file of the old version's snapshot used by this test. If you need to
+// * recreate the binary, you can comment this out and run it.
+// */
+// private void createBinarySnapshot() throws Exception {
+//
+// final String pathToWrite = "/PATH/TO/WRITE";
+//
+// final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class);
+// stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
+//
+// final Integer namespace1 = 1;
+// final Integer namespace2 = 2;
+// final Integer namespace3 = 3;
+//
+// final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend();
+//
+// try {
+// InternalListState<Integer, Long> state = keyedBackend.createListState(IntSerializer.INSTANCE, stateDescr);
+//
+// keyedBackend.setCurrentKey("abc");
+// state.setCurrentNamespace(namespace1);
+// state.add(33L);
+// state.add(55L);
+//
+// state.setCurrentNamespace(namespace2);
+// state.add(22L);
+// state.add(11L);
+//
+// state.setCurrentNamespace(namespace3);
+// state.add(44L);
+//
+// keyedBackend.setCurrentKey("def");
+// state.setCurrentNamespace(namespace1);
+// state.add(11L);
+// state.add(44L);
+//
+// state.setCurrentNamespace(namespace3);
+// state.add(22L);
+// state.add(55L);
+// state.add(33L);
+//
+// keyedBackend.setCurrentKey("jkl");
+// state.setCurrentNamespace(namespace1);
+// state.add(11L);
+// state.add(22L);
+// state.add(33L);
+// state.add(44L);
+// state.add(55L);
+//
+// keyedBackend.setCurrentKey("mno");
+// state.setCurrentNamespace(namespace3);
+// state.add(11L);
+// state.add(22L);
+// state.add(33L);
+// state.add(44L);
+// state.add(55L);
+// RunnableFuture<KeyGroupsStateHandle> snapshot = keyedBackend.snapshot(
+// 0L,
+// 0L,
+// new MemCheckpointStreamFactory(4 * 1024 * 1024),
+// CheckpointOptions.forFullCheckpoint());
+//
+// snapshot.run();
+//
+// try (BufferedOutputStream bis = new BufferedOutputStream(new FileOutputStream(pathToWrite))) {
+// InstantiationUtil.serializeObject(bis, snapshot.get());
+// }
+//
+// } finally {
+// keyedBackend.close();
+// keyedBackend.dispose();
+// }
+// }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
index c36a48b..7705c19 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
@@ -22,13 +22,9 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.internal.InternalListState;
-
import org.junit.Test;
import java.util.ArrayList;
@@ -39,12 +35,11 @@ import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
/**
* Tests for the simple Java heap objects implementation of the {@link ListState}.
*/
-public class HeapListStateTest {
+public class HeapListStateTest extends HeapStateBackendTestBase {
@Test
public void testAddAndGet() throws Exception {
@@ -225,16 +220,6 @@ public class HeapListStateTest {
keyedBackend.dispose();
}
}
-
- private static HeapKeyedStateBackend<String> createKeyedBackend() throws Exception {
- return new HeapKeyedStateBackend<>(
- mock(TaskKvStateRegistry.class),
- StringSerializer.INSTANCE,
- HeapListStateTest.class.getClassLoader(),
- 16,
- new KeyGroupRange(0, 15),
- new ExecutionConfig());
- }
private static <T> void validateResult(Iterable<T> values, Set<T> expected) {
int num = 0;
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
index 63eec04..928eaec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
@@ -23,25 +23,20 @@ import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.internal.InternalReducingState;
-
import org.junit.Test;
import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
/**
* Tests for the simple Java heap objects implementation of the {@link ReducingState}.
*/
-public class HeapReducingStateTest {
+public class HeapReducingStateTest extends HeapStateBackendTestBase {
@Test
public void testAddAndGet() throws Exception {
@@ -214,7 +209,7 @@ public class HeapReducingStateTest {
keyedBackend.setCurrentKey("mno");
state.setCurrentNamespace(namespace1);
state.clear();
-
+
StateTable<String, Integer, Long> stateTable =
((HeapReducingState<String, Integer, Long>) state).stateTable;
@@ -227,20 +222,6 @@ public class HeapReducingStateTest {
}
// ------------------------------------------------------------------------
- // utilities
- // ------------------------------------------------------------------------
-
- private static HeapKeyedStateBackend<String> createKeyedBackend() throws Exception {
- return new HeapKeyedStateBackend<>(
- mock(TaskKvStateRegistry.class),
- StringSerializer.INSTANCE,
- HeapReducingStateTest.class.getClassLoader(),
- 16,
- new KeyGroupRange(0, 15),
- new ExecutionConfig());
- }
-
- // ------------------------------------------------------------------------
// test functions
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
new file mode 100644
index 0000000..e6adef8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.mockito.Mockito.mock;
+
+@RunWith(Parameterized.class)
+public abstract class HeapStateBackendTestBase {
+
+ @Parameterized.Parameters
+ public static Collection<Boolean> parameters() {
+ return Arrays.asList(false, true);
+ }
+
+ @Parameterized.Parameter
+ public boolean async;
+
+ public HeapKeyedStateBackend<String> createKeyedBackend() throws Exception {
+ return new HeapKeyedStateBackend<>(
+ mock(TaskKvStateRegistry.class),
+ StringSerializer.INSTANCE,
+ HeapReducingStateTest.class.getClassLoader(),
+ 16,
+ new KeyGroupRange(0, 15),
+ async,
+ new ExecutionConfig());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java
new file mode 100644
index 0000000..6fd94f7
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.ArrayListSerializer;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
+import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Random;
+
+public class StateTableSnapshotCompatibilityTest {
+
+ /**
+ * This test ensures that different implementations of {@link StateTable} are compatible in their serialization
+ * format.
+ */
+ @Test
+ public void checkCompatibleSerializationFormats() throws IOException {
+ final Random r = new Random(42);
+ RegisteredBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo =
+ new RegisteredBackendStateMetaInfo<>(
+ StateDescriptor.Type.UNKNOWN,
+ "test",
+ IntSerializer.INSTANCE,
+ new ArrayListSerializer<>(IntSerializer.INSTANCE));
+
+ final CopyOnWriteStateTableTest.MockInternalKeyContext<Integer> keyContext =
+ new CopyOnWriteStateTableTest.MockInternalKeyContext<>(IntSerializer.INSTANCE);
+
+ CopyOnWriteStateTable<Integer, Integer, ArrayList<Integer>> cowStateTable =
+ new CopyOnWriteStateTable<>(keyContext, metaInfo);
+
+ for (int i = 0; i < 100; ++i) {
+ ArrayList<Integer> list = new ArrayList<>(5);
+ int end = r.nextInt(5);
+ for (int j = 0; j < end; ++j) {
+ list.add(r.nextInt(100));
+ }
+
+ cowStateTable.put(r.nextInt(10), r.nextInt(2), list);
+ }
+
+ StateTableSnapshot snapshot = cowStateTable.createSnapshot();
+
+ final NestedMapsStateTable<Integer, Integer, ArrayList<Integer>> nestedMapsStateTable =
+ new NestedMapsStateTable<>(keyContext, metaInfo);
+
+ restoreStateTableFromSnapshot(nestedMapsStateTable, snapshot, keyContext.getKeyGroupRange());
+ snapshot.release();
+
+
+ Assert.assertEquals(cowStateTable.size(), nestedMapsStateTable.size());
+ for (StateEntry<Integer, Integer, ArrayList<Integer>> entry : cowStateTable) {
+ Assert.assertEquals(entry.getState(), nestedMapsStateTable.get(entry.getKey(), entry.getNamespace()));
+ }
+
+ snapshot = nestedMapsStateTable.createSnapshot();
+ cowStateTable = new CopyOnWriteStateTable<>(keyContext, metaInfo);
+
+ restoreStateTableFromSnapshot(cowStateTable, snapshot, keyContext.getKeyGroupRange());
+ snapshot.release();
+
+ Assert.assertEquals(nestedMapsStateTable.size(), cowStateTable.size());
+ for (StateEntry<Integer, Integer, ArrayList<Integer>> entry : cowStateTable) {
+ Assert.assertEquals(nestedMapsStateTable.get(entry.getKey(), entry.getNamespace()), entry.getState());
+ }
+ }
+
+ private static <K, N, S> void restoreStateTableFromSnapshot(
+ StateTable<K, N, S> stateTable,
+ StateTableSnapshot snapshot,
+ KeyGroupRange keyGroupRange) throws IOException {
+
+ final ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos(1024 * 1024);
+ final DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper(out);
+
+ for (Integer keyGroup : keyGroupRange) {
+ snapshot.writeMappingsInKeyGroup(dov, keyGroup);
+ }
+
+ final ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(out.getBuf());
+ final DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(in);
+
+ final StateTableByKeyGroupReader keyGroupReader =
+ StateTableByKeyGroupReaders.readerForVersion(stateTable, KeyedBackendSerializationProxy.VERSION);
+
+ for (Integer keyGroup : keyGroupRange) {
+ keyGroupReader.readMappingsInKeyGroup(div, keyGroup);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
new file mode 100644
index 0000000..291f3ed
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+
+import java.io.IOException;
+
+/**
+ * {@link CheckpointStreamFactory} for tests that allows for testing cancellation in async IO
+ */
+@VisibleForTesting
+@Internal
+public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory {
+
+ private final int maxSize;
+ private volatile int afterNumberInvocations;
+ private volatile OneShotLatch blocker;
+ private volatile OneShotLatch waiter;
+
+ MemCheckpointStreamFactory.MemoryCheckpointOutputStream lastCreatedStream;
+
+ public MemCheckpointStreamFactory.MemoryCheckpointOutputStream getLastCreatedStream() {
+ return lastCreatedStream;
+ }
+
+ public BlockerCheckpointStreamFactory(int maxSize) {
+ this.maxSize = maxSize;
+ }
+
+ public void setAfterNumberInvocations(int afterNumberInvocations) {
+ this.afterNumberInvocations = afterNumberInvocations;
+ }
+
+ public void setBlockerLatch(OneShotLatch latch) {
+ this.blocker = latch;
+ }
+
+ public void setWaiterLatch(OneShotLatch latch) {
+ this.waiter = latch;
+ }
+
+ @Override
+ public MemCheckpointStreamFactory.MemoryCheckpointOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
+ this.lastCreatedStream = new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize) {
+
+ private int afterNInvocations = afterNumberInvocations;
+ private final OneShotLatch streamBlocker = blocker;
+ private final OneShotLatch streamWaiter = waiter;
+
+ @Override
+ public void write(int b) throws IOException {
+
+ if (null != waiter) {
+ waiter.trigger();
+ }
+
+ if (afterNInvocations > 0) {
+ --afterNInvocations;
+ }
+
+ if (0 == afterNInvocations && null != streamBlocker) {
+ try {
+ streamBlocker.await();
+ } catch (InterruptedException ignored) {
+ }
+ }
+ try {
+ super.write(b);
+ } catch (IOException ex) {
+ if (null != streamWaiter) {
+ streamWaiter.trigger();
+ }
+ throw ex;
+ }
+
+ if (0 == afterNInvocations && null != streamWaiter) {
+ streamWaiter.trigger();
+ }
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ if (null != streamWaiter) {
+ streamWaiter.trigger();
+ }
+ }
+ };
+
+ return lastCreatedStream;
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/resources/heap_keyed_statebackend_1_2.snapshot
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/resources/heap_keyed_statebackend_1_2.snapshot b/flink-runtime/src/test/resources/heap_keyed_statebackend_1_2.snapshot
new file mode 100644
index 0000000..b9171bc
Binary files /dev/null and b/flink-runtime/src/test/resources/heap_keyed_statebackend_1_2.snapshot differ
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
index 05c89dd..781c320 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
+import org.apache.flink.util.MathUtils;
import java.io.IOException;
import java.util.ArrayList;
@@ -98,9 +99,7 @@ public class TimeWindow extends Window {
@Override
public int hashCode() {
- int result = (int) (start ^ (start >>> 32));
- result = 31 * result + (int) (end ^ (end >>> 32));
- return result;
+ return MathUtils.longToIntWithBitMixing(start + end);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index 1911f44..5e966d1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -91,7 +91,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
}
enum StateBackendEnum {
- MEM, FILE, ROCKSDB_FULLY_ASYNC
+ MEM, FILE, ROCKSDB_FULLY_ASYNC, MEM_ASYNC, FILE_ASYNC
}
@BeforeClass
@@ -116,11 +116,19 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
public void initStateBackend() throws IOException {
switch (stateBackendEnum) {
case MEM:
- this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE);
+ this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
break;
case FILE: {
String backups = tempFolder.newFolder().getAbsolutePath();
- this.stateBackend = new FsStateBackend("file://" + backups);
+ this.stateBackend = new FsStateBackend("file://" + backups, false);
+ break;
+ }
+ case MEM_ASYNC:
+ this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, true);
+ break;
+ case FILE_ASYNC: {
+ String backups = tempFolder.newFolder().getAbsolutePath();
+ this.stateBackend = new FsStateBackend("file://" + backups, true);
break;
}
case ROCKSDB_FULLY_ASYNC: {
@@ -138,9 +146,9 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
@Test
public void testTumblingTimeWindow() {
- final int NUM_ELEMENTS_PER_KEY = 3000;
- final int WINDOW_SIZE = 100;
- final int NUM_KEYS = 100;
+ final int NUM_ELEMENTS_PER_KEY = numElementsPerKey();
+ final int WINDOW_SIZE = windowSize();
+ final int NUM_KEYS = numKeys();
FailingSource.reset();
try {
@@ -211,9 +219,9 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
}
public void doTestTumblingTimeWindowWithKVState(int maxParallelism) {
- final int NUM_ELEMENTS_PER_KEY = 3000;
- final int WINDOW_SIZE = 100;
- final int NUM_KEYS = 100;
+ final int NUM_ELEMENTS_PER_KEY = numElementsPerKey();
+ final int WINDOW_SIZE = windowSize();
+ final int NUM_KEYS = numKeys();
FailingSource.reset();
try {
@@ -280,10 +288,10 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
@Test
public void testSlidingTimeWindow() {
- final int NUM_ELEMENTS_PER_KEY = 3000;
- final int WINDOW_SIZE = 1000;
- final int WINDOW_SLIDE = 100;
- final int NUM_KEYS = 100;
+ final int NUM_ELEMENTS_PER_KEY = numElementsPerKey();
+ final int WINDOW_SIZE = windowSize();
+ final int WINDOW_SLIDE = windowSlide();
+ final int NUM_KEYS = numKeys();
FailingSource.reset();
try {
@@ -346,9 +354,9 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
@Test
public void testPreAggregatedTumblingTimeWindow() {
- final int NUM_ELEMENTS_PER_KEY = 3000;
- final int WINDOW_SIZE = 100;
- final int NUM_KEYS = 100;
+ final int NUM_ELEMENTS_PER_KEY = numElementsPerKey();
+ final int WINDOW_SIZE = windowSize();
+ final int NUM_KEYS = numKeys();
FailingSource.reset();
try {
@@ -418,10 +426,10 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
@Test
public void testPreAggregatedSlidingTimeWindow() {
- final int NUM_ELEMENTS_PER_KEY = 3000;
- final int WINDOW_SIZE = 1000;
- final int WINDOW_SLIDE = 100;
- final int NUM_KEYS = 100;
+ final int NUM_ELEMENTS_PER_KEY = numElementsPerKey();
+ final int WINDOW_SIZE = windowSize();
+ final int WINDOW_SLIDE = windowSlide();
+ final int NUM_KEYS = numKeys();
FailingSource.reset();
try {
@@ -790,4 +798,20 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
public IntType(int value) { this.value = value; }
}
+
+ protected int numElementsPerKey() {
+ return 300;
+ }
+
+ protected int windowSize() {
+ return 100;
+ }
+
+ protected int windowSlide() {
+ return 100;
+ }
+
+ protected int numKeys() {
+ return 20;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
new file mode 100644
index 0000000..a5bf10c
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+public class AsyncFileBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
+
+ public AsyncFileBackendEventTimeWindowCheckpointingITCase() {
+ super(StateBackendEnum.FILE_ASYNC);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
new file mode 100644
index 0000000..ef9ad37
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+public class AsyncMemBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
+
+ public AsyncMemBackendEventTimeWindowCheckpointingITCase() {
+ super(StateBackendEnum.MEM_ASYNC);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
index 14feb78..da2bbc7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
@@ -23,4 +23,24 @@ public class RocksDbBackendEventTimeWindowCheckpointingITCase extends AbstractEv
public RocksDbBackendEventTimeWindowCheckpointingITCase() {
super(StateBackendEnum.ROCKSDB_FULLY_ASYNC);
}
+
+ @Override
+ protected int numElementsPerKey() {
+ return 3000;
+ }
+
+ @Override
+ protected int windowSize() {
+ return 1000;
+ }
+
+ @Override
+ protected int windowSlide() {
+ return 100;
+ }
+
+ @Override
+ protected int numKeys() {
+ return 100;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
index 456861a..cbb56d0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
@@ -33,6 +33,8 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import java.util.Random;
+
/**
* A collection of manual tests that serve to assess the performance of windowed operations. These
* run in local mode with parallelism 1 with a source that emits data as fast as possible. Thus,
@@ -241,11 +243,10 @@ public class ManualWindowSpeedITCase extends StreamingMultipleProgramsTestBase {
@Override
public void run(SourceContext<Tuple2<String, Integer>> out) throws Exception {
- long index = 0;
+ Random random = new Random(42);
while (running) {
- Tuple2<String, Integer> tuple = new Tuple2<String, Integer>("Tuple " + (index % numKeys), 1);
+ Tuple2<String, Integer> tuple = new Tuple2<String, Integer>("Tuple " + (random.nextInt(numKeys)), 1);
out.collect(tuple);
- index++;
}
}
[4/4] flink git commit: [FLINK-5715] Asynchronous snapshots for
heap-based keyed state backend
Posted by sr...@apache.org.
[FLINK-5715] Asynchronous snapshots for heap-based keyed state backend
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ab014ef9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ab014ef9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ab014ef9
Branch: refs/heads/master
Commit: ab014ef94e0e9137ac6f8f41dae385ff71e8ba5b
Parents: 30bb958
Author: Stefan Richter <s....@data-artisans.com>
Authored: Fri Mar 3 10:51:15 2017 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Mar 16 18:34:02 2017 +0100
----------------------------------------------------------------------
.../state/RocksDBKeyedStateBackend.java | 5 +
.../state/RocksDBStateBackendTest.java | 86 +-
.../java/org/apache/flink/util/MathUtils.java | 35 +-
.../filesystem/AbstractFsStateSnapshot.java | 30 +-
.../state/memory/AbstractMemStateSnapshot.java | 40 +-
.../AbstractMigrationRestoreStrategy.java | 117 ++
.../state/memory/MigrationRestoreSnapshot.java | 32 +
.../state/AbstractKeyedStateBackend.java | 7 +
.../runtime/state/ArrayListSerializer.java | 14 +-
.../state/KeyedBackendSerializationProxy.java | 23 +-
.../flink/runtime/state/KeyedStateBackend.java | 28 +-
.../state/StateTransformationFunction.java | 23 +
.../state/filesystem/FsStateBackend.java | 109 +-
.../state/heap/AbstractHeapMergingState.java | 93 +-
.../runtime/state/heap/AbstractHeapState.java | 84 +-
.../state/heap/AbstractStateTableSnapshot.java | 51 +
.../state/heap/CopyOnWriteStateTable.java | 1066 ++++++++++++++++++
.../heap/CopyOnWriteStateTableSnapshot.java | 188 +++
.../state/heap/HeapAggregatingState.java | 92 +-
.../runtime/state/heap/HeapFoldingState.java | 70 +-
.../state/heap/HeapKeyedStateBackend.java | 426 +++----
.../flink/runtime/state/heap/HeapListState.java | 63 +-
.../flink/runtime/state/heap/HeapMapState.java | 167 +--
.../runtime/state/heap/HeapReducingState.java | 82 +-
.../runtime/state/heap/HeapValueState.java | 47 +-
.../runtime/state/heap/InternalKeyContext.java | 60 +
.../state/heap/NestedMapsStateTable.java | 363 ++++++
.../flink/runtime/state/heap/StateEntry.java | 44 +
.../flink/runtime/state/heap/StateTable.java | 222 ++--
.../state/heap/StateTableByKeyGroupReader.java | 38 +
.../state/heap/StateTableByKeyGroupReaders.java | 136 +++
.../runtime/state/heap/StateTableSnapshot.java | 45 +
.../state/memory/MemoryStateBackend.java | 26 +
.../runtime/query/QueryableStateClientTest.java | 7 +-
.../message/KvStateRequestSerializerTest.java | 38 +-
.../state/AsyncFileStateBackendTest.java | 27 +
.../state/AsyncMemoryStateBackendTest.java | 27 +
.../runtime/state/FileStateBackendTest.java | 8 +-
.../runtime/state/MemoryStateBackendTest.java | 8 +-
.../runtime/state/StateBackendTestBase.java | 216 +++-
.../state/heap/CopyOnWriteStateTableTest.java | 486 ++++++++
.../state/heap/HeapAggregatingStateTest.java | 21 +-
...pKeyedStateBackendSnapshotMigrationTest.java | 173 +++
.../runtime/state/heap/HeapListStateTest.java | 17 +-
.../state/heap/HeapReducingStateTest.java | 23 +-
.../state/heap/HeapStateBackendTestBase.java | 54 +
.../StateTableSnapshotCompatibilityTest.java | 118 ++
.../util/BlockerCheckpointStreamFactory.java | 118 ++
.../heap_keyed_statebackend_1_2.snapshot | Bin 0 -> 2068 bytes
.../api/windowing/windows/TimeWindow.java | 5 +-
...tractEventTimeWindowCheckpointingITCase.java | 64 +-
...ckendEventTimeWindowCheckpointingITCase.java | 26 +
...ckendEventTimeWindowCheckpointingITCase.java | 26 +
...ckendEventTimeWindowCheckpointingITCase.java | 20 +
.../test/state/ManualWindowSpeedITCase.java | 7 +-
55 files changed, 4239 insertions(+), 1162 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index aaccc2f..f585d21 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1219,4 +1219,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
// expected
}
}
+
+ @Override
+ public boolean supportsAsynchronousSnapshots() {
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index c7b5c20..708613b 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -31,14 +31,13 @@ import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.StateBackendTestBase;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -364,89 +363,6 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
assertEquals(null, keyedStateBackend.db);
}
- static class BlockerCheckpointStreamFactory implements CheckpointStreamFactory {
-
- private final int maxSize;
- private int afterNumberInvocations;
- private OneShotLatch blocker;
- private OneShotLatch waiter;
-
- MemCheckpointStreamFactory.MemoryCheckpointOutputStream lastCreatedStream;
-
- public MemCheckpointStreamFactory.MemoryCheckpointOutputStream getLastCreatedStream() {
- return lastCreatedStream;
- }
-
- public BlockerCheckpointStreamFactory(int maxSize) {
- this.maxSize = maxSize;
- }
-
- public void setAfterNumberInvocations(int afterNumberInvocations) {
- this.afterNumberInvocations = afterNumberInvocations;
- }
-
- public void setBlockerLatch(OneShotLatch latch) {
- this.blocker = latch;
- }
-
- public void setWaiterLatch(OneShotLatch latch) {
- this.waiter = latch;
- }
-
- @Override
- public MemCheckpointStreamFactory.MemoryCheckpointOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
- waiter.trigger();
- this.lastCreatedStream = new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize) {
-
- private int afterNInvocations = afterNumberInvocations;
- private final OneShotLatch streamBlocker = blocker;
- private final OneShotLatch streamWaiter = waiter;
-
- @Override
- public void write(int b) throws IOException {
-
- if (afterNInvocations > 0) {
- --afterNInvocations;
- }
-
- if (0 == afterNInvocations && null != streamBlocker) {
- try {
- streamBlocker.await();
- } catch (InterruptedException ignored) {
- }
- }
- try {
- super.write(b);
- } catch (IOException ex) {
- if (null != streamWaiter) {
- streamWaiter.trigger();
- }
- throw ex;
- }
-
- if (0 == afterNInvocations && null != streamWaiter) {
- streamWaiter.trigger();
- }
- }
-
- @Override
- public void close() {
- super.close();
- if (null != streamWaiter) {
- streamWaiter.trigger();
- }
- }
- };
-
- return lastCreatedStream;
- }
-
- @Override
- public void close() throws Exception {
-
- }
- }
-
private static class AcceptAllFilter implements IOFileFilter {
@Override
public boolean accept(File file) {
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
index 074e8ae..1d84a39 100644
--- a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
@@ -140,11 +140,7 @@ public final class MathUtils {
code = code * 5 + 0xe6546b64;
code ^= 4;
- code ^= code >>> 16;
- code *= 0x85ebca6b;
- code ^= code >>> 13;
- code *= 0xc2b2ae35;
- code ^= code >>> 16;
+ code = bitMix(code);
if (code >= 0) {
return code;
@@ -172,6 +168,35 @@ public final class MathUtils {
return x + 1;
}
+ /**
+ * Pseudo-randomly maps a long (64-bit) to an integer (32-bit) using some bit-mixing for better distribution.
+ *
+ * @param in the long (64-bit)input.
+ * @return the bit-mixed int (32-bit) output
+ */
+ public static int longToIntWithBitMixing(long in) {
+ in = (in ^ (in >>> 30)) * 0xbf58476d1ce4e5b9L;
+ in = (in ^ (in >>> 27)) * 0x94d049bb133111ebL;
+ in = in ^ (in >>> 31);
+ return (int) in;
+ }
+
+ /**
+ * Bit-mixing for pseudo-randomization of integers (e.g., to guard against bad hash functions). Implementation is
+ * from Murmur's 32 bit finalizer.
+ *
+ * @param in the input value
+ * @return the bit-mixed output value
+ */
+ public static int bitMix(int in) {
+ in ^= in >>> 16;
+ in *= 0x85ebca6b;
+ in ^= in >>> 13;
+ in *= 0xc2b2ae35;
+ in ^= in >>> 16;
+ return in;
+ }
+
// ============================================================================================
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java
index 103c214..a15e49d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java
@@ -21,8 +21,16 @@ package org.apache.flink.migration.runtime.state.filesystem;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.migration.runtime.state.KvStateSnapshot;
+import org.apache.flink.migration.runtime.state.memory.AbstractMigrationRestoreStrategy;
+import org.apache.flink.migration.runtime.state.memory.MigrationRestoreSnapshot;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.StateTable;
import java.io.IOException;
@@ -36,7 +44,7 @@ import java.io.IOException;
@Deprecated
@SuppressWarnings("deprecation")
public abstract class AbstractFsStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>>
- extends AbstractFileStateHandle implements KvStateSnapshot<K, N, S, SD> {
+ extends AbstractFileStateHandle implements KvStateSnapshot<K, N, S, SD>, MigrationRestoreSnapshot<K, N, SV> {
private static final long serialVersionUID = 1L;
@@ -85,4 +93,24 @@ public abstract class AbstractFsStateSnapshot<K, N, SV, S extends State, SD exte
public SD getStateDesc() {
return stateDesc;
}
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public StateTable<K, N, SV> deserialize(
+ String stateName,
+ HeapKeyedStateBackend<K> stateBackend) throws IOException {
+
+ final FileSystem fs = getFilePath().getFileSystem();
+ try (FSDataInputStream inStream = fs.open(getFilePath())) {
+ final DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(inStream);
+ AbstractMigrationRestoreStrategy<K, N, SV> restoreStrategy =
+ new AbstractMigrationRestoreStrategy<K, N, SV>(keySerializer, namespaceSerializer, stateSerializer) {
+ @Override
+ protected DataInputView openDataInputView() throws IOException {
+ return inView;
+ }
+ };
+ return restoreStrategy.deserialize(stateName, stateBackend);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java
index 6056578..ff86f7c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java
@@ -21,17 +21,18 @@ package org.apache.flink.migration.runtime.state.memory;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.migration.runtime.state.KvStateSnapshot;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.StateTable;
import org.apache.flink.runtime.util.DataInputDeserializer;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
@Deprecated
@SuppressWarnings("deprecation")
public abstract class AbstractMemStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>>
- implements KvStateSnapshot<K, N, S, SD> {
+ implements KvStateSnapshot<K, N, S, SD>, MigrationRestoreSnapshot<K, N, SV> {
private static final long serialVersionUID = 1L;
@@ -73,24 +74,21 @@ public abstract class AbstractMemStateSnapshot<K, N, SV, S extends State, SD ext
this.data = data;
}
- public HashMap<N, Map<K, SV>> deserialize() throws IOException {
- DataInputDeserializer inView = new DataInputDeserializer(data, 0, data.length);
-
- final int numKeys = inView.readInt();
- HashMap<N, Map<K, SV>> stateMap = new HashMap<>(numKeys);
-
- for (int i = 0; i < numKeys && !closed; i++) {
- N namespace = namespaceSerializer.deserialize(inView);
- final int numValues = inView.readInt();
- Map<K, SV> namespaceMap = new HashMap<>(numValues);
- stateMap.put(namespace, namespaceMap);
- for (int j = 0; j < numValues; j++) {
- K key = keySerializer.deserialize(inView);
- SV value = stateSerializer.deserialize(inView);
- namespaceMap.put(key, value);
- }
- }
- return stateMap;
+ @Override
+ @SuppressWarnings("unchecked")
+ public StateTable<K, N, SV> deserialize(
+ String stateName,
+ HeapKeyedStateBackend<K> stateBackend) throws IOException {
+
+ final DataInputDeserializer inView = new DataInputDeserializer(data, 0, data.length);
+ AbstractMigrationRestoreStrategy<K, N, SV> restoreStrategy =
+ new AbstractMigrationRestoreStrategy<K, N, SV>(keySerializer, namespaceSerializer, stateSerializer) {
+ @Override
+ protected DataInputView openDataInputView() throws IOException {
+ return inView;
+ }
+ };
+ return restoreStrategy.deserialize(stateName, stateBackend);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java
new file mode 100644
index 0000000..e572619
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.memory;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.VoidSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.StateTable;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * This class outlines the general strategy to restore from migration states.
+ *
+ * @param <K> type of key.
+ * @param <N> type of namespace.
+ * @param <S> type of state.
+ */
+@Deprecated
+public abstract class AbstractMigrationRestoreStrategy<K, N, S> implements MigrationRestoreSnapshot<K, N, S> {
+
+ /**
+ * Key Serializer
+ */
+ protected final TypeSerializer<K> keySerializer;
+
+ /**
+ * Namespace Serializer
+ */
+ protected final TypeSerializer<N> namespaceSerializer;
+
+ /**
+ * Serializer for the state value
+ */
+ protected final TypeSerializer<S> stateSerializer;
+
+ public AbstractMigrationRestoreStrategy(
+ TypeSerializer<K> keySerializer,
+ TypeSerializer<N> namespaceSerializer,
+ TypeSerializer<S> stateSerializer) {
+
+ this.keySerializer = Preconditions.checkNotNull(keySerializer);
+ this.namespaceSerializer = Preconditions.checkNotNull(namespaceSerializer);
+ this.stateSerializer = Preconditions.checkNotNull(stateSerializer);
+ }
+
+ @Override
+ public StateTable<K, N, S> deserialize(String stateName, HeapKeyedStateBackend<K> stateBackend) throws IOException {
+
+ Preconditions.checkNotNull(stateName, "State name is null. Cannot deserialize snapshot.");
+ Preconditions.checkNotNull(stateBackend, "State backend is null. Cannot deserialize snapshot.");
+
+ final KeyGroupRange keyGroupRange = stateBackend.getKeyGroupRange();
+ Preconditions.checkState(1 == keyGroupRange.getNumberOfKeyGroups(),
+ "Unexpected number of key-groups for restoring from Flink 1.1");
+
+ TypeSerializer<N> patchedNamespaceSerializer = this.namespaceSerializer;
+
+ if (patchedNamespaceSerializer instanceof VoidSerializer) {
+ patchedNamespaceSerializer = (TypeSerializer<N>) VoidNamespaceSerializer.INSTANCE;
+ }
+
+ RegisteredBackendStateMetaInfo<N, S> registeredBackendStateMetaInfo =
+ new RegisteredBackendStateMetaInfo<>(
+ StateDescriptor.Type.UNKNOWN,
+ stateName,
+ patchedNamespaceSerializer,
+ stateSerializer);
+
+ final StateTable<K, N, S> stateTable = stateBackend.newStateTable(registeredBackendStateMetaInfo);
+ final DataInputView inView = openDataInputView();
+ final int keyGroup = keyGroupRange.getStartKeyGroup();
+ final int numNamespaces = inView.readInt();
+
+ for (int i = 0; i < numNamespaces; i++) {
+ N namespace = namespaceSerializer.deserialize(inView);
+ if (null == namespace) {
+ namespace = (N) VoidNamespace.INSTANCE;
+ }
+ final int numKV = inView.readInt();
+ for (int j = 0; j < numKV; j++) {
+ K key = keySerializer.deserialize(inView);
+ S value = stateSerializer.deserialize(inView);
+ stateTable.put(key, keyGroup, namespace, value);
+ }
+ }
+ return stateTable;
+ }
+
+ /**
+ * Different state handles require different code to end up with a {@link DataInputView}.
+ */
+ protected abstract DataInputView openDataInputView() throws IOException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MigrationRestoreSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MigrationRestoreSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MigrationRestoreSnapshot.java
new file mode 100644
index 0000000..ea529db
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MigrationRestoreSnapshot.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.memory;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.StateTable;
+import org.apache.flink.util.Migration;
+
+import java.io.IOException;
+
+@Deprecated
+@Internal
+public interface MigrationRestoreSnapshot<K, N, S> extends Migration {
+ StateTable<K, N, S> deserialize(String stateName, HeapKeyedStateBackend<K> stateBackend) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index aba00f3..1f2f4a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
@@ -254,6 +255,7 @@ public abstract class AbstractKeyedStateBackend<K>
/**
* @see KeyedStateBackend
*/
+ @Override
public KeyGroupRange getKeyGroupRange() {
return keyGroupRange;
}
@@ -382,4 +384,9 @@ public abstract class AbstractKeyedStateBackend<K>
public void close() throws IOException {
cancelStreamRegistry.close();
}
+
+ @VisibleForTesting
+ public boolean supportsAsynchronousSnapshots() {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
index f5a6405..0badb41 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
@@ -57,11 +57,17 @@ final public class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> {
@Override
public ArrayList<T> copy(ArrayList<T> from) {
- ArrayList<T> newList = new ArrayList<>(from.size());
- for (int i = 0; i < from.size(); i++) {
- newList.add(elementSerializer.copy(from.get(i)));
+ if (elementSerializer.isImmutableType()) {
+ // fast track using memcopy for immutable types
+ return new ArrayList<>(from);
+ } else {
+ // element-wise deep copy for mutable types
+ ArrayList<T> newList = new ArrayList<>(from.size());
+ for (int i = 0; i < from.size(); i++) {
+ newList.add(elementSerializer.copy(from.get(i)));
+ }
+ return newList;
}
- return newList;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
index dbee6cb..5661c38 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.io.VersionMismatchException;
import org.apache.flink.core.io.VersionedIOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -37,11 +38,12 @@ import java.util.List;
*/
public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable {
- private static final int VERSION = 1;
+ public static final int VERSION = 2;
private TypeSerializerSerializationProxy<?> keySerializerProxy;
private List<StateMetaInfo<?, ?>> namedStateSerializationProxies;
+ private int restoredVersion;
private ClassLoader userCodeClassLoader;
public KeyedBackendSerializationProxy(ClassLoader userCodeClassLoader) {
@@ -51,6 +53,7 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
public KeyedBackendSerializationProxy(TypeSerializer<?> keySerializer, List<StateMetaInfo<?, ?>> namedStateSerializationProxies) {
this.keySerializerProxy = new TypeSerializerSerializationProxy<>(Preconditions.checkNotNull(keySerializer));
this.namedStateSerializationProxies = Preconditions.checkNotNull(namedStateSerializationProxies);
+ this.restoredVersion = VERSION;
Preconditions.checkArgument(namedStateSerializationProxies.size() <= Short.MAX_VALUE);
}
@@ -67,6 +70,22 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
return VERSION;
}
+ public int getRestoredVersion() {
+ return restoredVersion;
+ }
+
+ @Override
+ protected void resolveVersionRead(int foundVersion) throws VersionMismatchException {
+ super.resolveVersionRead(foundVersion);
+ this.restoredVersion = foundVersion;
+ }
+
+ @Override
+ public boolean isCompatibleVersion(int version) {
+ // we are compatible with version 2 (Flink 1.3.x) and version 1 (Flink 1.2.x)
+ return super.isCompatibleVersion(version) || version == 1;
+ }
+
@Override
public void write(DataOutputView out) throws IOException {
super.write(out);
@@ -96,7 +115,7 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
}
}
-//----------------------------------------------------------------------------------------------------------------------
+ //----------------------------------------------------------------------------------------------------------------------
/**
* This is the serialization proxy for {@link RegisteredBackendStateMetaInfo} for a single registered state in a
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
index 15e0491..09e27e7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
@@ -21,13 +21,14 @@ package org.apache.flink.runtime.state;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.heap.InternalKeyContext;
/**
* A keyed state backend provides methods for managing keyed state.
*
* @param <K> The key by which state is keyed.
*/
-public interface KeyedStateBackend<K> {
+public interface KeyedStateBackend<K> extends InternalKeyContext<K> {
/**
* Sets the current key that is used for partitioned state.
@@ -36,31 +37,6 @@ public interface KeyedStateBackend<K> {
void setCurrentKey(K newKey);
/**
- * Used by states to access the current key.
- */
- K getCurrentKey();
-
- /**
- * Returns the key-group to which the current key belongs.
- */
- int getCurrentKeyGroupIndex();
-
- /**
- * Returns the number of key-groups aka max parallelism.
- */
- int getNumberOfKeyGroups();
-
- /**
- * Returns the key groups for this backend.
- */
- KeyGroupsList getKeyGroupRange();
-
- /**
- * {@link TypeSerializer} for the state backend key type.
- */
- TypeSerializer<K> getKeySerializer();
-
- /**
* Creates or retrieves a keyed state backed by this state backend.
*
* @param namespaceSerializer The serializer used for the namespace type of the state
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java
new file mode 100644
index 0000000..9e12ee5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+public interface StateTransformationFunction<S, T> {
+ S apply(S previousState, T value) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 2e9198f..e27712c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -66,7 +66,10 @@ public class FsStateBackend extends AbstractStateBackend {
/** State below this size will be stored as part of the metadata, rather than in files */
private final int fileStateThreshold;
-
+
+ /** Switch to chose between synchronous and asynchronous snapshots */
+ private final boolean asynchronousSnapshots;
+
/**
* Creates a new state backend that stores its checkpoint data in the file system and location
* defined by the given URI.
@@ -99,6 +102,27 @@ public class FsStateBackend extends AbstractStateBackend {
*
* @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
* and the path to the checkpoint data directory.
+ * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
+ *
+ * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
+ */
+ public FsStateBackend(String checkpointDataUri, boolean asynchronousSnapshots) throws IOException {
+ this(new Path(checkpointDataUri), asynchronousSnapshots);
+ }
+
+ /**
+ * Creates a new state backend that stores its checkpoint data in the file system and location
+ * defined by the given URI.
+ *
+ * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
+ * must be accessible via {@link FileSystem#get(URI)}.
+ *
+ * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
+ * (host and port), or that the Hadoop configuration that describes that information must be in the
+ * classpath.
+ *
+ * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
+ * and the path to the checkpoint data directory.
* @throws IOException Thrown, if no file system can be found for the scheme in the URI.
*/
public FsStateBackend(Path checkpointDataUri) throws IOException {
@@ -118,10 +142,52 @@ public class FsStateBackend extends AbstractStateBackend {
*
* @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
* and the path to the checkpoint data directory.
+ * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
+ *
+ * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
+ */
+ public FsStateBackend(Path checkpointDataUri, boolean asynchronousSnapshots) throws IOException {
+ this(checkpointDataUri.toUri(), asynchronousSnapshots);
+ }
+
+ /**
+ * Creates a new state backend that stores its checkpoint data in the file system and location
+ * defined by the given URI.
+ *
+ * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
+ * must be accessible via {@link FileSystem#get(URI)}.
+ *
+ * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
+ * (host and port), or that the Hadoop configuration that describes that information must be in the
+ * classpath.
+ *
+ * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
+ * and the path to the checkpoint data directory.
* @throws IOException Thrown, if no file system can be found for the scheme in the URI.
*/
public FsStateBackend(URI checkpointDataUri) throws IOException {
- this(checkpointDataUri, DEFAULT_FILE_STATE_THRESHOLD);
+ this(checkpointDataUri, DEFAULT_FILE_STATE_THRESHOLD, false);
+ }
+
+ /**
+ * Creates a new state backend that stores its checkpoint data in the file system and location
+ * defined by the given URI.
+ *
+ * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
+ * must be accessible via {@link FileSystem#get(URI)}.
+ *
+ * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
+ * (host and port), or that the Hadoop configuration that describes that information must be in the
+ * classpath.
+ *
+ * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
+ * and the path to the checkpoint data directory.
+ * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
+ *
+ * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
+ */
+ public FsStateBackend(URI checkpointDataUri, boolean asynchronousSnapshots) throws IOException {
+ this(checkpointDataUri, DEFAULT_FILE_STATE_THRESHOLD, asynchronousSnapshots);
}
/**
@@ -139,17 +205,47 @@ public class FsStateBackend extends AbstractStateBackend {
* and the path to the checkpoint data directory.
* @param fileStateSizeThreshold State up to this size will be stored as part of the metadata,
* rather than in files
- *
+ *
* @throws IOException Thrown, if no file system can be found for the scheme in the URI.
* @throws IllegalArgumentException Thrown, if the {@code fileStateSizeThreshold} is out of bounds.
*/
public FsStateBackend(URI checkpointDataUri, int fileStateSizeThreshold) throws IOException {
+
+ this(checkpointDataUri, fileStateSizeThreshold, false);
+ }
+
+ /**
+ * Creates a new state backend that stores its checkpoint data in the file system and location
+ * defined by the given URI.
+ *
+ * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
+ * must be accessible via {@link FileSystem#get(URI)}.
+ *
+ * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
+ * (host and port), or that the Hadoop configuration that describes that information must be in the
+ * classpath.
+ *
+ * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
+ * and the path to the checkpoint data directory.
+ * @param fileStateSizeThreshold State up to this size will be stored as part of the metadata,
+ * rather than in files
+ * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
+ *
+ * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
+ */
+ public FsStateBackend(
+ URI checkpointDataUri,
+ int fileStateSizeThreshold,
+ boolean asynchronousSnapshots) throws IOException {
+
checkArgument(fileStateSizeThreshold >= 0, "The threshold for file state size must be zero or larger.");
- checkArgument(fileStateSizeThreshold <= MAX_FILE_STATE_THRESHOLD,
+ checkArgument(fileStateSizeThreshold <= MAX_FILE_STATE_THRESHOLD,
"The threshold for file state size cannot be larger than %s", MAX_FILE_STATE_THRESHOLD);
this.fileStateThreshold = fileStateSizeThreshold;
this.basePath = validateAndNormalizeUri(checkpointDataUri);
+
+ this.asynchronousSnapshots = asynchronousSnapshots;
}
/**
@@ -166,9 +262,9 @@ public class FsStateBackend extends AbstractStateBackend {
* Gets the threshold below which state is stored as part of the metadata, rather than in files.
* This threshold ensures that the backend does not create a large amount of very small files,
* where potentially the file pointers are larger than the state itself.
- *
+ *
* <p>By default, this threshold is {@value #DEFAULT_FILE_STATE_THRESHOLD}.
- *
+ *
* @return The file size threshold, in bytes.
*/
public int getMinFileSizeThreshold() {
@@ -209,6 +305,7 @@ public class FsStateBackend extends AbstractStateBackend {
env.getUserClassLoader(),
numberOfKeyGroups,
keyGroupRange,
+ asynchronousSnapshots,
env.getExecutionConfig());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
index 4ac7125..3e76423 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
@@ -22,18 +22,15 @@ import org.apache.flink.api.common.state.MergingState;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.runtime.state.internal.InternalMergingState;
import java.util.Collection;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkState;
/**
* Base class for {@link MergingState} ({@link org.apache.flink.runtime.state.internal.InternalMergingState})
* that is stored on the heap.
- *
+ *
* @param <K> The type of the key.
* @param <N> The type of the namespace.
* @param <SV> The type of the values in the state.
@@ -45,21 +42,25 @@ public abstract class AbstractHeapMergingState<K, N, IN, OUT, SV, S extends Stat
implements InternalMergingState<N, IN, OUT> {
/**
+ * The merge transformation function that implements the merge logic.
+ */
+ private final MergeTransformation mergeTransformation;
+
+ /**
* Creates a new key/value state for the given hash map of key/value pairs.
*
- * @param backend The state backend backing that created this state.
* @param stateDesc The state identifier for the state. This contains name
* and can create a default state value.
* @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
*/
protected AbstractHeapMergingState(
- KeyedStateBackend<K> backend,
SD stateDesc,
StateTable<K, N, SV> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer) {
- super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer);
+ super(stateDesc, stateTable, keySerializer, namespaceSerializer);
+ this.mergeTransformation = new MergeTransformation();
}
@Override
@@ -68,56 +69,40 @@ public abstract class AbstractHeapMergingState<K, N, IN, OUT, SV, S extends Stat
return; // nothing to do
}
- final K key = backend.getCurrentKey();
- checkState(key != null, "No key set.");
-
- final Map<N, Map<K, SV>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex());
-
- if (namespaceMap != null) {
- SV merged = null;
-
- // merge the sources
- for (N source : sources) {
- Map<K, SV> keysForNamespace = namespaceMap.get(source);
- if (keysForNamespace != null) {
- // get and remove the next source per namespace/key
- SV sourceState = keysForNamespace.remove(key);
-
- // if the namespace map became empty, remove
- if (keysForNamespace.isEmpty()) {
- namespaceMap.remove(source);
- }
-
- if (merged != null && sourceState != null) {
- merged = mergeState(merged, sourceState);
- }
- else if (merged == null) {
- merged = sourceState;
- }
- }
- }
+ final StateTable<K, N, SV> map = stateTable;
+
+ SV merged = null;
+
+ // merge the sources
+ for (N source : sources) {
- // merge into the target, if needed
- if (merged != null) {
- Map<K, SV> keysForTarget = namespaceMap.get(target);
- if (keysForTarget == null) {
- keysForTarget = createNewMap();
- namespaceMap.put(target, keysForTarget);
- }
- SV targetState = keysForTarget.get(key);
-
- if (targetState != null) {
- targetState = mergeState(targetState, merged);
- }
- else {
- targetState = merged;
- }
- keysForTarget.put(key, targetState);
+ // get and remove the next source per namespace/key
+ SV sourceState = map.removeAndGetOld(source);
+
+ if (merged != null && sourceState != null) {
+ merged = mergeState(merged, sourceState);
+ } else if (merged == null) {
+ merged = sourceState;
}
}
- // else no entries for that key at all, nothing to do skip
+ // merge into the target, if needed
+ if (merged != null) {
+ map.transform(target, merged, mergeTransformation);
+ }
}
protected abstract SV mergeState(SV a, SV b) throws Exception;
-}
+
+ final class MergeTransformation implements StateTransformationFunction<SV, SV> {
+
+ @Override
+ public SV apply(SV targetState, SV merged) throws Exception {
+ if (targetState != null) {
+ return mergeState(targetState, merged);
+ } else {
+ return merged;
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
index 18b71de..7e1123d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state.heap;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
@@ -25,18 +26,12 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.state.internal.InternalKvState;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.util.Preconditions;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
/**
* Base class for partitioned {@link ListState} implementations that are backed by a regular
* heap hash map. The concrete implementations define how the state is checkpointed.
- *
+ *
* @param <K> The type of the key.
* @param <N> The type of the namespace.
* @param <SV> The type of the values in the state.
@@ -53,9 +48,7 @@ public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends St
protected final SD stateDesc;
/** The current namespace, which the access methods will refer to. */
- protected N currentNamespace = null;
-
- protected final KeyedStateBackend<K> backend;
+ protected N currentNamespace;
protected final TypeSerializer<K> keySerializer;
@@ -64,58 +57,28 @@ public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends St
/**
* Creates a new key/value state for the given hash map of key/value pairs.
*
- * @param backend The state backend backing that created this state.
* @param stateDesc The state identifier for the state. This contains name
* and can create a default state value.
* @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
*/
protected AbstractHeapState(
- KeyedStateBackend<K> backend,
SD stateDesc,
StateTable<K, N, SV> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer) {
- Preconditions.checkNotNull(stateTable, "State table must not be null.");
-
- this.backend = backend;
this.stateDesc = stateDesc;
- this.stateTable = stateTable;
+ this.stateTable = Preconditions.checkNotNull(stateTable, "State table must not be null.");
this.keySerializer = keySerializer;
this.namespaceSerializer = namespaceSerializer;
+ this.currentNamespace = null;
}
// ------------------------------------------------------------------------
@Override
public final void clear() {
- Preconditions.checkState(currentNamespace != null, "No namespace set.");
- Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
-
- Map<N, Map<K, SV>> namespaceMap =
- stateTable.get(backend.getCurrentKeyGroupIndex());
-
- if (namespaceMap == null) {
- return;
- }
-
- Map<K, SV> keyedMap = namespaceMap.get(currentNamespace);
-
- if (keyedMap == null) {
- return;
- }
-
- SV removed = keyedMap.remove(backend.getCurrentKey());
-
- if (removed == null) {
- return;
- }
-
- if (!keyedMap.isEmpty()) {
- return;
- }
-
- namespaceMap.remove(currentNamespace);
+ stateTable.remove(currentNamespace);
}
@Override
@@ -137,20 +100,7 @@ public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends St
Preconditions.checkState(namespace != null, "No namespace given.");
Preconditions.checkState(key != null, "No key given.");
- Map<N, Map<K, SV>> namespaceMap =
- stateTable.get(KeyGroupRangeAssignment.assignToKeyGroup(key, backend.getNumberOfKeyGroups()));
-
- if (namespaceMap == null) {
- return null;
- }
-
- Map<K, SV> keyedMap = namespaceMap.get(currentNamespace);
-
- if (keyedMap == null) {
- return null;
- }
-
- SV result = keyedMap.get(key);
+ SV result = stateTable.get(key, namespace);
if (result == null) {
return null;
@@ -158,30 +108,14 @@ public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends St
@SuppressWarnings("unchecked,rawtypes")
TypeSerializer serializer = stateDesc.getSerializer();
-
return KvStateRequestSerializer.serializeValue(result, serializer);
}
/**
- * Creates a new map for use in Heap based state.
- *
- * <p>If the state queryable ({@link StateDescriptor#isQueryable()}, this
- * will create a concurrent hash map instead of a regular one.
- *
- * @return A new namespace map.
- */
- protected <MK, MV> Map<MK, MV> createNewMap() {
- if (stateDesc.isQueryable()) {
- return new ConcurrentHashMap<>();
- } else {
- return new HashMap<>();
- }
- }
-
- /**
* This should only be used for testing.
*/
+ @VisibleForTesting
public StateTable<K, N, SV> getStateTable() {
return stateTable;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java
new file mode 100644
index 0000000..b0d7727
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Abstract class to encapsulate the logic to take snapshots of {@link StateTable} implementations and also defines how
+ * the snapshot is written during the serialization phase of checkpointing.
+ */
+@Internal
+abstract class AbstractStateTableSnapshot<K, N, S, T extends StateTable<K, N, S>> implements StateTableSnapshot {
+
+ /**
+ * The {@link StateTable} from which this snapshot was created.
+ */
+ final T owningStateTable;
+
+ /**
+ * Creates a new {@link AbstractStateTableSnapshot} for and owned by the given table.
+ *
+ * @param owningStateTable the {@link StateTable} for which this object represents a snapshot.
+ */
+ AbstractStateTableSnapshot(T owningStateTable) {
+ this.owningStateTable = Preconditions.checkNotNull(owningStateTable);
+ }
+
+ /**
+ * Optional hook to release resources for this snapshot at the end of its lifecycle.
+ */
+ @Override
+ public void release() {
+ }
+}
\ No newline at end of file
[2/4] flink git commit: [FLINK-5715] Asynchronous snapshots for
heap-based keyed state backend
Posted by sr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
index 0360161..f393237 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
@@ -22,8 +22,6 @@ import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.util.Preconditions;
@@ -47,35 +45,23 @@ public class HeapMapState<K, N, UK, UV>
/**
* Creates a new key/value state for the given hash map of key/value pairs.
*
- * @param backend The state backend backing that created this state.
* @param stateDesc The state identifier for the state. This contains name
* and can create a default state value.
* @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
*/
- public HeapMapState(KeyedStateBackend<K> backend,
+ public HeapMapState(
MapStateDescriptor<UK, UV> stateDesc,
StateTable<K, N, HashMap<UK, UV>> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer) {
- super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer);
+ super(stateDesc, stateTable, keySerializer, namespaceSerializer);
}
@Override
public UV get(UK userKey) {
- Preconditions.checkState(currentNamespace != null, "No namespace set.");
- Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
- Map<N, Map<K, HashMap<UK, UV>>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex());
- if (namespaceMap == null) {
- return null;
- }
-
- Map<K, HashMap<UK, UV>> keyedMap = namespaceMap.get(currentNamespace);
- if (keyedMap == null) {
- return null;
- }
+ HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
- HashMap<UK, UV> userMap = keyedMap.get(backend.<K>getCurrentKey());
if (userMap == null) {
return null;
}
@@ -85,25 +71,11 @@ public class HeapMapState<K, N, UK, UV>
@Override
public void put(UK userKey, UV userValue) {
- Preconditions.checkState(currentNamespace != null, "No namespace set.");
- Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
-
- Map<N, Map<K, HashMap<UK, UV>>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex());
- if (namespaceMap == null) {
- namespaceMap = createNewMap();
- stateTable.set(backend.getCurrentKeyGroupIndex(), namespaceMap);
- }
-
- Map<K, HashMap<UK, UV>> keyedMap = namespaceMap.get(currentNamespace);
- if (keyedMap == null) {
- keyedMap = createNewMap();
- namespaceMap.put(currentNamespace, keyedMap);
- }
- HashMap<UK, UV> userMap = keyedMap.get(backend.getCurrentKey());
+ HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
if (userMap == null) {
userMap = new HashMap<>();
- keyedMap.put(backend.getCurrentKey(), userMap);
+ stateTable.put(currentNamespace, userMap);
}
userMap.put(userKey, userValue);
@@ -111,52 +83,27 @@ public class HeapMapState<K, N, UK, UV>
@Override
public void putAll(Map<UK, UV> value) {
- Preconditions.checkState(currentNamespace != null, "No namespace set.");
- Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
- Map<N, Map<K, HashMap<UK, UV>>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex());
- if (namespaceMap == null) {
- namespaceMap = createNewMap();
- stateTable.set(backend.getCurrentKeyGroupIndex(), namespaceMap);
- }
-
- Map<K, HashMap<UK, UV>> keyedMap = namespaceMap.get(currentNamespace);
- if (keyedMap == null) {
- keyedMap = createNewMap();
- namespaceMap.put(currentNamespace, keyedMap);
- }
+ HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
- HashMap<UK, UV> userMap = keyedMap.get(backend.getCurrentKey());
if (userMap == null) {
userMap = new HashMap<>();
- keyedMap.put(backend.getCurrentKey(), userMap);
+ stateTable.put(currentNamespace, userMap);
}
userMap.putAll(value);
}
-
+
@Override
public void remove(UK userKey) {
- Preconditions.checkState(currentNamespace != null, "No namespace set.");
- Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
-
- Map<N, Map<K, HashMap<UK, UV>>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex());
- if (namespaceMap == null) {
- return;
- }
- Map<K, HashMap<UK, UV>> keyedMap = namespaceMap.get(currentNamespace);
- if (keyedMap == null) {
- return;
- }
-
- HashMap<UK, UV> userMap = keyedMap.get(backend.getCurrentKey());
+ HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
if (userMap == null) {
return;
}
userMap.remove(userKey);
-
+
if (userMap.isEmpty()) {
clear();
}
@@ -164,101 +111,31 @@ public class HeapMapState<K, N, UK, UV>
@Override
public boolean contains(UK userKey) {
- Preconditions.checkState(currentNamespace != null, "No namespace set.");
- Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
-
- Map<N, Map<K, HashMap<UK, UV>>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex());
- if (namespaceMap == null) {
- return false;
- }
-
- Map<K, HashMap<UK, UV>> keyedMap = namespaceMap.get(currentNamespace);
- if (keyedMap == null) {
- return false;
- }
-
- HashMap<UK, UV> userMap = keyedMap.get(backend.<K>getCurrentKey());
-
+ HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
return userMap != null && userMap.containsKey(userKey);
}
@Override
public Iterable<Map.Entry<UK, UV>> entries() {
- Preconditions.checkState(currentNamespace != null, "No namespace set.");
- Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
-
- Map<N, Map<K, HashMap<UK, UV>>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex());
- if (namespaceMap == null) {
- return null;
- }
-
- Map<K, HashMap<UK, UV>> keyedMap = namespaceMap.get(currentNamespace);
- if (keyedMap == null) {
- return null;
- }
-
- HashMap<UK, UV> userMap = keyedMap.get(backend.<K>getCurrentKey());
-
+ HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
return userMap == null ? null : userMap.entrySet();
}
@Override
public Iterable<UK> keys() {
- Preconditions.checkState(currentNamespace != null, "No namespace set.");
- Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
-
- Map<N, Map<K, HashMap<UK, UV>>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex());
- if (namespaceMap == null) {
- return null;
- }
-
- Map<K, HashMap<UK, UV>> keyedMap = namespaceMap.get(currentNamespace);
- if (keyedMap == null) {
- return null;
- }
-
- HashMap<UK, UV> userMap = keyedMap.get(backend.<K>getCurrentKey());
-
+ HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
return userMap == null ? null : userMap.keySet();
}
@Override
public Iterable<UV> values() {
- Preconditions.checkState(currentNamespace != null, "No namespace set.");
- Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
-
- Map<N, Map<K, HashMap<UK, UV>>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex());
- if (namespaceMap == null) {
- return null;
- }
-
- Map<K, HashMap<UK, UV>> keyedMap = namespaceMap.get(currentNamespace);
- if (keyedMap == null) {
- return null;
- }
-
- HashMap<UK, UV> userMap = keyedMap.get(backend.<K>getCurrentKey());
-
+ HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
return userMap == null ? null : userMap.values();
}
@Override
public Iterator<Map.Entry<UK, UV>> iterator() {
- Preconditions.checkState(currentNamespace != null, "No namespace set.");
- Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
-
- Map<N, Map<K, HashMap<UK, UV>>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex());
- if (namespaceMap == null) {
- return null;
- }
-
- Map<K, HashMap<UK, UV>> keyedMap = namespaceMap.get(currentNamespace);
- if (keyedMap == null) {
- return null;
- }
-
- HashMap<UK, UV> userMap = keyedMap.get(backend.<K>getCurrentKey());
-
+ HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
return userMap == null ? null : userMap.entrySet().iterator();
}
@@ -267,22 +144,12 @@ public class HeapMapState<K, N, UK, UV>
Preconditions.checkState(namespace != null, "No namespace given.");
Preconditions.checkState(key != null, "No key given.");
- Map<N, Map<K, HashMap<UK, UV>>> namespaceMap = stateTable.get(KeyGroupRangeAssignment.assignToKeyGroup(key, backend.getNumberOfKeyGroups()));
-
- if (namespaceMap == null) {
- return null;
- }
+ HashMap<UK, UV> result = stateTable.get(key, namespace);
- Map<K, HashMap<UK, UV>> keyedMap = namespaceMap.get(namespace);
- if (keyedMap == null) {
+ if (null == result) {
return null;
}
- HashMap<UK, UV> result = keyedMap.get(key);
- if (result == null) {
- return null;
- }
-
TypeSerializer<UK> userKeySerializer = stateDesc.getKeySerializer();
TypeSerializer<UV> userValueSerializer = stateDesc.getValueSerializer();
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
index 090a660..6e11327 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
@@ -22,17 +22,16 @@ import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
-import java.util.Map;
/**
* Heap-backed partitioned {@link org.apache.flink.api.common.state.ReducingState} that is
* snapshotted into files.
- *
+ *
* @param <K> The type of the key.
* @param <N> The type of the namespace.
* @param <V> The type of the value.
@@ -41,25 +40,23 @@ public class HeapReducingState<K, N, V>
extends AbstractHeapMergingState<K, N, V, V, V, ReducingState<V>, ReducingStateDescriptor<V>>
implements InternalReducingState<N, V> {
- private final ReduceFunction<V> reduceFunction;
+ private final ReduceTransformation<V> reduceTransformation;
/**
* Creates a new key/value state for the given hash map of key/value pairs.
*
- * @param backend The state backend backing that created this state.
* @param stateDesc The state identifier for the state. This contains name
* and can create a default state value.
* @param stateTable The state table to use in this kev/value state. May contain initial state.
*/
public HeapReducingState(
- KeyedStateBackend<K> backend,
ReducingStateDescriptor<V> stateDesc,
StateTable<K, N, V> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer) {
- super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer);
- this.reduceFunction = stateDesc.getReduceFunction();
+ super(stateDesc, stateTable, keySerializer, namespaceSerializer);
+ this.reduceTransformation = new ReduceTransformation<>(stateDesc.getReduceFunction());
}
// ------------------------------------------------------------------------
@@ -68,62 +65,21 @@ public class HeapReducingState<K, N, V>
@Override
public V get() {
- Preconditions.checkState(currentNamespace != null, "No namespace set.");
- Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
-
- Map<N, Map<K, V>> namespaceMap =
- stateTable.get(backend.getCurrentKeyGroupIndex());
-
- if (namespaceMap == null) {
- return null;
- }
-
- Map<K, V> keyedMap = namespaceMap.get(currentNamespace);
-
- if (keyedMap == null) {
- return null;
- }
-
- return keyedMap.get(backend.<K>getCurrentKey());
+ return stateTable.get(currentNamespace);
}
@Override
public void add(V value) throws IOException {
- Preconditions.checkState(currentNamespace != null, "No namespace set.");
- Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
if (value == null) {
clear();
return;
}
- Map<N, Map<K, V>> namespaceMap =
- stateTable.get(backend.getCurrentKeyGroupIndex());
-
- if (namespaceMap == null) {
- namespaceMap = createNewMap();
- stateTable.set(backend.getCurrentKeyGroupIndex(), namespaceMap);
- }
-
- Map<K, V> keyedMap = namespaceMap.get(currentNamespace);
-
- if (keyedMap == null) {
- keyedMap = createNewMap();
- namespaceMap.put(currentNamespace, keyedMap);
- }
-
- V currentValue = keyedMap.put(backend.<K>getCurrentKey(), value);
-
- if (currentValue == null) {
- // we're good, just added the new value
- } else {
- V reducedValue;
- try {
- reducedValue = reduceFunction.reduce(currentValue, value);
- } catch (Exception e) {
- throw new IOException("Exception while applying ReduceFunction in reducing state", e);
- }
- keyedMap.put(backend.<K>getCurrentKey(), reducedValue);
+ try {
+ stateTable.transform(currentNamespace, value, reduceTransformation);
+ } catch (Exception e) {
+ throw new IOException("Exception while applying ReduceFunction in reducing state", e);
}
}
@@ -133,6 +89,20 @@ public class HeapReducingState<K, N, V>
@Override
protected V mergeState(V a, V b) throws Exception {
- return reduceFunction.reduce(a, b);
+ return reduceTransformation.apply(a, b);
+ }
+
+ static final class ReduceTransformation<V> implements StateTransformationFunction<V, V> {
+
+ private final ReduceFunction<V> reduceFunction;
+
+ ReduceTransformation(ReduceFunction<V> reduceFunction) {
+ this.reduceFunction = Preconditions.checkNotNull(reduceFunction);
+ }
+
+ @Override
+ public V apply(V previousState, V value) throws Exception {
+ return previousState != null ? reduceFunction.reduce(previousState, value) : value;
+ }
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
index 9e042fe..6de62a8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
@@ -21,16 +21,12 @@ package org.apache.flink.runtime.state.heap;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.internal.InternalValueState;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Map;
/**
* Heap-backed partitioned {@link org.apache.flink.api.common.state.ValueState} that is snapshotted
* into files.
- *
+ *
* @param <K> The type of the key.
* @param <N> The type of the namespace.
* @param <V> The type of the value.
@@ -42,39 +38,21 @@ public class HeapValueState<K, N, V>
/**
* Creates a new key/value state for the given hash map of key/value pairs.
*
- * @param backend The state backend backing that created this state.
* @param stateDesc The state identifier for the state. This contains name
* and can create a default state value.
* @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
*/
public HeapValueState(
- KeyedStateBackend<K> backend,
ValueStateDescriptor<V> stateDesc,
StateTable<K, N, V> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer) {
- super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer);
+ super(stateDesc, stateTable, keySerializer, namespaceSerializer);
}
@Override
public V value() {
- Preconditions.checkState(currentNamespace != null, "No namespace set.");
- Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
-
- Map<N, Map<K, V>> namespaceMap =
- stateTable.get(backend.getCurrentKeyGroupIndex());
-
- if (namespaceMap == null) {
- return stateDesc.getDefaultValue();
- }
-
- Map<K, V> keyedMap = namespaceMap.get(currentNamespace);
-
- if (keyedMap == null) {
- return stateDesc.getDefaultValue();
- }
-
- V result = keyedMap.get(backend.<K>getCurrentKey());
+ final V result = stateTable.get(currentNamespace);
if (result == null) {
return stateDesc.getDefaultValue();
@@ -85,29 +63,12 @@ public class HeapValueState<K, N, V>
@Override
public void update(V value) {
- Preconditions.checkState(currentNamespace != null, "No namespace set.");
- Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
if (value == null) {
clear();
return;
}
- Map<N, Map<K, V>> namespaceMap =
- stateTable.get(backend.getCurrentKeyGroupIndex());
-
- if (namespaceMap == null) {
- namespaceMap = createNewMap();
- stateTable.set(backend.getCurrentKeyGroupIndex(), namespaceMap);
- }
-
- Map<K, V> keyedMap = namespaceMap.get(currentNamespace);
-
- if (keyedMap == null) {
- keyedMap = createNewMap();
- namespaceMap.put(currentNamespace, keyedMap);
- }
-
- keyedMap.put(backend.<K>getCurrentKey(), value);
+ stateTable.put(currentNamespace, value);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContext.java
new file mode 100644
index 0000000..cb0582b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContext.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyGroupRange;
+
+/**
+ * This interface is the current context of a keyed state. It provides information about the currently selected key in
+ * the context, the corresponding key-group, and other key and key-grouping related information.
+ * <p>
+ * The typical use case for this interface is providing a view on the current-key selection aspects of
+ * {@link org.apache.flink.runtime.state.KeyedStateBackend}.
+ */
+@Internal
+public interface InternalKeyContext<K> {
+
+ /**
+ * Used by states to access the current key.
+ */
+ K getCurrentKey();
+
+ /**
+ * Returns the key-group to which the current key belongs.
+ */
+ int getCurrentKeyGroupIndex();
+
+ /**
+ * Returns the number of key-groups aka max parallelism.
+ */
+ int getNumberOfKeyGroups();
+
+ /**
+ * Returns the key groups for this backend.
+ */
+ KeyGroupRange getKeyGroupRange();
+
+ /**
+ * {@link TypeSerializer} for the state backend key type.
+ */
+ TypeSerializer<K> getKeySerializer();
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
new file mode 100644
index 0000000..22f344d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This implementation of {@link StateTable} uses nested {@link HashMap} objects. It is also maintaining a partitioning
+ * by key-group.
+ * <p>
+ * In contrast to {@link CopyOnWriteStateTable}, this implementation does not support asynchronous snapshots. However,
+ * it might have a better memory footprint for some use-cases, e.g. it is naturally de-duplicating namespace objects.
+ *
+ * @param <K> type of key.
+ * @param <N> type of namespace.
+ * @param <S> type of state.
+ */
+@Internal
+public class NestedMapsStateTable<K, N, S> extends StateTable<K, N, S> {
+
+ /**
+ * Map for holding the actual state objects. The outer array represents the key-groups. The nested maps provide
+ * an outer scope by namespace and an inner scope by key.
+ */
+ private final Map<N, Map<K, S>>[] state;
+
+ /**
+ * The offset to the contiguous key groups
+ */
+ private final int keyGroupOffset;
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a new {@link NestedMapsStateTable} for the given key context and meta info.
+ *
+ * @param keyContext the key context.
+ * @param metaInfo the meta information for this state table.
+ */
+ public NestedMapsStateTable(InternalKeyContext<K> keyContext, RegisteredBackendStateMetaInfo<N, S> metaInfo) {
+ super(keyContext, metaInfo);
+ this.keyGroupOffset = keyContext.getKeyGroupRange().getStartKeyGroup();
+
+ @SuppressWarnings("unchecked")
+ Map<N, Map<K, S>>[] state = (Map<N, Map<K, S>>[]) new Map[keyContext.getNumberOfKeyGroups()];
+ this.state = state;
+ }
+
+ // ------------------------------------------------------------------------
+ // access to maps
+ // ------------------------------------------------------------------------
+
+ /**
+ * Returns the internal data structure.
+ */
+ @VisibleForTesting
+ public Map<N, Map<K, S>>[] getState() {
+ return state;
+ }
+
+ @VisibleForTesting
+ Map<N, Map<K, S>> getMapForKeyGroup(int keyGroupIndex) {
+ final int pos = indexToOffset(keyGroupIndex);
+ if (pos >= 0 && pos < state.length) {
+ return state[pos];
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Sets the given map for the given key-group.
+ */
+ private void setMapForKeyGroup(int keyGroupId, Map<N, Map<K, S>> map) {
+ try {
+ state[indexToOffset(keyGroupId)] = map;
+ } catch (ArrayIndexOutOfBoundsException e) {
+ throw new IllegalArgumentException("Key group index out of range of key group range [" +
+ keyGroupOffset + ", " + (keyGroupOffset + state.length) + ").");
+ }
+ }
+
+ /**
+ * Translates a key-group id to the internal array offset.
+ */
+ private int indexToOffset(int index) {
+ return index - keyGroupOffset;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public int size() {
+ int count = 0;
+ for (Map<N, Map<K, S>> namespaceMap : state) {
+ if (null != namespaceMap) {
+ for (Map<K, S> keyMap : namespaceMap.values()) {
+ if (null != keyMap) {
+ count += keyMap.size();
+ }
+ }
+ }
+ }
+ return count;
+ }
+
+ @Override
+ public S get(N namespace) {
+ return get(keyContext.getCurrentKey(), keyContext.getCurrentKeyGroupIndex(), namespace);
+ }
+
+ @Override
+ public boolean containsKey(N namespace) {
+ return containsKey(keyContext.getCurrentKey(), keyContext.getCurrentKeyGroupIndex(), namespace);
+ }
+
+ @Override
+ public void put(N namespace, S state) {
+ put(keyContext.getCurrentKey(), keyContext.getCurrentKeyGroupIndex(), namespace, state);
+ }
+
+ @Override
+ public S putAndGetOld(N namespace, S state) {
+ return putAndGetOld(keyContext.getCurrentKey(), keyContext.getCurrentKeyGroupIndex(), namespace, state);
+ }
+
+ @Override
+ public void remove(N namespace) {
+ remove(keyContext.getCurrentKey(), keyContext.getCurrentKeyGroupIndex(), namespace);
+ }
+
+ @Override
+ public S removeAndGetOld(N namespace) {
+ return removeAndGetOld(keyContext.getCurrentKey(), keyContext.getCurrentKeyGroupIndex(), namespace);
+ }
+
+ @Override
+ public S get(K key, N namespace) {
+ int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(key, keyContext.getNumberOfKeyGroups());
+ return get(key, keyGroup, namespace);
+ }
+
+ // ------------------------------------------------------------------------
+
+ private boolean containsKey(K key, int keyGroupIndex, N namespace) {
+
+ checkKeyNamespacePreconditions(key, namespace);
+
+ Map<N, Map<K, S>> namespaceMap = getMapForKeyGroup(keyGroupIndex);
+
+ if (namespaceMap == null) {
+ return false;
+ }
+
+ Map<K, S> keyedMap = namespaceMap.get(namespace);
+
+ return keyedMap != null && keyedMap.containsKey(key);
+ }
+
+ S get(K key, int keyGroupIndex, N namespace) {
+
+ checkKeyNamespacePreconditions(key, namespace);
+
+ Map<N, Map<K, S>> namespaceMap = getMapForKeyGroup(keyGroupIndex);
+
+ if (namespaceMap == null) {
+ return null;
+ }
+
+ Map<K, S> keyedMap = namespaceMap.get(namespace);
+
+ if (keyedMap == null) {
+ return null;
+ }
+
+ return keyedMap.get(key);
+ }
+
+ @Override
+ public void put(K key, int keyGroupIndex, N namespace, S value) {
+ putAndGetOld(key, keyGroupIndex, namespace, value);
+ }
+
+ private S putAndGetOld(K key, int keyGroupIndex, N namespace, S value) {
+
+ checkKeyNamespacePreconditions(key, namespace);
+
+ Map<N, Map<K, S>> namespaceMap = getMapForKeyGroup(keyGroupIndex);
+
+ if (namespaceMap == null) {
+ namespaceMap = new HashMap<>();
+ setMapForKeyGroup(keyGroupIndex, namespaceMap);
+ }
+
+ Map<K, S> keyedMap = namespaceMap.get(namespace);
+
+ if (keyedMap == null) {
+ keyedMap = new HashMap<>();
+ namespaceMap.put(namespace, keyedMap);
+ }
+
+ return keyedMap.put(key, value);
+ }
+
+ private void remove(K key, int keyGroupIndex, N namespace) {
+ removeAndGetOld(key, keyGroupIndex, namespace);
+ }
+
+ private S removeAndGetOld(K key, int keyGroupIndex, N namespace) {
+
+ checkKeyNamespacePreconditions(key, namespace);
+
+ Map<N, Map<K, S>> namespaceMap = getMapForKeyGroup(keyGroupIndex);
+
+ if (namespaceMap == null) {
+ return null;
+ }
+
+ Map<K, S> keyedMap = namespaceMap.get(namespace);
+
+ if (keyedMap == null) {
+ return null;
+ }
+
+ S removed = keyedMap.remove(key);
+
+ if (keyedMap.isEmpty()) {
+ namespaceMap.remove(namespace);
+ }
+
+ return removed;
+ }
+
+ private void checkKeyNamespacePreconditions(K key, N namespace) {
+ Preconditions.checkNotNull(key, "No key set. This method should not be called outside of a keyed context.");
+ Preconditions.checkNotNull(namespace, "Provided namespace is null.");
+ }
+
+ @Override
+ public int sizeOfNamespace(Object namespace) {
+ int count = 0;
+ for (Map<N, Map<K, S>> namespaceMap : state) {
+ if (null != namespaceMap) {
+ Map<K, S> keyMap = namespaceMap.get(namespace);
+ count += keyMap != null ? keyMap.size() : 0;
+ }
+ }
+
+ return count;
+ }
+
+ @Override
+ public <T> void transform(N namespace, T value, StateTransformationFunction<S, T> transformation) throws Exception {
+ final K key = keyContext.getCurrentKey();
+ checkKeyNamespacePreconditions(key, namespace);
+ final int keyGroupIndex = keyContext.getCurrentKeyGroupIndex();
+
+ Map<N, Map<K, S>> namespaceMap = getMapForKeyGroup(keyGroupIndex);
+
+ if (namespaceMap == null) {
+ namespaceMap = new HashMap<>();
+ setMapForKeyGroup(keyGroupIndex, namespaceMap);
+ }
+
+ Map<K, S> keyedMap = namespaceMap.get(namespace);
+
+ if (keyedMap == null) {
+ keyedMap = new HashMap<>();
+ namespaceMap.put(namespace, keyedMap);
+ }
+
+ keyedMap.put(key, transformation.apply(keyedMap.get(key), value));
+ }
+
+ // snapshots ---------------------------------------------------------------------------------------------------
+
+ private static <K, N, S> int countMappingsInKeyGroup(final Map<N, Map<K, S>> keyGroupMap) {
+ int count = 0;
+ for (Map<K, S> namespaceMap : keyGroupMap.values()) {
+ count += namespaceMap.size();
+ }
+
+ return count;
+ }
+
+ @Override
+ public NestedMapsStateTableSnapshot<K, N, S> createSnapshot() {
+ return new NestedMapsStateTableSnapshot<>(this);
+ }
+
+ /**
+ * This class encapsulates the snapshot logic.
+ *
+ * @param <K> type of key.
+ * @param <N> type of namespace.
+ * @param <S> type of state.
+ */
+ static class NestedMapsStateTableSnapshot<K, N, S>
+ extends AbstractStateTableSnapshot<K, N, S, NestedMapsStateTable<K, N, S>> {
+
+ NestedMapsStateTableSnapshot(NestedMapsStateTable<K, N, S> owningTable) {
+ super(owningTable);
+ }
+
+ /**
+ * Implementation note: we currently chose the same format between {@link NestedMapsStateTable} and
+ * {@link CopyOnWriteStateTable}.
+ * <p>
+ * {@link NestedMapsStateTable} could naturally support a kind of
+ * prefix-compressed format (grouping by namespace, writing the namespace only once per group instead for each
+ * mapping). We might implement support for different formats later (tailored towards different state table
+ * implementations).
+ */
+ @Override
+ public void writeMappingsInKeyGroup(DataOutputView dov, int keyGroupId) throws IOException {
+ final Map<N, Map<K, S>> keyGroupMap = owningStateTable.getMapForKeyGroup(keyGroupId);
+ if (null != keyGroupMap) {
+ TypeSerializer<K> keySerializer = owningStateTable.keyContext.getKeySerializer();
+ TypeSerializer<N> namespaceSerializer = owningStateTable.metaInfo.getNamespaceSerializer();
+ TypeSerializer<S> stateSerializer = owningStateTable.metaInfo.getStateSerializer();
+ dov.writeInt(countMappingsInKeyGroup(keyGroupMap));
+ for (Map.Entry<N, Map<K, S>> namespaceEntry : keyGroupMap.entrySet()) {
+ final N namespace = namespaceEntry.getKey();
+ final Map<K, S> namespaceMap = namespaceEntry.getValue();
+
+ for (Map.Entry<K, S> keyEntry : namespaceMap.entrySet()) {
+ namespaceSerializer.serialize(namespace, dov);
+ keySerializer.serialize(keyEntry.getKey(), dov);
+ stateSerializer.serialize(keyEntry.getValue(), dov);
+ }
+ }
+ } else {
+ dov.writeInt(0);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateEntry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateEntry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateEntry.java
new file mode 100644
index 0000000..8e29cb2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateEntry.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+/**
+ * Interface of entries in a state table. Entries are triple of key, namespace, and state.
+ *
+ * @param <K> type of key.
+ * @param <N> type of namespace.
+ * @param <S> type of state.
+ */
+public interface StateEntry<K, N, S> {
+
+ /**
+ * Returns the key of this entry.
+ */
+ K getKey();
+
+ /**
+ * Returns the namespace of this entry.
+ */
+ N getNamespace();
+
+ /**
+ * Returns the state of this entry.
+ */
+ S getState();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
index 21265f4..62fc869 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,72 +15,152 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.state.heap;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
-import org.apache.flink.runtime.state.KeyGroupRange;
-
-import java.util.Map;
-
-public class StateTable<K, N, ST> {
-
- /** Map for holding the actual state objects. */
- private final Map<N, Map<K, ST>>[] state;
-
- /** The offset to the contiguous key groups */
- private final int keyGroupOffset;
-
- /** Combined meta information such as name and serializers for this state */
- private RegisteredBackendStateMetaInfo<N, ST> metaInfo;
-
- // ------------------------------------------------------------------------
- public StateTable(RegisteredBackendStateMetaInfo<N, ST> metaInfo, KeyGroupRange keyGroupRange) {
- this.metaInfo = metaInfo;
- this.keyGroupOffset = keyGroupRange.getStartKeyGroup();
-
- @SuppressWarnings("unchecked")
- Map<N, Map<K, ST>>[] state = (Map<N, Map<K, ST>>[]) new Map[keyGroupRange.getNumberOfKeyGroups()];
- this.state = state;
- }
-
- // ------------------------------------------------------------------------
- // access to maps
- // ------------------------------------------------------------------------
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.util.Preconditions;
- public Map<N, Map<K, ST>>[] getState() {
- return state;
- }
-
- public Map<N, Map<K, ST>> get(int index) {
- final int pos = indexToOffset(index);
- if (pos >= 0 && pos < state.length) {
- return state[pos];
- } else {
- return null;
- }
- }
-
- public void set(int index, Map<N, Map<K, ST>> map) {
- try {
- state[indexToOffset(index)] = map;
- }
- catch (ArrayIndexOutOfBoundsException e) {
- throw new IllegalArgumentException("Key group index out of range of key group range [" +
- keyGroupOffset + ", " + (keyGroupOffset + state.length) + ").");
- }
+/**
+ * Base class for state tables. Accesses to state are typically scoped by the currently active key, as provided
+ * through the {@link InternalKeyContext}.
+ *
+ * @param <K> type of key
+ * @param <N> type of namespace
+ * @param <S> type of state
+ */
+public abstract class StateTable<K, N, S> {
+
+ /**
+ * The key context view on the backend. This provides information, such as the currently active key.
+ */
+ protected final InternalKeyContext<K> keyContext;
+
+ /**
+ * Combined meta information such as name and serializers for this state
+ */
+ protected RegisteredBackendStateMetaInfo<N, S> metaInfo;
+
+ /**
+ *
+ * @param keyContext the key context provides the key scope for all put/get/delete operations.
+ * @param metaInfo the meta information, including the type serializer for state copy-on-write.
+ */
+ public StateTable(InternalKeyContext<K> keyContext, RegisteredBackendStateMetaInfo<N, S> metaInfo) {
+ this.keyContext = Preconditions.checkNotNull(keyContext);
+ this.metaInfo = Preconditions.checkNotNull(metaInfo);
}
- private int indexToOffset(int index) {
- return index - keyGroupOffset;
+ // Main interface methods of StateTable -------------------------------------------------------
+
+ /**
+ * Returns whether this {@link NestedMapsStateTable} is empty.
+ *
+ * @return {@code true} if this {@link NestedMapsStateTable} has no elements, {@code false}
+ * otherwise.
+ * @see #size()
+ */
+ public boolean isEmpty() {
+ return size() == 0;
}
- // ------------------------------------------------------------------------
- // metadata
- // ------------------------------------------------------------------------
-
- public TypeSerializer<ST> getStateSerializer() {
+ /**
+ * Returns the total number of entries in this {@link NestedMapsStateTable}. This is the sum of both sub-tables.
+ *
+ * @return the number of entries in this {@link NestedMapsStateTable}.
+ */
+ public abstract int size();
+
+ /**
+ * Returns the state of the mapping for the composite of active key and given namespace.
+ *
+ * @param namespace the namespace. Not null.
+ * @return the states of the mapping with the specified key/namespace composite key, or {@code null}
+ * if no mapping for the specified key is found.
+ */
+ public abstract S get(N namespace);
+
+ /**
+ * Returns whether this table contains a mapping for the composite of active key and given namespace.
+ *
+ * @param namespace the namespace in the composite key to search for. Not null.
+ * @return {@code true} if this map contains the specified key/namespace composite key,
+ * {@code false} otherwise.
+ */
+ public abstract boolean containsKey(N namespace);
+
+ /**
+ * Maps the composite of active key and given namespace to the specified state. This method should be preferred
+ * over {@link #putAndGetOld(N, S)} (Namespace, State)} when the caller is not interested in the old state.
+ *
+ * @param namespace the namespace. Not null.
+ * @param state the state. Can be null.
+ */
+ public abstract void put(N namespace, S state);
+
+ /**
+ * Maps the composite of active key and given namespace to the specified state. Returns the previous state that
+ * was registered under the composite key.
+ *
+ * @param namespace the namespace. Not null.
+ * @param state the state. Can be null.
+ * @return the state of any previous mapping with the specified key or
+ * {@code null} if there was no such mapping.
+ */
+ public abstract S putAndGetOld(N namespace, S state);
+
+ /**
+ * Removes the mapping for the composite of active key and given namespace. This method should be preferred
+ * over {@link #removeAndGetOld(N)} when the caller is not interested in the old state.
+ *
+ * @param namespace the namespace of the mapping to remove. Not null.
+ */
+ public abstract void remove(N namespace);
+
+ /**
+ * Removes the mapping for the composite of active key and given namespace, returning the state that was
+ * found under the entry.
+ *
+ * @param namespace the namespace of the mapping to remove. Not null.
+ * @return the state of the removed mapping or {@code null} if no mapping
+ * for the specified key was found.
+ */
+ public abstract S removeAndGetOld(N namespace);
+
+ /**
+ * Applies the given {@link StateTransformationFunction} to the state (1st input argument), using the given value as
+ * second input argument. The result of {@link StateTransformationFunction#apply(Object, Object)} is then stored as
+ * the new state. This function is basically an optimization for get-update-put pattern.
+ *
+ * @param namespace the namespace. Not null.
+ * @param value the value to use in transforming the state. Can be null.
+ * @param transformation the transformation function.
+ * @throws Exception if some exception happens in the transformation function.
+ */
+ public abstract <T> void transform(
+ N namespace,
+ T value,
+ StateTransformationFunction<S, T> transformation) throws Exception;
+
+ // For queryable state ------------------------------------------------------------------------
+
+ /**
+ * Returns the state for the composite of active key and given namespace. This is typically used by
+ * queryable state.
+ *
+ * @param key the key. Not null.
+ * @param namespace the namespace. Not null.
+ * @return the state of the mapping with the specified key/namespace composite key, or {@code null}
+ * if no mapping for the specified key is found.
+ */
+ public abstract S get(K key, N namespace);
+
+ // Meta data setter / getter and toString -----------------------------------------------------
+
+ public TypeSerializer<S> getStateSerializer() {
return metaInfo.getStateSerializer();
}
@@ -88,28 +168,22 @@ public class StateTable<K, N, ST> {
return metaInfo.getNamespaceSerializer();
}
- public RegisteredBackendStateMetaInfo<N, ST> getMetaInfo() {
+ public RegisteredBackendStateMetaInfo<N, S> getMetaInfo() {
return metaInfo;
}
- public void setMetaInfo(RegisteredBackendStateMetaInfo<N, ST> metaInfo) {
+ public void setMetaInfo(RegisteredBackendStateMetaInfo<N, S> metaInfo) {
this.metaInfo = metaInfo;
}
- // ------------------------------------------------------------------------
- // for testing
- // ------------------------------------------------------------------------
+ // Snapshot / Restore -------------------------------------------------------------------------
+
+ abstract StateTableSnapshot createSnapshot();
+
+ public abstract void put(K key, int keyGroup, N namespace, S state);
+
+ // For testing --------------------------------------------------------------------------------
@VisibleForTesting
- boolean isEmpty() {
- for (Map<N, Map<K, ST>> map : state) {
- if (map != null) {
- if (!map.isEmpty()) {
- return false;
- }
- }
- }
-
- return true;
- }
-}
+ public abstract int sizeOfNamespace(Object namespace);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReader.java
new file mode 100644
index 0000000..659c174
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReader.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.core.memory.DataInputView;
+
+import java.io.IOException;
+
+/**
+ * Interface for state de-serialization into {@link StateTable}s by key-group.
+ */
+interface StateTableByKeyGroupReader {
+
+ /**
+ * Read the data for the specified key-group from the input.
+ *
+ * @param div the input
+ * @param keyGroupId the key-group to write
+ * @throws IOException on write related problems
+ */
+ void readMappingsInKeyGroup(DataInputView div, int keyGroupId) throws IOException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
new file mode 100644
index 0000000..53ec349
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+
+import java.io.IOException;
+
+/**
+ * This class provides a static factory method to create different implementations of {@link StateTableByKeyGroupReader}
+ * depending on the provided serialization format version.
+ * <p>
+ * The implementations are also located here as inner classes.
+ */
+class StateTableByKeyGroupReaders {
+
+ /**
+ * Creates a new StateTableByKeyGroupReader that inserts de-serialized mappings into the given table, using the
+ * de-serialization algorithm that matches the given version.
+ *
+ * @param table the {@link StateTable} into which de-serialized mappings are inserted.
+ * @param version version for the de-serialization algorithm.
+ * @param <K> type of key.
+ * @param <N> type of namespace.
+ * @param <S> type of state.
+ * @return the appropriate reader.
+ */
+ static <K, N, S> StateTableByKeyGroupReader readerForVersion(StateTable<K, N, S> table, int version) {
+ switch (version) {
+ case 1:
+ return new StateTableByKeyGroupReaderV1<>(table);
+ case 2:
+ return new StateTableByKeyGroupReaderV2<>(table);
+ default:
+ throw new IllegalArgumentException("Unknown version: " + version);
+ }
+ }
+
+ static abstract class AbstractStateTableByKeyGroupReader<K, N, S>
+ implements StateTableByKeyGroupReader {
+
+ protected final StateTable<K, N, S> stateTable;
+
+ AbstractStateTableByKeyGroupReader(StateTable<K, N, S> stateTable) {
+ this.stateTable = stateTable;
+ }
+
+ @Override
+ public abstract void readMappingsInKeyGroup(DataInputView div, int keyGroupId) throws IOException;
+
+ protected TypeSerializer<K> getKeySerializer() {
+ return stateTable.keyContext.getKeySerializer();
+ }
+
+ protected TypeSerializer<N> getNamespaceSerializer() {
+ return stateTable.getNamespaceSerializer();
+ }
+
+ protected TypeSerializer<S> getStateSerializer() {
+ return stateTable.getStateSerializer();
+ }
+ }
+
+ static final class StateTableByKeyGroupReaderV1<K, N, S>
+ extends AbstractStateTableByKeyGroupReader<K, N, S> {
+
+ StateTableByKeyGroupReaderV1(StateTable<K, N, S> stateTable) {
+ super(stateTable);
+ }
+
+ @Override
+ public void readMappingsInKeyGroup(DataInputView inView, int keyGroupId) throws IOException {
+
+ if (inView.readByte() == 0) {
+ return;
+ }
+
+ final TypeSerializer<K> keySerializer = getKeySerializer();
+ final TypeSerializer<N> namespaceSerializer = getNamespaceSerializer();
+ final TypeSerializer<S> stateSerializer = getStateSerializer();
+
+ // V1 uses kind of namespace compressing format
+ int numNamespaces = inView.readInt();
+ for (int k = 0; k < numNamespaces; k++) {
+ N namespace = namespaceSerializer.deserialize(inView);
+ int numEntries = inView.readInt();
+ for (int l = 0; l < numEntries; l++) {
+ K key = keySerializer.deserialize(inView);
+ S state = stateSerializer.deserialize(inView);
+ stateTable.put(key, keyGroupId, namespace, state);
+ }
+ }
+ }
+ }
+
+ private static final class StateTableByKeyGroupReaderV2<K, N, S>
+ extends AbstractStateTableByKeyGroupReader<K, N, S> {
+
+ StateTableByKeyGroupReaderV2(StateTable<K, N, S> stateTable) {
+ super(stateTable);
+ }
+
+ @Override
+ public void readMappingsInKeyGroup(DataInputView inView, int keyGroupId) throws IOException {
+
+ final TypeSerializer<K> keySerializer = getKeySerializer();
+ final TypeSerializer<N> namespaceSerializer = getNamespaceSerializer();
+ final TypeSerializer<S> stateSerializer = getStateSerializer();
+
+ int numKeys = inView.readInt();
+ for (int i = 0; i < numKeys; ++i) {
+ N namespace = namespaceSerializer.deserialize(inView);
+ K key = keySerializer.deserialize(inView);
+ S state = stateSerializer.deserialize(inView);
+ stateTable.put(key, keyGroupId, namespace, state);
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableSnapshot.java
new file mode 100644
index 0000000..d4244d7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableSnapshot.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/**
+ * Interface for the snapshots of a {@link StateTable}. Offers a way to serialize the snapshot (by key-group). All
+ * snapshots should be released after usage.
+ */
+interface StateTableSnapshot {
+
+ /**
+ * Writes the data for the specified key-group to the output.
+ *
+ * @param dov the output
+ * @param keyGroupId the key-group to write
+ * @throws IOException on write related problems
+ */
+ void writeMappingsInKeyGroup(DataOutputView dov, int keyGroupId) throws IOException;
+
+ /**
+ * Release the snapshot. All snapshots should be released when they are no longer used because some implementation
+ * can only release resources after a release.
+ */
+ void release();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index da01c09..f0bac1b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -45,6 +45,9 @@ public class MemoryStateBackend extends AbstractStateBackend {
/** The maximal size that the snapshotted memory state may have */
private final int maxStateSize;
+ /** Switch to chose between synchronous and asynchronous snapshots */
+ private final boolean asynchronousSnapshots;
+
/**
* Creates a new memory state backend that accepts states whose serialized forms are
* up to the default state size (5 MB).
@@ -60,7 +63,29 @@ public class MemoryStateBackend extends AbstractStateBackend {
* @param maxStateSize The maximal size of the serialized state
*/
public MemoryStateBackend(int maxStateSize) {
+ this(maxStateSize, false);
+ }
+
+ /**
+ * Creates a new memory state backend that accepts states whose serialized forms are
+ * up to the default state size (5 MB).
+ *
+ * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
+ */
+ public MemoryStateBackend(boolean asynchronousSnapshots) {
+ this(DEFAULT_MAX_STATE_SIZE, asynchronousSnapshots);
+ }
+
+ /**
+ * Creates a new memory state backend that accepts states whose serialized forms are
+ * up to the given number of bytes.
+ *
+ * @param maxStateSize The maximal size of the serialized state
+ * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
+ */
+ public MemoryStateBackend(int maxStateSize, boolean asynchronousSnapshots) {
this.maxStateSize = maxStateSize;
+ this.asynchronousSnapshots = asynchronousSnapshots;
}
@Override
@@ -98,6 +123,7 @@ public class MemoryStateBackend extends AbstractStateBackend {
env.getUserClassLoader(),
numberOfKeyGroups,
keyGroupRange,
+ asynchronousSnapshots,
env.getExecutionConfig());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
index 2c385c1..b4d6eb7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
@@ -33,12 +33,12 @@ import org.apache.flink.runtime.query.netty.KvStateServer;
import org.apache.flink.runtime.query.netty.UnknownKvStateID;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.heap.HeapValueState;
-import org.apache.flink.runtime.state.heap.StateTable;
+import org.apache.flink.runtime.state.heap.NestedMapsStateTable;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.util.MathUtils;
import org.junit.AfterClass;
@@ -278,9 +278,8 @@ public class QueryableStateClientTest {
// Register state
HeapValueState<Integer, VoidNamespace, Integer> kvState = new HeapValueState<>(
- keyedStateBackend,
descriptor,
- new StateTable<Integer, VoidNamespace, Integer>(registeredBackendStateMetaInfo, new KeyGroupRange(0, 1)),
+ new NestedMapsStateTable<Integer, VoidNamespace, Integer>(keyedStateBackend, registeredBackendStateMetaInfo),
IntSerializer.INSTANCE,
VoidNamespaceSerializer.INSTANCE);
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
index 93094a4..4ed63a2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
@@ -21,12 +21,10 @@ package org.apache.flink.runtime.query.netty.message;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
-
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.ByteSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -38,13 +36,16 @@ import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.internal.InternalListState;
-
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -54,10 +55,19 @@ import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
+@RunWith(Parameterized.class)
public class KvStateRequestSerializerTest {
private final ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT;
+ @Parameterized.Parameters
+ public static Collection<Boolean> parameters() {
+ return Arrays.asList(false, true);
+ }
+
+ @Parameterized.Parameter
+ public boolean async;
+
/**
* Tests KvState request serialization.
*/
@@ -332,7 +342,9 @@ public class KvStateRequestSerializerTest {
mock(TaskKvStateRegistry.class),
LongSerializer.INSTANCE,
ClassLoader.getSystemClassLoader(),
- 1, new KeyGroupRange(0, 0),
+ 1,
+ new KeyGroupRange(0, 0),
+ async,
new ExecutionConfig()
);
longHeapKeyedStateBackend.setCurrentKey(key);
@@ -418,7 +430,7 @@ public class KvStateRequestSerializerTest {
KvStateRequestSerializer.deserializeList(new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2, 3},
LongSerializer.INSTANCE);
}
-
+
/**
* Tests map serialization utils.
*/
@@ -429,11 +441,13 @@ public class KvStateRequestSerializerTest {
// objects for heap state list serialisation
final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend =
new HeapKeyedStateBackend<>(
- mock(TaskKvStateRegistry.class),
- LongSerializer.INSTANCE,
- ClassLoader.getSystemClassLoader(),
- 1, new KeyGroupRange(0, 0),
- new ExecutionConfig()
+ mock(TaskKvStateRegistry.class),
+ LongSerializer.INSTANCE,
+ ClassLoader.getSystemClassLoader(),
+ 1,
+ new KeyGroupRange(0, 0),
+ async,
+ new ExecutionConfig()
);
longHeapKeyedStateBackend.setCurrentKey(key);
@@ -481,7 +495,7 @@ public class KvStateRequestSerializerTest {
KvStateRequestSerializer.serializeKeyAndNamespace(
key, LongSerializer.INSTANCE,
VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
-
+
final byte[] serializedValues = mapState.getSerializedValue(serializedKey);
Map<Long, String> actualValues = KvStateRequestSerializer.deserializeMap(serializedValues, userKeySerializer, userValueSerializer);
@@ -534,7 +548,7 @@ public class KvStateRequestSerializerTest {
KvStateRequestSerializer.deserializeMap(new byte[]{1, 1, 1, 1, 1, 1, 1, 1, 0},
LongSerializer.INSTANCE, LongSerializer.INSTANCE);
}
-
+
/**
* Tests map deserialization with too few bytes.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/AsyncFileStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/AsyncFileStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AsyncFileStateBackendTest.java
new file mode 100644
index 0000000..dd73e42
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AsyncFileStateBackendTest.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+public class AsyncFileStateBackendTest extends FileStateBackendTest {
+
+ @Override
+ protected boolean useAsyncMode() {
+ return true;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/AsyncMemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/AsyncMemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AsyncMemoryStateBackendTest.java
new file mode 100644
index 0000000..ba4a89d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AsyncMemoryStateBackendTest.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+public class AsyncMemoryStateBackendTest extends MemoryStateBackendTest {
+
+ @Override
+ protected boolean useAsyncMode() {
+ return true;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
index 75014e7..6be2343 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -50,7 +51,11 @@ public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> {
@Override
protected FsStateBackend getStateBackend() throws Exception {
File checkpointPath = tempFolder.newFolder();
- return new FsStateBackend(localFileUri(checkpointPath));
+ return new FsStateBackend(localFileUri(checkpointPath), useAsyncMode());
+ }
+
+ protected boolean useAsyncMode() {
+ return false;
}
// disable these because the verification does not work for this state backend
@@ -208,6 +213,7 @@ public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> {
}
}
+ @Ignore
@Test
public void testConcurrentMapIfQueryable() throws Exception {
super.testConcurrentMapIfQueryable();
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
index 362fcd6..48d56e2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
@@ -44,7 +45,11 @@ public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBack
@Override
protected MemoryStateBackend getStateBackend() throws Exception {
- return new MemoryStateBackend();
+ return new MemoryStateBackend(useAsyncMode());
+ }
+
+ protected boolean useAsyncMode() {
+ return false;
}
// disable these because the verification does not work for this state backend
@@ -193,6 +198,7 @@ public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBack
}
}
+ @Ignore
@Test
public void testConcurrentMapIfQueryable() throws Exception {
super.testConcurrentMapIfQueryable();
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 40ac72c..331c6bd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -42,6 +42,7 @@ import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -50,10 +51,15 @@ import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateRegistryListener;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.state.heap.AbstractHeapState;
+import org.apache.flink.runtime.state.heap.NestedMapsStateTable;
import org.apache.flink.runtime.state.heap.StateTable;
import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.internal.InternalValueState;
+import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.types.IntValue;
+import org.apache.flink.util.IOUtils;
import org.apache.flink.util.TestLogger;
+import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
@@ -67,6 +73,7 @@ import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.RunnableFuture;
import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -791,7 +798,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
fail(e.getMessage());
}
}
-
+
@Test
@SuppressWarnings("unchecked,rawtypes")
public void testMapState() {
@@ -823,7 +830,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
backend.setCurrentKey(1);
assertTrue(state.contains(1));
assertEquals("1", state.get(1));
- assertEquals(new HashMap<Integer, String>() {{ put (1, "1"); }},
+ assertEquals(new HashMap<Integer, String>() {{ put (1, "1"); }},
getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
// draw a snapshot
@@ -848,12 +855,12 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
backend.setCurrentKey(2);
assertEquals("102", state.get(102));
- assertEquals(new HashMap<Integer, String>() {{ put(2, "2"); put(102, "102"); }},
+ assertEquals(new HashMap<Integer, String>() {{ put(2, "2"); put(102, "102"); }},
getSerializedMap(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
backend.setCurrentKey(3);
assertTrue(state.contains(103));
assertEquals("103", state.get(103));
- assertEquals(new HashMap<Integer, String>() {{ put(103, "103"); put(1031, "1031"); put(1032, "1032"); }},
+ assertEquals(new HashMap<Integer, String>() {{ put(103, "103"); put(1031, "1031"); put(1032, "1032"); }},
getSerializedMap(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
List<Integer> keys = new ArrayList<>();
@@ -912,11 +919,11 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
backend.setCurrentKey(1);
assertEquals("1", restored1.get(1));
- assertEquals(new HashMap<Integer, String>() {{ put (1, "1"); }},
+ assertEquals(new HashMap<Integer, String>() {{ put (1, "1"); }},
getSerializedMap(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
backend.setCurrentKey(2);
assertEquals("2", restored1.get(2));
- assertEquals(new HashMap<Integer, String>() {{ put (2, "2"); }},
+ assertEquals(new HashMap<Integer, String>() {{ put (2, "2"); }},
getSerializedMap(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
backend.dispose();
@@ -931,15 +938,15 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
backend.setCurrentKey(1);
assertEquals("101", restored2.get(1));
- assertEquals(new HashMap<Integer, String>() {{ put (1, "101"); }},
+ assertEquals(new HashMap<Integer, String>() {{ put (1, "101"); }},
getSerializedMap(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
backend.setCurrentKey(2);
assertEquals("102", restored2.get(102));
- assertEquals(new HashMap<Integer, String>() {{ put(2, "2"); put (102, "102"); }},
+ assertEquals(new HashMap<Integer, String>() {{ put(2, "2"); put (102, "102"); }},
getSerializedMap(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
backend.setCurrentKey(3);
assertEquals("103", restored2.get(103));
- assertEquals(new HashMap<Integer, String>() {{ put(103, "103"); put(1031, "1031"); put(1032, "1032"); }},
+ assertEquals(new HashMap<Integer, String>() {{ put(103, "103"); put(1031, "1031"); put(1032, "1032"); }},
getSerializedMap(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
backend.dispose();
@@ -1111,7 +1118,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
backend.dispose();
}
-
+
/**
* This test verifies that state is correctly assigned to key groups and that restore
* restores the relevant key groups in the backend.
@@ -1364,7 +1371,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
fail(e.getMessage());
}
}
-
+
@Test
@SuppressWarnings("unchecked")
public void testMapStateRestoreWithWrongSerializers() {
@@ -1507,11 +1514,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
backend.setCurrentKey(1);
state.update(121818273);
- int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(1, numberOfKeyGroups);
- StateTable stateTable = ((AbstractHeapState) kvState).getStateTable();
- assertNotNull("State not set", stateTable.get(keyGroupIndex));
- assertTrue(stateTable.get(keyGroupIndex) instanceof ConcurrentHashMap);
- assertTrue(stateTable.get(keyGroupIndex).get(VoidNamespace.INSTANCE) instanceof ConcurrentHashMap);
+ StateTable<?, ?, ?> stateTable = ((AbstractHeapState<?, ?,? ,?, ?>) kvState).getStateTable();
+ checkConcurrentStateTable(stateTable, numberOfKeyGroups);
}
@@ -1533,11 +1537,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
backend.setCurrentKey(1);
state.add(121818273);
- int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(1, numberOfKeyGroups);
- StateTable stateTable = ((AbstractHeapState) kvState).getStateTable();
- assertNotNull("State not set", stateTable.get(keyGroupIndex));
- assertTrue(stateTable.get(keyGroupIndex) instanceof ConcurrentHashMap);
- assertTrue(stateTable.get(keyGroupIndex).get(VoidNamespace.INSTANCE) instanceof ConcurrentHashMap);
+ StateTable<?, ?, ?> stateTable = ((AbstractHeapState<?, ?,? ,?, ?>) kvState).getStateTable();
+ checkConcurrentStateTable(stateTable, numberOfKeyGroups);
}
{
@@ -1564,11 +1565,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
backend.setCurrentKey(1);
state.add(121818273);
- int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(1, numberOfKeyGroups);
- StateTable stateTable = ((AbstractHeapState) kvState).getStateTable();
- assertNotNull("State not set", stateTable.get(keyGroupIndex));
- assertTrue(stateTable.get(keyGroupIndex) instanceof ConcurrentHashMap);
- assertTrue(stateTable.get(keyGroupIndex).get(VoidNamespace.INSTANCE) instanceof ConcurrentHashMap);
+ StateTable<?, ?, ?> stateTable = ((AbstractHeapState<?, ?,? ,?, ?>) kvState).getStateTable();
+ checkConcurrentStateTable(stateTable, numberOfKeyGroups);
}
{
@@ -1595,13 +1593,10 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
backend.setCurrentKey(1);
state.add(121818273);
- int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(1, numberOfKeyGroups);
- StateTable stateTable = ((AbstractHeapState) kvState).getStateTable();
- assertNotNull("State not set", stateTable.get(keyGroupIndex));
- assertTrue(stateTable.get(keyGroupIndex) instanceof ConcurrentHashMap);
- assertTrue(stateTable.get(keyGroupIndex).get(VoidNamespace.INSTANCE) instanceof ConcurrentHashMap);
+ StateTable<?, ?, ?> stateTable = ((AbstractHeapState<?, ?,? ,?, ?>) kvState).getStateTable();
+ checkConcurrentStateTable(stateTable, numberOfKeyGroups);
}
-
+
{
// MapState
MapStateDescriptor<Integer, String> desc = new MapStateDescriptor<>("map-state", Integer.class, String.class);
@@ -1623,13 +1618,22 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(1, numberOfKeyGroups);
StateTable stateTable = ((AbstractHeapState) kvState).getStateTable();
assertNotNull("State not set", stateTable.get(keyGroupIndex));
- assertTrue(stateTable.get(keyGroupIndex) instanceof ConcurrentHashMap);
- assertTrue(stateTable.get(keyGroupIndex).get(VoidNamespace.INSTANCE) instanceof ConcurrentHashMap);
+ checkConcurrentStateTable(stateTable, numberOfKeyGroups);
}
backend.dispose();
}
+ private void checkConcurrentStateTable(StateTable<?, ?, ?> stateTable, int numberOfKeyGroups) {
+ assertNotNull("State not set", stateTable);
+ if (stateTable instanceof NestedMapsStateTable) {
+ int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(1, numberOfKeyGroups);
+ NestedMapsStateTable<?, ?, ?> nestedMapsStateTable = (NestedMapsStateTable<?, ?, ?>) stateTable;
+ assertTrue(nestedMapsStateTable.getState()[keyGroupIndex] instanceof ConcurrentHashMap);
+ assertTrue(nestedMapsStateTable.getState()[keyGroupIndex].get(VoidNamespace.INSTANCE) instanceof ConcurrentHashMap);
+ }
+ }
+
/**
* Tests registration with the KvStateRegistry.
*/
@@ -1688,7 +1692,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", String.class);
// draw a snapshot
- KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(682375462379L, 1, streamFactory, CheckpointOptions.forFullCheckpoint()));
+ KeyGroupsStateHandle snapshot =
+ runSnapshot(backend.snapshot(682375462379L, 1, streamFactory, CheckpointOptions.forFullCheckpoint()));
assertNull(snapshot);
backend.dispose();
@@ -1708,6 +1713,145 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
}
}
+ @Test
+ public void testAsyncSnapshot() throws Exception {
+ OneShotLatch waiter = new OneShotLatch();
+ BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(1024 * 1024);
+ streamFactory.setWaiterLatch(waiter);
+
+ AbstractKeyedStateBackend<Integer> backend = null;
+ KeyGroupsStateHandle stateHandle = null;
+
+ try {
+ backend = createKeyedBackend(IntSerializer.INSTANCE);
+ InternalValueState<VoidNamespace, Integer> valueState = backend.createValueState(
+ VoidNamespaceSerializer.INSTANCE,
+ new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
+
+ valueState.setCurrentNamespace(VoidNamespace.INSTANCE);
+
+ for (int i = 0; i < 10; ++i) {
+ backend.setCurrentKey(i);
+ valueState.update(i);
+ }
+
+ RunnableFuture<KeyGroupsStateHandle> snapshot =
+ backend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forFullCheckpoint());
+ Thread runner = new Thread(snapshot);
+ runner.start();
+ for (int i = 0; i < 20; ++i) {
+ backend.setCurrentKey(i);
+ valueState.update(i + 1);
+ if (10 == i) {
+ waiter.await();
+ }
+ }
+
+ runner.join();
+ stateHandle = snapshot.get();
+
+ // test isolation
+ for (int i = 0; i < 20; ++i) {
+ backend.setCurrentKey(i);
+ Assert.assertEquals(i + 1, (int) valueState.value());
+ }
+
+ } finally {
+ if (null != backend) {
+ IOUtils.closeQuietly(backend);
+ backend.dispose();
+ }
+ }
+
+ Assert.assertNotNull(stateHandle);
+
+ backend = createKeyedBackend(IntSerializer.INSTANCE);
+ try {
+ backend.restore(Collections.singleton(stateHandle));
+ InternalValueState<VoidNamespace, Integer> valueState = backend.createValueState(
+ VoidNamespaceSerializer.INSTANCE,
+ new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
+
+ valueState.setCurrentNamespace(VoidNamespace.INSTANCE);
+
+ for (int i = 0; i < 10; ++i) {
+ backend.setCurrentKey(i);
+ Assert.assertEquals(i, (int) valueState.value());
+ }
+
+ backend.setCurrentKey(11);
+ Assert.assertEquals(null, valueState.value());
+ } finally {
+ if (null != backend) {
+ IOUtils.closeQuietly(backend);
+ backend.dispose();
+ }
+ }
+ }
+
+ @Test
+ public void testAsyncSnapshotCancellation() throws Exception {
+ OneShotLatch blocker = new OneShotLatch();
+ OneShotLatch waiter = new OneShotLatch();
+ BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(1024 * 1024);
+ streamFactory.setWaiterLatch(waiter);
+ streamFactory.setBlockerLatch(blocker);
+ streamFactory.setAfterNumberInvocations(100);
+
+ AbstractKeyedStateBackend<Integer> backend = null;
+ try {
+ backend = createKeyedBackend(IntSerializer.INSTANCE);
+
+ if (!backend.supportsAsynchronousSnapshots()) {
+ return;
+ }
+
+ InternalValueState<VoidNamespace, Integer> valueState = backend.createValueState(
+ VoidNamespaceSerializer.INSTANCE,
+ new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
+
+ valueState.setCurrentNamespace(VoidNamespace.INSTANCE);
+
+ for (int i = 0; i < 10; ++i) {
+ backend.setCurrentKey(i);
+ valueState.update(i);
+ }
+
+ RunnableFuture<KeyGroupsStateHandle> snapshot =
+ backend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forFullCheckpoint());
+
+ Thread runner = new Thread(snapshot);
+ runner.start();
+
+ // wait until the code reached some stream read
+ waiter.await();
+
+ // close the backend to see if the close is propagated to the stream
+ backend.close();
+
+ //unblock the stream so that it can run into the IOException
+ blocker.trigger();
+
+ //dispose the backend
+ backend.dispose();
+
+ runner.join();
+
+ try {
+ snapshot.get();
+ fail("Close was not propagated.");
+ } catch (ExecutionException ex) {
+ //ignore
+ }
+
+ } finally {
+ if (null != backend) {
+ IOUtils.closeQuietly(backend);
+ backend.dispose();
+ }
+ }
+ }
+
private static class AppendingFold implements FoldFunction<Integer, String> {
private static final long serialVersionUID = 1L;
@@ -1764,7 +1908,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
return KvStateRequestSerializer.deserializeList(serializedValue, valueSerializer);
}
}
-
+
/**
* Returns the value by getting the serialized value and deserializing it
* if it is not null.
[3/4] flink git commit: [FLINK-5715] Asynchronous snapshots for
heap-based keyed state backend
Posted by sr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
new file mode 100644
index 0000000..d63b6d3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
@@ -0,0 +1,1066 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.TreeSet;
+
+/**
+ * Implementation of Flink's in-memory state tables with copy-on-write support. This map does not support null values
+ * for key or namespace.
+ * <p>
+ * {@link CopyOnWriteStateTable} sacrifices some peak performance and memory efficiency for features like incremental
+ * rehashing and asynchronous snapshots through copy-on-write. Copy-on-write tries to minimize the amount of copying by
+ * maintaining version meta data for both, the map structure and the state objects. However, we must often proactively
+ * copy state objects when we hand them to the user.
+ * <p>
+ * As for any state backend, user should not keep references on state objects that they obtained from state backends
+ * outside the scope of the user function calls.
+ * <p>
+ * Some brief maintenance notes:
+ * <p>
+ * 1) Flattening the underlying data structure from nested maps (namespace) -> (key) -> (state) to one flat map
+ * (key, namespace) -> (state) brings certain performance trade-offs. In theory, the flat map has one less level of
+ * indirection compared to the nested map. However, the nested map naturally de-duplicates namespace objects for which
+ * #equals() is true. This leads to potentially a lot of redundant namespace objects for the flattened version. Those,
+ * in turn, can again introduce more cache misses because we need to follow the namespace object on all operations to
+ * ensure entry identities. Obviously, copy-on-write can also add memory overhead. So does the meta data to track
+ * copy-on-write requirement (state and entry versions on {@link StateTableEntry}).
+ * <p>
+ * 2) A flat map structure is a lot easier when it comes to tracking copy-on-write of the map structure.
+ * <p>
+ * 3) Nested structure had the (never used) advantage that we can easily drop and iterate whole namespaces. This could
+ * give locality advantages for certain access pattern, e.g. iterating a namespace.
+ * <p>
+ * 4) Serialization format is changed from namespace-prefix compressed (as naturally provided from the old nested
+ * structure) to making all entries self contained as (key, namespace, state).
+ * <p>
+ * 5) We got rid of having multiple nested tables, one for each key-group. Instead, we partition state into key-groups
+ * on-the-fly, during the asynchronous part of a snapshot.
+ * <p>
+ * 6) Currently, a state table can only grow, but never shrinks on low load. We could easily add this if required.
+ * <p>
+ * 7) Heap based state backends like this can easily cause a lot of GC activity. Besides using G1 as garbage collector,
+ * we should provide an additional state backend that operates on off-heap memory. This would sacrifice peak performance
+ * (due to de/serialization of objects) for a lower, but more constant throughput and potentially huge simplifications
+ * w.r.t. copy-on-write.
+ * <p>
+ * 8) We could try a hybrid of a serialized and object based backends, where key and namespace of the entries are both
+ * serialized in one byte-array.
+ * <p>
+ * 9) We could consider smaller types (e.g. short) for the version counting and think about some reset strategy before
+ * overflows, when there is no snapshot running. However, this would have to touch all entries in the map.
+ * <p>
+ * This class was initially based on the {@link java.util.HashMap} implementation of the Android JDK, but is now heavily
+ * customized towards the use case of table for state entries.
+ *
+ * IMPORTANT: the contracts for this class rely on the user not holding any references to objects returned by this map
+ * beyond the life cycle of per-element operations. Or phrased differently, all get-update-put operations on a mapping
+ * should be within one call of processElement. Otherwise, the user must take care of taking deep copies, e.g. for
+ * caching purposes.
+ *
+ * @param <K> type of key.
+ * @param <N> type of namespace.
+ * @param <S> type of value.
+ */
+public class CopyOnWriteStateTable<K, N, S> extends StateTable<K, N, S> implements Iterable<StateEntry<K, N, S>> {
+
+ /**
+ * The logger.
+ */
+ private static final Logger LOG = LoggerFactory.getLogger(HeapKeyedStateBackend.class);
+
+ /**
+ * Min capacity (other than zero) for a {@link CopyOnWriteStateTable}. Must be a power of two
+ * greater than 1 (and less than 1 << 30).
+ */
+ private static final int MINIMUM_CAPACITY = 4;
+
+ /**
+ * Max capacity for a {@link CopyOnWriteStateTable}. Must be a power of two >= MINIMUM_CAPACITY.
+ */
+ private static final int MAXIMUM_CAPACITY = 1 << 30;
+
+ /**
+ * Minimum number of entries that one step of incremental rehashing migrates from the old to the new sub-table.
+ */
+ private static final int MIN_TRANSFERRED_PER_INCREMENTAL_REHASH = 4;
+
+ /**
+ * An empty table shared by all zero-capacity maps (typically from default
+ * constructor). It is never written to, and replaced on first put. Its size
+ * is set to half the minimum, so that the first resize will create a
+ * minimum-sized table.
+ */
+ private static final StateTableEntry<?, ?, ?>[] EMPTY_TABLE = new StateTableEntry[MINIMUM_CAPACITY >>> 1];
+
+ /**
+ * Empty entry that we use to bootstrap our {@link CopyOnWriteStateTable.StateEntryIterator}.
+ */
+ private static final StateTableEntry<?, ?, ?> ITERATOR_BOOTSTRAP_ENTRY = new StateTableEntry<>();
+
+ /**
+ * Maintains an ordered set of version ids that are still in use by unreleased snapshots.
+ */
+ private final TreeSet<Integer> snapshotVersions;
+
+ /**
+ * This is the primary entry array (hash directory) of the state table. If no incremental rehash is ongoing, this
+ * is the only used table.
+ **/
+ private StateTableEntry<K, N, S>[] primaryTable;
+
+ /**
+ * We maintain a secondary entry array while performing an incremental rehash. The purpose is to slowly migrate
+ * entries from the primary table to this resized table array. When all entries are migrated, this becomes the new
+ * primary table.
+ */
+ private StateTableEntry<K, N, S>[] incrementalRehashTable;
+
+ /**
+ * The current number of mappings in the primary table.
+ */
+ private int primaryTableSize;
+
+ /**
+ * The current number of mappings in the rehash table.
+ */
+ private int incrementalRehashTableSize;
+
+ /**
+ * The next index for a step of incremental rehashing in the primary table.
+ */
+ private int rehashIndex;
+
+ /**
+ * The current version of this map. Used for copy-on-write mechanics.
+ */
+ private int stateTableVersion;
+
+ /**
+ * The highest version of this map that is still required by any unreleased snapshot.
+ */
+ private int highestRequiredSnapshotVersion;
+
+ /**
+ * The last namespace that was actually inserted. This is a small optimization to reduce duplicate namespace objects.
+ */
+ private N lastNamespace;
+
+ /**
+ * The {@link CopyOnWriteStateTable} is rehashed when its size exceeds this threshold.
+ * The value of this field is generally .75 * capacity, except when
+ * the capacity is zero, as described in the EMPTY_TABLE declaration
+ * above.
+ */
+ private int threshold;
+
+ /**
+ * Incremented by "structural modifications" to allow (best effort)
+ * detection of concurrent modification.
+ */
+ private int modCount;
+
+ /**
+ * Constructs a new {@code StateTable} with default capacity of 1024.
+ *
+ * @param keyContext the key context.
+ * @param metaInfo the meta information, including the type serializer for state copy-on-write.
+ */
+ CopyOnWriteStateTable(InternalKeyContext<K> keyContext, RegisteredBackendStateMetaInfo<N, S> metaInfo) {
+ this(keyContext, metaInfo, 1024);
+ }
+
+ /**
+ * Constructs a new {@code StateTable} instance with the specified capacity.
+ *
+ * @param keyContext the key context.
+ * @param metaInfo the meta information, including the type serializer for state copy-on-write.
+ * @param capacity the initial capacity of this hash map.
+ * @throws IllegalArgumentException when the capacity is less than zero.
+ */
+ @SuppressWarnings("unchecked")
+ private CopyOnWriteStateTable(InternalKeyContext<K> keyContext, RegisteredBackendStateMetaInfo<N, S> metaInfo, int capacity) {
+ super(keyContext, metaInfo);
+
+ // initialized tables to EMPTY_TABLE.
+ this.primaryTable = (StateTableEntry<K, N, S>[]) EMPTY_TABLE;
+ this.incrementalRehashTable = (StateTableEntry<K, N, S>[]) EMPTY_TABLE;
+
+ // initialize sizes to 0.
+ this.primaryTableSize = 0;
+ this.incrementalRehashTableSize = 0;
+
+ this.rehashIndex = 0;
+ this.stateTableVersion = 0;
+ this.highestRequiredSnapshotVersion = 0;
+ this.snapshotVersions = new TreeSet<>();
+
+ if (capacity < 0) {
+ throw new IllegalArgumentException("Capacity: " + capacity);
+ }
+
+ if (capacity == 0) {
+ threshold = -1;
+ return;
+ }
+
+ if (capacity < MINIMUM_CAPACITY) {
+ capacity = MINIMUM_CAPACITY;
+ } else if (capacity > MAXIMUM_CAPACITY) {
+ capacity = MAXIMUM_CAPACITY;
+ } else {
+ capacity = MathUtils.roundUpToPowerOfTwo(capacity);
+ }
+ primaryTable = makeTable(capacity);
+ }
+
+ // Public API from AbstractStateTable ------------------------------------------------------------------------------
+
+ /**
+ * Returns the total number of entries in this {@link CopyOnWriteStateTable}. This is the sum of both sub-tables.
+ *
+ * @return the number of entries in this {@link CopyOnWriteStateTable}.
+ */
+ @Override
+ public int size() {
+ return primaryTableSize + incrementalRehashTableSize;
+ }
+
+ @Override
+ public S get(K key, N namespace) {
+
+ final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace);
+ final int requiredVersion = highestRequiredSnapshotVersion;
+ final StateTableEntry<K, N, S>[] tab = selectActiveTable(hash);
+ int index = hash & (tab.length - 1);
+
+ for (StateTableEntry<K, N, S> e = tab[index]; e != null; e = e.next) {
+ final K eKey = e.key;
+ final N eNamespace = e.namespace;
+ if ((e.hash == hash && key.equals(eKey) && namespace.equals(eNamespace))) {
+
+ // copy-on-write check for state
+ if (e.stateVersion < requiredVersion) {
+ // copy-on-write check for entry
+ if (e.entryVersion < requiredVersion) {
+ e = handleChainedEntryCopyOnWrite(tab, hash & (tab.length - 1), e);
+ }
+ e.stateVersion = stateTableVersion;
+ e.state = getStateSerializer().copy(e.state);
+ }
+
+ return e.state;
+ }
+ }
+
+ return null;
+ }
+
+ @Override
+ public void put(K key, int keyGroup, N namespace, S state) {
+ put(key, namespace, state);
+ }
+
+ @Override
+ public S get(N namespace) {
+ return get(keyContext.getCurrentKey(), namespace);
+ }
+
+ @Override
+ public boolean containsKey(N namespace) {
+ return containsKey(keyContext.getCurrentKey(), namespace);
+ }
+
+ @Override
+ public void put(N namespace, S state) {
+ put(keyContext.getCurrentKey(), namespace, state);
+ }
+
+ @Override
+ public S putAndGetOld(N namespace, S state) {
+ return putAndGetOld(keyContext.getCurrentKey(), namespace, state);
+ }
+
+ @Override
+ public void remove(N namespace) {
+ remove(keyContext.getCurrentKey(), namespace);
+ }
+
+ @Override
+ public S removeAndGetOld(N namespace) {
+ return removeAndGetOld(keyContext.getCurrentKey(), namespace);
+ }
+
+ @Override
+ public <T> void transform(N namespace, T value, StateTransformationFunction<S, T> transformation) throws Exception {
+ transform(keyContext.getCurrentKey(), namespace, value, transformation);
+ }
+
+ // Private implementation details of the API methods ---------------------------------------------------------------
+
+ /**
+ * Returns whether this table contains the specified key/namespace composite key.
+ *
+ * @param key the key in the composite key to search for. Not null.
+ * @param namespace the namespace in the composite key to search for. Not null.
+ * @return {@code true} if this map contains the specified key/namespace composite key,
+ * {@code false} otherwise.
+ */
+ boolean containsKey(K key, N namespace) {
+
+ final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace);
+ final StateTableEntry<K, N, S>[] tab = selectActiveTable(hash);
+ int index = hash & (tab.length - 1);
+
+ for (StateTableEntry<K, N, S> e = tab[index]; e != null; e = e.next) {
+ final K eKey = e.key;
+ final N eNamespace = e.namespace;
+
+ if ((e.hash == hash && key.equals(eKey) && namespace.equals(eNamespace))) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Maps the specified key/namespace composite key to the specified value. This method should be preferred
+ * over {@link #putAndGetOld(Object, Object, Object)} (Object, Object)} when the caller is not interested
+ * in the old value, because this can potentially reduce copy-on-write activity.
+ *
+ * @param key the key. Not null.
+ * @param namespace the namespace. Not null.
+ * @param value the value. Can be null.
+ */
+ void put(K key, N namespace, S value) {
+ final StateTableEntry<K, N, S> e = putEntry(key, namespace);
+
+ e.state = value;
+ e.stateVersion = stateTableVersion;
+ }
+
+ /**
+ * Maps the specified key/namespace composite key to the specified value. Returns the previous state that was
+ * registered under the composite key.
+ *
+ * @param key the key. Not null.
+ * @param namespace the namespace. Not null.
+ * @param value the value. Can be null.
+ * @return the value of any previous mapping with the specified key or
+ * {@code null} if there was no such mapping.
+ */
+ S putAndGetOld(K key, N namespace, S value) {
+
+ final StateTableEntry<K, N, S> e = putEntry(key, namespace);
+
+ // copy-on-write check for state
+ S oldState = (e.stateVersion < highestRequiredSnapshotVersion) ?
+ getStateSerializer().copy(e.state) :
+ e.state;
+
+ e.state = value;
+ e.stateVersion = stateTableVersion;
+
+ return oldState;
+ }
+
+ /**
+ * Removes the mapping with the specified key/namespace composite key from this map. This method should be preferred
+ * over {@link #removeAndGetOld(Object, Object)} when the caller is not interested in the old value, because this
+ * can potentially reduce copy-on-write activity.
+ *
+ * @param key the key of the mapping to remove. Not null.
+ * @param namespace the namespace of the mapping to remove. Not null.
+ */
+ void remove(K key, N namespace) {
+ removeEntry(key, namespace);
+ }
+
+ /**
+ * Removes the mapping with the specified key/namespace composite key from this map, returning the state that was
+ * found under the entry.
+ *
+ * @param key the key of the mapping to remove. Not null.
+ * @param namespace the namespace of the mapping to remove. Not null.
+ * @return the value of the removed mapping or {@code null} if no mapping
+ * for the specified key was found.
+ */
+ S removeAndGetOld(K key, N namespace) {
+
+ final StateTableEntry<K, N, S> e = removeEntry(key, namespace);
+
+ return e != null ?
+ // copy-on-write check for state
+ (e.stateVersion < highestRequiredSnapshotVersion ?
+ getStateSerializer().copy(e.state) :
+ e.state) :
+ null;
+ }
+
+ /**
+ * @param key the key of the mapping to remove. Not null.
+ * @param namespace the namespace of the mapping to remove. Not null.
+ * @param value the value that is the second input for the transformation.
+ * @param transformation the transformation function to apply on the old state and the given value.
+ * @param <T> type of the value that is the second input to the {@link StateTransformationFunction}.
+ * @throws Exception exception that happen on applying the function.
+ * @see #transform(Object, Object, StateTransformationFunction).
+ */
+ <T> void transform(
+ K key,
+ N namespace,
+ T value,
+ StateTransformationFunction<S, T> transformation) throws Exception {
+
+ final StateTableEntry<K, N, S> entry = putEntry(key, namespace);
+
+ // copy-on-write check for state
+ entry.state = transformation.apply(
+ (entry.stateVersion < highestRequiredSnapshotVersion) ?
+ getStateSerializer().copy(entry.state) :
+ entry.state,
+ value);
+ entry.stateVersion = stateTableVersion;
+ }
+
+ /**
+ * Helper method that is the basis for operations that add mappings.
+ */
+ private StateTableEntry<K, N, S> putEntry(K key, N namespace) {
+
+ final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace);
+ final StateTableEntry<K, N, S>[] tab = selectActiveTable(hash);
+ int index = hash & (tab.length - 1);
+
+ for (StateTableEntry<K, N, S> e = tab[index]; e != null; e = e.next) {
+ if (e.hash == hash && key.equals(e.key) && namespace.equals(e.namespace)) {
+
+ // copy-on-write check for entry
+ if (e.entryVersion < highestRequiredSnapshotVersion) {
+ e = handleChainedEntryCopyOnWrite(tab, index, e);
+ }
+
+ return e;
+ }
+ }
+
+ ++modCount;
+ if (size() > threshold) {
+ doubleCapacity();
+ }
+
+ return addNewStateTableEntry(tab, key, namespace, hash);
+ }
+
+ /**
+ * Helper method that is the basis for operations that remove mappings.
+ */
+ private StateTableEntry<K, N, S> removeEntry(K key, N namespace) {
+
+ final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace);
+ final StateTableEntry<K, N, S>[] tab = selectActiveTable(hash);
+ int index = hash & (tab.length - 1);
+
+ for (StateTableEntry<K, N, S> e = tab[index], prev = null; e != null; prev = e, e = e.next) {
+ if (e.hash == hash && key.equals(e.key) && namespace.equals(e.namespace)) {
+ if (prev == null) {
+ tab[index] = e.next;
+ } else {
+ // copy-on-write check for entry
+ if (prev.entryVersion < highestRequiredSnapshotVersion) {
+ prev = handleChainedEntryCopyOnWrite(tab, index, prev);
+ }
+ prev.next = e.next;
+ }
+ ++modCount;
+ if (tab == primaryTable) {
+ --primaryTableSize;
+ } else {
+ --incrementalRehashTableSize;
+ }
+ return e;
+ }
+ }
+ return null;
+ }
+
+ private void checkKeyNamespacePreconditions(K key, N namespace) {
+ Preconditions.checkNotNull(key, "No key set. This method should not be called outside of a keyed context.");
+ Preconditions.checkNotNull(namespace, "Provided namespace is null.");
+ }
+
+ // Meta data setter / getter and toString --------------------------------------------------------------------------
+
+ @Override
+ public TypeSerializer<S> getStateSerializer() {
+ return metaInfo.getStateSerializer();
+ }
+
+ @Override
+ public TypeSerializer<N> getNamespaceSerializer() {
+ return metaInfo.getNamespaceSerializer();
+ }
+
+ @Override
+ public RegisteredBackendStateMetaInfo<N, S> getMetaInfo() {
+ return metaInfo;
+ }
+
+ @Override
+ public void setMetaInfo(RegisteredBackendStateMetaInfo<N, S> metaInfo) {
+ this.metaInfo = metaInfo;
+ }
+
+ // Iteration ------------------------------------------------------------------------------------------------------
+
+ @Override
+ public Iterator<StateEntry<K, N, S>> iterator() {
+ return new StateEntryIterator();
+ }
+
+ // Private utility functions for StateTable management -------------------------------------------------------------
+
+ /**
+ * @see #releaseSnapshot(CopyOnWriteStateTableSnapshot)
+ */
+ @VisibleForTesting
+ void releaseSnapshot(int snapshotVersion) {
+ // we guard against concurrent modifications of highestRequiredSnapshotVersion between snapshot and release.
+ // Only stale reads of from the result of #releaseSnapshot calls are ok.
+ synchronized (snapshotVersions) {
+ Preconditions.checkState(snapshotVersions.remove(snapshotVersion), "Attempt to release unknown snapshot version");
+ highestRequiredSnapshotVersion = snapshotVersions.isEmpty() ? 0 : snapshotVersions.last();
+ }
+ }
+
+ /**
+ * Creates (combined) copy of the table arrays for a snapshot. This method must be called by the same Thread that
+ * does modifications to the {@link CopyOnWriteStateTable}.
+ */
+ @VisibleForTesting
+ @SuppressWarnings("unchecked")
+ StateTableEntry<K, N, S>[] snapshotTableArrays() {
+
+ // we guard against concurrent modifications of highestRequiredSnapshotVersion between snapshot and release.
+ // Only stale reads of from the result of #releaseSnapshot calls are ok. This is why we must call this method
+ // from the same thread that does all the modifications to the table.
+ synchronized (snapshotVersions) {
+
+ // increase the table version for copy-on-write and register the snapshot
+ if (++stateTableVersion < 0) {
+ // this is just a safety net against overflows, but should never happen in practice (i.e., only after 2^31 snapshots)
+ throw new IllegalStateException("Version count overflow in CopyOnWriteStateTable. Enforcing restart.");
+ }
+
+ highestRequiredSnapshotVersion = stateTableVersion;
+ snapshotVersions.add(highestRequiredSnapshotVersion);
+ }
+
+ StateTableEntry<K, N, S>[] table = primaryTable;
+ if (isRehashing()) {
+ // consider both tables for the snapshot, the rehash index tells us which part of the two tables we need
+ final int localRehashIndex = rehashIndex;
+ final int localCopyLength = table.length - localRehashIndex;
+ StateTableEntry<K, N, S>[] copy = new StateTableEntry[localRehashIndex + table.length];
+ // for the primary table, take every index >= rhIdx.
+ System.arraycopy(table, localRehashIndex, copy, 0, localCopyLength);
+
+ // for the new table, we are sure that two regions contain all the entries:
+ // [0, rhIdx[ AND [table.length / 2, table.length / 2 + rhIdx[
+ table = incrementalRehashTable;
+ System.arraycopy(table, 0, copy, localCopyLength, localRehashIndex);
+ System.arraycopy(table, table.length >>> 1, copy, localCopyLength + localRehashIndex, localRehashIndex);
+
+ return copy;
+ } else {
+ // we only need to copy the primary table
+ return Arrays.copyOf(table, table.length);
+ }
+ }
+
+ /**
+ * Allocate a table of the given capacity and set the threshold accordingly.
+ *
+ * @param newCapacity must be a power of two
+ */
+ private StateTableEntry<K, N, S>[] makeTable(int newCapacity) {
+
+ if (MAXIMUM_CAPACITY == newCapacity) {
+ LOG.warn("Maximum capacity of 2^30 in StateTable reached. Cannot increase hash table size. This can lead " +
+ "to more collisions and lower performance. Please consider scaling-out your job or using a " +
+ "different keyed state backend implementation!");
+ }
+
+ threshold = (newCapacity >> 1) + (newCapacity >> 2); // 3/4 capacity
+ @SuppressWarnings("unchecked") StateTableEntry<K, N, S>[] newTable
+ = (StateTableEntry<K, N, S>[]) new StateTableEntry[newCapacity];
+ return newTable;
+ }
+
+ /**
+ * Creates and inserts a new {@link StateTableEntry}.
+ */
+ private StateTableEntry<K, N, S> addNewStateTableEntry(
+ StateTableEntry<K, N, S>[] table,
+ K key,
+ N namespace,
+ int hash) {
+
+ // small optimization that aims to avoid holding references on duplicate namespace objects
+ if (namespace.equals(lastNamespace)) {
+ namespace = lastNamespace;
+ } else {
+ lastNamespace = namespace;
+ }
+
+ int index = hash & (table.length - 1);
+ StateTableEntry<K, N, S> newEntry = new StateTableEntry<>(
+ key,
+ namespace,
+ null,
+ hash,
+ table[index],
+ stateTableVersion,
+ stateTableVersion);
+ table[index] = newEntry;
+
+ if (table == primaryTable) {
+ ++primaryTableSize;
+ } else {
+ ++incrementalRehashTableSize;
+ }
+ return newEntry;
+ }
+
+ /**
+ * Select the sub-table which is responsible for entries with the given hash code.
+ *
+ * @param hashCode the hash code which we use to decide about the table that is responsible.
+ * @return the index of the sub-table that is responsible for the entry with the given hash code.
+ */
+ private StateTableEntry<K, N, S>[] selectActiveTable(int hashCode) {
+ return (hashCode & (primaryTable.length - 1)) >= rehashIndex ? primaryTable : incrementalRehashTable;
+ }
+
+ /**
+ * Doubles the capacity of the hash table. Existing entries are placed in
+ * the correct bucket on the enlarged table. If the current capacity is,
+ * MAXIMUM_CAPACITY, this method is a no-op. Returns the table, which
+ * will be new unless we were already at MAXIMUM_CAPACITY.
+ */
+ private void doubleCapacity() {
+
+ // There can only be one rehash in flight. From the amount of incremental rehash steps we take, this should always hold.
+ Preconditions.checkState(!isRehashing(), "There is already a rehash in progress.");
+
+ StateTableEntry<K, N, S>[] oldTable = primaryTable;
+
+ int oldCapacity = oldTable.length;
+
+ if (oldCapacity == MAXIMUM_CAPACITY) {
+ return;
+ }
+
+ incrementalRehashTable = makeTable(oldCapacity * 2);
+ }
+
+ /**
+ * Returns true, if an incremental rehash is in progress.
+ */
+ @VisibleForTesting
+ boolean isRehashing() {
+ // if we rehash, the secondary table is not empty
+ return EMPTY_TABLE != incrementalRehashTable;
+ }
+
+ /**
+ * Computes the hash for the composite of key and namespace and performs some steps of incremental rehash if
+ * incremental rehashing is in progress.
+ */
+ private int computeHashForOperationAndDoIncrementalRehash(K key, N namespace) {
+
+ checkKeyNamespacePreconditions(key, namespace);
+
+ if (isRehashing()) {
+ incrementalRehash();
+ }
+
+ return compositeHash(key, namespace);
+ }
+
+ /**
+ * Runs a number of steps for incremental rehashing.
+ */
+ @SuppressWarnings("unchecked")
+ private void incrementalRehash() {
+
+ StateTableEntry<K, N, S>[] oldTable = primaryTable;
+ StateTableEntry<K, N, S>[] newTable = incrementalRehashTable;
+
+ int oldCapacity = oldTable.length;
+ int newMask = newTable.length - 1;
+ int requiredVersion = highestRequiredSnapshotVersion;
+ int rhIdx = rehashIndex;
+ int transferred = 0;
+
+ // we migrate a certain minimum amount of entries from the old to the new table
+ while (transferred < MIN_TRANSFERRED_PER_INCREMENTAL_REHASH) {
+
+ StateTableEntry<K, N, S> e = oldTable[rhIdx];
+
+ while (e != null) {
+ // copy-on-write check for entry
+ if (e.entryVersion < requiredVersion) {
+ e = new StateTableEntry<>(e, stateTableVersion);
+ }
+ StateTableEntry<K, N, S> n = e.next;
+ int pos = e.hash & newMask;
+ e.next = newTable[pos];
+ newTable[pos] = e;
+ e = n;
+ ++transferred;
+ }
+
+ oldTable[rhIdx] = null;
+ if (++rhIdx == oldCapacity) {
+ //here, the rehash is complete and we release resources and reset fields
+ primaryTable = newTable;
+ incrementalRehashTable = (StateTableEntry<K, N, S>[]) EMPTY_TABLE;
+ primaryTableSize += incrementalRehashTableSize;
+ incrementalRehashTableSize = 0;
+ rehashIndex = 0;
+ return;
+ }
+ }
+
+ // sync our local bookkeeping the with official bookkeeping fields
+ primaryTableSize -= transferred;
+ incrementalRehashTableSize += transferred;
+ rehashIndex = rhIdx;
+ }
+
+ /**
+ * Perform copy-on-write for entry chains. We iterate the (hopefully and probably) still cached chain, replace
+ * all links up to the 'untilEntry', which we actually wanted to modify.
+ */
+ private StateTableEntry<K, N, S> handleChainedEntryCopyOnWrite(
+ StateTableEntry<K, N, S>[] tab,
+ int tableIdx,
+ StateTableEntry<K, N, S> untilEntry) {
+
+ final int required = highestRequiredSnapshotVersion;
+
+ StateTableEntry<K, N, S> current = tab[tableIdx];
+ StateTableEntry<K, N, S> copy;
+
+ if (current.entryVersion < required) {
+ copy = new StateTableEntry<>(current, stateTableVersion);
+ tab[tableIdx] = copy;
+ } else {
+ // nothing to do, just advance copy to current
+ copy = current;
+ }
+
+ // we iterate the chain up to 'until entry'
+ while (current != untilEntry) {
+
+ //advance current
+ current = current.next;
+
+ if (current.entryVersion < required) {
+ // copy and advance the current's copy
+ copy.next = new StateTableEntry<>(current, stateTableVersion);
+ copy = copy.next;
+ } else {
+ // nothing to do, just advance copy to current
+ copy = current;
+ }
+ }
+
+ return copy;
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <K, N, S> StateTableEntry<K, N, S> getBootstrapEntry() {
+ return (StateTableEntry<K, N, S>) ITERATOR_BOOTSTRAP_ENTRY;
+ }
+
+ /**
+ * Helper function that creates and scrambles a composite hash for key and namespace.
+ */
+ private static int compositeHash(Object key, Object namespace) {
+ // create composite key through XOR, then apply some bit-mixing for better distribution of skewed keys.
+ return MathUtils.bitMix(key.hashCode() ^ namespace.hashCode());
+ }
+
+ // Snapshotting ----------------------------------------------------------------------------------------------------
+
+ int getStateTableVersion() {
+ return stateTableVersion;
+ }
+
+ /**
+ * Creates a snapshot of this {@link CopyOnWriteStateTable}, to be written in checkpointing. The snapshot integrity
+ * is protected through copy-on-write from the {@link CopyOnWriteStateTable}. Users should call
+ * {@link #releaseSnapshot(CopyOnWriteStateTableSnapshot)} after using the returned object.
+ *
+ * @return a snapshot from this {@link CopyOnWriteStateTable}, for checkpointing.
+ */
+ @Override
+ public CopyOnWriteStateTableSnapshot<K, N, S> createSnapshot() {
+ return new CopyOnWriteStateTableSnapshot<>(this);
+ }
+
+ /**
+ * Releases a snapshot for this {@link CopyOnWriteStateTable}. This method should be called once a snapshot is no more needed,
+ * so that the {@link CopyOnWriteStateTable} can stop considering this snapshot for copy-on-write, thus avoiding unnecessary
+ * object creation.
+ *
+ * @param snapshotToRelease the snapshot to release, which was previously created by this state table.
+ */
+ void releaseSnapshot(CopyOnWriteStateTableSnapshot<K, N, S> snapshotToRelease) {
+
+ Preconditions.checkArgument(snapshotToRelease.isOwner(this),
+ "Cannot release snapshot which is owned by a different state table.");
+
+ releaseSnapshot(snapshotToRelease.getSnapshotVersion());
+ }
+
+ // StateTableEntry -------------------------------------------------------------------------------------------------
+
+ /**
+ * One entry in the {@link CopyOnWriteStateTable}. This is a triplet of key, namespace, and state. Thereby, key and
+ * namespace together serve as a composite key for the state. This class also contains some management meta data for
+ * copy-on-write, a pointer to link other {@link StateTableEntry}s to a list, and cached hash code.
+ *
+ * @param <K> type of key.
+ * @param <N> type of namespace.
+ * @param <S> type of state.
+ */
+ static class StateTableEntry<K, N, S> implements StateEntry<K, N, S> {
+
+ /**
+ * The key. Assumed to be immutable and not null.
+ */
+ final K key;
+
+ /**
+ * The namespace. Assumed to be immutable and not null.
+ */
+ final N namespace;
+
+ /**
+ * The state. This is not final to allow exchanging the object for copy-on-write. Can be null.
+ */
+ S state;
+
+ /**
+ * Link to another {@link StateTableEntry}. This is used to resolve collisions in the
+ * {@link CopyOnWriteStateTable} through chaining.
+ */
+ StateTableEntry<K, N, S> next;
+
+ /**
+ * The version of this {@link StateTableEntry}. This is meta data for copy-on-write of the table structure.
+ */
+ int entryVersion;
+
+ /**
+ * The version of the state object in this entry. This is meta data for copy-on-write of the state object itself.
+ */
+ int stateVersion;
+
+ /**
+ * The computed secondary hash for the composite of key and namespace.
+ */
+ final int hash;
+
+ StateTableEntry() {
+ this(null, null, null, 0, null, 0, 0);
+ }
+
+ StateTableEntry(StateTableEntry<K, N, S> other, int entryVersion) {
+ this(other.key, other.namespace, other.state, other.hash, other.next, entryVersion, other.stateVersion);
+ }
+
+ StateTableEntry(
+ K key,
+ N namespace,
+ S state,
+ int hash,
+ StateTableEntry<K, N, S> next,
+ int entryVersion,
+ int stateVersion) {
+ this.key = key;
+ this.namespace = namespace;
+ this.hash = hash;
+ this.next = next;
+ this.entryVersion = entryVersion;
+ this.state = state;
+ this.stateVersion = stateVersion;
+ }
+
+ public final void setState(S value, int mapVersion) {
+ // naturally, we can update the state version every time we replace the old state with a different object
+ if (value != state) {
+ this.state = value;
+ this.stateVersion = mapVersion;
+ }
+ }
+
+ @Override
+ public K getKey() {
+ return key;
+ }
+
+ @Override
+ public N getNamespace() {
+ return namespace;
+ }
+
+ @Override
+ public S getState() {
+ return state;
+ }
+
+ @Override
+ public final boolean equals(Object o) {
+ if (!(o instanceof CopyOnWriteStateTable.StateTableEntry)) {
+ return false;
+ }
+
+ StateEntry<?, ?, ?> e = (StateEntry<?, ?, ?>) o;
+ return e.getKey().equals(key)
+ && e.getNamespace().equals(namespace)
+ && Objects.equals(e.getState(), state);
+ }
+
+ @Override
+ public final int hashCode() {
+ return (key.hashCode() ^ namespace.hashCode()) ^ Objects.hashCode(state);
+ }
+
+ @Override
+ public final String toString() {
+ return "(" + key + "|" + namespace + ")=" + state;
+ }
+ }
+
+ // For testing ----------------------------------------------------------------------------------------------------
+
+ @Override
+ public int sizeOfNamespace(Object namespace) {
+ int count = 0;
+ for (StateEntry<K, N, S> entry : this) {
+ if (null != entry && namespace.equals(entry.getNamespace())) {
+ ++count;
+ }
+ }
+ return count;
+ }
+
+
+ // StateEntryIterator ---------------------------------------------------------------------------------------------
+
+ /**
+ * Iterator over the entries in a {@link CopyOnWriteStateTable}.
+ */
+ class StateEntryIterator implements Iterator<StateEntry<K, N, S>> {
+ private StateTableEntry<K, N, S>[] activeTable;
+ private int nextTablePosition;
+ private StateTableEntry<K, N, S> nextEntry;
+ private int expectedModCount = modCount;
+
+ StateEntryIterator() {
+ this.activeTable = primaryTable;
+ this.nextTablePosition = 0;
+ this.expectedModCount = modCount;
+ this.nextEntry = getBootstrapEntry();
+ advanceIterator();
+ }
+
+ private StateTableEntry<K, N, S> advanceIterator() {
+
+ StateTableEntry<K, N, S> entryToReturn = nextEntry;
+ StateTableEntry<K, N, S> next = entryToReturn.next;
+
+ // consider both sub-tables tables to cover the case of rehash
+ while (next == null) {
+
+ StateTableEntry<K, N, S>[] tab = activeTable;
+
+ while (nextTablePosition < tab.length) {
+ next = tab[nextTablePosition++];
+
+ if (next != null) {
+ nextEntry = next;
+ return entryToReturn;
+ }
+ }
+
+ if (activeTable == incrementalRehashTable) {
+ break;
+ }
+
+ activeTable = incrementalRehashTable;
+ nextTablePosition = 0;
+ }
+
+ nextEntry = next;
+ return entryToReturn;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return nextEntry != null;
+ }
+
+ @Override
+ public StateTableEntry<K, N, S> next() {
+ if (modCount != expectedModCount) {
+ throw new ConcurrentModificationException();
+ }
+
+ if (nextEntry == null) {
+ throw new NoSuchElementException();
+ }
+
+ return advanceIterator();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Read-only iterator");
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java
new file mode 100644
index 0000000..c83fce0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import java.io.IOException;
+
+/**
+ * This class represents the snapshot of a {@link CopyOnWriteStateTable} and has a role in operator state checkpointing. Besides
+ * holding the {@link CopyOnWriteStateTable}s internal entries at the time of the snapshot, this class is also responsible for
+ * preparing and writing the state in the process of checkpointing.
+ * <p>
+ * IMPORTANT: Please notice that snapshot integrity of entries in this class rely on proper copy-on-write semantics
+ * through the {@link CopyOnWriteStateTable} that created the snapshot object, but all objects in this snapshot must be considered
+ * as READ-ONLY!. The reason is that the objects held by this class may or may not be deep copies of original objects
+ * that may still used in the {@link CopyOnWriteStateTable}. This depends for each entry on whether or not it was subject to
+ * copy-on-write operations by the {@link CopyOnWriteStateTable}. Phrased differently: the {@link CopyOnWriteStateTable} provides
+ * copy-on-write isolation for this snapshot, but this snapshot does not isolate modifications from the
+ * {@link CopyOnWriteStateTable}!
+ *
+ * @param <K> type of key
+ * @param <N> type of namespace
+ * @param <S> type of state
+ */
+@Internal
+public class CopyOnWriteStateTableSnapshot<K, N, S>
+ extends AbstractStateTableSnapshot<K, N, S, CopyOnWriteStateTable<K, N, S>> {
+
+ /**
+ * Version of the {@link CopyOnWriteStateTable} when this snapshot was created. This can be used to release the snapshot.
+ */
+ private final int snapshotVersion;
+
+ /**
+ * The number of entries in the {@link CopyOnWriteStateTable} at the time of creating this snapshot.
+ */
+ private final int stateTableSize;
+
+ /**
+ * The state table entries, as by the time this snapshot was created. Objects in this array may or may not be deep
+ * copies of the current entries in the {@link CopyOnWriteStateTable} that created this snapshot. This depends for each entry
+ * on whether or not it was subject to copy-on-write operations by the {@link CopyOnWriteStateTable}.
+ */
+ private final CopyOnWriteStateTable.StateTableEntry<K, N, S>[] snapshotData;
+
+ /**
+ * Offsets for the individual key-groups. This is lazily created when the snapshot is grouped by key-group during
+ * the process of writing this snapshot to an output as part of checkpointing.
+ */
+ private int[] keyGroupOffsets;
+
+ /**
+ * Creates a new {@link CopyOnWriteStateTableSnapshot}.
+ *
+ * @param owningStateTable the {@link CopyOnWriteStateTable} for which this object represents a snapshot.
+ */
+ CopyOnWriteStateTableSnapshot(CopyOnWriteStateTable<K, N, S> owningStateTable) {
+
+ super(owningStateTable);
+ this.snapshotData = owningStateTable.snapshotTableArrays();
+ this.snapshotVersion = owningStateTable.getStateTableVersion();
+ this.stateTableSize = owningStateTable.size();
+ this.keyGroupOffsets = null;
+ }
+
+ /**
+ * Returns the internal version of the {@link CopyOnWriteStateTable} when this snapshot was created. This value must be used to
+ * tell the {@link CopyOnWriteStateTable} when to release this snapshot.
+ */
+ int getSnapshotVersion() {
+ return snapshotVersion;
+ }
+
+ /**
+ * Partitions the snapshot data by key-group. The algorithm first builds a histogram for the distribution of keys
+ * into key-groups. Then, the histogram is accumulated to obtain the boundaries of each key-group in an array.
+ * Last, we use the accumulated counts as write position pointers for the key-group's bins when reordering the
+ * entries by key-group. This operation is lazily performed before the first writing of a key-group.
+ * <p>
+ * As a possible future optimization, we could perform the repartitioning in-place, using a scheme similar to the
+ * cuckoo cycles in cuckoo hashing. This can trade some performance for a smaller memory footprint.
+ */
+ @SuppressWarnings("unchecked")
+ private void partitionEntriesByKeyGroup() {
+
+ // We only have to perform this step once before the first key-group is written
+ if (null != keyGroupOffsets) {
+ return;
+ }
+
+ final KeyGroupRange keyGroupRange = owningStateTable.keyContext.getKeyGroupRange();
+ final int totalKeyGroups = owningStateTable.keyContext.getNumberOfKeyGroups();
+ final int baseKgIdx = keyGroupRange.getStartKeyGroup();
+ final int[] histogram = new int[keyGroupRange.getNumberOfKeyGroups() + 1];
+
+ CopyOnWriteStateTable.StateTableEntry<K, N, S>[] unfold = new CopyOnWriteStateTable.StateTableEntry[stateTableSize];
+
+ // 1) In this step we i) 'unfold' the linked list of entries to a flat array and ii) build a histogram for key-groups
+ int unfoldIndex = 0;
+ for (CopyOnWriteStateTable.StateTableEntry<K, N, S> entry : snapshotData) {
+ while (null != entry) {
+ int effectiveKgIdx =
+ KeyGroupRangeAssignment.computeKeyGroupForKeyHash(entry.key.hashCode(), totalKeyGroups) - baseKgIdx + 1;
+ ++histogram[effectiveKgIdx];
+ unfold[unfoldIndex++] = entry;
+ entry = entry.next;
+ }
+ }
+
+ // 2) We accumulate the histogram bins to obtain key-group ranges in the final array
+ for (int i = 1; i < histogram.length; ++i) {
+ histogram[i] += histogram[i - 1];
+ }
+
+ // 3) We repartition the entries by key-group, using the histogram values as write indexes
+ for (CopyOnWriteStateTable.StateTableEntry<K, N, S> t : unfold) {
+ int effectiveKgIdx =
+ KeyGroupRangeAssignment.computeKeyGroupForKeyHash(t.key.hashCode(), totalKeyGroups) - baseKgIdx;
+ snapshotData[histogram[effectiveKgIdx]++] = t;
+ }
+
+ // 4) As byproduct, we also created the key-group offsets
+ this.keyGroupOffsets = histogram;
+ }
+
+ @Override
+ public void release() {
+ owningStateTable.releaseSnapshot(this);
+ }
+
+ @Override
+ public void writeMappingsInKeyGroup(DataOutputView dov, int keyGroupId) throws IOException {
+
+ if (null == keyGroupOffsets) {
+ partitionEntriesByKeyGroup();
+ }
+
+ final CopyOnWriteStateTable.StateTableEntry<K, N, S>[] groupedOut = snapshotData;
+ KeyGroupRange keyGroupRange = owningStateTable.keyContext.getKeyGroupRange();
+ int keyGroupOffsetIdx = keyGroupId - keyGroupRange.getStartKeyGroup() - 1;
+ int startOffset = keyGroupOffsetIdx < 0 ? 0 : keyGroupOffsets[keyGroupOffsetIdx];
+ int endOffset = keyGroupOffsets[keyGroupOffsetIdx + 1];
+
+ TypeSerializer<K> keySerializer = owningStateTable.keyContext.getKeySerializer();
+ TypeSerializer<N> namespaceSerializer = owningStateTable.metaInfo.getNamespaceSerializer();
+ TypeSerializer<S> stateSerializer = owningStateTable.metaInfo.getStateSerializer();
+
+ // write number of mappings in key-group
+ dov.writeInt(endOffset - startOffset);
+
+ // write mappings
+ for (int i = startOffset; i < endOffset; ++i) {
+ CopyOnWriteStateTable.StateTableEntry<K, N, S> toWrite = groupedOut[i];
+ groupedOut[i] = null; // free asap for GC
+ namespaceSerializer.serialize(toWrite.namespace, dov);
+ keySerializer.serialize(toWrite.key, dov);
+ stateSerializer.serialize(toWrite.state, dov);
+ }
+ }
+
+ /**
+ * Returns true iff the given state table is the owner of this snapshot object.
+ */
+ boolean isOwner(CopyOnWriteStateTable<K, N, S> stateTable) {
+ return stateTable == owningStateTable;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
index 624b83e..64fc1db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
@@ -23,18 +23,16 @@ import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
+import org.apache.flink.util.Preconditions;
import java.io.IOException;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkState;
/**
* Heap-backed partitioned {@link ReducingState} that is
* snapshotted into files.
- *
+ *
* @param <K> The type of the key.
* @param <N> The type of the namespace.
* @param <IN> The type of the value added to the state.
@@ -45,13 +43,11 @@ public class HeapAggregatingState<K, N, IN, ACC, OUT>
extends AbstractHeapMergingState<K, N, IN, OUT, ACC, AggregatingState<IN, OUT>, AggregatingStateDescriptor<IN, ACC, OUT>>
implements InternalAggregatingState<N, IN, OUT> {
- private final AggregateFunction<IN, ACC, OUT> aggFunction;
+ private final AggregateTransformation<IN, ACC, OUT> aggregateTransformation;
/**
* Creates a new key/value state for the given hash map of key/value pairs.
*
- * @param backend
- * The state backend backing that created this state.
* @param stateDesc
* The state identifier for the state. This contains name and can create a default state value.
* @param stateTable
@@ -60,14 +56,13 @@ public class HeapAggregatingState<K, N, IN, ACC, OUT>
* The serializer for the type that indicates the namespace
*/
public HeapAggregatingState(
- KeyedStateBackend<K> backend,
AggregatingStateDescriptor<IN, ACC, OUT> stateDesc,
StateTable<K, N, ACC> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer) {
- super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer);
- this.aggFunction = stateDesc.getAggregateFunction();
+ super(stateDesc, stateTable, keySerializer, namespaceSerializer);
+ this.aggregateTransformation = new AggregateTransformation<>(stateDesc.getAggregateFunction());
}
// ------------------------------------------------------------------------
@@ -76,64 +71,25 @@ public class HeapAggregatingState<K, N, IN, ACC, OUT>
@Override
public OUT get() {
- final K key = backend.getCurrentKey();
-
- checkState(currentNamespace != null, "No namespace set.");
- checkState(key != null, "No key set.");
-
- Map<N, Map<K, ACC>> namespaceMap =
- stateTable.get(backend.getCurrentKeyGroupIndex());
-
- if (namespaceMap == null) {
- return null;
- }
- Map<K, ACC> keyedMap = namespaceMap.get(currentNamespace);
-
- if (keyedMap == null) {
- return null;
- }
-
- ACC accumulator = keyedMap.get(key);
- return aggFunction.getResult(accumulator);
+ ACC accumulator = stateTable.get(currentNamespace);
+ return accumulator != null ? aggregateTransformation.aggFunction.getResult(accumulator) : null;
}
@Override
public void add(IN value) throws IOException {
- final K key = backend.getCurrentKey();
-
- checkState(currentNamespace != null, "No namespace set.");
- checkState(key != null, "No key set.");
+ final N namespace = currentNamespace;
if (value == null) {
clear();
return;
}
- Map<N, Map<K, ACC>> namespaceMap =
- stateTable.get(backend.getCurrentKeyGroupIndex());
-
- if (namespaceMap == null) {
- namespaceMap = createNewMap();
- stateTable.set(backend.getCurrentKeyGroupIndex(), namespaceMap);
- }
-
- Map<K, ACC> keyedMap = namespaceMap.get(currentNamespace);
-
- if (keyedMap == null) {
- keyedMap = createNewMap();
- namespaceMap.put(currentNamespace, keyedMap);
- }
-
- // if this is the first value for the key, create a new accumulator
- ACC accumulator = keyedMap.get(key);
- if (accumulator == null) {
- accumulator = aggFunction.createAccumulator();
- keyedMap.put(key, accumulator);
+ try {
+ stateTable.transform(namespace, value, aggregateTransformation);
+ } catch (Exception e) {
+ throw new IOException("Exception while applying AggregateFunction in aggregating state", e);
}
-
- //
- aggFunction.add(value, accumulator);
}
// ------------------------------------------------------------------------
@@ -142,6 +98,24 @@ public class HeapAggregatingState<K, N, IN, ACC, OUT>
@Override
protected ACC mergeState(ACC a, ACC b) throws Exception {
- return aggFunction.merge(a, b);
+ return aggregateTransformation.aggFunction.merge(a, b);
+ }
+
+ static final class AggregateTransformation<IN, ACC, OUT> implements StateTransformationFunction<ACC, IN> {
+
+ private final AggregateFunction<IN, ACC, OUT> aggFunction;
+
+ AggregateTransformation(AggregateFunction<IN, ACC, OUT> aggFunction) {
+ this.aggFunction = Preconditions.checkNotNull(aggFunction);
+ }
+
+ @Override
+ public ACC apply(ACC accumulator, IN value) throws Exception {
+ if (accumulator == null) {
+ accumulator = aggFunction.createAccumulator();
+ }
+ aggFunction.add(value, accumulator);
+ return accumulator;
+ }
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
index 6df3f5d..dad6d0d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
@@ -22,12 +22,11 @@ import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.runtime.state.internal.InternalFoldingState;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
-import java.util.Map;
/**
* Heap-backed partitioned {@link FoldingState} that is
@@ -43,24 +42,22 @@ public class HeapFoldingState<K, N, T, ACC>
implements InternalFoldingState<N, T, ACC> {
/** The function used to fold the state */
- private final FoldFunction<T, ACC> foldFunction;
+ private final FoldTransformation<T, ACC> foldTransformation;
/**
* Creates a new key/value state for the given hash map of key/value pairs.
*
- * @param backend The state backend backing that created this state.
* @param stateDesc The state identifier for the state. This contains name
* and can create a default state value.
* @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
*/
public HeapFoldingState(
- KeyedStateBackend<K> backend,
FoldingStateDescriptor<T, ACC> stateDesc,
StateTable<K, N, ACC> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer) {
- super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer);
- this.foldFunction = stateDesc.getFoldFunction();
+ super(stateDesc, stateTable, keySerializer, namespaceSerializer);
+ this.foldTransformation = new FoldTransformation<>(stateDesc);
}
// ------------------------------------------------------------------------
@@ -69,62 +66,37 @@ public class HeapFoldingState<K, N, T, ACC>
@Override
public ACC get() {
- Preconditions.checkState(currentNamespace != null, "No namespace set.");
- Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
-
- Map<N, Map<K, ACC>> namespaceMap =
- stateTable.get(backend.getCurrentKeyGroupIndex());
-
- if (namespaceMap == null) {
- return null;
- }
-
- Map<K, ACC> keyedMap = namespaceMap.get(currentNamespace);
-
- if (keyedMap == null) {
- return null;
- }
-
- return keyedMap.get(backend.<K>getCurrentKey());
+ return stateTable.get(currentNamespace);
}
@Override
public void add(T value) throws IOException {
- Preconditions.checkState(currentNamespace != null, "No namespace set.");
- Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
if (value == null) {
clear();
return;
}
- Map<N, Map<K, ACC>> namespaceMap =
- stateTable.get(backend.getCurrentKeyGroupIndex());
-
- if (namespaceMap == null) {
- namespaceMap = createNewMap();
- stateTable.set(backend.getCurrentKeyGroupIndex(), namespaceMap);
+ try {
+ stateTable.transform(currentNamespace, value, foldTransformation);
+ } catch (Exception e) {
+ throw new IOException("Could not add value to folding state.", e);
}
+ }
- Map<K, ACC> keyedMap = namespaceMap.get(currentNamespace);
-
- if (keyedMap == null) {
- keyedMap = createNewMap();
- namespaceMap.put(currentNamespace, keyedMap);
- }
+ static final class FoldTransformation<T, ACC> implements StateTransformationFunction<ACC, T> {
- ACC currentValue = keyedMap.get(backend.<K>getCurrentKey());
+ private final FoldingStateDescriptor<T, ACC> stateDescriptor;
+ private final FoldFunction<T, ACC> foldFunction;
- try {
+ FoldTransformation(FoldingStateDescriptor<T, ACC> stateDesc) {
+ this.stateDescriptor = Preconditions.checkNotNull(stateDesc);
+ this.foldFunction = Preconditions.checkNotNull(stateDesc.getFoldFunction());
+ }
- if (currentValue == null) {
- keyedMap.put(backend.<K>getCurrentKey(),
- foldFunction.fold(stateDesc.getDefaultValue(), value));
- } else {
- keyedMap.put(backend.<K>getCurrentKey(), foldFunction.fold(currentValue, value));
- }
- } catch (Exception e) {
- throw new RuntimeException("Could not add value to folding state.", e);
+ @Override
+ public ACC apply(ACC previousState, T value) throws Exception {
+ return foldFunction.fold((previousState != null) ? previousState : stateDescriptor.getDefaultValue(), value);
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index a4a08c1..0335933 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state.heap;
+import org.apache.commons.collections.map.HashedMap;
import org.apache.commons.io.IOUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
@@ -29,18 +30,15 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.migration.MigrationUtil;
import org.apache.flink.migration.runtime.state.KvStateSnapshot;
-import org.apache.flink.migration.runtime.state.filesystem.AbstractFsStateSnapshot;
-import org.apache.flink.migration.runtime.state.memory.AbstractMemStateSnapshot;
+import org.apache.flink.migration.runtime.state.memory.MigrationRestoreSnapshot;
+import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable;
+import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
@@ -54,8 +52,6 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalFoldingState;
import org.apache.flink.runtime.state.internal.InternalListState;
@@ -74,6 +70,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* A {@link AbstractKeyedStateBackend} that keeps state on the Java Heap and will serialize state to
@@ -94,7 +91,13 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* but we can't put them here because different key/value states with different types and
* namespace types share this central list of tables.
*/
- private final Map<String, StateTable<K, ?, ?>> stateTables = new HashMap<>();
+ private final HashMap<String, StateTable<K, ?, ?>> stateTables = new HashMap<>();
+
+ /**
+ * Determines whether or not we run snapshots asynchronously. This impacts the choice of the underlying
+ * {@link StateTable} implementation.
+ */
+ private final boolean asynchronousSnapshots;
public HeapKeyedStateBackend(
TaskKvStateRegistry kvStateRegistry,
@@ -102,10 +105,11 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
ClassLoader userCodeClassLoader,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
+ boolean asynchronousSnapshots,
ExecutionConfig executionConfig) {
super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig);
-
+ this.asynchronousSnapshots = asynchronousSnapshots;
LOG.info("Initializing heap keyed state backend with stream factory.");
}
@@ -124,7 +128,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private <N, V> StateTable<K, N, V> tryRegisterStateTable(
String stateName,
StateDescriptor.Type stateType,
- TypeSerializer<N> namespaceSerializer,
+ TypeSerializer<N> namespaceSerializer,
TypeSerializer<V> valueSerializer) {
final RegisteredBackendStateMetaInfo<N, V> newMetaInfo =
@@ -134,7 +138,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
StateTable<K, N, V> stateTable = (StateTable<K, N, V>) stateTables.get(stateName);
if (stateTable == null) {
- stateTable = new StateTable<>(newMetaInfo, keyGroupRange);
+ stateTable = newStateTable(newMetaInfo);
stateTables.put(stateName, stateTable);
} else {
if (!newMetaInfo.isCompatibleWith(stateTable.getMetaInfo())) {
@@ -152,7 +156,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
ValueStateDescriptor<V> stateDesc) throws Exception {
StateTable<K, N, V> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
- return new HeapValueState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
+ return new HeapValueState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
}
@Override
@@ -170,7 +174,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
namespaceSerializer,
new ArrayListSerializer<T>(stateDesc.getElementSerializer()));
- return new HeapListState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
+ return new HeapListState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
}
@Override
@@ -179,7 +183,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
ReducingStateDescriptor<T> stateDesc) throws Exception {
StateTable<K, N, T> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
- return new HeapReducingState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
+ return new HeapReducingState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
}
@Override
@@ -188,7 +192,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
StateTable<K, N, ACC> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
- return new HeapAggregatingState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
+ return new HeapAggregatingState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
}
@Override
@@ -197,83 +201,151 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
StateTable<K, N, ACC> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
- return new HeapFoldingState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
+ return new HeapFoldingState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
}
@Override
public <N, UK, UV> InternalMapState<N, UK, UV> createMapState(TypeSerializer<N> namespaceSerializer,
MapStateDescriptor<UK, UV> stateDesc) throws Exception {
-
+
StateTable<K, N, HashMap<UK, UV>> stateTable = tryRegisterStateTable(
stateDesc.getName(),
stateDesc.getType(),
namespaceSerializer,
new HashMapSerializer<>(stateDesc.getKeySerializer(), stateDesc.getValueSerializer()));
-
- return new HeapMapState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
+
+ return new HeapMapState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
}
@Override
@SuppressWarnings("unchecked")
- public RunnableFuture<KeyGroupsStateHandle> snapshot(
- long checkpointId,
- long timestamp,
- CheckpointStreamFactory streamFactory,
+ public RunnableFuture<KeyGroupsStateHandle> snapshot(
+ final long checkpointId,
+ final long timestamp,
+ final CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions) throws Exception {
if (stateTables.isEmpty()) {
return new DoneFuture<>(null);
}
- try (CheckpointStreamFactory.CheckpointStateOutputStream stream = streamFactory.
- createCheckpointStateOutputStream(checkpointId, timestamp)) {
-
- DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(stream);
+ long syncStartTime = System.currentTimeMillis();
- Preconditions.checkState(stateTables.size() <= Short.MAX_VALUE,
- "Too many KV-States: " + stateTables.size() +
- ". Currently at most " + Short.MAX_VALUE + " states are supported");
+ Preconditions.checkState(stateTables.size() <= Short.MAX_VALUE,
+ "Too many KV-States: " + stateTables.size() +
+ ". Currently at most " + Short.MAX_VALUE + " states are supported");
- List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> metaInfoProxyList = new ArrayList<>(stateTables.size());
+ List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> metaInfoProxyList = new ArrayList<>(stateTables.size());
- Map<String, Integer> kVStateToId = new HashMap<>(stateTables.size());
+ final Map<String, Integer> kVStateToId = new HashMap<>(stateTables.size());
- for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
+ final Map<StateTable<K, ?, ?>, StateTableSnapshot> cowStateStableSnapshots = new HashedMap(stateTables.size());
- RegisteredBackendStateMetaInfo<?, ?> metaInfo = kvState.getValue().getMetaInfo();
- KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy = new KeyedBackendSerializationProxy.StateMetaInfo(
- metaInfo.getStateType(),
- metaInfo.getName(),
- metaInfo.getNamespaceSerializer(),
- metaInfo.getStateSerializer());
+ for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
+ RegisteredBackendStateMetaInfo<?, ?> metaInfo = kvState.getValue().getMetaInfo();
+ KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy = new KeyedBackendSerializationProxy.StateMetaInfo(
+ metaInfo.getStateType(),
+ metaInfo.getName(),
+ metaInfo.getNamespaceSerializer(),
+ metaInfo.getStateSerializer());
- metaInfoProxyList.add(metaInfoProxy);
- kVStateToId.put(kvState.getKey(), kVStateToId.size());
+ metaInfoProxyList.add(metaInfoProxy);
+ kVStateToId.put(kvState.getKey(), kVStateToId.size());
+ StateTable<K, ?, ?> stateTable = kvState.getValue();
+ if (null != stateTable) {
+ cowStateStableSnapshots.put(stateTable, stateTable.createSnapshot());
}
+ }
- KeyedBackendSerializationProxy serializationProxy =
- new KeyedBackendSerializationProxy(keySerializer, metaInfoProxyList);
+ final KeyedBackendSerializationProxy serializationProxy =
+ new KeyedBackendSerializationProxy(keySerializer, metaInfoProxyList);
+
+ //--------------------------------------------------- this becomes the end of sync part
+
+ // implementation of the async IO operation, based on FutureTask
+ final AbstractAsyncIOCallable<KeyGroupsStateHandle, CheckpointStreamFactory.CheckpointStateOutputStream> ioCallable =
+ new AbstractAsyncIOCallable<KeyGroupsStateHandle, CheckpointStreamFactory.CheckpointStateOutputStream>() {
+
+ AtomicBoolean open = new AtomicBoolean(false);
+
+ @Override
+ public CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws Exception {
+ if (open.compareAndSet(false, true)) {
+ CheckpointStreamFactory.CheckpointStateOutputStream stream =
+ streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
+ try {
+ cancelStreamRegistry.registerClosable(stream);
+ return stream;
+ } catch (Exception ex) {
+ open.set(false);
+ throw ex;
+ }
+ } else {
+ throw new IOException("Operation already opened.");
+ }
+ }
- serializationProxy.write(outView);
+ @Override
+ public KeyGroupsStateHandle performOperation() throws Exception {
+ long asyncStartTime = System.currentTimeMillis();
+ CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle();
+ DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(stream);
+ serializationProxy.write(outView);
- int offsetCounter = 0;
- long[] keyGroupRangeOffsets = new long[keyGroupRange.getNumberOfKeyGroups()];
+ long[] keyGroupRangeOffsets = new long[keyGroupRange.getNumberOfKeyGroups()];
- for (int keyGroupIndex = keyGroupRange.getStartKeyGroup(); keyGroupIndex <= keyGroupRange.getEndKeyGroup(); keyGroupIndex++) {
- keyGroupRangeOffsets[offsetCounter++] = stream.getPos();
- outView.writeInt(keyGroupIndex);
- for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
- outView.writeShort(kVStateToId.get(kvState.getKey()));
- writeStateTableForKeyGroup(outView, kvState.getValue(), keyGroupIndex);
- }
- }
+ for (int keyGroupPos = 0; keyGroupPos < keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) {
+ int keyGroupId = keyGroupRange.getKeyGroupId(keyGroupPos);
+ keyGroupRangeOffsets[keyGroupPos] = stream.getPos();
+ outView.writeInt(keyGroupId);
+
+ for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
+ outView.writeShort(kVStateToId.get(kvState.getKey()));
+ cowStateStableSnapshots.get(kvState.getValue()).writeMappingsInKeyGroup(outView, keyGroupId);
+ }
+ }
+
+ if (open.compareAndSet(true, false)) {
+ StreamStateHandle streamStateHandle = stream.closeAndGetHandle();
+ KeyGroupRangeOffsets offsets = new KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets);
+ final KeyGroupsStateHandle keyGroupsStateHandle = new KeyGroupsStateHandle(offsets, streamStateHandle);
+
+ if (asynchronousSnapshots) {
+ LOG.info("Heap backend snapshot ({}, asynchronous part) in thread {} took {} ms.",
+ streamFactory, Thread.currentThread(), (System.currentTimeMillis() - asyncStartTime));
+ }
+
+ return keyGroupsStateHandle;
+ } else {
+ throw new IOException("Checkpoint stream already closed.");
+ }
+ }
+
+ @Override
+ public void done(boolean canceled) {
+ if (open.compareAndSet(true, false)) {
+ CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle();
+ if (null != stream) {
+ cancelStreamRegistry.unregisterClosable(stream);
+ IOUtils.closeQuietly(stream);
+ }
+ }
+ for (StateTableSnapshot snapshot : cowStateStableSnapshots.values()) {
+ snapshot.release();
+ }
+ }
+ };
- StreamStateHandle streamStateHandle = stream.closeAndGetHandle();
+ AsyncStoppableTaskWithCallback<KeyGroupsStateHandle> task = AsyncStoppableTaskWithCallback.from(ioCallable);
- KeyGroupRangeOffsets offsets = new KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets);
- final KeyGroupsStateHandle keyGroupsStateHandle = new KeyGroupsStateHandle(offsets, streamStateHandle);
- return new DoneFuture<>(keyGroupsStateHandle);
+ if (!asynchronousSnapshots) {
+ task.run();
}
+
+ LOG.info("Heap backend snapshot (" + streamFactory + ", synchronous part) in thread " +
+ Thread.currentThread() + " took " + (System.currentTimeMillis() - syncStartTime) + " ms.");
+
+ return task;
}
@SuppressWarnings("deprecation")
@@ -292,42 +364,12 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
}
- private <N, S> void writeStateTableForKeyGroup(
- DataOutputView outView,
- StateTable<K, N, S> stateTable,
- int keyGroupIndex) throws IOException {
-
- TypeSerializer<N> namespaceSerializer = stateTable.getNamespaceSerializer();
- TypeSerializer<S> stateSerializer = stateTable.getStateSerializer();
-
- Map<N, Map<K, S>> namespaceMap = stateTable.get(keyGroupIndex);
- if (namespaceMap == null) {
- outView.writeByte(0);
- } else {
- outView.writeByte(1);
-
- // number of namespaces
- outView.writeInt(namespaceMap.size());
- for (Map.Entry<N, Map<K, S>> namespace : namespaceMap.entrySet()) {
- namespaceSerializer.serialize(namespace.getKey(), outView);
-
- Map<K, S> entryMap = namespace.getValue();
-
- // number of entries
- outView.writeInt(entryMap.size());
- for (Map.Entry<K, S> entry : entryMap.entrySet()) {
- keySerializer.serialize(entry.getKey(), outView);
- stateSerializer.serialize(entry.getValue(), outView);
- }
- }
- }
- }
-
@SuppressWarnings({"unchecked"})
private void restorePartitionedState(Collection<KeyGroupsStateHandle> state) throws Exception {
+ final Map<Integer, String> kvStatesById = new HashMap<>();
int numRegisteredKvStates = 0;
- Map<Integer, String> kvStatesById = new HashMap<>();
+ stateTables.clear();
for (KeyGroupsStateHandle keyGroupsHandle : state) {
@@ -359,7 +401,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
RegisteredBackendStateMetaInfo<?, ?> registeredBackendStateMetaInfo =
new RegisteredBackendStateMetaInfo<>(metaInfoSerializationProxy);
- stateTable = new StateTable<>(registeredBackendStateMetaInfo, keyGroupRange);
+ stateTable = newStateTable(registeredBackendStateMetaInfo);
stateTables.put(metaInfoSerializationProxy.getStateName(), stateTable);
kvStatesById.put(numRegisteredKvStates, metaInfoSerializationProxy.getStateName());
++numRegisteredKvStates;
@@ -372,20 +414,20 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
fsDataInputStream.seek(offset);
int writtenKeyGroupIndex = inView.readInt();
- assert writtenKeyGroupIndex == keyGroupIndex;
+
+ Preconditions.checkState(writtenKeyGroupIndex == keyGroupIndex,
+ "Unexpected key-group in restore.");
for (int i = 0; i < metaInfoList.size(); i++) {
int kvStateId = inView.readShort();
-
- byte isPresent = inView.readByte();
- if (isPresent == 0) {
- continue;
- }
-
StateTable<K, ?, ?> stateTable = stateTables.get(kvStatesById.get(kvStateId));
- Preconditions.checkNotNull(stateTable);
- readStateTableForKeyGroup(inView, stateTable, keyGroupIndex);
+ StateTableByKeyGroupReader keyGroupReader =
+ StateTableByKeyGroupReaders.readerForVersion(
+ stateTable,
+ serializationProxy.getRestoredVersion());
+
+ keyGroupReader.readMappingsInKeyGroup(inView, keyGroupIndex);
}
}
} finally {
@@ -395,38 +437,12 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
}
- private <N, S> void readStateTableForKeyGroup(
- DataInputView inView,
- StateTable<K, N, S> stateTable,
- int keyGroupIndex) throws IOException {
-
- TypeSerializer<N> namespaceSerializer = stateTable.getNamespaceSerializer();
- TypeSerializer<S> stateSerializer = stateTable.getStateSerializer();
-
- Map<N, Map<K, S>> namespaceMap = new HashMap<>();
- stateTable.set(keyGroupIndex, namespaceMap);
-
- int numNamespaces = inView.readInt();
- for (int k = 0; k < numNamespaces; k++) {
- N namespace = namespaceSerializer.deserialize(inView);
- Map<K, S> entryMap = new HashMap<>();
- namespaceMap.put(namespace, entryMap);
-
- int numEntries = inView.readInt();
- for (int l = 0; l < numEntries; l++) {
- K key = keySerializer.deserialize(inView);
- S state = stateSerializer.deserialize(inView);
- entryMap.put(key, state);
- }
- }
- }
-
@Override
public String toString() {
return "HeapKeyedStateBackend";
}
- @SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
+ @SuppressWarnings({"unchecked", "rawtypes", "DeprecatedIsStillUsed"})
@Deprecated
private void restoreOldSavepointKeyedState(
Collection<KeyGroupsStateHandle> stateHandles) throws IOException, ClassNotFoundException {
@@ -444,118 +460,18 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
for (Map.Entry<String, KvStateSnapshot<K, ?, ?, ?>> nameToState : namedStates.entrySet()) {
- KvStateSnapshot<K, ?, ?, ?> genericSnapshot = nameToState.getValue();
-
- final RestoredState restoredState;
-
- if (genericSnapshot instanceof AbstractMemStateSnapshot) {
-
- AbstractMemStateSnapshot<K, ?, ?, ?, ?> stateSnapshot =
- (AbstractMemStateSnapshot<K, ?, ?, ?, ?>) nameToState.getValue();
+ final String stateName = nameToState.getKey();
+ final KvStateSnapshot<K, ?, ?, ?> genericSnapshot = nameToState.getValue();
- restoredState = restoreHeapState(stateSnapshot);
-
- } else if (genericSnapshot instanceof AbstractFsStateSnapshot) {
-
- AbstractFsStateSnapshot<K, ?, ?, ?, ?> stateSnapshot =
- (AbstractFsStateSnapshot<K, ?, ?, ?, ?>) nameToState.getValue();
- restoredState = restoreFsState(stateSnapshot);
+ if (genericSnapshot instanceof MigrationRestoreSnapshot) {
+ MigrationRestoreSnapshot<K, ?, ?> stateSnapshot = (MigrationRestoreSnapshot<K, ?, ?>) genericSnapshot;
+ final StateTable rawResultMap =
+ stateSnapshot.deserialize(stateName, this);
+ // add named state to the backend
+ stateTables.put(stateName, rawResultMap);
} else {
throw new IllegalStateException("Unknown state: " + genericSnapshot);
}
-
- Map rawResultMap = restoredState.getRawResultMap();
- TypeSerializer<?> namespaceSerializer = restoredState.getNamespaceSerializer();
- TypeSerializer<?> stateSerializer = restoredState.getStateSerializer();
-
- if (namespaceSerializer instanceof VoidSerializer) {
- namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
- }
-
- Map nullNameSpaceFix = (Map) rawResultMap.remove(null);
-
- if (null != nullNameSpaceFix) {
- rawResultMap.put(VoidNamespace.INSTANCE, nullNameSpaceFix);
- }
-
- RegisteredBackendStateMetaInfo<?, ?> registeredBackendStateMetaInfo =
- new RegisteredBackendStateMetaInfo<>(
- StateDescriptor.Type.UNKNOWN,
- nameToState.getKey(),
- namespaceSerializer,
- stateSerializer);
-
- StateTable<K, ?, ?> stateTable = new StateTable<>(registeredBackendStateMetaInfo, keyGroupRange);
- stateTable.getState()[0] = rawResultMap;
-
- // add named state to the backend
- stateTables.put(registeredBackendStateMetaInfo.getName(), stateTable);
- }
- }
-
- @SuppressWarnings("deprecation")
- private RestoredState restoreHeapState(AbstractMemStateSnapshot<K, ?, ?, ?, ?> stateSnapshot) throws IOException {
- return new RestoredState(
- stateSnapshot.deserialize(),
- stateSnapshot.getNamespaceSerializer(),
- stateSnapshot.getStateSerializer());
- }
-
- @SuppressWarnings({"rawtypes", "unchecked", "deprecation"})
- private RestoredState restoreFsState(AbstractFsStateSnapshot<K, ?, ?, ?, ?> stateSnapshot) throws IOException {
- FileSystem fs = stateSnapshot.getFilePath().getFileSystem();
- //TODO register closeable to support fast cancelation?
- try (FSDataInputStream inStream = fs.open(stateSnapshot.getFilePath())) {
-
- DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(inStream);
-
- final int numNamespaces = inView.readInt();
- HashMap rawResultMap = new HashMap<>(numNamespaces);
-
- TypeSerializer<K> keySerializer = stateSnapshot.getKeySerializer();
- TypeSerializer<?> namespaceSerializer = stateSnapshot.getNamespaceSerializer();
- TypeSerializer<?> stateSerializer = stateSnapshot.getStateSerializer();
-
- for (int i = 0; i < numNamespaces; i++) {
- Object namespace = namespaceSerializer.deserialize(inView);
- final int numKV = inView.readInt();
- Map<K, Object> namespaceMap = new HashMap<>(numKV);
- rawResultMap.put(namespace, namespaceMap);
- for (int j = 0; j < numKV; j++) {
- K key = keySerializer.deserialize(inView);
- Object value = stateSerializer.deserialize(inView);
- namespaceMap.put(key, value);
- }
- }
- return new RestoredState(rawResultMap, namespaceSerializer, stateSerializer);
- } catch (Exception e) {
- throw new IOException("Failed to restore state from file system", e);
- }
- }
-
- @SuppressWarnings("rawtypes")
- static final class RestoredState {
-
- private final Map rawResultMap;
- private final TypeSerializer<?> namespaceSerializer;
- private final TypeSerializer<?> stateSerializer ;
-
- public RestoredState(Map rawResultMap, TypeSerializer<?> namespaceSerializer, TypeSerializer<?> stateSerializer) {
- this.rawResultMap = rawResultMap;
- this.namespaceSerializer = namespaceSerializer;
- this.stateSerializer = stateSerializer;
- }
-
- public Map getRawResultMap() {
- return rawResultMap;
- }
-
- public TypeSerializer<?> getNamespaceSerializer() {
- return namespaceSerializer;
- }
-
- public TypeSerializer<?> getStateSerializer() {
- return stateSerializer;
}
}
@@ -567,15 +483,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
public int numStateEntries() {
int sum = 0;
for (StateTable<K, ?, ?> stateTable : stateTables.values()) {
- for (Map namespaceMap : stateTable.getState()) {
- if (namespaceMap == null) {
- continue;
- }
- Map<?, Map> typedMap = (Map<?, Map>) namespaceMap;
- for (Map entriesMap : typedMap.values()) {
- sum += entriesMap.size();
- }
- }
+ sum += stateTable.size();
}
return sum;
}
@@ -584,22 +492,22 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* Returns the total number of state entries across all keys for the given namespace.
*/
@VisibleForTesting
- @SuppressWarnings("unchecked")
- public <N> int numStateEntries(N namespace) {
+ public int numStateEntries(Object namespace) {
int sum = 0;
for (StateTable<K, ?, ?> stateTable : stateTables.values()) {
- for (Map namespaceMap : stateTable.getState()) {
- if (namespaceMap == null) {
- continue;
- }
- Map<?, Map> typedMap = (Map<?, Map>) namespaceMap;
- Map singleNamespace = typedMap.get(namespace);
- if (singleNamespace != null) {
- sum += singleNamespace.size();
- }
- }
+ sum += stateTable.sizeOfNamespace(namespace);
}
return sum;
}
-}
+ public <N, V> StateTable<K, N, V> newStateTable(RegisteredBackendStateMetaInfo<N, V> newMetaInfo) {
+ return asynchronousSnapshots ?
+ new CopyOnWriteStateTable<>(this, newMetaInfo) :
+ new NestedMapsStateTable<>(this, newMetaInfo);
+ }
+
+ @Override
+ public boolean supportsAsynchronousSnapshots() {
+ return asynchronousSnapshots;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
index 02c3067..d3f67f0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
@@ -22,14 +22,11 @@ import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.util.Preconditions;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
-import java.util.Map;
/**
* Heap-backed partitioned {@link org.apache.flink.api.common.state.ListState} that is snapshotted
@@ -46,18 +43,16 @@ public class HeapListState<K, N, V>
/**
* Creates a new key/value state for the given hash map of key/value pairs.
*
- * @param backend The state backend backing that created this state.
* @param stateDesc The state identifier for the state. This contains name
* and can create a default state value.
* @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
*/
public HeapListState(
- KeyedStateBackend<K> backend,
ListStateDescriptor<V> stateDesc,
StateTable<K, N, ArrayList<V>> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer) {
- super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer);
+ super(stateDesc, stateTable, keySerializer, namespaceSerializer);
}
// ------------------------------------------------------------------------
@@ -66,55 +61,24 @@ public class HeapListState<K, N, V>
@Override
public Iterable<V> get() {
- Preconditions.checkState(currentNamespace != null, "No namespace set.");
- Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
-
- Map<N, Map<K, ArrayList<V>>> namespaceMap =
- stateTable.get(backend.getCurrentKeyGroupIndex());
-
- if (namespaceMap == null) {
- return null;
- }
-
- Map<K, ArrayList<V>> keyedMap = namespaceMap.get(currentNamespace);
-
- if (keyedMap == null) {
- return null;
- }
-
- return keyedMap.get(backend.<K>getCurrentKey());
+ return stateTable.get(currentNamespace);
}
@Override
public void add(V value) {
- Preconditions.checkState(currentNamespace != null, "No namespace set.");
- Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
+ final N namespace = currentNamespace;
if (value == null) {
clear();
return;
}
- Map<N, Map<K, ArrayList<V>>> namespaceMap =
- stateTable.get(backend.getCurrentKeyGroupIndex());
-
- if (namespaceMap == null) {
- namespaceMap = createNewMap();
- stateTable.set(backend.getCurrentKeyGroupIndex(), namespaceMap);
- }
-
- Map<K, ArrayList<V>> keyedMap = namespaceMap.get(currentNamespace);
-
- if (keyedMap == null) {
- keyedMap = createNewMap();
- namespaceMap.put(currentNamespace, keyedMap);
- }
-
- ArrayList<V> list = keyedMap.get(backend.<K>getCurrentKey());
+ final StateTable<K, N, ArrayList<V>> map = stateTable;
+ ArrayList<V> list = map.get(namespace);
if (list == null) {
list = new ArrayList<>();
- keyedMap.put(backend.<K>getCurrentKey(), list);
+ map.put(namespace, list);
}
list.add(value);
}
@@ -124,20 +88,7 @@ public class HeapListState<K, N, V>
Preconditions.checkState(namespace != null, "No namespace given.");
Preconditions.checkState(key != null, "No key given.");
- Map<N, Map<K, ArrayList<V>>> namespaceMap =
- stateTable.get(KeyGroupRangeAssignment.assignToKeyGroup(key, backend.getNumberOfKeyGroups()));
-
- if (namespaceMap == null) {
- return null;
- }
-
- Map<K, ArrayList<V>> keyedMap = namespaceMap.get(currentNamespace);
-
- if (keyedMap == null) {
- return null;
- }
-
- ArrayList<V> result = keyedMap.get(key);
+ ArrayList<V> result = stateTable.get(key, namespace);
if (result == null) {
return null;