You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/01/10 15:35:59 UTC
flink git commit: [FLINK-7475] [state] Introduce ListState#update()
Repository: flink
Updated Branches:
refs/heads/master b7f3497d3 -> 438e4e374
[FLINK-7475] [state] Introduce ListState#update()
This closes #4963.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/438e4e37
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/438e4e37
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/438e4e37
Branch: refs/heads/master
Commit: 438e4e37425688e2689fcb35488f819d729903cc
Parents: b7f3497
Author: Bowen Li <bo...@gmail.com>
Authored: Fri Oct 27 16:21:20 2017 +0800
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Wed Jan 10 16:35:28 2018 +0100
----------------------------------------------------------------------
docs/dev/stream/state/state.md | 6 +-
.../kafka/FlinkKafkaConsumerBaseTest.java | 9 ++
.../kinesis/FlinkKinesisConsumerTest.java | 18 ++-
.../streaming/state/RocksDBListState.java | 31 ++++-
.../streaming/state/util/MergeUtils.java | 63 +++++++++
.../RocksDBListStatePerformanceTest.java | 133 +++++++++++++++++++
.../streaming/state/util/MergeUtilsTest.java | 57 ++++++++
.../flink/api/common/state/ListState.java | 16 ++-
.../client/state/ImmutableListState.java | 5 +
.../state/DefaultOperatorStateBackend.java | 9 ++
.../runtime/state/UserFacingListState.java | 6 +
.../flink/runtime/state/heap/HeapListState.java | 13 ++
.../state/internal/InternalListState.java | 14 +-
.../runtime/state/StateBackendTestBase.java | 80 ++++++-----
14 files changed, 417 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/438e4e37/docs/dev/stream/state/state.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/state/state.md b/docs/dev/stream/state/state.md
index 9956998..2002138 100644
--- a/docs/dev/stream/state/state.md
+++ b/docs/dev/stream/state/state.md
@@ -95,10 +95,10 @@ for each key that the operation sees). The value can be set using `update(T)` an
* `ListState<T>`: This keeps a list of elements. You can append elements and retrieve an `Iterable`
over all currently stored elements. Elements are added using `add(T)`, the Iterable can
-be retrieved using `Iterable<T> get()`.
+be retrieved using `Iterable<T> get()`. You can also override the existing list with `update(List<T>)`
* `ReducingState<T>`: This keeps a single value that represents the aggregation of all values
-added to the state. The interface is the same as for `ListState` but elements added using
+added to the state. The interface is similar to `ListState` but elements added using
`add(T)` are reduced to an aggregate using a specified `ReduceFunction`.
* `AggregatingState<IN, OUT>`: This keeps a single value that represents the aggregation of all values
@@ -108,7 +108,7 @@ added using `add(IN)` are aggregated using a specified `AggregateFunction`.
* `FoldingState<T, ACC>`: This keeps a single value that represents the aggregation of all values
added to the state. Contrary to `ReducingState`, the aggregate type may be different from the type
-of elements that are added to the state. The interface is the same as for `ListState` but elements
+of elements that are added to the state. The interface is similar to `ListState` but elements
added using `add(T)` are folded into an aggregate using a specified `FoldFunction`.
* `MapState<UK, UV>`: This keeps a list of mappings. You can put key-value pairs into the state and
http://git-wip-us.apache.org/repos/asf/flink/blob/438e4e37/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 6ccfeb1..180b12a 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -751,5 +751,14 @@ public class FlinkKafkaConsumerBaseTest {
public boolean isClearCalled() {
return clearCalled;
}
+
+ @Override
+ public void update(List<T> values) throws Exception {
+ clear();
+
+ if (values != null && !values.isEmpty()) {
+ list.addAll(values);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/438e4e37/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index ea63476..3b0cb5c 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -48,7 +48,6 @@ import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
import com.amazonaws.services.kinesis.model.HashKeyRange;
import com.amazonaws.services.kinesis.model.SequenceNumberRange;
import com.amazonaws.services.kinesis.model.Shard;
-import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -138,12 +137,12 @@ public class FlinkKinesisConsumerTest {
// arbitrary checkpoint id and timestamp
consumer.snapshotState(new StateSnapshotContextSynchronousImpl(123, 123));
- Assert.assertTrue(listState.isClearCalled());
+ assertTrue(listState.isClearCalled());
// the checkpointed list state should contain only the shards that it should subscribe to
- Assert.assertEquals(globalUnionState.size() / 2, listState.getList().size());
- Assert.assertTrue(listState.getList().contains(globalUnionState.get(0)));
- Assert.assertTrue(listState.getList().contains(globalUnionState.get(2)));
+ assertEquals(globalUnionState.size() / 2, listState.getList().size());
+ assertTrue(listState.getList().contains(globalUnionState.get(0)));
+ assertTrue(listState.getList().contains(globalUnionState.get(2)));
}
@Test
@@ -544,6 +543,15 @@ public class FlinkKinesisConsumerTest {
public boolean isClearCalled() {
return clearCalled;
}
+
+ @Override
+ public void update(List<T> values) throws Exception {
+ list.clear();
+
+ if (values != null || !values.isEmpty()) {
+ list.addAll(values);
+ }
+ }
}
private HashMap<StreamShardHandle, SequenceNumber> getFakeRestoredStore(String streamName) {
http://git-wip-us.apache.org/repos/asf/flink/blob/438e4e37/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index f8ed244..436fec2 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -21,6 +21,7 @@ package org.apache.flink.contrib.streaming.state;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.contrib.streaming.state.util.MergeUtils;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.internal.InternalListState;
@@ -114,7 +115,6 @@ public class RocksDBListState<K, N, V>
DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
valueSerializer.serialize(value, out);
backend.db.merge(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
-
} catch (Exception e) {
throw new RuntimeException("Error while adding data to RocksDB", e);
}
@@ -158,4 +158,33 @@ public class RocksDBListState<K, N, V>
throw new Exception("Error while merging state in RocksDB", e);
}
}
+
+ @Override
+ public void update(List<V> values) throws Exception {
+ clear();
+
+ if (values != null && !values.isEmpty()) {
+ try {
+ writeCurrentKeyWithGroupAndNamespace();
+ byte[] key = keySerializationStream.toByteArray();
+ DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
+
+ List<byte[]> bytes = new ArrayList<>(values.size());
+ for (V value : values) {
+ keySerializationStream.reset();
+ valueSerializer.serialize(value, out);
+ bytes.add(keySerializationStream.toByteArray());
+ }
+
+ byte[] premerge = MergeUtils.merge(bytes);
+ if (premerge != null) {
+ backend.db.put(columnFamily, writeOptions, key, premerge);
+ } else {
+ throw new IOException("Failed pre-merge values");
+ }
+ } catch (IOException | RocksDBException e) {
+ throw new RuntimeException("Error while updating data to RocksDB", e);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/438e4e37/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/util/MergeUtils.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/util/MergeUtils.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/util/MergeUtils.java
new file mode 100644
index 0000000..6cf2781
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/util/MergeUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.util;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import java.util.List;
+
+/**
+ * Utils to simulate StringAppendTestOperator's merge operations in RocksDB.
+ */
+public class MergeUtils {
+ @VisibleForTesting
+ protected static final byte DELIMITER = ',';
+
+ /**
+ * Merge operands into a single value that can be put directly into RocksDB.
+ */
+ public static byte[] merge(List<byte[]> operands) {
+ if (operands == null || operands.size() == 0) {
+ return null;
+ }
+
+ if (operands.size() == 1) {
+ return operands.get(0);
+ }
+
+ int numBytes = 0;
+ for (byte[] arr : operands) {
+ numBytes += arr.length + 1;
+ }
+ numBytes--;
+
+ byte[] result = new byte[numBytes];
+
+ System.arraycopy(operands.get(0), 0, result, 0, operands.get(0).length);
+
+ for (int i = 1, arrIndex = operands.get(0).length; i < operands.size(); i++) {
+ result[arrIndex] = DELIMITER;
+ arrIndex += 1;
+ System.arraycopy(operands.get(i), 0, result, arrIndex, operands.get(i).length);
+ arrIndex += operands.get(i).length;
+ }
+
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/438e4e37/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBListStatePerformanceTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBListStatePerformanceTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBListStatePerformanceTest.java
new file mode 100644
index 0000000..88c74fd
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBListStatePerformanceTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.contrib.streaming.state.util.MergeUtils;
+import org.apache.flink.testutils.junit.RetryOnFailure;
+import org.apache.flink.testutils.junit.RetryRule;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.NativeLibraryLoader;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test that validates that the performance of APIs of RocksDB's ListState is as expected.
+ *
+ * <p>Benchmarking:
+ *
+ * <p>Computer: MacbookPro (Mid 2015), Flash Storage, Processor 2.5GHz Intel Core i7, Memory 16GB 1600MHz DDR3
+ * Number of values added | time for add() | time for update() | perf improvement of update() over add()
+ * 500 978703 ns 55397 ns 17.66x
+ * 1000 3044179 ns 89474 ns 34.02x
+ * 5000 9247395 ns 305580 ns 30.26x
+ * 10000 16416442 ns 605963 ns 27.09x
+ * 50000 84311205 ns 5691288 ns 14.81x
+ * 100000 195103310 ns 12914182 ns 15.11x
+ * 500000 1223141510 ns 70595881 ns 17.33x
+ *
+ * <p>In summary, update() API which pre-merges all values gives users 15-35x performance improvements.
+ * For most frequent use cases where there are a few hundreds to a few thousands values per key,
+ * users can get 30x - 35x performance improvement!
+ *
+ */
+public class RocksDBListStatePerformanceTest extends TestLogger {
+
+ @Rule
+ public final TemporaryFolder tmp = new TemporaryFolder();
+
+ @Rule
+ public final RetryRule retry = new RetryRule();
+
+ @Test(timeout = 2000)
+ @RetryOnFailure(times = 3)
+ public void testRocksDbListStateAPIs() throws Exception {
+ final File rocksDir = tmp.newFolder();
+
+ // ensure the RocksDB library is loaded to a distinct location each retry
+ NativeLibraryLoader.getInstance().loadLibrary(rocksDir.getAbsolutePath());
+
+ final String key1 = "key1";
+ final String key2 = "key2";
+ final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
+
+ final byte[] keyBytes1 = key1.getBytes(StandardCharsets.UTF_8);
+ final byte[] keyBytes2 = key2.getBytes(StandardCharsets.UTF_8);
+ final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
+
+ // The number of values added to ListState. Can be changed for benchmarking
+ final int num = 1000;
+
+ try (
+ final Options options = new Options()
+ .setCompactionStyle(CompactionStyle.LEVEL)
+ .setLevelCompactionDynamicLevelBytes(true)
+ .setIncreaseParallelism(4)
+ .setUseFsync(false)
+ .setMaxOpenFiles(-1)
+ .setCreateIfMissing(true)
+ .setMergeOperatorName(RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME);
+
+ final WriteOptions writeOptions = new WriteOptions()
+ .setSync(false)
+ .setDisableWAL(true);
+
+ final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath())) {
+
+ // ----- add() API -----
+ log.info("begin add");
+ System.out.println("begin add");
+
+ final long beginInsert1 = System.nanoTime();
+ for (int i = 0; i < num; i++) {
+ rocksDB.merge(writeOptions, keyBytes1, valueBytes);
+ }
+ final long endInsert1 = System.nanoTime();
+
+ log.info("end add - duration: {} ns", (endInsert1 - beginInsert1));
+
+ // ----- update() API -----
+
+ List<byte[]> list = new ArrayList<>(num);
+ for (int i = 0; i < num; i++) {
+ list.add(valueBytes);
+ }
+ byte[] premerged = MergeUtils.merge(list);
+
+ log.info("begin update");
+
+ final long beginInsert2 = System.nanoTime();
+ rocksDB.merge(writeOptions, keyBytes2, premerged);
+ final long endInsert2 = System.nanoTime();
+
+ log.info("end update - duration: {} ns", (endInsert2 - beginInsert2));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/438e4e37/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/util/MergeUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/util/MergeUtilsTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/util/MergeUtilsTest.java
new file mode 100644
index 0000000..3493f76
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/util/MergeUtilsTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.util;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for MergeUtils.
+ */
+public class MergeUtilsTest {
+
+ @Test
+ public void testMergeMulti() {
+ List<byte[]> list = Arrays.asList(
+ new byte[]{0, 1, 2, 3},
+ new byte[]{4},
+ new byte[]{5, 6});
+
+ byte[] expected = new byte[]{0, 1, 2, 3, MergeUtils.DELIMITER, 4, MergeUtils.DELIMITER, 5, 6};
+ assertTrue(Arrays.equals(expected, MergeUtils.merge(list)));
+ }
+
+ @Test
+ public void testMergeEmptyList() {
+ // Empty list
+ assertTrue(Arrays.equals(null, MergeUtils.merge(Collections.emptyList())));
+ }
+
+ @Test
+ public void testMergeSingleton() {
+ // Singleton list
+ byte[] singletonData = new byte[] {0x42};
+ assertTrue(Arrays.equals(singletonData, MergeUtils.merge(Arrays.asList(singletonData))));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/438e4e37/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
index d3fd61e..97aaa7f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
@@ -20,6 +20,8 @@ package org.apache.flink.api.common.state;
import org.apache.flink.annotation.PublicEvolving;
+import java.util.List;
+
/**
* {@link State} interface for partitioned list state in Operations.
* The state is accessed and modified by user functions, and checkpointed consistently
@@ -33,4 +35,16 @@ import org.apache.flink.annotation.PublicEvolving;
* @param <T> Type of values that this list state keeps.
*/
@PublicEvolving
-public interface ListState<T> extends MergingState<T, Iterable<T>> {}
+public interface ListState<T> extends MergingState<T, Iterable<T>> {
+ /**
+ * Updates the state of the current key for the given source namespaces into the state of
+ * the target namespace.
+ *
+ * If `null` or an empty list is passed in, the state value will be null
+ *
+ * @param values The target namespace where the merged state should be stored.
+ *
+ * @throws Exception The method may forward exception thrown internally (by I/O or functions).
+ */
+ void update(List<T> values) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/438e4e37/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
index 3dcd75d..dd780a8 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
@@ -67,4 +67,9 @@ public final class ImmutableListState<V> extends ImmutableState implements ListS
stateDescriptor.getElementSerializer());
return new ImmutableListState<>(state);
}
+
+ @Override
+ public void update(List<V> values) throws Exception {
+ throw MODIFICATION_ATTEMPT_ERROR;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/438e4e37/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index aa17efb..da05a8b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -497,6 +497,15 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
return partitionOffsets;
}
+
+ @Override
+ public void update(List<S> values) throws Exception {
+ internalList.clear();
+
+ if (values != null && !values.isEmpty()) {
+ internalList.addAll(values);
+ }
+ }
}
private <S> ListState<S> getListState(
http://git-wip-us.apache.org/repos/asf/flink/blob/438e4e37/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingListState.java
index 71026c6..b93e4d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingListState.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state;
import org.apache.flink.api.common.state.ListState;
import java.util.Collections;
+import java.util.List;
/**
* Simple wrapper list state that exposes empty state properly as an empty list.
@@ -54,4 +55,9 @@ class UserFacingListState<T> implements ListState<T> {
public void clear() {
originalState.clear();
}
+
+ @Override
+ public void update(List<T> values) throws Exception {
+ originalState.update(values);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/438e4e37/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
index d3f67f0..fbf2087 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
@@ -27,6 +27,7 @@ import org.apache.flink.util.Preconditions;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
+import java.util.List;
/**
* Heap-backed partitioned {@link org.apache.flink.api.common.state.ListState} that is snapshotted
@@ -120,4 +121,16 @@ public class HeapListState<K, N, V>
a.addAll(b);
return a;
}
+
+ @Override
+ public void update(List<V> values) throws Exception {
+ clear();
+
+ if (values != null && !values.isEmpty()) {
+ final N namespace = currentNamespace;
+ final StateTable<K, N, ArrayList<V>> map = stateTable;
+
+ map.put(namespace, new ArrayList<>(values));
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/438e4e37/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
index ae392ed..ac17f8e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.state.internal;
import org.apache.flink.api.common.state.ListState;
+import java.util.List;
+
/**
* The peer to the {@link ListState} in the internal state type hierarchy.
*
@@ -28,4 +30,14 @@ import org.apache.flink.api.common.state.ListState;
* @param <N> The type of the namespace
* @param <T> The type of elements in the list
*/
-public interface InternalListState<N, T> extends InternalMergingState<N, T, Iterable<T>>, ListState<T> {}
+public interface InternalListState<N, T> extends InternalMergingState<N, T, Iterable<T>>, ListState<T> {
+ /**
+ * Updates the state of the current key for the given source namespaces into the state of
+ * the target namespace.
+ *
+ * @param values The target namespace where the merged state should be stored.
+ *
+ * @throws Exception The method may forward exception thrown internally (by I/O or functions).
+ */
+ void update(List<T> values) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/438e4e37/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 7730aec..2b4de54 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -77,13 +77,13 @@ import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import org.apache.commons.io.output.ByteArrayOutputStream;
-import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -1199,15 +1199,18 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
TypeSerializer<String> valueSerializer = kvId.getElementSerializer();
Joiner joiner = Joiner.on(",");
+
// some modifications to the state
backend.setCurrentKey(1);
- assertEquals(null, state.get());
- assertEquals(null, getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+ assertNull(state.get());
+ assertNull(getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
state.add("1");
+
backend.setCurrentKey(2);
- assertEquals(null, state.get());
- assertEquals(null, getSerializedList(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
- state.add("2");
+ assertNull(state.get());
+ assertNull(getSerializedList(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+ state.update(Arrays.asList("2"));
+
backend.setCurrentKey(1);
assertEquals("1", joiner.join(state.get()));
assertEquals("1", joiner.join(getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
@@ -1218,8 +1221,10 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
// make some more modifications
backend.setCurrentKey(1);
state.add("u1");
+
backend.setCurrentKey(2);
state.add("u2");
+
backend.setCurrentKey(3);
state.add("u3");
@@ -1230,9 +1235,11 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
backend.setCurrentKey(1);
assertEquals("1,u1", joiner.join(state.get()));
assertEquals("1,u1", joiner.join(getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+
backend.setCurrentKey(2);
assertEquals("2,u2", joiner.join(state.get()));
assertEquals("2,u2", joiner.join(getSerializedList(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+
backend.setCurrentKey(3);
assertEquals("u3", joiner.join(state.get()));
assertEquals("u3", joiner.join(getSerializedList(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
@@ -1249,6 +1256,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
backend.setCurrentKey(1);
assertEquals("1", joiner.join(restored1.get()));
assertEquals("1", joiner.join(getSerializedList(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+
backend.setCurrentKey(2);
assertEquals("2", joiner.join(restored1.get()));
assertEquals("2", joiner.join(getSerializedList(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
@@ -1265,9 +1273,11 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
backend.setCurrentKey(1);
assertEquals("1,u1", joiner.join(restored2.get()));
assertEquals("1,u1", joiner.join(getSerializedList(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+
backend.setCurrentKey(2);
assertEquals("2,u2", joiner.join(restored2.get()));
assertEquals("2,u2", joiner.join(getSerializedList(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+
backend.setCurrentKey(3);
assertEquals("u3", joiner.join(restored2.get()));
assertEquals("u3", joiner.join(getSerializedList(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
@@ -1276,7 +1286,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
}
@Test
- public void testListStateAddAndGet() throws Exception {
+ public void testListStateAddUpdateAndGet() throws Exception {
AbstractKeyedStateBackend<String> keyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
@@ -1294,18 +1304,24 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
state.add(17L);
state.add(11L);
assertThat(state.get(), containsInAnyOrder(17L, 11L));
+ // update(null) should remain the value null
+ state.update(null);
+ assertNull(state.get());
+ // update(emptyList) should remain the value null
+ state.update(Arrays.asList());
+ assertNull(state.get());
+ state.update(Arrays.asList(10L, 16L));
+ assertThat(state.get(), containsInAnyOrder(16L, 10L));
keyedBackend.setCurrentKey("abc");
assertNull(state.get());
keyedBackend.setCurrentKey("g");
assertNull(state.get());
- state.add(1L);
- state.add(2L);
+ state.update(Arrays.asList(1L, 2L));
keyedBackend.setCurrentKey("def");
- assertThat(state.get(), containsInAnyOrder(11L, 17L));
-
+ assertThat(state.get(), containsInAnyOrder(10L, 16L));
state.clear();
assertNull(state.get());
@@ -1319,7 +1335,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
keyedBackend.setCurrentKey("g");
assertThat(state.get(), containsInAnyOrder(1L, 2L, 3L, 2L, 1L));
-
+ state.update(Arrays.asList(5L, 6L));
+ assertThat(state.get(), containsInAnyOrder(5L, 6L));
state.clear();
// make sure all lists / maps are cleared
@@ -1465,11 +1482,11 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
// some modifications to the state
backend.setCurrentKey(1);
- assertEquals(null, state.get());
+ assertNull(state.get());
assertNull(getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
state.add("1");
backend.setCurrentKey(2);
- assertEquals(null, state.get());
+ assertNull(state.get());
assertNull(getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
state.add("2");
backend.setCurrentKey(1);
@@ -2081,12 +2098,12 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
// some modifications to the state
backend.setCurrentKey(1);
- assertEquals(null, state.get());
- assertEquals(null, getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+ assertNull(state.get());
+ assertNull(getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
state.add(1);
backend.setCurrentKey(2);
- assertEquals(null, state.get());
- assertEquals(null, getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+ assertNull(state.get());
+ assertNull(getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
state.add(2);
backend.setCurrentKey(1);
assertEquals("Fold-Initial:,1", state.get());
@@ -2178,12 +2195,12 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
// some modifications to the state
backend.setCurrentKey(1);
- assertEquals(null, state.get(1));
- assertEquals(null, getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+ assertNull(state.get(1));
+ assertNull(getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
state.put(1, "1");
backend.setCurrentKey(2);
- assertEquals(null, state.get(2));
- assertEquals(null, getSerializedMap(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+ assertNull(state.get(2));
+ assertNull(getSerializedMap(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
state.put(2, "2");
backend.setCurrentKey(1);
assertTrue(state.contains(1));
@@ -2225,7 +2242,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
for (Integer key : state.keys()) {
keys.add(key);
}
- List<Integer> expectedKeys = new ArrayList<Integer>() {{ add(103); add(1031); add(1032); }};
+ List<Integer> expectedKeys = Arrays.asList(103, 1031, 1032);
assertEquals(keys.size(), expectedKeys.size());
keys.removeAll(expectedKeys);
assertTrue(keys.isEmpty());
@@ -2234,7 +2251,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
for (String value : state.values()) {
values.add(value);
}
- List<String> expectedValues = new ArrayList<String>() {{ add("103"); add("1031"); add("1032"); }};
+ List<String> expectedValues = Arrays.asList("103", "1031", "1032");
assertEquals(values.size(), expectedValues.size());
values.removeAll(expectedValues);
assertTrue(values.isEmpty());
@@ -2322,13 +2339,13 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
backend.setCurrentKey(1);
- assertEquals(null, state.value());
+ assertNull(state.value());
state.update("Ciao");
assertEquals("Ciao", state.value());
state.clear();
- assertEquals(null, state.value());
+ assertNull(state.value());
backend.dispose();
}
@@ -2426,8 +2443,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
backend.setCurrentKey(1);
assertNull(state.get());
- state.add("Ciao");
- state.add("Bello");
+ state.update(Arrays.asList("Ciao", "Bello"));
assertThat(state.get(), containsInAnyOrder("Ciao", "Bello"));
state.clear();
@@ -3194,7 +3210,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
// test isolation
for (int i = 0; i < 20; ++i) {
backend.setCurrentKey(i);
- Assert.assertEquals(i + 1, (int) valueState.value());
+ assertEquals(i + 1, (int) valueState.value());
}
} finally {
@@ -3204,7 +3220,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
}
}
- Assert.assertNotNull(stateHandle);
+ assertNotNull(stateHandle);
backend = null;
@@ -3219,11 +3235,11 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
for (int i = 0; i < 10; ++i) {
backend.setCurrentKey(i);
- Assert.assertEquals(i, (int) valueState.value());
+ assertEquals(i, (int) valueState.value());
}
backend.setCurrentKey(11);
- Assert.assertEquals(null, valueState.value());
+ assertNull(valueState.value());
} finally {
if (null != backend) {
IOUtils.closeQuietly(backend);