You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/06 17:04:05 UTC

[GitHub] [flink] dawidwys opened a new pull request #13550: [FLINK-19474] Implement a state backend that holds a single key at a time

dawidwys opened a new pull request #13550:
URL: https://github.com/apache/flink/pull/13550


   ## What is the purpose of the change
   
   This commit introduces a SingleKeyStateBackend. This state backend is a
   simplified version of a state backend that can be used in a BATCH
   runtime mode. It requires the input to be sorted, as it only ever
   remembers the current key. If the key changes, the current state is
   discarded. Moreover this state backend does not support checkpointing.
   
   
   ## Verifying this change
   
   This change added tests:
   * org.apache.flink.streaming.api.operators.sorted.state.SingleKeyStateBackendVerificationTest
   * org.apache.flink.streaming.api.operators.sorted.state.SingleKeyStateBackendTest
   * org.apache.flink.streaming.api.operators.sorted.state.SingleKeyKeyGroupedInternalPriorityQueueTest
   
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (**yes** / no / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / **no**)
     - If yes, how is the feature documented? (**not applicable** / docs / **JavaDocs** / not documented)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13550: [FLINK-19474] Implement a state backend that holds a single key at a time

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13550:
URL: https://github.com/apache/flink/pull/13550#issuecomment-704399024


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5e658efd970cd23de2637876e59becdbf00a8921",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5e658efd970cd23de2637876e59becdbf00a8921",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5e658efd970cd23de2637876e59becdbf00a8921 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on a change in pull request #13550: [FLINK-19474] Implement a state backend that holds a single key at a time

Posted by GitBox <gi...@apache.org>.
aljoscha commented on a change in pull request #13550:
URL: https://github.com/apache/flink/pull/13550#discussion_r502326914



##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/StateBackendBenchmarkUtils.java
##########
@@ -74,6 +75,24 @@
 			case ROCKSDB:
 				rootDir = prepareDirectory(rootDirName, null);
 				return createRocksDBKeyedStateBackend(rootDir);
