You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2021/02/11 17:48:08 UTC

[flink] branch release-1.12 updated (a3ec041 -> 6cc60de)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from a3ec041  [FLINK-21361][table-planner-blink] Match on CatalogTable interface in FlinkRelMdUniqueKeys
     new 86bdfff  [FLINK-21138] - User ClassLoader in KvStateServerHandler
     new 98da61b  [FLINK-21138][tests] Register serializer instance in AbstractQueryableStateTestBase.testCustomKryoSerializerHandling
     new dad7f57  [FLINK-21138] Fix classloader on QueryableStateClient
     new 6cc60de  [FLINK-21138] Explicit Classloader in QueryableStateClient

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../client/QueryableStateClient.java               |  26 +++++-
 .../flink-queryable-state-runtime/pom.xml          |   8 ++
 .../server/KvStateServerHandler.java               |  20 ++--
 .../itcases/AbstractQueryableStateTestBase.java    | 103 +++++++++++++++++++++
 .../flink/queryablestate/network/ClientTest.java   |   3 +-
 .../network/KvStateServerHandlerTest.java          |   3 +-
 .../apache/flink/runtime/query/KvStateEntry.java   |   9 +-
 .../flink/runtime/query/KvStateRegistry.java       |   6 +-
 .../flink/runtime/query/TaskKvStateRegistry.java   |  10 +-
 .../runtime/state/AbstractKeyedStateBackend.java   |   2 +-
 .../flink/runtime/query/KvStateRegistryTest.java   |  24 ++++-
 11 files changed, 194 insertions(+), 20 deletions(-)


[flink] 03/04: [FLINK-21138] Fix classloader on QueryableStateClient

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dad7f57e61d53c076ea9b69a61a71d9ac6edf094
Author: Maciek Próchniak <mp...@touk.pl>
AuthorDate: Tue Feb 9 16:59:56 2021 +0100

    [FLINK-21138] Fix classloader on QueryableStateClient
---
 .../client/QueryableStateClient.java               |  8 +++++-
 .../itcases/AbstractQueryableStateTestBase.java    | 31 +++++++++++++---------
 2 files changed, 26 insertions(+), 13 deletions(-)

diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
index b2c3b37..8bf601f 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
@@ -45,6 +45,7 @@ import org.apache.flink.queryablestate.network.Client;
 import org.apache.flink.queryablestate.network.messages.MessageSerializer;
 import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.LambdaUtil;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.Preconditions;
 
@@ -292,8 +293,13 @@ public class QueryableStateClient {
             return FutureUtils.getFailedFuture(e);
         }
 
+        ClassLoader contextLoader = Thread.currentThread().getContextClassLoader();
         return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace)
