You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/01/10 15:35:59 UTC

flink git commit: [FLINK-7475] [state] Introduce ListState#update()

Repository: flink
Updated Branches:
  refs/heads/master b7f3497d3 -> 438e4e374


[FLINK-7475] [state] Introduce ListState#update()

This closes #4963.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/438e4e37
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/438e4e37
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/438e4e37

Branch: refs/heads/master
Commit: 438e4e37425688e2689fcb35488f819d729903cc
Parents: b7f3497
Author: Bowen Li <bo...@gmail.com>
Authored: Fri Oct 27 16:21:20 2017 +0800
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Wed Jan 10 16:35:28 2018 +0100

----------------------------------------------------------------------
 docs/dev/stream/state/state.md                  |   6 +-
 .../kafka/FlinkKafkaConsumerBaseTest.java       |   9 ++
 .../kinesis/FlinkKinesisConsumerTest.java       |  18 ++-
 .../streaming/state/RocksDBListState.java       |  31 ++++-
 .../streaming/state/util/MergeUtils.java        |  63 +++++++++
 .../RocksDBListStatePerformanceTest.java        | 133 +++++++++++++++++++
 .../streaming/state/util/MergeUtilsTest.java    |  57 ++++++++
 .../flink/api/common/state/ListState.java       |  16 ++-
 .../client/state/ImmutableListState.java        |   5 +
 .../state/DefaultOperatorStateBackend.java      |   9 ++
 .../runtime/state/UserFacingListState.java      |   6 +
 .../flink/runtime/state/heap/HeapListState.java |  13 ++
 .../state/internal/InternalListState.java       |  14 +-
 .../runtime/state/StateBackendTestBase.java     |  80 ++++++-----
 14 files changed, 417 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/438e4e37/docs/dev/stream/state/state.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/state/state.md b/docs/dev/stream/state/state.md
index 9956998..2002138 100644
--- a/docs/dev/stream/state/state.md
+++ b/docs/dev/stream/state/state.md
@@ -95,10 +95,10 @@ for each key that the operation sees). The value can be set using `update(T)` an
 
 * `ListState<T>`: This keeps a list of elements. You can append elements and retrieve an `Iterable`
 over all currently stored elements. Elements are added using `add(T)`, the Iterable can
-be retrieved using `Iterable<T> get()`.
+be retrieved using `Iterable<T> get()`. You can also override the existing list with `update(List<T>)`
 
 * `ReducingState<T>`: This keeps a single value that represents the aggregation of all values
-added to the state. The interface is the same as for `ListState` but elements added using
+added to the state. The interface is similar to `ListState` but elements added using
 `add(T)` are reduced to an aggregate using a specified `ReduceFunction`.
 
 * `AggregatingState<IN, OUT>`: This keeps a single value that represents the aggregation of all values
@@ -108,7 +108,7 @@ added using `add(IN)` are aggregated using a specified `AggregateFunction`.
 
 * `FoldingState<T, ACC>`: This keeps a single value that represents the aggregation of all values
 added to the state. Contrary to `ReducingState`, the aggregate type may be different from the type
-of elements that are added to the state. The interface is the same as for `ListState` but elements
+of elements that are added to the state. The interface is similar to `ListState` but elements
 added using `add(T)` are folded into an aggregate using a specified `FoldFunction`.
 
 * `MapState<UK, UV>`: This keeps a list of mappings. You can put key-value pairs into the state and

http://git-wip-us.apache.org/repos/asf/flink/blob/438e4e37/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 6ccfeb1..180b12a 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -751,5 +751,14 @@ public class FlinkKafkaConsumerBaseTest {
 		public boolean isClearCalled() {
 			return clearCalled;
 		}