+			case SINGLE_KEY:
+				try {
+					return new SingleKeyStateBackend().createKeyedStateBackend(

Review comment:
       Should also be in a method like for the other backends.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/SingleKeyKeyGroupedInternalPriorityQueue.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sorted.state;
+
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.PriorityComparator;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueue;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Very similar implementation to {@link org.apache.flink.runtime.state.heap.HeapPriorityQueueSet}. The only difference
+ * is it keeps track of elements for a single key at a time.
+ */
+class SingleKeyKeyGroupedInternalPriorityQueue<T extends HeapPriorityQueueElement>
+		extends HeapPriorityQueue<T>
+		implements KeyGroupedInternalPriorityQueue<T> {
+
+	private final Map<T, T> dedupMap = new HashMap<>();

Review comment:
       Why do we need to additionally deduplicate?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/SingleKeyKeyedStateBackend.java
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sorted.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.PriorityComparator;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A simple {@link CheckpointableKeyedStateBackend} which keeps values for a single key at a time.

Review comment:
       ```suggestion
    * A {@link CheckpointableKeyedStateBackend} which keeps values for a single key at a time.
   ```
   
   I think `simple` rarely adds much information.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/SingleKeyKeyedStateBackend.java
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sorted.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.PriorityComparator;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A simple {@link CheckpointableKeyedStateBackend} which keeps values for a single key at a time.
+ *
+ * <p><b>IMPORTANT:</b> Requires the incoming records to be sorted/grouped by the key. Used in a BATCH style execution.
+ */
+class SingleKeyKeyedStateBackend<K> implements CheckpointableKeyedStateBackend<K> {
+	@SuppressWarnings("rawtypes")
+	private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
+		Stream.of(
+			Tuple2.of(ValueStateDescriptor.class, (StateFactory) SingleKeyValueState::create),
+			Tuple2.of(ListStateDescriptor.class, (StateFactory) SingleKeyListState::create),
+			Tuple2.of(MapStateDescriptor.class, (StateFactory) SingleKeyMapState::create),
+			Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) SingleKeyAggregatingState::create),
+			Tuple2.of(ReducingStateDescriptor.class, (StateFactory) SingleKeyReducingState::create)
+		).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+
+	private K currentKey = null;
+	private final TypeSerializer<K> keySerializer;
+	private final List<KeySelectionListener<K>> keySelectionListeners = new ArrayList<>();
+	private final Map<String, State> states = new HashMap<>();
+	private final Map<String, KeyGroupedInternalPriorityQueue<?>> priorityQueues = new HashMap<>();
+	private final KeyGroupRange keyGroupRange;
+
+	public SingleKeyKeyedStateBackend(
+			TypeSerializer<K> keySerializer,
+			KeyGroupRange keyGroupRange) {
+		this.keySerializer = keySerializer;
+		this.keyGroupRange = keyGroupRange;
+	}
+
+	@Override
+	public void setCurrentKey(K newKey) {
+		if (!Objects.equals(newKey, currentKey)) {
+			notifyKeySelected(newKey);
+			for (State value : states.values()) {
+				((AbstractSingleKeyState<?, ?, ?>) value).clearAllNamespaces();
+			}
+			for (KeyGroupedInternalPriorityQueue<?> value : priorityQueues.values()) {
+				while (value.poll() != null) {
+					// remove everything for the key
+				}
+			}
+			this.currentKey = newKey;
+		}
+	}
+
+	@Override
+	public K getCurrentKey() {
+		return currentKey;
+	}
+
+	@Override
+	public TypeSerializer<K> getKeySerializer() {
+		return keySerializer;
+	}
+
+	@Override
+	public <N, S extends State, T> void applyToAllKeys(
+			N namespace,
+			TypeSerializer<N> namespaceSerializer,
+			StateDescriptor<S, T> stateDescriptor,
+			KeyedStateFunction<K, S> function) {
+		throw new UnsupportedOperationException("applyToAllKeys() is not supported in BATCH execution mode.");

Review comment:
       I'm wondering about the name here. If we always refer to `BATCH execution mode` in the exceptions, maybe the state backend should be called `BatchExecutionStateBackend`. Or do we think it could be used for any other purpose.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/SingleKeyKeyGroupedInternalPriorityQueue.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sorted.state;
+
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.PriorityComparator;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueue;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Very similar implementation to {@link org.apache.flink.runtime.state.heap.HeapPriorityQueueSet}. The only difference
+ * is it keeps track of elements for a single key at a time.
+ */
+class SingleKeyKeyGroupedInternalPriorityQueue<T extends HeapPriorityQueueElement>
+		extends HeapPriorityQueue<T>
+		implements KeyGroupedInternalPriorityQueue<T> {
+
+	private final Map<T, T> dedupMap = new HashMap<>();

Review comment:
       Ok, thanks for the explanation!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on a change in pull request #13550: [FLINK-19474] Implement a state backend that holds a single key at a time

Posted by GitBox <gi...@apache.org>.
aljoscha commented on a change in pull request #13550:
URL: https://github.com/apache/flink/pull/13550#discussion_r502398890



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/SingleKeyKeyGroupedInternalPriorityQueue.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sorted.state;
+
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.PriorityComparator;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueue;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Very similar implementation to {@link org.apache.flink.runtime.state.heap.HeapPriorityQueueSet}. The only difference
+ * is it keeps track of elements for a single key at a time.
+ */
+class SingleKeyKeyGroupedInternalPriorityQueue<T extends HeapPriorityQueueElement>
+		extends HeapPriorityQueue<T>
+		implements KeyGroupedInternalPriorityQueue<T> {
+
+	private final Map<T, T> dedupMap = new HashMap<>();

Review comment:
       Ok, thanks for the explanation!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13550: [FLINK-19474] Implement a state backend that holds a single key at a time

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13550:
URL: https://github.com/apache/flink/pull/13550#issuecomment-704399024


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5e658efd970cd23de2637876e59becdbf00a8921",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7250",
       "triggerID" : "5e658efd970cd23de2637876e59becdbf00a8921",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5e658efd970cd23de2637876e59becdbf00a8921 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7250) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on a change in pull request #13550: [FLINK-19474] Implement a state backend that holds a single key at a time

Posted by GitBox <gi...@apache.org>.
aljoscha commented on a change in pull request #13550:
URL: https://github.com/apache/flink/pull/13550#discussion_r502326914



##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/StateBackendBenchmarkUtils.java
##########
@@ -74,6 +75,24 @@
 			case ROCKSDB:
 				rootDir = prepareDirectory(rootDirName, null);
 				return createRocksDBKeyedStateBackend(rootDir);
+			case SINGLE_KEY:
+				try {
+					return new SingleKeyStateBackend().createKeyedStateBackend(

Review comment:
       Should also be in a method like for the other backends.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/SingleKeyKeyGroupedInternalPriorityQueue.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sorted.state;
+
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.PriorityComparator;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueue;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Very similar implementation to {@link org.apache.flink.runtime.state.heap.HeapPriorityQueueSet}. The only difference
+ * is it keeps track of elements for a single key at a time.
+ */
+class SingleKeyKeyGroupedInternalPriorityQueue<T extends HeapPriorityQueueElement>
+		extends HeapPriorityQueue<T>
+		implements KeyGroupedInternalPriorityQueue<T> {
+
+	private final Map<T, T> dedupMap = new HashMap<>();

Review comment:
       Why do we need to additionally deduplicate?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/SingleKeyKeyedStateBackend.java
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sorted.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.PriorityComparator;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A simple {@link CheckpointableKeyedStateBackend} which keeps values for a single key at a time.

Review comment:
       ```suggestion
    * A {@link CheckpointableKeyedStateBackend} which keeps values for a single key at a time.
   ```
   
   I think `simple` rarely adds much information.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/SingleKeyKeyedStateBackend.java
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sorted.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.PriorityComparator;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A simple {@link CheckpointableKeyedStateBackend} which keeps values for a single key at a time.
+ *
+ * <p><b>IMPORTANT:</b> Requires the incoming records to be sorted/grouped by the key. Used in a BATCH style execution.
+ */
+class SingleKeyKeyedStateBackend<K> implements CheckpointableKeyedStateBackend<K> {
+	@SuppressWarnings("rawtypes")
+	private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
+		Stream.of(
+			Tuple2.of(ValueStateDescriptor.class, (StateFactory) SingleKeyValueState::create),
+			Tuple2.of(ListStateDescriptor.class, (StateFactory) SingleKeyListState::create),
+			Tuple2.of(MapStateDescriptor.class, (StateFactory) SingleKeyMapState::create),
+			Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) SingleKeyAggregatingState::create),
+			Tuple2.of(ReducingStateDescriptor.class, (StateFactory) SingleKeyReducingState::create)
+		).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+
+	private K currentKey = null;
+	private final TypeSerializer<K> keySerializer;
+	private final List<KeySelectionListener<K>> keySelectionListeners = new ArrayList<>();
+	private final Map<String, State> states = new HashMap<>();
+	private final Map<String, KeyGroupedInternalPriorityQueue<?>> priorityQueues = new HashMap<>();
+	private final KeyGroupRange keyGroupRange;
+
+	public SingleKeyKeyedStateBackend(
+			TypeSerializer<K> keySerializer,
+			KeyGroupRange keyGroupRange) {
+		this.keySerializer = keySerializer;
+		this.keyGroupRange = keyGroupRange;
+	}
+
+	@Override
+	public void setCurrentKey(K newKey) {
+		if (!Objects.equals(newKey, currentKey)) {
+			notifyKeySelected(newKey);
+			for (State value : states.values()) {
+				((AbstractSingleKeyState<?, ?, ?>) value).clearAllNamespaces();
+			}
+			for (KeyGroupedInternalPriorityQueue<?> value : priorityQueues.values()) {
+				while (value.poll() != null) {
+					// remove everything for the key
+				}
+			}
+			this.currentKey = newKey;
+		}
+	}
+
+	@Override
+	public K getCurrentKey() {
+		return currentKey;
+	}
+
+	@Override
+	public TypeSerializer<K> getKeySerializer() {
+		return keySerializer;
+	}
+
+	@Override
+	public <N, S extends State, T> void applyToAllKeys(
+			N namespace,
+			TypeSerializer<N> namespaceSerializer,
+			StateDescriptor<S, T> stateDescriptor,
+			KeyedStateFunction<K, S> function) {
+		throw new UnsupportedOperationException("applyToAllKeys() is not supported in BATCH execution mode.");

Review comment:
       I'm wondering about the name here. If we always refer to `BATCH execution mode` in the exceptions, maybe the state backend should be called `BatchExecutionStateBackend`. Or do we think it could be used for any other purpose.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13550: [FLINK-19474] Implement a state backend that holds a single key at a time

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13550:
URL: https://github.com/apache/flink/pull/13550#issuecomment-704399024


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5e658efd970cd23de2637876e59becdbf00a8921",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7250",
       "triggerID" : "5e658efd970cd23de2637876e59becdbf00a8921",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b7d64e6761396fda8ace8c17d400a9e6d925d527",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7328",
       "triggerID" : "b7d64e6761396fda8ace8c17d400a9e6d925d527",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f8f50916ab86dc0db496722194a6ef16abffbc88",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f8f50916ab86dc0db496722194a6ef16abffbc88",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5e658efd970cd23de2637876e59becdbf00a8921 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7250) 
   * b7d64e6761396fda8ace8c17d400a9e6d925d527 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7328) 
   * f8f50916ab86dc0db496722194a6ef16abffbc88 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13550: [FLINK-19474] Implement a state backend that holds a single key at a time

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13550:
URL: https://github.com/apache/flink/pull/13550#issuecomment-704399024


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5e658efd970cd23de2637876e59becdbf00a8921",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7250",
       "triggerID" : "5e658efd970cd23de2637876e59becdbf00a8921",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b7d64e6761396fda8ace8c17d400a9e6d925d527",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7328",
       "triggerID" : "b7d64e6761396fda8ace8c17d400a9e6d925d527",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f8f50916ab86dc0db496722194a6ef16abffbc88",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7330",
       "triggerID" : "f8f50916ab86dc0db496722194a6ef16abffbc88",
       "triggerType" : "PUSH"
     }, {
       "hash" : "804b8f73417efce2dceae397211094b8991e8590",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "804b8f73417efce2dceae397211094b8991e8590",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5e658efd970cd23de2637876e59becdbf00a8921 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7250) 
   * b7d64e6761396fda8ace8c17d400a9e6d925d527 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7328) 
   * f8f50916ab86dc0db496722194a6ef16abffbc88 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7330) 
   * 804b8f73417efce2dceae397211094b8991e8590 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13550: [FLINK-19474] Implement a state backend that holds a single key at a time

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13550:
URL: https://github.com/apache/flink/pull/13550#issuecomment-704399024






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13550: [FLINK-19474] Implement a state backend that holds a single key at a time

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13550:
URL: https://github.com/apache/flink/pull/13550#issuecomment-704399024


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5e658efd970cd23de2637876e59becdbf00a8921",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7250",
       "triggerID" : "5e658efd970cd23de2637876e59becdbf00a8921",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b7d64e6761396fda8ace8c17d400a9e6d925d527",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7328",
       "triggerID" : "b7d64e6761396fda8ace8c17d400a9e6d925d527",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f8f50916ab86dc0db496722194a6ef16abffbc88",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7330",
       "triggerID" : "f8f50916ab86dc0db496722194a6ef16abffbc88",
       "triggerType" : "PUSH"
     }, {
       "hash" : "804b8f73417efce2dceae397211094b8991e8590",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7336",
       "triggerID" : "804b8f73417efce2dceae397211094b8991e8590",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5e658efd970cd23de2637876e59becdbf00a8921 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7250) 
   * b7d64e6761396fda8ace8c17d400a9e6d925d527 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7328) 
   * f8f50916ab86dc0db496722194a6ef16abffbc88 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7330) 
   * 804b8f73417efce2dceae397211094b8991e8590 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7336) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #13550: [FLINK-19474] Implement a state backend that holds a single key at a time

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #13550:
URL: https://github.com/apache/flink/pull/13550#discussion_r502383634



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/SingleKeyKeyGroupedInternalPriorityQueue.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sorted.state;
+
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.PriorityComparator;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueue;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Very similar implementation to {@link org.apache.flink.runtime.state.heap.HeapPriorityQueueSet}. The only difference
+ * is it keeps track of elements for a single key at a time.
+ */
+class SingleKeyKeyGroupedInternalPriorityQueue<T extends HeapPriorityQueueElement>
+		extends HeapPriorityQueue<T>
+		implements KeyGroupedInternalPriorityQueue<T> {
+
+	private final Map<T, T> dedupMap = new HashMap<>();

Review comment:
       Good question.
   
   The reason is that we actually always use the `*PriorityQueue` as a `Set`. We use the `PriorityQueue` for storing timers and that's why we need Set semantics. We want to fire only a single timer for a timestamp. BTW, the logic is copied over from the `HeapPriorityQueueSet`. 
   
   However to better address it I will rename the class to `BatchExecutionInternalPriorityQueueSet`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13550: [FLINK-19474] Implement a state backend that holds a single key at a time

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13550:
URL: https://github.com/apache/flink/pull/13550#issuecomment-704399024


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5e658efd970cd23de2637876e59becdbf00a8921",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7250",
       "triggerID" : "5e658efd970cd23de2637876e59becdbf00a8921",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b7d64e6761396fda8ace8c17d400a9e6d925d527",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7328",
       "triggerID" : "b7d64e6761396fda8ace8c17d400a9e6d925d527",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f8f50916ab86dc0db496722194a6ef16abffbc88",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7330",
       "triggerID" : "f8f50916ab86dc0db496722194a6ef16abffbc88",
       "triggerType" : "PUSH"
     }, {
       "hash" : "804b8f73417efce2dceae397211094b8991e8590",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7336",
       "triggerID" : "804b8f73417efce2dceae397211094b8991e8590",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b7d64e6761396fda8ace8c17d400a9e6d925d527 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7328) 
   * f8f50916ab86dc0db496722194a6ef16abffbc88 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7330) 
   * 804b8f73417efce2dceae397211094b8991e8590 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7336) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys merged pull request #13550: [FLINK-19474] Implement a state backend that holds a single key at a time

