You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2021/02/11 17:48:12 UTC

[flink] 04/04: [FLINK-21138] Explicit Classloader in QueryableStateClient

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6cc60de8cd5415ff40d3eae6467c756a54506973
Author: Maciek Próchniak <mp...@touk.pl>
AuthorDate: Wed Feb 10 09:11:37 2021 +0100

    [FLINK-21138] Explicit Classloader in QueryableStateClient
---
 .../client/QueryableStateClient.java               | 22 ++++++++++--
 .../itcases/AbstractQueryableStateTestBase.java    | 39 ++++++++++------------
 2 files changed, 38 insertions(+), 23 deletions(-)

diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
index 8bf601f..ecb3f39 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
@@ -115,6 +115,9 @@ public class QueryableStateClient {
     /** The execution configuration used to instantiate the different (de-)serializers. */
     private ExecutionConfig executionConfig;
 
+    /** The user code classloader, used in loading serializers. */
+    private ClassLoader userClassLoader;
+
     /**
      * Create the Queryable State Client.
      *
@@ -198,6 +201,18 @@ public class QueryableStateClient {
     }
 
     /**
+     * * Replaces the existing {@link ClassLoader} (possibly {@code null}), with the provided one.
+     *
+     * @param userClassLoader The new {@code userClassLoader}.
+     * @return The old classloader, or {@code null} if none was specified.
+     */
+    public ClassLoader setUserClassLoader(ClassLoader userClassLoader) {
+        ClassLoader prev = this.userClassLoader;
+        this.userClassLoader = userClassLoader;
+        return prev;
+    }
+
+    /**
      * Returns a future holding the request result.
      *
      * @param jobId JobID of the job the queryable state belongs to.
@@ -293,12 +308,15 @@ public class QueryableStateClient {
             return FutureUtils.getFailedFuture(e);
         }
 
-        ClassLoader contextLoader = Thread.currentThread().getContextClassLoader();
+        ClassLoader classLoaderToUse =
+                userClassLoader != null
+                        ? userClassLoader
+                        : Thread.currentThread().getContextClassLoader();
         return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace)
                 .thenApply(
                         stateResponse ->
                                 LambdaUtil.withContextClassLoader(
-                                        contextLoader,
+                                        classLoaderToUse,
                                         () -> createState(stateResponse, stateDescriptor)));
     }
 
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index 7c15a22..368dd57 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -67,7 +67,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.testutils.ClassLoaderUtils;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.LambdaUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
@@ -424,11 +423,13 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
             jobGraph.setClasspaths(Arrays.asList(userClassLoader.getURLs()));
 
             clusterClient.submitJob(jobGraph).get();
-            LambdaUtil.withContextClassLoader(
-                    userClassLoader,
-                    () ->
-                            executeValueQuery(
-                                    deadline, client, jobId, stateName, valueState, numElements));
+
+            try {
+                client.setUserClassLoader(userClassLoader);
+                executeValueQuery(deadline, client, jobId, stateName, valueState, numElements);
+            } finally {
+                client.setUserClassLoader(null);
+            }
         }
     }
 
@@ -1389,7 +1390,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
         if (!resultFuture.isDone()) {
             CompletableFuture<S> expected =
                     client.getKvState(jobId, queryName, key, keyTypeInfo, stateDescriptor);
-            ClassLoader contextLoader = Thread.currentThread().getContextClassLoader();
             expected.whenCompleteAsync(
                     (result, throwable) -> {
                         if (throwable != null) {
@@ -1400,20 +1400,17 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
                                                     instanceof UnknownKeyOrNamespaceException)) {
                                 resultFuture.completeExceptionally(throwable.getCause());
                             } else if (deadline.hasTimeLeft()) {
-                                LambdaUtil.withContextClassLoader(
-                                        contextLoader,
-                                        () ->
-                                                getKvStateIgnoringCertainExceptions(
-                                                        deadline,
-                                                        resultFuture,
-                                                        client,
-                                                        jobId,
-                                                        queryName,
-                                                        key,
-                                                        keyTypeInfo,
-                                                        stateDescriptor,
-                                                        failForUnknownKeyOrNamespace,
-                                                        executor));
+                                getKvStateIgnoringCertainExceptions(
+                                        deadline,
+                                        resultFuture,
+                                        client,
+                                        jobId,
+                                        queryName,
+                                        key,
+                                        keyTypeInfo,
+                                        stateDescriptor,
+                                        failForUnknownKeyOrNamespace,
+                                        executor);
                             }
                         } else {
                             resultFuture.complete(result);