You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/10/31 15:40:07 UTC

[jira] [Commented] (FLINK-9058) Relax ListState.addAll() and ListState.update() to take Iterable

    [ https://issues.apache.org/jira/browse/FLINK-9058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16670257#comment-16670257 ] 

ASF GitHub Bot commented on FLINK-9058:
---------------------------------------

aljoscha closed pull request #5749: [FLINK-9058] Relax ListState.addAll() and ListState.update() to take Iterable
URL: https://github.com/apache/flink/pull/5749
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index b226ff1360a..5c12793ce2c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -717,18 +717,20 @@ boolean isClearCalled() {
 		}
 
 		@Override
-		public void update(List<T> values) throws Exception {
+		public void update(Iterable<T> values) throws Exception {
 			clear();
 
 			addAll(values);
 		}
 
 		@Override
-		public void addAll(List<T> values) throws Exception {
+		public void addAll(Iterable<T> values) throws Exception {
 			if (values != null) {
 				values.forEach(v -> Preconditions.checkNotNull(v, "You cannot add null to a ListState."));
 
-				list.addAll(values);
+				for (T v : values) {
+					list.add(v);
+				}
 			}
 		}
 	}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
index 254dc1d6140..38b556d64c4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
@@ -20,8 +20,6 @@
 
 import org.apache.flink.annotation.PublicEvolving;
 
-import java.util.List;
-
 /**
  * {@link State} interface for partitioned list state in Operations.
  * The state is accessed and modified by user functions, and checkpointed consistently
@@ -48,7 +46,7 @@
 	 *
 	 * @throws Exception The method may forward exception thrown internally (by I/O or functions).
 	 */
-	void update(List<T> values) throws Exception;
+	void update(Iterable<T> values) throws Exception;
 
 	/**
 	 * Updates the operator state accessible by {@link #get()} by adding the given values
@@ -61,5 +59,5 @@
 	 *
 	 * @throws Exception The method may forward exception thrown internally (by I/O or functions).
 	 */
-	void addAll(List<T> values) throws Exception;
+	void addAll(Iterable<T> values) throws Exception;
 }
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
index 9f1465eafa3..2c10befbd5c 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
@@ -69,12 +69,12 @@ public void clear() {
 	}
 
 	@Override
-	public void update(List<V> values) throws Exception {
+	public void update(Iterable<V> values) throws Exception {
 		throw MODIFICATION_ATTEMPT_ERROR;
 	}
 
 	@Override
-	public void addAll(List<V> values) throws Exception {
+	public void addAll(Iterable<V> values) throws Exception {
 		throw MODIFICATION_ATTEMPT_ERROR;
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index 01a397ae6c1..625937dcaf0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -691,17 +691,19 @@ public String toString() {
 		}
 
 		@Override
-		public void update(List<S> values) throws Exception {
+		public void update(Iterable<S> values) {
 			internalList.clear();
 
 			addAll(values);
 		}
 
 		@Override
-		public void addAll(List<S> values) throws Exception {
-			if (values != null && !values.isEmpty()) {
-				internalList.addAll(values);
+		public void addAll(Iterable<S> values) {
+			for (S v : values) {
+				Preconditions.checkNotNull(v, "You cannot add null to a ListState.");
+				internalList.add(v);
 			}
+
 		}
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingListState.java
index 71f5aa5a912..60c0549831d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingListState.java
@@ -21,7 +21,6 @@
 import org.apache.flink.api.common.state.ListState;
 
 import java.util.Collections;
-import java.util.List;
 
 /**
  * Simple wrapper list state that exposes empty state properly as an empty list.
@@ -57,12 +56,12 @@ public void clear() {
 	}
 
 	@Override
-	public void update(List<T> values) throws Exception {
+	public void update(Iterable<T> values) throws Exception {
 		originalState.update(values);
 	}
 
 	@Override
-	public void addAll(List<T> values) throws Exception {
+	public void addAll(Iterable<T> values) throws Exception {
 		originalState.addAll(values);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
index f7b5cd2d5f0..d2df8199308 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
@@ -120,38 +120,35 @@ public void add(V value) {
 	}
 
 	@Override
-	public void update(List<V> values) throws Exception {
+	public void update(Iterable<V> values) {
 		Preconditions.checkNotNull(values, "List of values to add cannot be null.");
 
-		if (values.isEmpty()) {
-			clear();
-			return;
-		}
-
 		List<V> newStateList = new ArrayList<>();
 		for (V v : values) {
 			Preconditions.checkNotNull(v, "You cannot add null to a ListState.");
 			newStateList.add(v);
 		}
 
-		stateTable.put(currentNamespace, newStateList);
+		if (newStateList.isEmpty()) {
+			clear();
+		} else {
+			stateTable.put(currentNamespace, newStateList);
+		}
 	}
 
 	@Override
-	public void addAll(List<V> values) throws Exception {
+	public void addAll(Iterable<V> values) throws Exception {
 		Preconditions.checkNotNull(values, "List of values to add cannot be null.");
 
-		if (!values.isEmpty()) {
-			stateTable.transform(currentNamespace, values, (previousState, value) -> {
-				if (previousState == null) {
-					previousState = new ArrayList<>();
-				}
-				for (V v : value) {
-					Preconditions.checkNotNull(v, "You cannot add null to a ListState.");
-					previousState.add(v);
-				}
-				return previousState;
-			});
-		}
+		stateTable.transform(currentNamespace, values, (previousState, value) -> {
+			if (previousState == null) {
+				previousState = new ArrayList<>();
+			}
+			for (V v : value) {
+				Preconditions.checkNotNull(v, "You cannot add null to a ListState.");
+				previousState.add(v);
+			}
+			return previousState;
+		});
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
index 1e22dc6eea6..52438467460 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
@@ -20,8 +20,6 @@
 
 import org.apache.flink.api.common.state.ListState;
 
-import java.util.List;
-
 /**
  * The peer to the {@link ListState} in the internal state type hierarchy.
  * 
@@ -42,7 +40,7 @@
 	 *
 	 * @throws Exception The method may forward exception thrown internally (by I/O or functions).
 	 */
-	void update(List<T> values) throws Exception;
+	void update(Iterable<T> values) throws Exception;
 
 	/**
 	 * Updates the operator state accessible by {@link #get()} by adding the given values
@@ -55,5 +53,5 @@
 	 *
 	 * @throws Exception The method may forward exception thrown internally (by I/O or functions).
 	 */
-	void addAll(List<T> values) throws Exception;
+	void addAll(Iterable<T> values) throws Exception;
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 11ae389da73..1ef91e25d87 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -1353,7 +1353,7 @@ public void testListStateAddNull() throws Exception {
 
 	/**
 	 * This test verifies that all ListState implementations are consistent in not allowing
-	 * {@link ListState#addAll(List)} to be called with {@code null} entries in the list of entries
+	 * {@link ListState#addAll(Iterable)} to be called with {@code null} entries in the list of entries
 	 * to add.
 	 */
 	@Test
@@ -1387,7 +1387,7 @@ public void testListStateAddAllNullEntries() throws Exception {
 
 	/**
 	 * This test verifies that all ListState implementations are consistent in not allowing
-	 * {@link ListState#addAll(List)} to be called with {@code null}.
+	 * {@link ListState#addAll(Iterable)} to be called with {@code null}.
 	 */
 	@Test
 	public void testListStateAddAllNull() throws Exception {
@@ -1415,7 +1415,7 @@ public void testListStateAddAllNull() throws Exception {
 
 	/**
 	 * This test verifies that all ListState implementations are consistent in not allowing
-	 * {@link ListState#addAll(List)} to be called with {@code null} entries in the list of entries
+	 * {@link ListState#addAll(Iterable)} to be called with {@code null} entries in the list of entries
 	 * to add.
 	 */
 	@Test
@@ -1449,7 +1449,7 @@ public void testListStateUpdateNullEntries() throws Exception {
 
 	/**
 	 * This test verifies that all ListState implementations are consistent in not allowing
-	 * {@link ListState#addAll(List)} to be called with {@code null}.
+	 * {@link ListState#addAll(Iterable)} to be called with {@code null}.
 	 */
 	@Test
 	public void testListStateUpdateNull() throws Exception {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index 62c169b059e..9b446c09116 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -157,50 +157,46 @@ public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
 	}
 
 	@Override
-	public void update(List<V> values) throws Exception {
+	public void update(Iterable<V> values) throws Exception {
 		Preconditions.checkNotNull(values, "List of values to add cannot be null.");
 
 		clear();
 
-		if (!values.isEmpty()) {
-			try {
-				writeCurrentKeyWithGroupAndNamespace();
-				byte[] key = keySerializationStream.toByteArray();
+		try {
+			writeCurrentKeyWithGroupAndNamespace();
+			byte[] key = keySerializationStream.toByteArray();
 
-				byte[] premerge = getPreMergedValue(values);
-				if (premerge != null) {
-					backend.db.put(columnFamily, writeOptions, key, premerge);
-				} else {
-					throw new IOException("Failed pre-merge values in update()");
-				}
-			} catch (IOException | RocksDBException e) {
-				throw new RuntimeException("Error while updating data to RocksDB", e);
+			byte[] premerge = getPreMergedValue(values);
+			if (premerge != null) {
+				backend.db.put(columnFamily, writeOptions, key, premerge);
+			} else {
+				throw new IOException("Failed pre-merge values in update()");
 			}
+		} catch (IOException | RocksDBException e) {
+			throw new RuntimeException("Error while updating data to RocksDB", e);
 		}
 	}
 
 	@Override
-	public void addAll(List<V> values) throws Exception {
+	public void addAll(Iterable<V> values) throws Exception {
 		Preconditions.checkNotNull(values, "List of values to add cannot be null.");
 
-		if (!values.isEmpty()) {
-			try {
-				writeCurrentKeyWithGroupAndNamespace();
-				byte[] key = keySerializationStream.toByteArray();
+		try {
+			writeCurrentKeyWithGroupAndNamespace();
+			byte[] key = keySerializationStream.toByteArray();
 
-				byte[] premerge = getPreMergedValue(values);
-				if (premerge != null) {
-					backend.db.merge(columnFamily, writeOptions, key, premerge);
-				} else {
-					throw new IOException("Failed pre-merge values in addAll()");
-				}
-			} catch (IOException | RocksDBException e) {
-				throw new RuntimeException("Error while updating data to RocksDB", e);
+			byte[] premerge = getPreMergedValue(values);
+			if (premerge != null) {
+				backend.db.merge(columnFamily, writeOptions, key, premerge);
+			} else {
+				throw new IOException("Failed pre-merge values in addAll()");
 			}
+		} catch (IOException | RocksDBException e) {
+			throw new RuntimeException("Error while updating data to RocksDB", e);
 		}
 	}
 
-	private byte[] getPreMergedValue(List<V> values) throws IOException {
+	private byte[] getPreMergedValue(Iterable<V> values) throws IOException {
 		DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
 
 		keySerializationStream.reset();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Relax ListState.addAll() and ListState.update() to take Iterable
> ----------------------------------------------------------------
>
>                 Key: FLINK-9058
>                 URL: https://issues.apache.org/jira/browse/FLINK-9058
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.5.0, 1.6.0
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>            Priority: Major
>              Labels: pull-request-available
>
> [~srichter] What do you think about this. None of the implementations require the parameter to actually be a list and allowing an {{Iterable}} there allows calling it in situations where all you have is an {{Iterable}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)