You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by kl0u <gi...@git.apache.org> on 2018/03/13 13:03:19 UTC

[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

GitHub user kl0u opened a pull request:

    https://github.com/apache/flink/pull/5691

    [FLINK-8802] [QS] Fix concurrent access to non-duplicated serializers.

    ## What is the purpose of the change
    
    This fixes the problem of multiple threads using the same serializer instance to concurrently deserialize requests.
    
    ## Brief change log
    
    Now the `KvStateRegistry` holds a `KvStateEntry` for each registered `InternalKvState` and in that entry we have a cache that holds a copy of the serializer per thread, if the serializer is stateful. If it is stateless, we do not need to copy the serializer.
    
    ## Verifying this change
    
    There is a test added in the `KvStateRegistryTest`, the `KvStateRegistryTest#testKvStateEntry()` and also I have tested it with an actual job (but I run it locally).
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): no
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
      - The S3 file system connector: no
    
    ## Documentation
    
      - Does this pull request introduce a new feature? no
      - If yes, how is the feature documented? not applicable


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kl0u/flink improved-qs-inv

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5691.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5691
    
----
commit 595a59c30dfc778945c43351edf8dbc2887b6a87
Author: kkloudas <kk...@...>
Date:   2018-03-09T11:05:38Z

    [FLINK-8908] Do not create copy when MapSerializer stateless.

commit 1a8c0415cec46ed6683768b793c295ab478b2eb8
Author: kkloudas <kk...@...>
Date:   2018-03-12T11:12:06Z

    [FLINK-8928] [QS] Improve server binding error message.

commit 67b6d7ad2d128a14e001afc34eb14cd6c62d90c4
Author: kkloudas <kk...@...>
Date:   2018-03-09T21:47:35Z

    [FLINK-8802] [QS] Fix concurrent access to non-duplicated serializers.

commit b86657f630363e59971edd29464130c62bd4b3a7
Author: kkloudas <kk...@...>
Date:   2018-03-12T12:19:23Z

    [FLINK-8926] [QS] Shutdown client proxy after test ends.

----


---