Posted by GitBox <gi...@apache.org>.
dawidwys merged pull request #13550:
URL: https://github.com/apache/flink/pull/13550


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on pull request #13550: [FLINK-19474] Implement a state backend that holds a single key at a time

Posted by GitBox <gi...@apache.org>.
dawidwys commented on pull request #13550:
URL: https://github.com/apache/flink/pull/13550#issuecomment-706737508


   @flinkbot run azure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #13550: [FLINK-19474] Implement a state backend that holds a single key at a time

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #13550:
URL: https://github.com/apache/flink/pull/13550#discussion_r502368491



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/SingleKeyKeyedStateBackend.java
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sorted.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.PriorityComparator;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A simple {@link CheckpointableKeyedStateBackend} which keeps values for a single key at a time.
+ *
+ * <p><b>IMPORTANT:</b> Requires the incoming records to be sorted/grouped by the key. Used in a BATCH style execution.
+ */
+class SingleKeyKeyedStateBackend<K> implements CheckpointableKeyedStateBackend<K> {
+	@SuppressWarnings("rawtypes")
+	private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
+		Stream.of(
+			Tuple2.of(ValueStateDescriptor.class, (StateFactory) SingleKeyValueState::create),
+			Tuple2.of(ListStateDescriptor.class, (StateFactory) SingleKeyListState::create),
+			Tuple2.of(MapStateDescriptor.class, (StateFactory) SingleKeyMapState::create),
+			Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) SingleKeyAggregatingState::create),
+			Tuple2.of(ReducingStateDescriptor.class, (StateFactory) SingleKeyReducingState::create)
+		).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+
+	private K currentKey = null;
+	private final TypeSerializer<K> keySerializer;
+	private final List<KeySelectionListener<K>> keySelectionListeners = new ArrayList<>();
+	private final Map<String, State> states = new HashMap<>();
+	private final Map<String, KeyGroupedInternalPriorityQueue<?>> priorityQueues = new HashMap<>();
+	private final KeyGroupRange keyGroupRange;
+
+	public SingleKeyKeyedStateBackend(
+			TypeSerializer<K> keySerializer,
+			KeyGroupRange keyGroupRange) {
+		this.keySerializer = keySerializer;
+		this.keyGroupRange = keyGroupRange;
+	}
+
+	@Override
+	public void setCurrentKey(K newKey) {
+		if (!Objects.equals(newKey, currentKey)) {
+			notifyKeySelected(newKey);
+			for (State value : states.values()) {
+				((AbstractSingleKeyState<?, ?, ?>) value).clearAllNamespaces();
+			}
+			for (KeyGroupedInternalPriorityQueue<?> value : priorityQueues.values()) {
+				while (value.poll() != null) {
+					// remove everything for the key
+				}
+			}
+			this.currentKey = newKey;
+		}
+	}
+
+	@Override
+	public K getCurrentKey() {
+		return currentKey;
+	}
+
+	@Override
+	public TypeSerializer<K> getKeySerializer() {
+		return keySerializer;
+	}
+
+	@Override
+	public <N, S extends State, T> void applyToAllKeys(
+			N namespace,
+			TypeSerializer<N> namespaceSerializer,
+			StateDescriptor<S, T> stateDescriptor,
+			KeyedStateFunction<K, S> function) {
+		throw new UnsupportedOperationException("applyToAllKeys() is not supported in BATCH execution mode.");

Review comment:
       Personally I can't think of other scenarios...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13550: [FLINK-19474] Implement a state backend that holds a single key at a time

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13550:
URL: https://github.com/apache/flink/pull/13550#issuecomment-704399024


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5e658efd970cd23de2637876e59becdbf00a8921",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7250",
       "triggerID" : "5e658efd970cd23de2637876e59becdbf00a8921",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b7d64e6761396fda8ace8c17d400a9e6d925d527",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7328",
       "triggerID" : "b7d64e6761396fda8ace8c17d400a9e6d925d527",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f8f50916ab86dc0db496722194a6ef16abffbc88",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7330",
       "triggerID" : "f8f50916ab86dc0db496722194a6ef16abffbc88",
       "triggerType" : "PUSH"
     }, {
       "hash" : "804b8f73417efce2dceae397211094b8991e8590",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7336",
       "triggerID" : "804b8f73417efce2dceae397211094b8991e8590",
       "triggerType" : "PUSH"
     }, {
       "hash" : "804b8f73417efce2dceae397211094b8991e8590",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7398",
       "triggerID" : "706737508",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "804b8f73417efce2dceae397211094b8991e8590",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7336",
       "triggerID" : "706737508",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "eaa536db01b506ec59d94a3120fb93ba81cbe69e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7399",
       "triggerID" : "eaa536db01b506ec59d94a3120fb93ba81cbe69e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 804b8f73417efce2dceae397211094b8991e8590 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7398) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7336) 
   * eaa536db01b506ec59d94a3120fb93ba81cbe69e Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7399) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13550: [FLINK-19474] Implement a state backend that holds a single key at a time

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13550:
URL: https://github.com/apache/flink/pull/13550#issuecomment-704399024


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5e658efd970cd23de2637876e59becdbf00a8921",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7250",
       "triggerID" : "5e658efd970cd23de2637876e59becdbf00a8921",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b7d64e6761396fda8ace8c17d400a9e6d925d527",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "b7d64e6761396fda8ace8c17d400a9e6d925d527",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5e658efd970cd23de2637876e59becdbf00a8921 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7250) 
   * b7d64e6761396fda8ace8c17d400a9e6d925d527 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13550: [FLINK-19474] Implement a state backend that holds a single key at a time

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13550:
URL: https://github.com/apache/flink/pull/13550#issuecomment-704399024


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5e658efd970cd23de2637876e59becdbf00a8921",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7250",
       "triggerID" : "5e658efd970cd23de2637876e59becdbf00a8921",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b7d64e6761396fda8ace8c17d400a9e6d925d527",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7328",
       "triggerID" : "b7d64e6761396fda8ace8c17d400a9e6d925d527",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f8f50916ab86dc0db496722194a6ef16abffbc88",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7330",
       "triggerID" : "f8f50916ab86dc0db496722194a6ef16abffbc88",
       "triggerType" : "PUSH"
     }, {
       "hash" : "804b8f73417efce2dceae397211094b8991e8590",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7336",
       "triggerID" : "804b8f73417efce2dceae397211094b8991e8590",
       "triggerType" : "PUSH"
     }, {
       "hash" : "804b8f73417efce2dceae397211094b8991e8590",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7398",
       "triggerID" : "706737508",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "804b8f73417efce2dceae397211094b8991e8590",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7336",
       "triggerID" : "706737508",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "eaa536db01b506ec59d94a3120fb93ba81cbe69e",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7399",
       "triggerID" : "eaa536db01b506ec59d94a3120fb93ba81cbe69e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * eaa536db01b506ec59d94a3120fb93ba81cbe69e Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7399) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13550: [FLINK-19474] Implement a state backend that holds a single key at a time

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13550:
URL: https://github.com/apache/flink/pull/13550#issuecomment-704399024


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5e658efd970cd23de2637876e59becdbf00a8921",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7250",
       "triggerID" : "5e658efd970cd23de2637876e59becdbf00a8921",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b7d64e6761396fda8ace8c17d400a9e6d925d527",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7328",
       "triggerID" : "b7d64e6761396fda8ace8c17d400a9e6d925d527",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f8f50916ab86dc0db496722194a6ef16abffbc88",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7330",
       "triggerID" : "f8f50916ab86dc0db496722194a6ef16abffbc88",
       "triggerType" : "PUSH"
     }, {
       "hash" : "804b8f73417efce2dceae397211094b8991e8590",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7336",
       "triggerID" : "804b8f73417efce2dceae397211094b8991e8590",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f8f50916ab86dc0db496722194a6ef16abffbc88 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7330) 
   * 804b8f73417efce2dceae397211094b8991e8590 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7336) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #13550: [FLINK-19474] Implement a state backend that holds a single key at a time

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #13550:
URL: https://github.com/apache/flink/pull/13550#discussion_r502383824



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/SingleKeyKeyedStateBackend.java
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sorted.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.PriorityComparator;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A simple {@link CheckpointableKeyedStateBackend} which keeps values for a single key at a time.
+ *
+ * <p><b>IMPORTANT:</b> Requires the incoming records to be sorted/grouped by the key. Used in a BATCH style execution.
+ */
+class SingleKeyKeyedStateBackend<K> implements CheckpointableKeyedStateBackend<K> {
+	@SuppressWarnings("rawtypes")
+	private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
+		Stream.of(
+			Tuple2.of(ValueStateDescriptor.class, (StateFactory) SingleKeyValueState::create),
+			Tuple2.of(ListStateDescriptor.class, (StateFactory) SingleKeyListState::create),
+			Tuple2.of(MapStateDescriptor.class, (StateFactory) SingleKeyMapState::create),
+			Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) SingleKeyAggregatingState::create),
+			Tuple2.of(ReducingStateDescriptor.class, (StateFactory) SingleKeyReducingState::create)
+		).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+
+	private K currentKey = null;
+	private final TypeSerializer<K> keySerializer;
+	private final List<KeySelectionListener<K>> keySelectionListeners = new ArrayList<>();
+	private final Map<String, State> states = new HashMap<>();
+	private final Map<String, KeyGroupedInternalPriorityQueue<?>> priorityQueues = new HashMap<>();
+	private final KeyGroupRange keyGroupRange;
+
+	public SingleKeyKeyedStateBackend(
+			TypeSerializer<K> keySerializer,
+			KeyGroupRange keyGroupRange) {
+		this.keySerializer = keySerializer;
+		this.keyGroupRange = keyGroupRange;
+	}
+
+	@Override
+	public void setCurrentKey(K newKey) {
+		if (!Objects.equals(newKey, currentKey)) {
+			notifyKeySelected(newKey);
+			for (State value : states.values()) {
+				((AbstractSingleKeyState<?, ?, ?>) value).clearAllNamespaces();
+			}
+			for (KeyGroupedInternalPriorityQueue<?> value : priorityQueues.values()) {
+				while (value.poll() != null) {
+					// remove everything for the key
+				}
+			}
+			this.currentKey = newKey;
+		}
+	}
+
+	@Override
+	public K getCurrentKey() {
+		return currentKey;
+	}
+
+	@Override
+	public TypeSerializer<K> getKeySerializer() {
+		return keySerializer;
+	}
+
+	@Override
+	public <N, S extends State, T> void applyToAllKeys(
+			N namespace,
+			TypeSerializer<N> namespaceSerializer,
+			StateDescriptor<S, T> stateDescriptor,
+			KeyedStateFunction<K, S> function) {
+		throw new UnsupportedOperationException("applyToAllKeys() is not supported in BATCH execution mode.");

Review comment:
       Will rename the classes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13550: [FLINK-19474] Implement a state backend that holds a single key at a time

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13550:
URL: https://github.com/apache/flink/pull/13550#issuecomment-704399024


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5e658efd970cd23de2637876e59becdbf00a8921",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7250",
       "triggerID" : "5e658efd970cd23de2637876e59becdbf00a8921",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b7d64e6761396fda8ace8c17d400a9e6d925d527",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7328",
       "triggerID" : "b7d64e6761396fda8ace8c17d400a9e6d925d527",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f8f50916ab86dc0db496722194a6ef16abffbc88",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7330",
       "triggerID" : "f8f50916ab86dc0db496722194a6ef16abffbc88",
       "triggerType" : "PUSH"
     }, {
       "hash" : "804b8f73417efce2dceae397211094b8991e8590",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7336",
       "triggerID" : "804b8f73417efce2dceae397211094b8991e8590",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 804b8f73417efce2dceae397211094b8991e8590 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7336) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13550: [FLINK-19474] Implement a state backend that holds a single key at a time

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13550:
URL: https://github.com/apache/flink/pull/13550#issuecomment-704399024


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5e658efd970cd23de2637876e59becdbf00a8921",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7250",
       "triggerID" : "5e658efd970cd23de2637876e59becdbf00a8921",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b7d64e6761396fda8ace8c17d400a9e6d925d527",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7328",
       "triggerID" : "b7d64e6761396fda8ace8c17d400a9e6d925d527",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5e658efd970cd23de2637876e59becdbf00a8921 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7250) 
   * b7d64e6761396fda8ace8c17d400a9e6d925d527 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7328) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13550: [FLINK-19474] Implement a state backend that holds a single key at a time

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13550:
URL: https://github.com/apache/flink/pull/13550#issuecomment-704399024


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5e658efd970cd23de2637876e59becdbf00a8921",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7250",
       "triggerID" : "5e658efd970cd23de2637876e59becdbf00a8921",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b7d64e6761396fda8ace8c17d400a9e6d925d527",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7328",
       "triggerID" : "b7d64e6761396fda8ace8c17d400a9e6d925d527",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f8f50916ab86dc0db496722194a6ef16abffbc88",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7330",
       "triggerID" : "f8f50916ab86dc0db496722194a6ef16abffbc88",
       "triggerType" : "PUSH"
     }, {
       "hash" : "804b8f73417efce2dceae397211094b8991e8590",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7336",
       "triggerID" : "804b8f73417efce2dceae397211094b8991e8590",
       "triggerType" : "PUSH"
     }, {
       "hash" : "804b8f73417efce2dceae397211094b8991e8590",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7398",
       "triggerID" : "706737508",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "804b8f73417efce2dceae397211094b8991e8590",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7336",
       "triggerID" : "706737508",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "eaa536db01b506ec59d94a3120fb93ba81cbe69e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7399",
       "triggerID" : "eaa536db01b506ec59d94a3120fb93ba81cbe69e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 804b8f73417efce2dceae397211094b8991e8590 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7398) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7336) 
   * eaa536db01b506ec59d94a3120fb93ba81cbe69e Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7399) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #13550: [FLINK-19474] Implement a state backend that holds a single key at a time

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #13550:
URL: https://github.com/apache/flink/pull/13550#discussion_r502368225