-                .thenApply(stateResponse -> createState(stateResponse, stateDescriptor));
+                .thenApply(
+                        stateResponse ->
+                                LambdaUtil.withContextClassLoader(
+                                        contextLoader,
+                                        () -> createState(stateResponse, stateDescriptor)));
     }
 
     private <T, S extends State> S createState(
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index 63fedfa..7c15a22 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -393,7 +393,10 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
 
         // Custom serializer is not needed, it's used just to check if serialization works.
-        env.getConfig().addDefaultKryoSerializer(Byte.class, createSerializer(userClassLoader));
+        env.getConfig()
+                .addDefaultKryoSerializer(
+                        Byte.class,
+                        (Serializer<?> & Serializable) createSerializer(userClassLoader));
 
         // Here we *force* using Kryo, to check if custom serializers are handled correctly WRT
         // classloading
@@ -1386,6 +1389,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
         if (!resultFuture.isDone()) {
             CompletableFuture<S> expected =
                     client.getKvState(jobId, queryName, key, keyTypeInfo, stateDescriptor);
+            ClassLoader contextLoader = Thread.currentThread().getContextClassLoader();
             expected.whenCompleteAsync(
                     (result, throwable) -> {
                         if (throwable != null) {
@@ -1396,17 +1400,20 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
                                                     instanceof UnknownKeyOrNamespaceException)) {
                                 resultFuture.completeExceptionally(throwable.getCause());
                             } else if (deadline.hasTimeLeft()) {
-                                getKvStateIgnoringCertainExceptions(
-                                        deadline,
-                                        resultFuture,
-                                        client,
-                                        jobId,
-                                        queryName,
-                                        key,
-                                        keyTypeInfo,
-                                        stateDescriptor,
-                                        failForUnknownKeyOrNamespace,
-                                        executor);
+                                LambdaUtil.withContextClassLoader(
+                                        contextLoader,
+                                        () ->
+                                                getKvStateIgnoringCertainExceptions(
+                                                        deadline,
+                                                        resultFuture,
+                                                        client,
+                                                        jobId,
+                                                        queryName,
+                                                        key,
+                                                        keyTypeInfo,
+                                                        stateDescriptor,
+                                                        failForUnknownKeyOrNamespace,
+                                                        executor));
                             }
                         } else {
                             resultFuture.complete(result);


[flink] 01/04: [FLINK-21138] - User ClassLoader in KvStateServerHandler

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 86bdfff24112fcf2793edb82f27ea5a5c248cc5a
Author: Maciek Próchniak <mp...@touk.pl>
AuthorDate: Mon Feb 8 13:11:00 2021 +0100

    [FLINK-21138] - User ClassLoader in KvStateServerHandler
---
 .../flink-queryable-state-runtime/pom.xml          |  8 ++
 .../server/KvStateServerHandler.java               | 20 +++--
 .../itcases/AbstractQueryableStateTestBase.java    | 92 ++++++++++++++++++++++
 .../flink/queryablestate/network/ClientTest.java   |  3 +-
 .../network/KvStateServerHandlerTest.java          |  3 +-
 .../apache/flink/runtime/query/KvStateEntry.java   |  9 ++-
 .../flink/runtime/query/KvStateRegistry.java       |  6 +-
 .../flink/runtime/query/TaskKvStateRegistry.java   | 10 ++-
 .../runtime/state/AbstractKeyedStateBackend.java   |  2 +-
 .../flink/runtime/query/KvStateRegistryTest.java   | 24 +++++-
 10 files changed, 158 insertions(+), 19 deletions(-)

diff --git a/flink-queryable-state/flink-queryable-state-runtime/pom.xml b/flink-queryable-state/flink-queryable-state-runtime/pom.xml
index ebca132..8405f8b 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/pom.xml
+++ b/flink-queryable-state/flink-queryable-state-runtime/pom.xml
@@ -65,6 +65,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
index 0ffb468..798008e 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.query.KvStateInfo;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.LambdaUtil;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
@@ -109,14 +110,19 @@ public class KvStateServerHandler
             final KvStateEntry<K, N, V> entry, final byte[] serializedKeyAndNamespace)
             throws Exception {
 
-        final InternalKvState<K, N, V> state = entry.getState();
-        final KvStateInfo<K, N, V> infoForCurrentThread = entry.getInfoForCurrentThread();
+        return LambdaUtil.withContextClassLoader(
+                entry.getUserClassLoader(),
+                () -> {
+                    final InternalKvState<K, N, V> state = entry.getState();
+                    final KvStateInfo<K, N, V> infoForCurrentThread =
+                            entry.getInfoForCurrentThread();
 
-        return state.getSerializedValue(
-                serializedKeyAndNamespace,
-                infoForCurrentThread.getKeySerializer(),
-                infoForCurrentThread.getNamespaceSerializer(),
-                infoForCurrentThread.getStateValueSerializer());
+                    return state.getSerializedValue(
+                            serializedKeyAndNamespace,
+                            infoForCurrentThread.getKeySerializer(),
+                            infoForCurrentThread.getNamespaceSerializer(),
+                            infoForCurrentThread.getStateValueSerializer());
+                });
     }
 
     @Override
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index 762a1bf..733c4bd 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -42,6 +42,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.queryablestate.client.QueryableStateClient;
@@ -63,18 +64,26 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.testutils.ClassLoaderUtils;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.LambdaUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
+import com.esotericsoftware.kryo.Serializer;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.ClassRule;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
+import java.io.IOException;
+import java.net.URLClassLoader;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -116,6 +125,8 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 
     protected static int maxParallelism;
 
+    @ClassRule public static TemporaryFolder classloaderFolder = new TemporaryFolder();
+
     @Before
     public void setUp() throws Exception {
         // NOTE: do not use a shared instance for all tests as the tests may break
@@ -361,6 +372,65 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
         }
     }
 
+    /** This test checks if custom Kryo serializers are loaded with correct classloader. */
+    @Test
+    public void testCustomKryoSerializerHandling() throws Exception {
+        final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
+        final long numElements = 1;
+        final String stateName = "teriberka";
+
+        final String customSerializerClassName = "CustomKryo";
+        final URLClassLoader userClassLoader =
+                createLoaderWithCustomKryoSerializer(customSerializerClassName);
+
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setStateBackend(stateBackend);
+        env.setParallelism(maxParallelism);
+        // Very important, because cluster is shared between tests and we
+        // don't explicitly check that all slots are available before
+        // submitting.
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+        // Custom serializer is not needed, it's used just to check if serialization works.
+        @SuppressWarnings("unchecked")
+        Class<Serializer<?>> customSerializerClass =
+                (Class<Serializer<?>>) userClassLoader.loadClass(customSerializerClassName);
+        env.getConfig().addDefaultKryoSerializer(Byte.class, customSerializerClass);
+
+        // Here we *force* using Kryo, to check if custom serializers are handled correctly WRT
+        // classloading
+        @SuppressWarnings({"rawtypes", "unchecked"})
+        ValueStateDescriptor<Tuple2<Integer, Long>> valueState =
+                new ValueStateDescriptor<>("any", new GenericTypeInfo(Tuple2.class));
+
+        env.addSource(new TestAscendingValueSource(numElements))
+                .keyBy(
+                        new KeySelector<Tuple2<Integer, Long>, Integer>() {
+                            private static final long serialVersionUID = 7662520075515707428L;
+
+                            @Override
+                            public Integer getKey(Tuple2<Integer, Long> value) {
+                                return value.f0;
+                            }
+                        })
+                .asQueryableState(stateName, valueState);
+
+        try (AutoCancellableJob autoCancellableJob =
+                new AutoCancellableJob(deadline, clusterClient, env)) {
+
+            final JobID jobId = autoCancellableJob.getJobId();
+            final JobGraph jobGraph = autoCancellableJob.getJobGraph();
+            jobGraph.setClasspaths(Arrays.asList(userClassLoader.getURLs()));
+
+            clusterClient.submitJob(jobGraph).get();
+            LambdaUtil.withContextClassLoader(
+                    userClassLoader,
+                    () ->
+                            executeValueQuery(
+                                    deadline, client, jobId, stateName, valueState, numElements));
+        }
+    }
+
     /**
      * Tests that the correct exception is thrown if the query contains a wrong jobId or wrong
      * queryable state name.
@@ -1384,4 +1454,26 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
             assertTrue("Did not succeed query", success);
         }
     }
+
+    private static URLClassLoader createLoaderWithCustomKryoSerializer(String className)
+            throws IOException {
+        return ClassLoaderUtils.compileAndLoadJava(
+                classloaderFolder.newFolder(),
+                className + ".java",
+                "import com.esotericsoftware.kryo.Kryo;\n"
+                        + "import com.esotericsoftware.kryo.Serializer;\n"
+                        + "import com.esotericsoftware.kryo.io.Input;\n"
+                        + "import com.esotericsoftware.kryo.io.Output;\n"
+                        + "import java.io.Serializable;\n"
+                        + "public class "
+                        + className
+                        + " extends Serializer<Byte> implements Serializable {\n"
+                        + "    @Override\n"
+                        + "    public void write(Kryo kryo, Output output, Byte testJob) {}\n"
+                        + "    @Override\n"
+                        + "    public Byte read(Kryo kryo, Input input, Class<Byte> aClass) {\n"
+                        + "        return null;\n"
+                        + "    }\n"
+                        + "}\n");
+    }
 }
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
index 6e888cb..e89a14f 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
@@ -741,7 +741,8 @@ public class ClientTest extends TestLogger {
                                 new JobVertexID(),
                                 new KeyGroupRange(0, 0),
                                 "any",
-                                kvState);
+                                kvState,
+                                getClass().getClassLoader());
             }
 
             final Client<KvStateInternalRequest, KvStateResponse> finalClient = client;
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
index f156d9a..6df22a8 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
@@ -383,7 +383,8 @@ public class KvStateServerHandlerTest extends TestLogger {
                         new JobVertexID(),
                         new KeyGroupRange(0, 0),
                         "vanilla",
-                        kvState);
+                        kvState,
+                        getClass().getClassLoader());
 
         KvStateInternalRequest request = new KvStateInternalRequest(kvStateId, new byte[0]);
         ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), 282872L, request);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java
index 43b8e24..0567d13 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java
@@ -43,7 +43,9 @@ public class KvStateEntry<K, N, V> {
 
     private final ConcurrentMap<Thread, KvStateInfo<K, N, V>> serializerCache;
 
-    public KvStateEntry(final InternalKvState<K, N, V> state) {
+    private final ClassLoader userClassLoader;
+
+    public KvStateEntry(final InternalKvState<K, N, V> state, ClassLoader userClassLoader) {
         this.state = Preconditions.checkNotNull(state);
         this.stateInfo =
                 new KvStateInfo<>(
@@ -51,6 +53,7 @@ public class KvStateEntry<K, N, V> {
                         state.getNamespaceSerializer(),
                         state.getValueSerializer());
         this.serializerCache = new ConcurrentHashMap<>();
+        this.userClassLoader = userClassLoader;
         this.areSerializersStateless = stateInfo.duplicate() == stateInfo;
     }
 
@@ -58,6 +61,10 @@ public class KvStateEntry<K, N, V> {
         return state;
     }
 
+    public ClassLoader getUserClassLoader() {
+        return userClassLoader;
+    }
+
     public KvStateInfo<K, N, V> getInfoForCurrentThread() {
         return areSerializersStateless
                 ? stateInfo
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
index 1def92a..cf9c7bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
@@ -87,11 +87,13 @@ public class KvStateRegistry {
             JobVertexID jobVertexId,
             KeyGroupRange keyGroupRange,
             String registrationName,
-            InternalKvState<?, ?, ?> kvState) {
+            InternalKvState<?, ?, ?> kvState,
+            ClassLoader userClassLoader) {
 
         KvStateID kvStateId = new KvStateID();
 
-        if (registeredKvStates.putIfAbsent(kvStateId, new KvStateEntry<>(kvState)) == null) {
+        if (registeredKvStates.putIfAbsent(kvStateId, new KvStateEntry<>(kvState, userClassLoader))
+                == null) {
             final KvStateRegistryListener listener = getKvStateRegistryListener(jobId);
 
             if (listener != null) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
index f722402..cdd80158 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
@@ -60,10 +60,16 @@ public class TaskKvStateRegistry {
     public void registerKvState(
             KeyGroupRange keyGroupRange,
             String registrationName,
-            InternalKvState<?, ?, ?> kvState) {
+            InternalKvState<?, ?, ?> kvState,
+            ClassLoader userClassLoader) {
         KvStateID kvStateId =
                 registry.registerKvState(
-                        jobId, jobVertexId, keyGroupRange, registrationName, kvState);
+                        jobId,
+                        jobVertexId,
+                        keyGroupRange,
+                        registrationName,
+                        kvState,
+                        userClassLoader);
         registeredKvStates.add(new KvStateInfo(keyGroupRange, registrationName, kvStateId));
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 1f9513d..28c5b03 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -281,7 +281,7 @@ public abstract class AbstractKeyedStateBackend<K>
                 throw new IllegalStateException("State backend has not been initialized for job.");
             }
             String name = stateDescriptor.getQueryableStateName();
-            kvStateRegistry.registerKvState(keyGroupRange, name, kvState);
+            kvStateRegistry.registerKvState(keyGroupRange, name, kvState, userCodeClassLoader);
         }
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
index 3f3ed74..f0ac718 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
@@ -71,7 +71,12 @@ public class KvStateRegistryTest extends TestLogger {
         final KvStateRegistry kvStateRegistry = new KvStateRegistry();
         final KvStateID stateID =
                 kvStateRegistry.registerKvState(
-                        jobID, jobVertexId, keyGroupRange, registrationName, new DummyKvState());
+                        jobID,
+                        jobVertexId,
+                        keyGroupRange,
+                        registrationName,
+                        new DummyKvState(),
+                        getClass().getClassLoader());
 
         final AtomicReference<Throwable> exceptionHolder = new AtomicReference<>();
 
@@ -162,7 +167,12 @@ public class KvStateRegistryTest extends TestLogger {
         final String registrationName = "foobar";
         final KvStateID kvStateID =
                 kvStateRegistry.registerKvState(
-                        jobId1, jobVertexId, keyGroupRange, registrationName, new DummyKvState());
+                        jobId1,
+                        jobVertexId,
+                        keyGroupRange,
+                        registrationName,
+                        new DummyKvState(),
+                        getClass().getClassLoader());
 
         assertThat(registeredNotifications1.poll(), equalTo(jobId1));
         assertThat(registeredNotifications2.isEmpty(), is(true));
@@ -176,7 +186,8 @@ public class KvStateRegistryTest extends TestLogger {
                         jobVertexId2,
                         keyGroupRange2,
                         registrationName2,
-                        new DummyKvState());
+                        new DummyKvState(),
+                        getClass().getClassLoader());
 
         assertThat(registeredNotifications2.poll(), equalTo(jobId2));
         assertThat(registeredNotifications1.isEmpty(), is(true));
@@ -222,7 +233,12 @@ public class KvStateRegistryTest extends TestLogger {
         final String registrationName = "registrationName";
         final KvStateID kvStateID =
                 kvStateRegistry.registerKvState(
-                        jobId, jobVertexId, keyGroupRange, registrationName, new DummyKvState());
+                        jobId,
+                        jobVertexId,
+                        keyGroupRange,
+                        registrationName,
+                        new DummyKvState(),
+                        getClass().getClassLoader());
 
         assertThat(stateRegistrationNotifications.poll(), equalTo(jobId));
         // another listener should not have received any notifications


[flink] 04/04: [FLINK-21138] Explicit Classloader in QueryableStateClient

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6cc60de8cd5415ff40d3eae6467c756a54506973
Author: Maciek Próchniak <mp...@touk.pl>
AuthorDate: Wed Feb 10 09:11:37 2021 +0100

    [FLINK-21138] Explicit Classloader in QueryableStateClient
---
 .../client/QueryableStateClient.java               | 22 ++++++++++--
 .../itcases/AbstractQueryableStateTestBase.java    | 39 ++++++++++------------
 2 files changed, 38 insertions(+), 23 deletions(-)

diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
index 8bf601f..ecb3f39 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
@@ -115,6 +115,9 @@ public class QueryableStateClient {
     /** The execution configuration used to instantiate the different (de-)serializers. */
     private ExecutionConfig executionConfig;
 
+    /** The user code classloader, used in loading serializers. */
+    private ClassLoader userClassLoader;
+
     /**
      * Create the Queryable State Client.
      *
@@ -198,6 +201,18 @@ public class QueryableStateClient {
     }
 
     /**
+     * * Replaces the existing {@link ClassLoader} (possibly {@code null}), with the provided one.
+     *
+     * @param userClassLoader The new {@code userClassLoader}.
+     * @return The old classloader, or {@code null} if none was specified.
+     */
+    public ClassLoader setUserClassLoader(ClassLoader userClassLoader) {
+        ClassLoader prev = this.userClassLoader;
+        this.userClassLoader = userClassLoader;
+        return prev;
+    }
+
+    /**
      * Returns a future holding the request result.
      *
      * @param jobId JobID of the job the queryable state belongs to.
@@ -293,12 +308,15 @@ public class QueryableStateClient {
             return FutureUtils.getFailedFuture(e);
         }
 
-        ClassLoader contextLoader = Thread.currentThread().getContextClassLoader();
+        ClassLoader classLoaderToUse =
+                userClassLoader != null
+                        ? userClassLoader
+                        : Thread.currentThread().getContextClassLoader();
         return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace)
                 .thenApply(
                         stateResponse ->
                                 LambdaUtil.withContextClassLoader(
-                                        contextLoader,
+                                        classLoaderToUse,
                                         () -> createState(stateResponse, stateDescriptor)));
     }
 
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index 7c15a22..368dd57 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -67,7 +67,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.testutils.ClassLoaderUtils;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.LambdaUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
@@ -424,11 +423,13 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
             jobGraph.setClasspaths(Arrays.asList(userClassLoader.getURLs()));
 
             clusterClient.submitJob(jobGraph).get();
-            LambdaUtil.withContextClassLoader(
-                    userClassLoader,
-                    () ->
-                            executeValueQuery(
-                                    deadline, client, jobId, stateName, valueState, numElements));
+
+            try {
+                client.setUserClassLoader(userClassLoader);
+                executeValueQuery(deadline, client, jobId, stateName, valueState, numElements);
+            } finally {
+                client.setUserClassLoader(null);
+            }
         }
     }
 
@@ -1389,7 +1390,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
         if (!resultFuture.isDone()) {
             CompletableFuture<S> expected =
                     client.getKvState(jobId, queryName, key, keyTypeInfo, stateDescriptor);
-            ClassLoader contextLoader = Thread.currentThread().getContextClassLoader();
             expected.whenCompleteAsync(
                     (result, throwable) -> {
                         if (throwable != null) {
@@ -1400,20 +1400,17 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
                                                     instanceof UnknownKeyOrNamespaceException)) {
                                 resultFuture.completeExceptionally(throwable.getCause());
                             } else if (deadline.hasTimeLeft()) {
-                                LambdaUtil.withContextClassLoader(
-                                        contextLoader,
-                                        () ->
-                                                getKvStateIgnoringCertainExceptions(
-                                                        deadline,
-                                                        resultFuture,
-                                                        client,
-                                                        jobId,
-                                                        queryName,
-                                                        key,
-                                                        keyTypeInfo,
-                                                        stateDescriptor,
-                                                        failForUnknownKeyOrNamespace,
-                                                        executor));
+                                getKvStateIgnoringCertainExceptions(
+                                        deadline,
+                                        resultFuture,
+                                        client,
+                                        jobId,
+                                        queryName,
+                                        key,
+                                        keyTypeInfo,
+                                        stateDescriptor,
+                                        failForUnknownKeyOrNamespace,
+                                        executor);
                             }
                         } else {
                             resultFuture.complete(result);


[flink] 02/04: [FLINK-21138][tests] Register serializer instance in AbstractQueryableStateTestBase.testCustomKryoSerializerHandling

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 98da61b8e6dbd54b09da864e9f7af33eba9e0c40
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Mon Feb 8 19:28:55 2021 +0100

    [FLINK-21138][tests] Register serializer instance in AbstractQueryableStateTestBase.testCustomKryoSerializerHandling
    
    The test AbstractQueryableStateTestBase.testCustomKryoSerializerHandling only works if one registers an actual
    serializer instance.
---
 .../itcases/AbstractQueryableStateTestBase.java           | 15 +++++++++++----
 1 file changed, 11 insertions(+), 4 deletions(-)

diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index 733c4bd..63fedfa 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -80,6 +80,7 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.net.URLClassLoader;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -392,10 +393,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
 
         // Custom serializer is not needed, it's used just to check if serialization works.
-        @SuppressWarnings("unchecked")
-        Class<Serializer<?>> customSerializerClass =
-                (Class<Serializer<?>>) userClassLoader.loadClass(customSerializerClassName);
-        env.getConfig().addDefaultKryoSerializer(Byte.class, customSerializerClass);
+        env.getConfig().addDefaultKryoSerializer(Byte.class, createSerializer(userClassLoader));
 
         // Here we *force* using Kryo, to check if custom serializers are handled correctly WRT
         // classloading
@@ -431,6 +429,15 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
         }
     }
 
+    @SuppressWarnings("unchecked")
+    <T extends Serializer<Byte> & Serializable> T createSerializer(ClassLoader userClassLoader)
+            throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+        Class<Serializer<?>> customSerializerClass =
+                (Class<Serializer<?>>) userClassLoader.loadClass("CustomKryo");
+
+        return (T) customSerializerClass.newInstance();
+    }
+
     /**
      * Tests that the correct exception is thrown if the query contains a wrong jobId or wrong
      * queryable state name.