+
+		@Override
+		public void update(List<T> values) throws Exception {
+			clear();
+
+			if (values != null && !values.isEmpty()) {
+				list.addAll(values);
+			}
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/438e4e37/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index ea63476..3b0cb5c 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -48,7 +48,6 @@ import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 import com.amazonaws.services.kinesis.model.HashKeyRange;
 import com.amazonaws.services.kinesis.model.SequenceNumberRange;
 import com.amazonaws.services.kinesis.model.Shard;
-import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -138,12 +137,12 @@ public class FlinkKinesisConsumerTest {
 		// arbitrary checkpoint id and timestamp
 		consumer.snapshotState(new StateSnapshotContextSynchronousImpl(123, 123));
 
-		Assert.assertTrue(listState.isClearCalled());
+		assertTrue(listState.isClearCalled());
 
 		// the checkpointed list state should contain only the shards that it should subscribe to
-		Assert.assertEquals(globalUnionState.size() / 2, listState.getList().size());
-		Assert.assertTrue(listState.getList().contains(globalUnionState.get(0)));
-		Assert.assertTrue(listState.getList().contains(globalUnionState.get(2)));
+		assertEquals(globalUnionState.size() / 2, listState.getList().size());
+		assertTrue(listState.getList().contains(globalUnionState.get(0)));
+		assertTrue(listState.getList().contains(globalUnionState.get(2)));
 	}
 
 	@Test
@@ -544,6 +543,15 @@ public class FlinkKinesisConsumerTest {
 		public boolean isClearCalled() {
 			return clearCalled;
 		}
+
+		@Override
+		public void update(List<T> values) throws Exception {
+			list.clear();
+
+			if (values != null || !values.isEmpty()) {
+				list.addAll(values);
+			}
+		}
 	}
 
 	private HashMap<StreamShardHandle, SequenceNumber> getFakeRestoredStore(String streamName) {

http://git-wip-us.apache.org/repos/asf/flink/blob/438e4e37/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index f8ed244..436fec2 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -21,6 +21,7 @@ package org.apache.flink.contrib.streaming.state;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.contrib.streaming.state.util.MergeUtils;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.internal.InternalListState;
@@ -114,7 +115,6 @@ public class RocksDBListState<K, N, V>
 			DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
 			valueSerializer.serialize(value, out);
 			backend.db.merge(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
-
 		} catch (Exception e) {
 			throw new RuntimeException("Error while adding data to RocksDB", e);
 		}
@@ -158,4 +158,33 @@ public class RocksDBListState<K, N, V>
 			throw new Exception("Error while merging state in RocksDB", e);
 		}
 	}