[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5691


---

[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174383199
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateInfo.java ---
    @@ -0,0 +1,114 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.query;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.runtime.state.internal.InternalKvState;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.Objects;
    +
    +/**
    + * Metadata about a {@link InternalKvState}. This includes the serializers for
    + * the key, the namespace, and the values kept in the state.
    + *
    + * @param <K>	The type of key the state is associated to
    + * @param <N>	The type of the namespace the state is associated to
    + * @param <V>	The type of values kept internally in state
    + */
    +public class KvStateInfo<K, N, V> {
    +
    +	private final TypeSerializer<K> keySerializer;
    +	private final TypeSerializer<N> namespaceSerializer;
    +	private final TypeSerializer<V> stateValueSerializer;
    +
    +	public KvStateInfo(
    +			final TypeSerializer<K> keySerializer,
    +			final TypeSerializer<N> namespaceSerializer,
    +			final TypeSerializer<V> stateValueSerializer
    +	) {
    +		this.keySerializer = Preconditions.checkNotNull(keySerializer);
    +		this.namespaceSerializer = Preconditions.checkNotNull(namespaceSerializer);
    +		this.stateValueSerializer = Preconditions.checkNotNull(stateValueSerializer);
    +	}
    +
    +	/**
    +	 * @return The serializer for the key the state is associated to.
    +	 */
    +	public TypeSerializer<K> getKeySerializer() {
    +		return keySerializer;
    +	}
    +
    +	/**
    +	 * @return The serializer for the namespace the state is associated to.
    +	 */
    +	public TypeSerializer<N> getNamespaceSerializer() {
    +		return namespaceSerializer;
    +	}
    +
    +	/**
    +	 * @return The serializer for the values kept in the state.
    +	 */
    +	public TypeSerializer<V> getStateValueSerializer() {
    +		return stateValueSerializer;
    +	}
    +
    +	/**
    +	 * Creates a deep copy of the current {@link KvStateInfo} by duplicating
    +	 * all the included serializers.
    +	 *
    +	 * <p>This method assumes correct implementation of the {@link TypeSerializer#duplicate()}
    +	 * method of the included serializers.
    +	 */
    +	public KvStateInfo<K, N, V> duplicate() {
    +		final TypeSerializer<K> dupKeySerializer = keySerializer.duplicate();
    +		final TypeSerializer<N> dupNamespaceSerializer = namespaceSerializer.duplicate();
    +		final TypeSerializer<V> dupSVSerializer = stateValueSerializer.duplicate();
    +
    +		if (
    +				dupKeySerializer == keySerializer &&
    --- End diff --
    
    odd formatting


---

[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174402053
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java ---
    @@ -40,6 +54,81 @@
      */
     public class KvStateRegistryTest extends TestLogger {
     
    +	@Test
    +	public void testKvStateEntry() throws InterruptedException {
    +		final int threads = 10;
    +
    +		final CountDownLatch latch1 = new CountDownLatch(threads);
    +		final CountDownLatch latch2 = new CountDownLatch(1);
    +
    +		final List<KvStateInfo<?, ?, ?>> infos = Collections.synchronizedList(new ArrayList<>());
    +
    +		final JobID jobID = new JobID();
    +
    +		final JobVertexID jobVertexId = new JobVertexID();
    +		final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1);
    +		final String registrationName = "foobar";
    +
    +		final KvStateRegistry kvStateRegistry = new KvStateRegistry();
    +		final KvStateID stateID = kvStateRegistry.registerKvState(
    +				jobID,
    +				jobVertexId,
    +				keyGroupRange,
    +				registrationName,
    +				new DummyKvState()
    +		);
    +
    +		for (int i = 0; i < threads; i++) {
    +			new Thread(() -> {
    +				final KvStateEntry<?, ?, ?> kvState = kvStateRegistry.getKvState(stateID);
    +				final KvStateInfo<?, ?, ?> stateInfo = kvState.getInfoForCurrentThread();
    +				infos.add(stateInfo);
    +
    +				latch1.countDown();
    +				try {
    +					latch2.await();
    +				} catch (InterruptedException e) {
    +					Assert.fail(e.getMessage());
    +				}
    +
    +			}).start();
    +		}
    +
    +		latch1.await();
    +
    +		final KvStateEntry<?, ?, ?> kvState = kvStateRegistry.getKvState(stateID);
    +
    +		// verify that all the threads are done correctly.
    +		Assert.assertEquals(threads, infos.size());
    +		Assert.assertEquals(threads, kvState.getCacheSize());
    +
    +		latch2.countDown();
    +
    +		for (KvStateInfo<?, ?, ?> infoA: infos) {
    +			boolean found = false;
    +			for (KvStateInfo<?, ?, ?> infoB: infos) {
    +				if (infoA == infoB) {
    +					if (found) {
    +						Assert.fail("Already found");
    +					}
    +					found = true;
    +				} else {
    +					Assert.assertTrue(infoA != infoB && infoA.equals(infoB));
    +				}
    +			}
    +		}
    +
    +		kvStateRegistry.unregisterKvState(
    +				jobID,
    +				jobVertexId,
    +				keyGroupRange,
    +				registrationName,
    +				stateID);
    +
    +		// we have to call for garbage collection to be sure that everything is cleared up.
    --- End diff --
    
    Forgotten from previous experiments that were using a weak hash map. I will remove it.


---

[GitHub] flink issue #5691: [FLINK-8802] [QS] Fix concurrent access to non-duplicated...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/5691
  
    @StefanRRichter and @zentol  I will ping you when I am done with the refactoring.


---

[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174473498
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java ---
    @@ -70,10 +88,18 @@
     	 * <p>If no value is associated with key and namespace, <code>null</code>
     	 * is returned.
     	 *
    +	 * <p><b>TO IMPLEMENTERS:</b> This method is called by multiple threads. Anything
    +	 * stateful (e.g. serializers) should be either duplicated or protected from undesired
    +	 * consequences of concurrent invocations.
    +	 *
     	 * @param serializedKeyAndNamespace Serialized key and namespace
     	 * @return Serialized value or <code>null</code> if no value is associated with the key and namespace.
     	 * 
     	 * @throws Exception Exceptions during serialization are forwarded
     	 */
    -	byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception;
    +	byte[] getSerializedValue(
    --- End diff --
    
    I would suggest to store the serializer in a thread local variable. The current solution is a bit confusing because this interface suddenly exposes serializers and caller have to provide serialzer in the `getSerializedValue` method. In my opinion this interface does not make much sense in this way. Furthermore, the serializers are copied externally into something that looks like a custom-build thread local. I suggest having the serializers thread local in the base class and bringing this interface back to the original form. There is also only one threadpool, dedicated for queryable state that would hold the serializers and even the current solution has a dedicated cleanup method. In that place, we can just clean the thread locals.


---

[GitHub] flink issue #5691: [FLINK-8802] [QS] Fix concurrent access to non-duplicated...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/5691
  
    That is because cleaning up the cache is happening from another thread, other than the one 
    accessing the serializers, and ThreadLocal does not have a clear() method that you can call
    from another thread and clean all the state in it. Each thread can only clean its own state.
    
    > On Mar 20, 2018, at 3:34 AM, sihua zhou <no...@github.com> wrote:
    > 
    > @sihuazhou commented on this pull request.
    > 
    > In flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalQueryableKvState.java <https://github.com/apache/flink/pull/5691#discussion_r175640731>:
    > 
    > >  	private final boolean areSerializersStateless;
    >  
    > -	private final ConcurrentMap<Thread, KvStateInfo<K, N, V>> serializerCache;
    > +	private final ConcurrentMap<Thread, KvStateInfo<K, N, V>> serializerCache = new ConcurrentHashMap<>(4);
    >  
    > nit: just wonder why didn't use ThreadLocal<KvStateInfo<K, N, V>> provided by JDK...
    > 
    > —
    > You are receiving this because you authored the thread.
    > Reply to this email directly, view it on GitHub <https://github.com/apache/flink/pull/5691#pullrequestreview-105202229>, or mute the thread <https://github.com/notifications/unsubscribe-auth/ACS1qD3X7x51gg8CHKKLCVJ6aFuOvnthks5tgGqmgaJpZM4SopKO>.
    > 
    



---

[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174385834
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java ---
    @@ -40,6 +54,81 @@
      */
     public class KvStateRegistryTest extends TestLogger {
     
    +	@Test
    +	public void testKvStateEntry() throws InterruptedException {
    +		final int threads = 10;
    +
    +		final CountDownLatch latch1 = new CountDownLatch(threads);
    +		final CountDownLatch latch2 = new CountDownLatch(1);
    +
    +		final List<KvStateInfo<?, ?, ?>> infos = Collections.synchronizedList(new ArrayList<>());
    +
    +		final JobID jobID = new JobID();
    +
    +		final JobVertexID jobVertexId = new JobVertexID();
    +		final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1);
    +		final String registrationName = "foobar";
    +
    +		final KvStateRegistry kvStateRegistry = new KvStateRegistry();
    +		final KvStateID stateID = kvStateRegistry.registerKvState(
    +				jobID,
    +				jobVertexId,
    +				keyGroupRange,
    +				registrationName,
    +				new DummyKvState()
    +		);
    +
    +		for (int i = 0; i < threads; i++) {
    +			new Thread(() -> {
    +				final KvStateEntry<?, ?, ?> kvState = kvStateRegistry.getKvState(stateID);
    +				final KvStateInfo<?, ?, ?> stateInfo = kvState.getInfoForCurrentThread();
    +				infos.add(stateInfo);
    +
    +				latch1.countDown();
    +				try {
    +					latch2.await();
    +				} catch (InterruptedException e) {
    +					Assert.fail(e.getMessage());
    +				}
    +
    +			}).start();
    +		}
    +
    +		latch1.await();
    +
    +		final KvStateEntry<?, ?, ?> kvState = kvStateRegistry.getKvState(stateID);
    +
    +		// verify that all the threads are done correctly.
    +		Assert.assertEquals(threads, infos.size());
    +		Assert.assertEquals(threads, kvState.getCacheSize());
    +
    +		latch2.countDown();
    +
    +		for (KvStateInfo<?, ?, ?> infoA: infos) {
    +			boolean found = false;
    +			for (KvStateInfo<?, ?, ?> infoB: infos) {
    +				if (infoA == infoB) {
    +					if (found) {
    --- End diff --
    
    `found` needs a better name or a comment what it means


---

[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174444245
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java ---
    @@ -39,29 +39,27 @@
     	private final InternalKvState<K, N, V> state;
     	private final KvStateInfo<K, N, V> stateInfo;
     
    +	private final boolean isSerializerStateless;
    +
     	private final ConcurrentMap<Thread, KvStateInfo<K, N, V>> serializerCache;
     
     	public KvStateEntry(final InternalKvState<K, N, V> state) {
    -
     		this.state = Preconditions.checkNotNull(state);
     		this.stateInfo = new KvStateInfo<>(
     				state.getKeySerializer(),
     				state.getNamespaceSerializer(),
     				state.getValueSerializer()
     		);
    -
    -		this.serializerCache =
    -				stateInfo.duplicate() == stateInfo
    -						? null							// if the serializers are stateless, we do not need a cache
    -						: new ConcurrentHashMap<>();
    +		this.serializerCache = new ConcurrentHashMap<>();
    +		this.isSerializerStateless = stateInfo.duplicate() == stateInfo;
    --- End diff --
    
    -> areSerializersStateless?


---

[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174383043
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java ---
    @@ -0,0 +1,79 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.query;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.runtime.state.internal.InternalKvState;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ConcurrentMap;
    +
    +/**
    + * An entry holding the {@link InternalKvState} along with its {@link KvStateInfo}.
    + *
    + * @param <K>	The type of key the state is associated to
    + * @param <N>	The type of the namespace the state is associated to
    + * @param <V>	The type of values kept internally in state
    + */
    +@Internal
    +public class KvStateEntry<K, N, V> {
    +
    +	private final InternalKvState<K, N, V> state;
    +	private final KvStateInfo<K, N, V> stateInfo;
    +
    +	private final ConcurrentMap<Thread, KvStateInfo<K, N, V>> serializerCache;
    +
    +	public KvStateEntry(final InternalKvState<K, N, V> state) {
    +
    +		this.state = Preconditions.checkNotNull(state);
    +		this.stateInfo = new KvStateInfo<>(
    +				state.getKeySerializer(),
    +				state.getNamespaceSerializer(),
    +				state.getValueSerializer()
    +		);
    +
    +		this.serializerCache =
    +				stateInfo.duplicate() == stateInfo
    +						? null							// if the serializers are stateless, we do not need a cache
    --- End diff --
    
    An empty map would be acceptable here.


---

[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174380623
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java ---
    @@ -29,20 +29,19 @@
     import java.io.IOException;
     
     /**
    - * Heap-backed partitioned {@link FoldingState} that is
    - * snapshotted into files.
    + * Heap-backed partitioned {@link FoldingState} that is snapshotted into files.
      *
    - * @param <K> The type of the key.
    - * @param <N> The type of the namespace.
    - * @param <T> The type of the values that can be folded into the state.
    - * @param <ACC> The type of the value in the folding state.
    + * @param <K>	The type of the key.
    + * @param <N>	The type of the namespace.
    + * @param <T>	The type of the values that can be folded into the state.
    + * @param <ACC>	The type of the value in the folding state.
    --- End diff --
    
    indentation is funky


---

[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174450032
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java ---
    @@ -140,19 +159,28 @@ public boolean contains(UK userKey) {
     	}
     
     	@Override
    -	public byte[] getSerializedValue(K key, N namespace) throws IOException {
    -		Preconditions.checkState(namespace != null, "No namespace given.");
    -		Preconditions.checkState(key != null, "No key given.");
    +	public byte[] getSerializedValue(
    +			byte[] serializedKeyAndNamespace,
    +			TypeSerializer<K> keySerializer,
    +			TypeSerializer<N> namespaceSerializer,
    +			TypeSerializer<HashMap<UK, UV>> valueSerializer) throws Exception {
     
    -		HashMap<UK, UV> result = stateTable.get(key, namespace);
    +		Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace");
     
    -		if (null == result) {
    +		Tuple2<K, N> keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace(
    +				serializedKeyAndNamespace, keySerializer, namespaceSerializer);
    +
    +		Map<UK, UV> result = stateTable.get(keyAndNamespace.f0, keyAndNamespace.f1);
    +
    +		if (result == null) {
     			return null;
     		}
     
    -		TypeSerializer<UK> userKeySerializer = stateDesc.getKeySerializer();
    -		TypeSerializer<UV> userValueSerializer = stateDesc.getValueSerializer();
    +		final HashMapSerializer<UK, UV> serializer = (HashMapSerializer<UK, UV>) valueSerializer;
    --- End diff --
    
    i mean why isn't the signature of `KvStateSerializer.serializeMap`:
    ```
    KvStateSerializer.serializeMap(Map<UK, UV> map, TypeSerializer<Map<UK, UV>> serializer);
    ```
    
    with the following implementation:
    ```
    ...
    serializeMap(Map<UK, UV> map, TypeSerializer<Map<UK, UV>> serializer) {
    		if (map != null) {
    			DataOutputSerializer dos = new DataOutputSerializer(32);
    			serializer.serialize(map, dos);
    			return dos.getCopyOfBuffer();
    		} else {
    			return null;
    		}
    ```
    
    Why deal with the map key/value entries at all outside the serializer?


---

[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174145613
  
    --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java ---
    @@ -78,13 +75,22 @@ public KvStateServerHandler(
     		final CompletableFuture<KvStateResponse> responseFuture = new CompletableFuture<>();
     
     		try {
    -			final InternalKvState<?> kvState = registry.getKvState(request.getKvStateId());
    +			final KvStateEntry<?, ?, ?> kvState = registry.getKvState(request.getKvStateId());
     			if (kvState == null) {
     				responseFuture.completeExceptionally(new UnknownKvStateIdException(getServerName(), request.getKvStateId()));
     			} else {
     				byte[] serializedKeyAndNamespace = request.getSerializedKeyAndNamespace();
     
    -				byte[] serializedResult = kvState.getSerializedValue(serializedKeyAndNamespace);
    +				// here we remove any type check...
    +				// Ideally we want to keep that the info match the state.
    +				final InternalKvState state = kvState.getState();
    +				final KvStateInfo info = kvState.getInfoForCurrentThread();
    +
    +				byte[] serializedResult = state.getSerializedValue(
    --- End diff --
    
    "Word on the street" :P is that `Kryo` duplication is pretty expensive. This is why I went for this solution.


---

[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r177690430
  
    --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java ---
    @@ -124,10 +134,56 @@ public void testListSerialization() throws Exception {
     			.createListState(VoidNamespaceSerializer.INSTANCE,
     				new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
     
    -		KvStateRequestSerializerTest.testListSerialization(key, listState);
    +		testListSerialization(key, (RocksDBListState<Long, VoidNamespace, Long>) listState);
     		longHeapKeyedStateBackend.dispose();
     	}
     
    +	/**
    +	 * Verifies that the serialization of a list using the given list state
    +	 * matches the deserialization with {@link KvStateSerializer#deserializeList}.
    +	 *
    +	 * @param key
    +	 * 		key of the list state
    +	 * @param listState
    +	 * 		list state using the {@link VoidNamespace}, must also be a {@link RocksDBListState} instance
    +	 *
    +	 * @throws Exception
    +	 */
    +	private void testListSerialization(
    +			final long key,
    +			final RocksDBListState<Long, VoidNamespace, Long> listState) throws Exception {
    +
    +		TypeSerializer<Long> valueSerializer = LongSerializer.INSTANCE;
    +		listState.setCurrentNamespace(VoidNamespace.INSTANCE);
    +
    +		// List
    +		final int numElements = 10;
    +
    +		final List<Long> expectedValues = new ArrayList<>();
    +		for (int i = 0; i < numElements; i++) {
    +			final long value = ThreadLocalRandom.current().nextLong();
    +			expectedValues.add(value);
    +			listState.add(value);
    +		}
    +
    +		final byte[] serializedKey =
    +				KvStateSerializer.serializeKeyAndNamespace(
    +						key, LongSerializer.INSTANCE,
    +						VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
    +
    +		final byte[] serializedValues = listState.getSerializedValue(serializedKey);
    +
    +		List<Long> actualValues = KvStateSerializer.deserializeList(serializedValues, valueSerializer);
    +		assertEquals(expectedValues, actualValues);
    +
    +		// Single value
    +		long expectedValue = ThreadLocalRandom.current().nextLong();
    --- End diff --
    
    nit: why is this using `ThreadLocalRandom`?


---

[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174384860
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java ---
    @@ -40,6 +54,81 @@
      */
     public class KvStateRegistryTest extends TestLogger {
     
    +	@Test
    +	public void testKvStateEntry() throws InterruptedException {
    +		final int threads = 10;
    +
    +		final CountDownLatch latch1 = new CountDownLatch(threads);
    +		final CountDownLatch latch2 = new CountDownLatch(1);
    +
    +		final List<KvStateInfo<?, ?, ?>> infos = Collections.synchronizedList(new ArrayList<>());
    +
    +		final JobID jobID = new JobID();
    +
    +		final JobVertexID jobVertexId = new JobVertexID();
    +		final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1);
    +		final String registrationName = "foobar";
    +
    +		final KvStateRegistry kvStateRegistry = new KvStateRegistry();
    +		final KvStateID stateID = kvStateRegistry.registerKvState(
    +				jobID,
    +				jobVertexId,
    +				keyGroupRange,
    +				registrationName,
    +				new DummyKvState()
    +		);
    +
    +		for (int i = 0; i < threads; i++) {
    +			new Thread(() -> {
    +				final KvStateEntry<?, ?, ?> kvState = kvStateRegistry.getKvState(stateID);
    +				final KvStateInfo<?, ?, ?> stateInfo = kvState.getInfoForCurrentThread();
    +				infos.add(stateInfo);
    +
    +				latch1.countDown();
    +				try {
    +					latch2.await();
    +				} catch (InterruptedException e) {
    +					Assert.fail(e.getMessage());
    +				}
    +
    +			}).start();
    +		}
    +
    +		latch1.await();
    +
    +		final KvStateEntry<?, ?, ?> kvState = kvStateRegistry.getKvState(stateID);
    +
    +		// verify that all the threads are done correctly.
    +		Assert.assertEquals(threads, infos.size());
    +		Assert.assertEquals(threads, kvState.getCacheSize());
    +
    +		latch2.countDown();
    +
    +		for (KvStateInfo<?, ?, ?> infoA: infos) {
    +			boolean found = false;
    +			for (KvStateInfo<?, ?, ?> infoB: infos) {
    +				if (infoA == infoB) {
    +					if (found) {
    +						Assert.fail("Already found");
    +					}
    +					found = true;
    +				} else {
    +					Assert.assertTrue(infoA != infoB && infoA.equals(infoB));
    --- End diff --
    
    `infoA != infoB` is redundant


---

[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174380673
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java ---
    @@ -30,18 +30,17 @@
     import java.io.IOException;
     
     /**
    - * Heap-backed partitioned {@link ReducingState} that is
    - * snapshotted into files.
    + * Heap-backed partitioned {@link ReducingState} that is snapshotted into files.
      *
    - * @param <K> The type of the key.
    - * @param <N> The type of the namespace.
    - * @param <IN> The type of the value added to the state.
    - * @param <ACC> The type of the value stored in the state (the accumulator type).
    - * @param <OUT> The type of the value returned from the state.
    + * @param <K>	The type of the key.
    + * @param <N>	The type of the namespace.
    + * @param <IN>	The type of the value added to the state.
    + * @param <ACC>	The type of the value stored in the state (the accumulator type).
    + * @param <OUT>	The type of the value returned from the state.
    --- End diff --
    
    indentation


---

[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174385024
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java ---
    @@ -40,6 +54,81 @@
      */
     public class KvStateRegistryTest extends TestLogger {
     
    +	@Test
    +	public void testKvStateEntry() throws InterruptedException {
    +		final int threads = 10;
    +
    +		final CountDownLatch latch1 = new CountDownLatch(threads);
    +		final CountDownLatch latch2 = new CountDownLatch(1);
    +
    +		final List<KvStateInfo<?, ?, ?>> infos = Collections.synchronizedList(new ArrayList<>());
    +
    +		final JobID jobID = new JobID();
    +
    +		final JobVertexID jobVertexId = new JobVertexID();
    +		final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1);
    +		final String registrationName = "foobar";
    +
    +		final KvStateRegistry kvStateRegistry = new KvStateRegistry();
    +		final KvStateID stateID = kvStateRegistry.registerKvState(
    +				jobID,
    +				jobVertexId,
    +				keyGroupRange,
    +				registrationName,
    +				new DummyKvState()
    +		);
    +
    +		for (int i = 0; i < threads; i++) {
    +			new Thread(() -> {
    +				final KvStateEntry<?, ?, ?> kvState = kvStateRegistry.getKvState(stateID);
    +				final KvStateInfo<?, ?, ?> stateInfo = kvState.getInfoForCurrentThread();
    +				infos.add(stateInfo);
    +
    +				latch1.countDown();
    +				try {
    +					latch2.await();
    +				} catch (InterruptedException e) {
    +					Assert.fail(e.getMessage());
    +				}
    +
    +			}).start();
    +		}
    +
    +		latch1.await();
    +
    +		final KvStateEntry<?, ?, ?> kvState = kvStateRegistry.getKvState(stateID);
    +
    +		// verify that all the threads are done correctly.
    +		Assert.assertEquals(threads, infos.size());
    +		Assert.assertEquals(threads, kvState.getCacheSize());
    +
    +		latch2.countDown();
    +
    +		for (KvStateInfo<?, ?, ?> infoA: infos) {
    +			boolean found = false;
    +			for (KvStateInfo<?, ?, ?> infoB: infos) {
    +				if (infoA == infoB) {
    +					if (found) {
    +						Assert.fail("Already found");
    --- End diff --
    
    needs a better error message


---

[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174380744
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java ---
    @@ -28,18 +28,19 @@
     import java.util.Collection;
     
     /**
    - * Base class for {@link MergingState} ({@link org.apache.flink.runtime.state.internal.InternalMergingState})
    - * that is stored on the heap.
    + * Base class for {@link MergingState} ({@link InternalMergingState}) that is stored on the heap.
      *
    - * @param <K> The type of the key.
    - * @param <N> The type of the namespace.
    - * @param <SV> The type of the values in the state.
    - * @param <S> The type of State
    - * @param <SD> The type of StateDescriptor for the State S
    + * @param <K>	The type of the key.
    + * @param <N>	The type of the namespace.
    + * @param <IN>	The type of the input elements.
    + * @param <SV>	The type of the values in the state.
    + * @param <OUT>	The type of the output elements.
    --- End diff --
    
    indentation


---

[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174381716
  
    --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java ---
    @@ -686,7 +686,7 @@ public void testClientServerIntegration() throws Throwable {
     				state.update(201 + i);
     
     				// we know it must be a KvStat but this is not exposed to the user via State
    --- End diff --
    
    typo: KvStat


---

[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174381475
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java ---
    @@ -136,13 +138,13 @@ public void unregisterKvState(
     	}
     
     	/**
    -	 * Returns the KvState instance identified by the given KvStateID or
    -	 * <code>null</code> if none is registered.
    +	 * Returns the KvState instance identified by the given KvStateID along with
    --- End diff --
    
    would rephrase to `KvStateEntry` containing the `KvState`


---

[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174444070
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java ---
    @@ -0,0 +1,77 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.query;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.runtime.state.internal.InternalKvState;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ConcurrentMap;
    +
    +/**
    + * An entry holding the {@link InternalKvState} along with its {@link KvStateInfo}.
    + *
    + * @param <K>	The type of key the state is associated to
    + * @param <N>	The type of the namespace the state is associated to
    + * @param <V>	The type of values kept internally in state
    + */
    +@Internal
    +public class KvStateEntry<K, N, V> {
    +
    +	private final InternalKvState<K, N, V> state;
    +	private final KvStateInfo<K, N, V> stateInfo;
    +
    +	private final boolean isSerializerStateless;
    +
    +	private final ConcurrentMap<Thread, KvStateInfo<K, N, V>> serializerCache;
    +
    +	public KvStateEntry(final InternalKvState<K, N, V> state) {
    +		this.state = Preconditions.checkNotNull(state);
    +		this.stateInfo = new KvStateInfo<>(
    +				state.getKeySerializer(),
    +				state.getNamespaceSerializer(),
    +				state.getValueSerializer()
    +		);
    +		this.serializerCache = new ConcurrentHashMap<>();
    +		this.isSerializerStateless = stateInfo.duplicate() == stateInfo;
    +	}
    +
    +	public InternalKvState<K, N, V> getState() {
    +		return state;
    +	}
    +
    +	public KvStateInfo<K, N, V> getInfoForCurrentThread() {
    +		return isSerializerStateless
    +				? stateInfo
    +				: serializerCache.computeIfAbsent(Thread.currentThread(), t -> stateInfo.duplicate());
    +	}
    +
    +	public void clear() {
    +		if (serializerCache != null) {
    --- End diff --
    
    unnecessary null check


---

[GitHub] flink issue #5691: [FLINK-8802] [QS] Fix concurrent access to non-duplicated...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/5691
  
    Can someone have a look at this? 


---

[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174454169
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java ---
    @@ -140,19 +159,28 @@ public boolean contains(UK userKey) {
     	}
     
     	@Override
    -	public byte[] getSerializedValue(K key, N namespace) throws IOException {
    -		Preconditions.checkState(namespace != null, "No namespace given.");
    -		Preconditions.checkState(key != null, "No key given.");
    +	public byte[] getSerializedValue(
    +			byte[] serializedKeyAndNamespace,
    +			TypeSerializer<K> keySerializer,
    +			TypeSerializer<N> namespaceSerializer,
    +			TypeSerializer<HashMap<UK, UV>> valueSerializer) throws Exception {
     
    -		HashMap<UK, UV> result = stateTable.get(key, namespace);
    +		Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace");
     
    -		if (null == result) {
    +		Tuple2<K, N> keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace(
    +				serializedKeyAndNamespace, keySerializer, namespaceSerializer);
    +
    +		Map<UK, UV> result = stateTable.get(keyAndNamespace.f0, keyAndNamespace.f1);
    +
    +		if (result == null) {
     			return null;
     		}
     
    -		TypeSerializer<UK> userKeySerializer = stateDesc.getKeySerializer();
    -		TypeSerializer<UV> userValueSerializer = stateDesc.getValueSerializer();
    +		final HashMapSerializer<UK, UV> serializer = (HashMapSerializer<UK, UV>) valueSerializer;
    --- End diff --
    
    The reason is that RocksDB returns an iterator that gets lazily populated as you call next() while the serialize() of the MapSerializer expects a Map. If it were to go with your option, we would have to iterate over the map twice, once to create the map, and then to serialize it.


---

[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174383696
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java ---
    @@ -140,19 +159,28 @@ public boolean contains(UK userKey) {
     	}
     
     	@Override
    -	public byte[] getSerializedValue(K key, N namespace) throws IOException {
    -		Preconditions.checkState(namespace != null, "No namespace given.");
    -		Preconditions.checkState(key != null, "No key given.");
    +	public byte[] getSerializedValue(
    +			byte[] serializedKeyAndNamespace,
    +			TypeSerializer<K> keySerializer,
    +			TypeSerializer<N> namespaceSerializer,
    +			TypeSerializer<HashMap<UK, UV>> valueSerializer) throws Exception {
     
    -		HashMap<UK, UV> result = stateTable.get(key, namespace);
    +		Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace");
     
    -		if (null == result) {
    +		Tuple2<K, N> keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace(
    +				serializedKeyAndNamespace, keySerializer, namespaceSerializer);
    +
    +		Map<UK, UV> result = stateTable.get(keyAndNamespace.f0, keyAndNamespace.f1);
    +
    +		if (result == null) {
     			return null;
     		}
     
    -		TypeSerializer<UK> userKeySerializer = stateDesc.getKeySerializer();
    -		TypeSerializer<UV> userValueSerializer = stateDesc.getValueSerializer();
    +		final HashMapSerializer<UK, UV> serializer = (HashMapSerializer<UK, UV>) valueSerializer;
    --- End diff --
    
    this shouldn't be necessary. why can't we just pass the map serialzer into `serializeMap`?


---

[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r175640731
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalQueryableKvState.java ---
    @@ -16,46 +16,42 @@
      * limitations under the License.
      */
     
    -package org.apache.flink.runtime.query;
    +package org.apache.flink.runtime.state.internal;
     
    -import org.apache.flink.annotation.Internal;
     import org.apache.flink.annotation.VisibleForTesting;
    -import org.apache.flink.runtime.state.internal.InternalKvState;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.runtime.query.KvStateInfo;
     import org.apache.flink.util.Preconditions;
     
     import java.util.concurrent.ConcurrentHashMap;
     import java.util.concurrent.ConcurrentMap;
     
     /**
    - * An entry holding the {@link InternalKvState} along with its {@link KvStateInfo}.
    + * An abstract base class to be subclassed by states that are expected to be queryable.
    + * Its main task is to keep a "thread-local" copy of the different serializers (if needed).
      *
      * @param <K>	The type of key the state is associated to
      * @param <N>	The type of the namespace the state is associated to
      * @param <V>	The type of values kept internally in state
      */
    -@Internal
    -public class KvStateEntry<K, N, V> {
    +public abstract class InternalQueryableKvState<K, N, V> implements InternalKvState<N> {
     
    -	private final InternalKvState<K, N, V> state;
     	private final KvStateInfo<K, N, V> stateInfo;
    -
     	private final boolean areSerializersStateless;
     
    -	private final ConcurrentMap<Thread, KvStateInfo<K, N, V>> serializerCache;
    +	private final ConcurrentMap<Thread, KvStateInfo<K, N, V>> serializerCache = new ConcurrentHashMap<>(4);
     
    --- End diff --
    
    nit:  just wonder why didn't use ThreadLocal<KvStateInfo<K, N, V>> provided by JDK...


---

[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174128377
  
    --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java ---
    @@ -78,13 +75,22 @@ public KvStateServerHandler(
     		final CompletableFuture<KvStateResponse> responseFuture = new CompletableFuture<>();
     
     		try {
    -			final InternalKvState<?> kvState = registry.getKvState(request.getKvStateId());
    +			final KvStateEntry<?, ?, ?> kvState = registry.getKvState(request.getKvStateId());
     			if (kvState == null) {
     				responseFuture.completeExceptionally(new UnknownKvStateIdException(getServerName(), request.getKvStateId()));
     			} else {
     				byte[] serializedKeyAndNamespace = request.getSerializedKeyAndNamespace();
     
    -				byte[] serializedResult = kvState.getSerializedValue(serializedKeyAndNamespace);
    +				// here we remove any type check...
    +				// Ideally we want to keep that the info match the state.
    +				final InternalKvState state = kvState.getState();
    +				final KvStateInfo info = kvState.getInfoForCurrentThread();
    +
    +				byte[] serializedResult = state.getSerializedValue(
    --- End diff --
    
    Couldn't we synchronize on `kvState` instead of modifying all `InternalKvState` implementations?
    
    This seems like a much safer alternative than baking in the assumption that `getSerializedValue()` can be called concurrently.



---

[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174385129
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java ---
    @@ -40,6 +54,81 @@
      */
     public class KvStateRegistryTest extends TestLogger {
     
    +	@Test
    +	public void testKvStateEntry() throws InterruptedException {
    +		final int threads = 10;
    +
    +		final CountDownLatch latch1 = new CountDownLatch(threads);
    +		final CountDownLatch latch2 = new CountDownLatch(1);
    +
    +		final List<KvStateInfo<?, ?, ?>> infos = Collections.synchronizedList(new ArrayList<>());
    +
    +		final JobID jobID = new JobID();
    +
    +		final JobVertexID jobVertexId = new JobVertexID();
    +		final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1);
    +		final String registrationName = "foobar";
    +
    +		final KvStateRegistry kvStateRegistry = new KvStateRegistry();
    +		final KvStateID stateID = kvStateRegistry.registerKvState(
    +				jobID,
    +				jobVertexId,
    +				keyGroupRange,
    +				registrationName,
    +				new DummyKvState()
    +		);
    +
    +		for (int i = 0; i < threads; i++) {
    +			new Thread(() -> {
    +				final KvStateEntry<?, ?, ?> kvState = kvStateRegistry.getKvState(stateID);
    +				final KvStateInfo<?, ?, ?> stateInfo = kvState.getInfoForCurrentThread();
    +				infos.add(stateInfo);
    +
    +				latch1.countDown();
    +				try {
    +					latch2.await();
    +				} catch (InterruptedException e) {
    +					Assert.fail(e.getMessage());
    +				}
    +
    +			}).start();
    +		}
    +
    +		latch1.await();
    +
    +		final KvStateEntry<?, ?, ?> kvState = kvStateRegistry.getKvState(stateID);
    +
    +		// verify that all the threads are done correctly.
    +		Assert.assertEquals(threads, infos.size());
    +		Assert.assertEquals(threads, kvState.getCacheSize());
    +
    +		latch2.countDown();
    +
    +		for (KvStateInfo<?, ?, ?> infoA: infos) {
    +			boolean found = false;
    +			for (KvStateInfo<?, ?, ?> infoB: infos) {
    +				if (infoA == infoB) {
    +					if (found) {
    +						Assert.fail("Already found");
    +					}
    +					found = true;
    +				} else {
    +					Assert.assertTrue(infoA != infoB && infoA.equals(infoB));
    +				}
    +			}
    +		}
    +
    +		kvStateRegistry.unregisterKvState(
    +				jobID,
    +				jobVertexId,
    +				keyGroupRange,
    +				registrationName,
    +				stateID);
    +
    +		// we have to call for garbage collection to be sure that everything is cleared up.
    --- End diff --
    
    ?


---

[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174379723
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java ---
    @@ -26,12 +26,14 @@
      * The peer to the {@link MergingState} in the internal state type hierarchy.
      * 
      * See {@link InternalKvState} for a description of the internal state hierarchy.
    - * 
    - * @param <N>   The type of the namespace
    - * @param <IN>  The type of elements added to the state
    - * @param <OUT> The type of elements 
    + *
    + * @param <K>	The type of key the state is associated to
    + * @param <N>	The type of the namespace
    + * @param <IN>	The type of elements added to the state
    + * @param <SV>	The type of elements in the state
    + * @param <OUT>	The type of elements
    --- End diff --
    
    indentation is off and the description deserves an extension


---

[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174382685
  
    --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java ---
    @@ -78,13 +75,22 @@ public KvStateServerHandler(
     		final CompletableFuture<KvStateResponse> responseFuture = new CompletableFuture<>();
     
     		try {
    -			final InternalKvState<?> kvState = registry.getKvState(request.getKvStateId());
    +			final KvStateEntry<?, ?, ?> kvState = registry.getKvState(request.getKvStateId());
     			if (kvState == null) {
     				responseFuture.completeExceptionally(new UnknownKvStateIdException(getServerName(), request.getKvStateId()));
     			} else {
     				byte[] serializedKeyAndNamespace = request.getSerializedKeyAndNamespace();
     
    -				byte[] serializedResult = kvState.getSerializedValue(serializedKeyAndNamespace);
    +				// here we remove any type check...
    +				// Ideally we want to keep that the info match the state.
    --- End diff --
    
    you can retain type safety:
    
    Call from the handler:
    ```
    byte[] serializedResult = getSerializedValue(kvState, serializedKeyAndNamespace);
    ```
    
    Added method:
    ```
    private static <K, N, V> byte[] getSerializedValue(KvStateEntry<K, N, V> entry, byte[] serializedKeyAndNamespace) throws Exception {
    		InternalKvState<K, N, V> state = entry.getState();
    		KvStateInfo<K, N, V> infoForCurrentThread = entry.getInfoForCurrentThread();
    		
    		return state.getSerializedValue(
    			serializedKeyAndNamespace,
    			infoForCurrentThread.getKeySerializer(),
    			infoForCurrentThread.getNamespaceSerializer(),
    			infoForCurrentThread.getStateValueSerializer()
    		);
    	}
    ```


---

[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174423536
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java ---
    @@ -140,19 +159,28 @@ public boolean contains(UK userKey) {
     	}
     
     	@Override
    -	public byte[] getSerializedValue(K key, N namespace) throws IOException {
    -		Preconditions.checkState(namespace != null, "No namespace given.");
    -		Preconditions.checkState(key != null, "No key given.");
    +	public byte[] getSerializedValue(
    +			byte[] serializedKeyAndNamespace,
    +			TypeSerializer<K> keySerializer,
    +			TypeSerializer<N> namespaceSerializer,
    +			TypeSerializer<HashMap<UK, UV>> valueSerializer) throws Exception {
     
    -		HashMap<UK, UV> result = stateTable.get(key, namespace);
    +		Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace");
     
    -		if (null == result) {
    +		Tuple2<K, N> keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace(
    +				serializedKeyAndNamespace, keySerializer, namespaceSerializer);
    +
    +		Map<UK, UV> result = stateTable.get(keyAndNamespace.f0, keyAndNamespace.f1);
    +
    +		if (result == null) {
     			return null;
     		}
     
    -		TypeSerializer<UK> userKeySerializer = stateDesc.getKeySerializer();
    -		TypeSerializer<UV> userValueSerializer = stateDesc.getValueSerializer();
    +		final HashMapSerializer<UK, UV> serializer = (HashMapSerializer<UK, UV>) valueSerializer;
    --- End diff --
    
    The problem is that the Heap Backend uses the `HashMapSerializer` while RocksDB uses `MapSerializer`. So here it should be sth like `TypeSerializer<? extends Map<UK, UV>>` that then should somehow be casted to the appropriate serializer somehow in order to get the key and the value serializer. The solution that we have here is a bit cleaner, although the best would be to remove the `HashMapSerializer` and have a `MapSerializer` in here.


---

[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174444107
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java ---
    @@ -0,0 +1,77 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.query;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.runtime.state.internal.InternalKvState;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ConcurrentMap;
    +
    +/**
    + * An entry holding the {@link InternalKvState} along with its {@link KvStateInfo}.
    + *
    + * @param <K>	The type of key the state is associated to
    + * @param <N>	The type of the namespace the state is associated to
    + * @param <V>	The type of values kept internally in state
    + */
    +@Internal
    +public class KvStateEntry<K, N, V> {
    +
    +	private final InternalKvState<K, N, V> state;
    +	private final KvStateInfo<K, N, V> stateInfo;
    +
    +	private final boolean isSerializerStateless;
    +
    +	private final ConcurrentMap<Thread, KvStateInfo<K, N, V>> serializerCache;
    +
    +	public KvStateEntry(final InternalKvState<K, N, V> state) {
    +		this.state = Preconditions.checkNotNull(state);
    +		this.stateInfo = new KvStateInfo<>(
    +				state.getKeySerializer(),
    +				state.getNamespaceSerializer(),
    +				state.getValueSerializer()
    +		);
    +		this.serializerCache = new ConcurrentHashMap<>();
    +		this.isSerializerStateless = stateInfo.duplicate() == stateInfo;
    +	}
    +
    +	public InternalKvState<K, N, V> getState() {
    +		return state;
    +	}
    +
    +	public KvStateInfo<K, N, V> getInfoForCurrentThread() {
    +		return isSerializerStateless
    +				? stateInfo
    +				: serializerCache.computeIfAbsent(Thread.currentThread(), t -> stateInfo.duplicate());
    +	}
    +
    +	public void clear() {
    +		if (serializerCache != null) {
    +			serializerCache.clear();
    +		}
    +	}
    +
    +	@VisibleForTesting
    +	public int getCacheSize() {
    +		return serializerCache == null ? -1 : serializerCache.size();
    --- End diff --
    
    unnecessary null check


---

[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174380400
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java ---
    @@ -24,9 +24,11 @@
      * The peer to the {@link AppendingState} in the internal state type hierarchy.
      * 
      * <p>See {@link InternalKvState} for a description of the internal state hierarchy.
    - * 
    - * @param <N>   The type of the namespace
    - * @param <IN>  The type of elements added to the state
    - * @param <OUT> The type of the 
    + *
    + * @param <K>	The type of key the state is associated to
    + * @param <N>	The type of the namespace
    + * @param <IN>	The type of elements added to the state
    + * @param <SV>	The type of elements in the state
    + * @param <OUT>	The type of the resulting element in the state
    --- End diff --
    
    indentation is weird


---

[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174385360
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java ---
    @@ -40,6 +54,81 @@
      */
     public class KvStateRegistryTest extends TestLogger {
     
    +	@Test
    +	public void testKvStateEntry() throws InterruptedException {
    +		final int threads = 10;
    +
    +		final CountDownLatch latch1 = new CountDownLatch(threads);
    +		final CountDownLatch latch2 = new CountDownLatch(1);
    +
    +		final List<KvStateInfo<?, ?, ?>> infos = Collections.synchronizedList(new ArrayList<>());
    +
    +		final JobID jobID = new JobID();
    +
    +		final JobVertexID jobVertexId = new JobVertexID();
    +		final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1);
    +		final String registrationName = "foobar";
    +
    +		final KvStateRegistry kvStateRegistry = new KvStateRegistry();
    +		final KvStateID stateID = kvStateRegistry.registerKvState(
    +				jobID,
    +				jobVertexId,
    +				keyGroupRange,
    +				registrationName,
    +				new DummyKvState()
    +		);
    +
    +		for (int i = 0; i < threads; i++) {
    +			new Thread(() -> {
    +				final KvStateEntry<?, ?, ?> kvState = kvStateRegistry.getKvState(stateID);
    +				final KvStateInfo<?, ?, ?> stateInfo = kvState.getInfoForCurrentThread();
    +				infos.add(stateInfo);
    +
    +				latch1.countDown();
    +				try {
    +					latch2.await();
    +				} catch (InterruptedException e) {
    +					Assert.fail(e.getMessage());
    --- End diff --
    
    this would never directly fail the test and we may loose the exception


---

[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174144247
  
    --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java ---
    @@ -78,13 +75,22 @@ public KvStateServerHandler(
     		final CompletableFuture<KvStateResponse> responseFuture = new CompletableFuture<>();
     
     		try {
    -			final InternalKvState<?> kvState = registry.getKvState(request.getKvStateId());
    +			final KvStateEntry<?, ?, ?> kvState = registry.getKvState(request.getKvStateId());
     			if (kvState == null) {
     				responseFuture.completeExceptionally(new UnknownKvStateIdException(getServerName(), request.getKvStateId()));
     			} else {
     				byte[] serializedKeyAndNamespace = request.getSerializedKeyAndNamespace();
     
    -				byte[] serializedResult = kvState.getSerializedValue(serializedKeyAndNamespace);
    +				// here we remove any type check...
    +				// Ideally we want to keep that the info match the state.
    +				final InternalKvState state = kvState.getState();
    +				final KvStateInfo info = kvState.getInfoForCurrentThread();
    +
    +				byte[] serializedResult = state.getSerializedValue(
    --- End diff --
    
    Ah, i mistakenly thought that `getSerializedValue()` is also called out of QS.
    
    What would be the overhead of always duplicating serializers within `getSerializedValue()`?


---

[GitHub] flink issue #5691: [FLINK-8802] [QS] Fix concurrent access to non-duplicated...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/5691
  
    Thanks for the review @zentol ! I updated the PR integrating your comments, or commenting on why I did not do it. Please have a look and let me know if you have more comments.


---

[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174380197
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMapState.java ---
    @@ -20,13 +20,16 @@
     
     import org.apache.flink.api.common.state.MapState;
     
    +import java.util.Map;
    +
     /**
      * The peer to the {@link MapState} in the internal state type hierarchy.
      *
      * <p>See {@link InternalKvState} for a description of the internal state hierarchy.
      *
    - * @param <N> The type of the namespace
    - * @param <UK> Type of the values folded into the state
    - * @param <UV> Type of the value in the state
    + * @param <K>	The type of key the state is associated to
    + * @param <N>	The type of the namespace
    + * @param <UK>	Type of the values folded into the state
    --- End diff --
    
    the description seems like a copy&paste from `FoldingState`


---

[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174150658
  
    --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java ---
    @@ -78,13 +75,22 @@ public KvStateServerHandler(
     		final CompletableFuture<KvStateResponse> responseFuture = new CompletableFuture<>();
     
     		try {
    -			final InternalKvState<?> kvState = registry.getKvState(request.getKvStateId());
    +			final KvStateEntry<?, ?, ?> kvState = registry.getKvState(request.getKvStateId());
     			if (kvState == null) {
     				responseFuture.completeExceptionally(new UnknownKvStateIdException(getServerName(), request.getKvStateId()));
     			} else {
     				byte[] serializedKeyAndNamespace = request.getSerializedKeyAndNamespace();
     
    -				byte[] serializedResult = kvState.getSerializedValue(serializedKeyAndNamespace);
    +				// here we remove any type check...
    +				// Ideally we want to keep that the info match the state.
    +				final InternalKvState state = kvState.getState();
    +				final KvStateInfo info = kvState.getInfoForCurrentThread();
    +
    +				byte[] serializedResult = state.getSerializedValue(
    --- End diff --
    
    True, but after also discussion with Stephan and Aljoscha we concluded that probably it is not the right place, just from a "separation-of-concerns" point of view.
    
    That said, the more I was working on it, the more I was thinking that the `InternalKvStates` could also be a place. But this we can leave for future discussion when we re-prioritize QS.


---

[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r178075205
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java ---
    @@ -31,14 +31,13 @@
      * Base class for partitioned {@link State} implementations that are backed by a regular
      * heap hash map. The concrete implementations define how the state is checkpointed.
      *
    - * @param <K> The type of the key.
    - * @param <N> The type of the namespace.
    - * @param <SV> The type of the values in the state.
    - * @param <S> The type of State
    - * @param <SD> The type of StateDescriptor for the State S
    + * @param <K>	The type of the key.
    --- End diff --
    
    All of the generic parameters seem to have a tab in them. That's why they look wonky here. Maybe should fix those.


---

[GitHub] flink issue #5691: [FLINK-8802] [QS] Fix concurrent access to non-duplicated...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/5691
  
    This is ready for another review @StefanRRichter and @zentol 


---

[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174148331
  
    --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java ---
    @@ -78,13 +75,22 @@ public KvStateServerHandler(
     		final CompletableFuture<KvStateResponse> responseFuture = new CompletableFuture<>();
     
     		try {
    -			final InternalKvState<?> kvState = registry.getKvState(request.getKvStateId());
    +			final KvStateEntry<?, ?, ?> kvState = registry.getKvState(request.getKvStateId());
     			if (kvState == null) {
     				responseFuture.completeExceptionally(new UnknownKvStateIdException(getServerName(), request.getKvStateId()));
     			} else {
     				byte[] serializedKeyAndNamespace = request.getSerializedKeyAndNamespace();
     
    -				byte[] serializedResult = kvState.getSerializedValue(serializedKeyAndNamespace);
    +				// here we remove any type check...
    +				// Ideally we want to keep that the info match the state.
    +				final InternalKvState state = kvState.getState();
    +				final KvStateInfo info = kvState.getInfoForCurrentThread();
    +
    +				byte[] serializedResult = state.getSerializedValue(
    --- End diff --
    
    well we could move the thread-caching into the `InternalKvStates`.


---

[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174133415
  
    --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java ---
    @@ -78,13 +75,22 @@ public KvStateServerHandler(
     		final CompletableFuture<KvStateResponse> responseFuture = new CompletableFuture<>();
     
     		try {
    -			final InternalKvState<?> kvState = registry.getKvState(request.getKvStateId());
    +			final KvStateEntry<?, ?, ?> kvState = registry.getKvState(request.getKvStateId());
     			if (kvState == null) {
     				responseFuture.completeExceptionally(new UnknownKvStateIdException(getServerName(), request.getKvStateId()));
     			} else {
     				byte[] serializedKeyAndNamespace = request.getSerializedKeyAndNamespace();
     
    -				byte[] serializedResult = kvState.getSerializedValue(serializedKeyAndNamespace);
    +				// here we remove any type check...
    +				// Ideally we want to keep that the info match the state.
    +				final InternalKvState state = kvState.getState();
    +				final KvStateInfo info = kvState.getInfoForCurrentThread();
    +
    +				byte[] serializedResult = state.getSerializedValue(
    --- End diff --
    
    It is not an assumption. It is accessed concurrently from the thread pool. Synchronizing on the `kvState` would serialize all accesses, which would slow down (by a lot) performance.


---