You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/06/13 20:32:55 UTC

[GitHub] [flink] StefanRRichter commented on a change in pull request #8611: [FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable

StefanRRichter commented on a change in pull request #8611: [FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable
URL: https://github.com/apache/flink/pull/8611#discussion_r293556751
 
 

 ##########
 File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
 ##########
 @@ -198,16 +377,118 @@ public void setMetaInfo(RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo) {
 
 	// Snapshot / Restore -------------------------------------------------------------------------
 
-	public abstract void put(K key, int keyGroup, N namespace, S state);
+	public void put(K key, int keyGroup, N namespace, S state) {
+		checkKeyNamespacePreconditions(key, namespace);
+
+		StateMap<K, N, S> stateMap = getMapForKeyGroup(keyGroup);
+
+		if (stateMap == null) {
+			stateMap = createStateMap();
+			setMapForKeyGroup(keyGroup, stateMap);
+		}
+
+		stateMap.put(key, namespace, state);
+	}
+
+	@Override
+	public Iterator<StateEntry<K, N, S>> iterator() {
+		return Arrays.stream(state)
+			.filter(Objects::nonNull)
+			.map(StateMap::iterator)
+			.flatMap(iter -> StreamSupport.stream(Spliterators.spliteratorUnknownSize(iter, 0), false))
+			.iterator();
+	}
 
 	// For testing --------------------------------------------------------------------------------
 
 	@VisibleForTesting
-	public abstract int sizeOfNamespace(Object namespace);
+	public int sizeOfNamespace(Object namespace) {
+		int count = 0;
+		for (StateMap<K, N, S> stateMap : state) {
+			if (stateMap != null) {
+				count += stateMap.sizeOfNamespace(namespace);
+			}
+		}
+
+		return count;
+	}
 
 	@Nonnull
 	@Override
 	public StateSnapshotKeyGroupReader keyGroupReader(int readVersion) {
 		return StateTableByKeyGroupReaders.readerForVersion(this, readVersion);
 	}
+
+	// StateEntryIterator  ---------------------------------------------------------------------------------------------
+
+	class StateEntryIterator implements StateIncrementalVisitor<K, N, S> {
+
+		final int recommendedMaxNumberOfReturnedRecords;
+
+		int keyGroupIndex;
+
+		StateIncrementalVisitor<K, N, S> stateIncrementalVisitor;
+
+		StateEntryIterator(int recommendedMaxNumberOfReturnedRecords) {
+			this.recommendedMaxNumberOfReturnedRecords = recommendedMaxNumberOfReturnedRecords;
+			this.keyGroupIndex = 0;
+			next();
+		}
+
+		private void next() {
+			while (keyGroupIndex < state.length) {
+				StateMap<K, N, S> stateMap = state[keyGroupIndex++];
+				if (stateMap != null) {
+					StateIncrementalVisitor<K, N, S> visitor =
+						stateMap.getStateIncrementalVisitor(recommendedMaxNumberOfReturnedRecords);
+					if (visitor.hasNext()) {
+						stateIncrementalVisitor = visitor;
+						return;
+					}
+				}
+			}
+		}
+
+		@Override
+		public boolean hasNext() {
+			while (stateIncrementalVisitor == null || !stateIncrementalVisitor.hasNext()) {
+				while (keyGroupIndex < state.length && state[keyGroupIndex] == null) {
+					keyGroupIndex++;
+				}
+				if (keyGroupIndex == state.length) {
+					return false;
+				}
+				StateIncrementalVisitor<K, N, S> visitor =
+					state[keyGroupIndex].getStateIncrementalVisitor(recommendedMaxNumberOfReturnedRecords);
+				if (visitor.hasNext()) {
+					stateIncrementalVisitor = visitor;
+					keyGroupIndex++;
+					break;
+				}
+			}
+			return true;
+		}
+
+		@Override
+		public Collection<StateEntry<K, N, S>> nextEntries() {
+			if (!hasNext()) {
+				return null;
+			}
+
+			Collection<StateEntry<K, N, S>> collection =
+				stateIncrementalVisitor.nextEntries();
+
+			return collection;
 
 Review comment:
   Nit: could just `return stateIncrementalVisitor.nextEntries();`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services