+
+	@Override
+	public void update(List<V> values) throws Exception {
+		clear();
+
+		if (values != null && !values.isEmpty()) {
+			try {
+				writeCurrentKeyWithGroupAndNamespace();
+				byte[] key = keySerializationStream.toByteArray();
+				DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
+
+				List<byte[]> bytes = new ArrayList<>(values.size());
+				for (V value : values) {
+					keySerializationStream.reset();
+					valueSerializer.serialize(value, out);
+					bytes.add(keySerializationStream.toByteArray());
+				}
+
+				byte[] premerge = MergeUtils.merge(bytes);
+				if (premerge != null) {
+					backend.db.put(columnFamily, writeOptions, key, premerge);
+				} else {
+					throw new IOException("Failed pre-merge values");
+				}
+			} catch (IOException | RocksDBException e) {
+				throw new RuntimeException("Error while updating data to RocksDB", e);
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/438e4e37/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/util/MergeUtils.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/util/MergeUtils.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/util/MergeUtils.java
new file mode 100644
index 0000000..6cf2781
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/util/MergeUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.util;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import java.util.List;
+
+/**
+ * Utils to simulate StringAppendTestOperator's merge operations in RocksDB.
+ */
+public class MergeUtils {
+	@VisibleForTesting
+	protected static final byte DELIMITER = ',';
+
+	/**
+	 * Merge operands into a single value that can be put directly into RocksDB.
+	 */
+	public static byte[] merge(List<byte[]> operands) {
+		if (operands == null || operands.size() == 0) {
+			return null;
+		}
+
+		if (operands.size() == 1) {
+			return operands.get(0);
+		}
+
+		int numBytes = 0;
+		for (byte[] arr : operands) {
+			numBytes += arr.length + 1;
+		}
+		numBytes--;
+
+		byte[] result = new byte[numBytes];
+
+		System.arraycopy(operands.get(0), 0, result, 0, operands.get(0).length);
+
+		for (int i = 1, arrIndex = operands.get(0).length; i < operands.size(); i++) {
+			result[arrIndex] = DELIMITER;
+			arrIndex += 1;
+			System.arraycopy(operands.get(i), 0, result, arrIndex, operands.get(i).length);
+			arrIndex += operands.get(i).length;
+		}
+
+		return result;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/438e4e37/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBListStatePerformanceTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBListStatePerformanceTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBListStatePerformanceTest.java
new file mode 100644
index 0000000..88c74fd
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBListStatePerformanceTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.contrib.streaming.state.util.MergeUtils;
+import org.apache.flink.testutils.junit.RetryOnFailure;
+import org.apache.flink.testutils.junit.RetryRule;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.NativeLibraryLoader;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test that validates that the performance of APIs of RocksDB's ListState is as expected.
+ *
+ * <p>Benchmarking:
+ *
+ * <p>Computer: MacbookPro (Mid 2015), Flash Storage, Processor 2.5GHz Intel Core i7, Memory 16GB 1600MHz DDR3
+ * Number of values added | time for add()   |  time for update() | perf improvement of update() over add()
+ * 500						978703 ns			55397 ns			17.66x
+ * 1000						3044179 ns			89474 ns			34.02x
+ * 5000						9247395 ns			305580 ns			30.26x
+ * 10000					16416442 ns			605963 ns			27.09x
+ * 50000					84311205 ns			5691288 ns			14.81x
+ * 100000					195103310 ns		12914182 ns			15.11x
+ * 500000					1223141510 ns		70595881 ns			17.33x
+ *
+ * <p>In summary, update() API which pre-merges all values gives users 15-35x performance improvements.
+ * For most frequent use cases where there are a few hundreds to a few thousands values per key,
+ * users can get 30x - 35x performance improvement!
+ *
+ */
+public class RocksDBListStatePerformanceTest extends TestLogger {
+
+	@Rule
+	public final TemporaryFolder tmp = new TemporaryFolder();
+
+	@Rule
+	public final RetryRule retry = new RetryRule();
+
+	@Test(timeout = 2000)
+	@RetryOnFailure(times = 3)
+	public void testRocksDbListStateAPIs() throws Exception {
+		final File rocksDir = tmp.newFolder();
+
+		// ensure the RocksDB library is loaded to a distinct location each retry
+		NativeLibraryLoader.getInstance().loadLibrary(rocksDir.getAbsolutePath());
+
+		final String key1 = "key1";
+		final String key2 = "key2";
+		final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
+
+		final byte[] keyBytes1 = key1.getBytes(StandardCharsets.UTF_8);
+		final byte[] keyBytes2 = key2.getBytes(StandardCharsets.UTF_8);
+		final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
+
+		// The number of values added to ListState. Can be changed for benchmarking
+		final int num = 1000;
+
+		try (
+			final Options options = new Options()
+					.setCompactionStyle(CompactionStyle.LEVEL)
+					.setLevelCompactionDynamicLevelBytes(true)
+					.setIncreaseParallelism(4)
+					.setUseFsync(false)
+					.setMaxOpenFiles(-1)
+					.setCreateIfMissing(true)
+					.setMergeOperatorName(RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME);
+
+			final WriteOptions writeOptions = new WriteOptions()
+					.setSync(false)
+					.setDisableWAL(true);
+
+			final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath())) {
+
+			// ----- add() API -----
+			log.info("begin add");
+			System.out.println("begin add");
+
+			final long beginInsert1 = System.nanoTime();
+			for (int i = 0; i < num; i++) {
+				rocksDB.merge(writeOptions, keyBytes1, valueBytes);
+			}
+			final long endInsert1 = System.nanoTime();
+
+			log.info("end add - duration: {} ns", (endInsert1 - beginInsert1));
+
+			// ----- update() API -----
+
+			List<byte[]> list = new ArrayList<>(num);
+			for (int i = 0; i < num; i++) {
+				list.add(valueBytes);
+			}
+			byte[] premerged = MergeUtils.merge(list);
+
+			log.info("begin update");
+
+			final long beginInsert2 = System.nanoTime();
+			rocksDB.merge(writeOptions, keyBytes2, premerged);
+			final long endInsert2 = System.nanoTime();
+
+			log.info("end update - duration: {} ns", (endInsert2 - beginInsert2));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/438e4e37/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/util/MergeUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/util/MergeUtilsTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/util/MergeUtilsTest.java
new file mode 100644
index 0000000..3493f76
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/util/MergeUtilsTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.util;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for MergeUtils.
+ */
+public class MergeUtilsTest {
+
+	@Test
+	public void testMergeMulti() {
+		List<byte[]> list = Arrays.asList(
+			new byte[]{0, 1, 2, 3},
+			new byte[]{4},
+			new byte[]{5, 6});
+
+		byte[] expected = new byte[]{0, 1, 2, 3, MergeUtils.DELIMITER, 4, MergeUtils.DELIMITER, 5, 6};
+		assertTrue(Arrays.equals(expected, MergeUtils.merge(list)));
+	}
+
+	@Test
+	public void testMergeEmptyList() {
+		// Empty list
+		assertTrue(Arrays.equals(null, MergeUtils.merge(Collections.emptyList())));
+	}
+
+	@Test
+	public void testMergeSingleton() {
+		// Singleton list
+		byte[] singletonData = new byte[] {0x42};
+		assertTrue(Arrays.equals(singletonData, MergeUtils.merge(Arrays.asList(singletonData))));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/438e4e37/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
index d3fd61e..97aaa7f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
@@ -20,6 +20,8 @@ package org.apache.flink.api.common.state;
 
 import org.apache.flink.annotation.PublicEvolving;
 
+import java.util.List;
+
 /**
  * {@link State} interface for partitioned list state in Operations.
  * The state is accessed and modified by user functions, and checkpointed consistently
@@ -33,4 +35,16 @@ import org.apache.flink.annotation.PublicEvolving;
  * @param <T> Type of values that this list state keeps.
  */
 @PublicEvolving
-public interface ListState<T> extends MergingState<T, Iterable<T>> {}
+public interface ListState<T> extends MergingState<T, Iterable<T>> {
+	/**
+	 * Updates the state of the current key for the given source namespaces into the state of
+	 * the target namespace.
+	 *
+	 * If `null` or an empty list is passed in, the state value will be null
+	 *
+	 * @param values The target namespace where the merged state should be stored.
+	 *
+	 * @throws Exception The method may forward exception thrown internally (by I/O or functions).
+	 */
+	void update(List<T> values) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/438e4e37/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
index 3dcd75d..dd780a8 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
@@ -67,4 +67,9 @@ public final class ImmutableListState<V> extends ImmutableState implements ListS
 				stateDescriptor.getElementSerializer());
 		return new ImmutableListState<>(state);
 	}
+
+	@Override
+	public void update(List<V> values) throws Exception {
+		throw MODIFICATION_ATTEMPT_ERROR;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/438e4e37/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index aa17efb..da05a8b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -497,6 +497,15 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 
 			return partitionOffsets;
 		}
+
+		@Override
+		public void update(List<S> values) throws Exception {
+			internalList.clear();
+
+			if (values != null && !values.isEmpty()) {
+				internalList.addAll(values);
+			}
+		}
 	}
 
 	private <S> ListState<S> getListState(

http://git-wip-us.apache.org/repos/asf/flink/blob/438e4e37/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingListState.java
index 71026c6..b93e4d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingListState.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.api.common.state.ListState;
 
 import java.util.Collections;
+import java.util.List;
 
 /**
  * Simple wrapper list state that exposes empty state properly as an empty list.
@@ -54,4 +55,9 @@ class UserFacingListState<T> implements ListState<T> {
 	public void clear() {
 		originalState.clear();
 	}
+
+	@Override
+	public void update(List<T> values) throws Exception {
+		originalState.update(values);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/438e4e37/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
index d3f67f0..fbf2087 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
@@ -27,6 +27,7 @@ import org.apache.flink.util.Preconditions;
 
 import java.io.ByteArrayOutputStream;
 import java.util.ArrayList;
+import java.util.List;
 
 /**
  * Heap-backed partitioned {@link org.apache.flink.api.common.state.ListState} that is snapshotted
@@ -120,4 +121,16 @@ public class HeapListState<K, N, V>
 		a.addAll(b);
 		return a;
 	}
+
+	@Override
+	public void update(List<V> values) throws Exception {
+		clear();
+
+		if (values != null && !values.isEmpty()) {
+			final N namespace = currentNamespace;
+			final StateTable<K, N, ArrayList<V>> map = stateTable;
+
+			map.put(namespace, new ArrayList<>(values));
+		}
+	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/438e4e37/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
index ae392ed..ac17f8e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.state.internal;
 
 import org.apache.flink.api.common.state.ListState;
 
+import java.util.List;
+
 /**
  * The peer to the {@link ListState} in the internal state type hierarchy.
  * 
@@ -28,4 +30,14 @@ import org.apache.flink.api.common.state.ListState;
  * @param <N> The type of the namespace
  * @param <T> The type of elements in the list
  */
-public interface InternalListState<N, T> extends InternalMergingState<N, T, Iterable<T>>, ListState<T> {}
+public interface InternalListState<N, T> extends InternalMergingState<N, T, Iterable<T>>, ListState<T> {
+	/**
+	 * Updates the state of the current key for the given source namespaces into the state of
+	 * the target namespace.
+	 *
+	 * @param values The target namespace where the merged state should be stored.
+	 *
+	 * @throws Exception The method may forward exception thrown internally (by I/O or functions).
+	 */
+	void update(List<T> values) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/438e4e37/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 7730aec..2b4de54 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -77,13 +77,13 @@ import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import org.apache.commons.io.output.ByteArrayOutputStream;
-import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -1199,15 +1199,18 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		TypeSerializer<String> valueSerializer = kvId.getElementSerializer();
 
 		Joiner joiner = Joiner.on(",");
+
 		// some modifications to the state
 		backend.setCurrentKey(1);
-		assertEquals(null, state.get());
-		assertEquals(null, getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		assertNull(state.get());
+		assertNull(getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 		state.add("1");
+
 		backend.setCurrentKey(2);
-		assertEquals(null, state.get());
-		assertEquals(null, getSerializedList(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-		state.add("2");
+		assertNull(state.get());
+		assertNull(getSerializedList(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		state.update(Arrays.asList("2"));
+
 		backend.setCurrentKey(1);
 		assertEquals("1", joiner.join(state.get()));
 		assertEquals("1", joiner.join(getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
@@ -1218,8 +1221,10 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		// make some more modifications
 		backend.setCurrentKey(1);
 		state.add("u1");
+
 		backend.setCurrentKey(2);
 		state.add("u2");
+
 		backend.setCurrentKey(3);
 		state.add("u3");
 
@@ -1230,9 +1235,11 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		backend.setCurrentKey(1);
 		assertEquals("1,u1", joiner.join(state.get()));
 		assertEquals("1,u1", joiner.join(getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+
 		backend.setCurrentKey(2);
 		assertEquals("2,u2", joiner.join(state.get()));
 		assertEquals("2,u2", joiner.join(getSerializedList(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+
 		backend.setCurrentKey(3);
 		assertEquals("u3", joiner.join(state.get()));
 		assertEquals("u3", joiner.join(getSerializedList(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
@@ -1249,6 +1256,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		backend.setCurrentKey(1);
 		assertEquals("1", joiner.join(restored1.get()));
 		assertEquals("1", joiner.join(getSerializedList(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+
 		backend.setCurrentKey(2);
 		assertEquals("2", joiner.join(restored1.get()));
 		assertEquals("2", joiner.join(getSerializedList(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
@@ -1265,9 +1273,11 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		backend.setCurrentKey(1);
 		assertEquals("1,u1", joiner.join(restored2.get()));
 		assertEquals("1,u1", joiner.join(getSerializedList(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+
 		backend.setCurrentKey(2);
 		assertEquals("2,u2", joiner.join(restored2.get()));
 		assertEquals("2,u2", joiner.join(getSerializedList(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+
 		backend.setCurrentKey(3);
 		assertEquals("u3", joiner.join(restored2.get()));
 		assertEquals("u3", joiner.join(getSerializedList(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
@@ -1276,7 +1286,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 	}
 
 	@Test
-	public void testListStateAddAndGet() throws Exception {
+	public void testListStateAddUpdateAndGet() throws Exception {
 
 		AbstractKeyedStateBackend<String> keyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
 
@@ -1294,18 +1304,24 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			state.add(17L);
 			state.add(11L);
 			assertThat(state.get(), containsInAnyOrder(17L, 11L));
+			// update(null) should remain the value null
+			state.update(null);
+			assertNull(state.get());
+			// update(emptyList) should remain the value null
+			state.update(Arrays.asList());
+			assertNull(state.get());
+			state.update(Arrays.asList(10L, 16L));
+			assertThat(state.get(), containsInAnyOrder(16L, 10L));
 
 			keyedBackend.setCurrentKey("abc");
 			assertNull(state.get());
 
 			keyedBackend.setCurrentKey("g");
 			assertNull(state.get());
-			state.add(1L);
-			state.add(2L);
+			state.update(Arrays.asList(1L, 2L));
 
 			keyedBackend.setCurrentKey("def");
-			assertThat(state.get(), containsInAnyOrder(11L, 17L));
-
+			assertThat(state.get(), containsInAnyOrder(10L, 16L));
 			state.clear();
 			assertNull(state.get());
 
@@ -1319,7 +1335,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 			keyedBackend.setCurrentKey("g");
 			assertThat(state.get(), containsInAnyOrder(1L, 2L, 3L, 2L, 1L));
-
+			state.update(Arrays.asList(5L, 6L));
+			assertThat(state.get(), containsInAnyOrder(5L, 6L));
 			state.clear();
 
 			// make sure all lists / maps are cleared
@@ -1465,11 +1482,11 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 		// some modifications to the state
 		backend.setCurrentKey(1);
-		assertEquals(null, state.get());
+		assertNull(state.get());
 		assertNull(getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 		state.add("1");
 		backend.setCurrentKey(2);
-		assertEquals(null, state.get());
+		assertNull(state.get());
 		assertNull(getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 		state.add("2");
 		backend.setCurrentKey(1);
@@ -2081,12 +2098,12 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 		// some modifications to the state
 		backend.setCurrentKey(1);
-		assertEquals(null, state.get());
-		assertEquals(null, getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		assertNull(state.get());
+		assertNull(getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 		state.add(1);
 		backend.setCurrentKey(2);
-		assertEquals(null, state.get());
-		assertEquals(null, getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		assertNull(state.get());
+		assertNull(getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 		state.add(2);
 		backend.setCurrentKey(1);
 		assertEquals("Fold-Initial:,1", state.get());
@@ -2178,12 +2195,12 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 		// some modifications to the state
 		backend.setCurrentKey(1);
-		assertEquals(null, state.get(1));
-		assertEquals(null, getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+		assertNull(state.get(1));
+		assertNull(getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
 		state.put(1, "1");
 		backend.setCurrentKey(2);
-		assertEquals(null, state.get(2));
-		assertEquals(null, getSerializedMap(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+		assertNull(state.get(2));
+		assertNull(getSerializedMap(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
 		state.put(2, "2");
 		backend.setCurrentKey(1);
 		assertTrue(state.contains(1));
@@ -2225,7 +2242,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		for (Integer key : state.keys()) {
 			keys.add(key);
 		}
-		List<Integer> expectedKeys = new ArrayList<Integer>() {{ add(103); add(1031); add(1032); }};
+		List<Integer> expectedKeys = Arrays.asList(103, 1031, 1032);
 		assertEquals(keys.size(), expectedKeys.size());
 		keys.removeAll(expectedKeys);
 		assertTrue(keys.isEmpty());
@@ -2234,7 +2251,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		for (String value : state.values()) {
 			values.add(value);
 		}
-		List<String> expectedValues = new ArrayList<String>() {{ add("103"); add("1031"); add("1032"); }};
+		List<String> expectedValues = Arrays.asList("103", "1031", "1032");
 		assertEquals(values.size(), expectedValues.size());
 		values.removeAll(expectedValues);
 		assertTrue(values.isEmpty());
@@ -2322,13 +2339,13 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
 		backend.setCurrentKey(1);
-		assertEquals(null, state.value());
+		assertNull(state.value());
 
 		state.update("Ciao");
 		assertEquals("Ciao", state.value());
 
 		state.clear();
-		assertEquals(null, state.value());
+		assertNull(state.value());
 
 		backend.dispose();
 	}
@@ -2426,8 +2443,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		backend.setCurrentKey(1);
 		assertNull(state.get());
 
-		state.add("Ciao");
-		state.add("Bello");
+		state.update(Arrays.asList("Ciao", "Bello"));
 		assertThat(state.get(), containsInAnyOrder("Ciao", "Bello"));
 
 		state.clear();
@@ -3194,7 +3210,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			// test isolation
 			for (int i = 0; i < 20; ++i) {
 				backend.setCurrentKey(i);
-				Assert.assertEquals(i + 1, (int) valueState.value());
+				assertEquals(i + 1, (int) valueState.value());
 			}
 
 		} finally {
@@ -3204,7 +3220,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			}
 		}
 
-		Assert.assertNotNull(stateHandle);
+		assertNotNull(stateHandle);
 
 		backend = null;
 
@@ -3219,11 +3235,11 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 			for (int i = 0; i < 10; ++i) {
 				backend.setCurrentKey(i);
-				Assert.assertEquals(i, (int) valueState.value());
+				assertEquals(i, (int) valueState.value());
 			}
 
 			backend.setCurrentKey(11);
-			Assert.assertEquals(null, valueState.value());
+			assertNull(valueState.value());
 		} finally {
 			if (null != backend) {
 				IOUtils.closeQuietly(backend);