##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/StateBackendBenchmarkUtils.java
##########
@@ -74,6 +75,24 @@
 			case ROCKSDB:
 				rootDir = prepareDirectory(rootDirName, null);
 				return createRocksDBKeyedStateBackend(rootDir);
+			case SINGLE_KEY:
+				try {
+					return new SingleKeyStateBackend().createKeyedStateBackend(

Review comment:
       Actually I did it and forgot to force push :facepalm: 

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/SingleKeyKeyedStateBackend.java
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sorted.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.PriorityComparator;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A simple {@link CheckpointableKeyedStateBackend} which keeps values for a single key at a time.
+ *
+ * <p><b>IMPORTANT:</b> Requires the incoming records to be sorted/grouped by the key. Used in a BATCH style execution.
+ */
+class SingleKeyKeyedStateBackend<K> implements CheckpointableKeyedStateBackend<K> {
+	@SuppressWarnings("rawtypes")
+	private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
+		Stream.of(
+			Tuple2.of(ValueStateDescriptor.class, (StateFactory) SingleKeyValueState::create),
+			Tuple2.of(ListStateDescriptor.class, (StateFactory) SingleKeyListState::create),
+			Tuple2.of(MapStateDescriptor.class, (StateFactory) SingleKeyMapState::create),
+			Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) SingleKeyAggregatingState::create),
+			Tuple2.of(ReducingStateDescriptor.class, (StateFactory) SingleKeyReducingState::create)
+		).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+
+	private K currentKey = null;
+	private final TypeSerializer<K> keySerializer;
+	private final List<KeySelectionListener<K>> keySelectionListeners = new ArrayList<>();
+	private final Map<String, State> states = new HashMap<>();
+	private final Map<String, KeyGroupedInternalPriorityQueue<?>> priorityQueues = new HashMap<>();
+	private final KeyGroupRange keyGroupRange;
+
+	public SingleKeyKeyedStateBackend(
+			TypeSerializer<K> keySerializer,
+			KeyGroupRange keyGroupRange) {
+		this.keySerializer = keySerializer;
+		this.keyGroupRange = keyGroupRange;
+	}
+
+	@Override
+	public void setCurrentKey(K newKey) {
+		if (!Objects.equals(newKey, currentKey)) {
+			notifyKeySelected(newKey);
+			for (State value : states.values()) {
+				((AbstractSingleKeyState<?, ?, ?>) value).clearAllNamespaces();
+			}
+			for (KeyGroupedInternalPriorityQueue<?> value : priorityQueues.values()) {
+				while (value.poll() != null) {
+					// remove everything for the key
+				}
+			}
+			this.currentKey = newKey;
+		}
+	}
+
+	@Override
+	public K getCurrentKey() {
+		return currentKey;
+	}
+
+	@Override
+	public TypeSerializer<K> getKeySerializer() {
+		return keySerializer;
+	}
+
+	@Override
+	public <N, S extends State, T> void applyToAllKeys(
+			N namespace,
+			TypeSerializer<N> namespaceSerializer,
+			StateDescriptor<S, T> stateDescriptor,
+			KeyedStateFunction<K, S> function) {
+		throw new UnsupportedOperationException("applyToAllKeys() is not supported in BATCH execution mode.");

Review comment:
       Personally I can't think of other scenarios...

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/SingleKeyKeyGroupedInternalPriorityQueue.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sorted.state;
+
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.PriorityComparator;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueue;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Very similar implementation to {@link org.apache.flink.runtime.state.heap.HeapPriorityQueueSet}. The only difference
+ * is it keeps track of elements for a single key at a time.
+ */
+class SingleKeyKeyGroupedInternalPriorityQueue<T extends HeapPriorityQueueElement>
+		extends HeapPriorityQueue<T>
+		implements KeyGroupedInternalPriorityQueue<T> {
+
+	private final Map<T, T> dedupMap = new HashMap<>();

Review comment:
       Good question.
   
   The reason is that we actually always use the `*PriorityQueue` as a `Set`. We use the `PriorityQueue` for storing timers and that's why we need Set semantics. We want to fire only a single timer for a timestamp. BTW, the logic is copied over from the `HeapPriorityQueueSet`. 
   
   However to better address it I will rename the class to `BatchExecutionInternalPriorityQueueSet`.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/SingleKeyKeyedStateBackend.java
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sorted.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.PriorityComparator;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A simple {@link CheckpointableKeyedStateBackend} which keeps values for a single key at a time.
+ *
+ * <p><b>IMPORTANT:</b> Requires the incoming records to be sorted/grouped by the key. Used in a BATCH style execution.
+ */
+class SingleKeyKeyedStateBackend<K> implements CheckpointableKeyedStateBackend<K> {
+	@SuppressWarnings("rawtypes")
+	private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
+		Stream.of(
+			Tuple2.of(ValueStateDescriptor.class, (StateFactory) SingleKeyValueState::create),
+			Tuple2.of(ListStateDescriptor.class, (StateFactory) SingleKeyListState::create),
+			Tuple2.of(MapStateDescriptor.class, (StateFactory) SingleKeyMapState::create),
+			Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) SingleKeyAggregatingState::create),
+			Tuple2.of(ReducingStateDescriptor.class, (StateFactory) SingleKeyReducingState::create)
+		).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+
+	private K currentKey = null;
+	private final TypeSerializer<K> keySerializer;
+	private final List<KeySelectionListener<K>> keySelectionListeners = new ArrayList<>();
+	private final Map<String, State> states = new HashMap<>();
+	private final Map<String, KeyGroupedInternalPriorityQueue<?>> priorityQueues = new HashMap<>();
+	private final KeyGroupRange keyGroupRange;
+
+	public SingleKeyKeyedStateBackend(
+			TypeSerializer<K> keySerializer,
+			KeyGroupRange keyGroupRange) {
+		this.keySerializer = keySerializer;
+		this.keyGroupRange = keyGroupRange;
+	}
+
+	@Override
+	public void setCurrentKey(K newKey) {
+		if (!Objects.equals(newKey, currentKey)) {
+			notifyKeySelected(newKey);
+			for (State value : states.values()) {
+				((AbstractSingleKeyState<?, ?, ?>) value).clearAllNamespaces();
+			}
+			for (KeyGroupedInternalPriorityQueue<?> value : priorityQueues.values()) {
+				while (value.poll() != null) {
+					// remove everything for the key
+				}
+			}
+			this.currentKey = newKey;
+		}
+	}
+
+	@Override
+	public K getCurrentKey() {
+		return currentKey;
+	}
+
+	@Override
+	public TypeSerializer<K> getKeySerializer() {
+		return keySerializer;
+	}
+
+	@Override
+	public <N, S extends State, T> void applyToAllKeys(
+			N namespace,
+			TypeSerializer<N> namespaceSerializer,
+			StateDescriptor<S, T> stateDescriptor,
+			KeyedStateFunction<K, S> function) {
+		throw new UnsupportedOperationException("applyToAllKeys() is not supported in BATCH execution mode.");

Review comment:
       Will rename the classes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13550: [FLINK-19474] Implement a state backend that holds a single key at a time

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13550:
URL: https://github.com/apache/flink/pull/13550#issuecomment-704387404


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 5e658efd970cd23de2637876e59becdbf00a8921 (Tue Oct 06 16:10:54 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13550: [FLINK-19474] Implement a state backend that holds a single key at a time

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13550:
URL: https://github.com/apache/flink/pull/13550#issuecomment-704399024


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5e658efd970cd23de2637876e59becdbf00a8921",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7250",
       "triggerID" : "5e658efd970cd23de2637876e59becdbf00a8921",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5e658efd970cd23de2637876e59becdbf00a8921 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7250) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13550: [FLINK-19474] Implement a state backend that holds a single key at a time

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13550:
URL: https://github.com/apache/flink/pull/13550#issuecomment-704399024


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5e658efd970cd23de2637876e59becdbf00a8921",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7250",
       "triggerID" : "5e658efd970cd23de2637876e59becdbf00a8921",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b7d64e6761396fda8ace8c17d400a9e6d925d527",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7328",
       "triggerID" : "b7d64e6761396fda8ace8c17d400a9e6d925d527",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f8f50916ab86dc0db496722194a6ef16abffbc88",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7330",
       "triggerID" : "f8f50916ab86dc0db496722194a6ef16abffbc88",
       "triggerType" : "PUSH"
     }, {
       "hash" : "804b8f73417efce2dceae397211094b8991e8590",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7336",
       "triggerID" : "804b8f73417efce2dceae397211094b8991e8590",
       "triggerType" : "PUSH"
     }, {
       "hash" : "804b8f73417efce2dceae397211094b8991e8590",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7398",
       "triggerID" : "706737508",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 804b8f73417efce2dceae397211094b8991e8590 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7336) Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7398) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #13550: [FLINK-19474] Implement a state backend that holds a single key at a time

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #13550:
URL: https://github.com/apache/flink/pull/13550#discussion_r502368225



##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/StateBackendBenchmarkUtils.java
##########
@@ -74,6 +75,24 @@
 			case ROCKSDB:
 				rootDir = prepareDirectory(rootDirName, null);
 				return createRocksDBKeyedStateBackend(rootDir);
+			case SINGLE_KEY:
+				try {
+					return new SingleKeyStateBackend().createKeyedStateBackend(

Review comment:
       Actually I did it and forgot to force push :facepalm: 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13550: [FLINK-19474] Implement a state backend that holds a single key at a time

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13550:
URL: https://github.com/apache/flink/pull/13550#issuecomment-704399024


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5e658efd970cd23de2637876e59becdbf00a8921",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7250",
       "triggerID" : "5e658efd970cd23de2637876e59becdbf00a8921",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b7d64e6761396fda8ace8c17d400a9e6d925d527",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7328",
       "triggerID" : "b7d64e6761396fda8ace8c17d400a9e6d925d527",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f8f50916ab86dc0db496722194a6ef16abffbc88",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7330",
       "triggerID" : "f8f50916ab86dc0db496722194a6ef16abffbc88",
       "triggerType" : "PUSH"
     }, {
       "hash" : "804b8f73417efce2dceae397211094b8991e8590",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7336",
       "triggerID" : "804b8f73417efce2dceae397211094b8991e8590",
       "triggerType" : "PUSH"
     }, {
       "hash" : "804b8f73417efce2dceae397211094b8991e8590",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7398",
       "triggerID" : "706737508",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "804b8f73417efce2dceae397211094b8991e8590",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7336",
       "triggerID" : "706737508",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "eaa536db01b506ec59d94a3120fb93ba81cbe69e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "eaa536db01b506ec59d94a3120fb93ba81cbe69e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 804b8f73417efce2dceae397211094b8991e8590 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7398) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7336) 
   * eaa536db01b506ec59d94a3120fb93ba81cbe69e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org