You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2021/02/11 17:48:12 UTC
[flink] 04/04: [FLINK-21138] Explicit Classloader in
QueryableStateClient
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6cc60de8cd5415ff40d3eae6467c756a54506973
Author: Maciek Próchniak <mp...@touk.pl>
AuthorDate: Wed Feb 10 09:11:37 2021 +0100
[FLINK-21138] Explicit Classloader in QueryableStateClient
---
.../client/QueryableStateClient.java | 22 ++++++++++--
.../itcases/AbstractQueryableStateTestBase.java | 39 ++++++++++------------
2 files changed, 38 insertions(+), 23 deletions(-)
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
index 8bf601f..ecb3f39 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
@@ -115,6 +115,9 @@ public class QueryableStateClient {
/** The execution configuration used to instantiate the different (de-)serializers. */
private ExecutionConfig executionConfig;
+ /** The user code classloader, used in loading serializers. */
+ private ClassLoader userClassLoader;
+
/**
* Create the Queryable State Client.
*
@@ -198,6 +201,18 @@ public class QueryableStateClient {
}
/**
+ * * Replaces the existing {@link ClassLoader} (possibly {@code null}), with the provided one.
+ *
+ * @param userClassLoader The new {@code userClassLoader}.
+ * @return The old classloader, or {@code null} if none was specified.
+ */
+ public ClassLoader setUserClassLoader(ClassLoader userClassLoader) {
+ ClassLoader prev = this.userClassLoader;
+ this.userClassLoader = userClassLoader;
+ return prev;
+ }
+
+ /**
* Returns a future holding the request result.
*
* @param jobId JobID of the job the queryable state belongs to.
@@ -293,12 +308,15 @@ public class QueryableStateClient {
return FutureUtils.getFailedFuture(e);
}
- ClassLoader contextLoader = Thread.currentThread().getContextClassLoader();
+ ClassLoader classLoaderToUse =
+ userClassLoader != null
+ ? userClassLoader
+ : Thread.currentThread().getContextClassLoader();
return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace)
.thenApply(
stateResponse ->
LambdaUtil.withContextClassLoader(
- contextLoader,
+ classLoaderToUse,
() -> createState(stateResponse, stateDescriptor)));
}
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index 7c15a22..368dd57 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -67,7 +67,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.testutils.ClassLoaderUtils;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.LambdaUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
@@ -424,11 +423,13 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
jobGraph.setClasspaths(Arrays.asList(userClassLoader.getURLs()));
clusterClient.submitJob(jobGraph).get();
- LambdaUtil.withContextClassLoader(
- userClassLoader,
- () ->
- executeValueQuery(
- deadline, client, jobId, stateName, valueState, numElements));
+
+ try {
+ client.setUserClassLoader(userClassLoader);
+ executeValueQuery(deadline, client, jobId, stateName, valueState, numElements);
+ } finally {
+ client.setUserClassLoader(null);
+ }
}
}
@@ -1389,7 +1390,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
if (!resultFuture.isDone()) {
CompletableFuture<S> expected =
client.getKvState(jobId, queryName, key, keyTypeInfo, stateDescriptor);
- ClassLoader contextLoader = Thread.currentThread().getContextClassLoader();
expected.whenCompleteAsync(
(result, throwable) -> {
if (throwable != null) {
@@ -1400,20 +1400,17 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
instanceof UnknownKeyOrNamespaceException)) {
resultFuture.completeExceptionally(throwable.getCause());
} else if (deadline.hasTimeLeft()) {
- LambdaUtil.withContextClassLoader(
- contextLoader,
- () ->
- getKvStateIgnoringCertainExceptions(
- deadline,
- resultFuture,
- client,
- jobId,
- queryName,
- key,
- keyTypeInfo,
- stateDescriptor,
- failForUnknownKeyOrNamespace,
- executor));
+ getKvStateIgnoringCertainExceptions(
+ deadline,
+ resultFuture,
+ client,
+ jobId,
+ queryName,
+ key,
+ keyTypeInfo,
+ stateDescriptor,
+ failForUnknownKeyOrNamespace,
+ executor);
}
} else {
resultFuture.complete(result);