You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/09/25 17:45:02 UTC
[flink] branch master updated: [FLINK-10263] [sql-client] Fix
classloader issues in SQL Client
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 116347e [FLINK-10263] [sql-client] Fix classloader issues in SQL Client
116347e is described below
commit 116347ea179bdcf0a8eb33581c744143374057a0
Author: Timo Walther <tw...@apache.org>
AuthorDate: Thu Sep 20 10:28:22 2018 +0200
[FLINK-10263] [sql-client] Fix classloader issues in SQL Client
Fixes classloading issues when using a UDF with constant parameters. Every
optimization might need to compile code (i.e. for constant folding), thus,
needs access to the user-code classloader.
This closes #6725.
---
.../test-scripts/test_sql_client.sh | 8 +++++--
.../client/gateway/local/ExecutionContext.java | 14 +++++++++++
.../table/client/gateway/local/LocalExecutor.java | 28 +++++++++++++++-------
.../flink/table/codegen/ExpressionReducer.scala | 6 +++--
4 files changed, 43 insertions(+), 13 deletions(-)
diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
index b583072..ca02513 100755
--- a/flink-end-to-end-tests/test-scripts/test_sql_client.sh
+++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
@@ -212,6 +212,8 @@ tables:
type: VARCHAR
- name: duplicate_count
type: BIGINT
+ - name: constant
+ type: VARCHAR
connector:
type: filesystem
path: $RESULT
@@ -226,6 +228,8 @@ tables:
type: VARCHAR
- name: duplicate_count
type: BIGINT
+ - name: constant
+ type: VARCHAR
functions:
- name: RegReplace
@@ -261,7 +265,7 @@ $FLINK_DIR/bin/sql-client.sh embedded \
read -r -d '' SQL_STATEMENT_2 << EOF
INSERT INTO CsvSinkTable
- SELECT *
+ SELECT AvroBothTable.*, RegReplace('Test constant folding.', 'Test', 'Success') AS constant
FROM AvroBothTable
EOF
@@ -285,4 +289,4 @@ for i in {1..10}; do
sleep 5
done
-check_result_hash "SQLClient" $RESULT "dca08a82cc09f6b19950291dbbef16bb"
+check_result_hash "SQLClient" $RESULT "0a1bf8bf716069b7269f575f87a802c0"
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 85b3e92..552d0b3 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -75,6 +75,7 @@ import java.net.URL;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.Supplier;
/**
* Context for executing table programs. This class caches everything that can be cached across
@@ -183,6 +184,19 @@ public class ExecutionContext<T> {
return tableSinks;
}
+ /**
+ * Executes the given supplier using the execution context's classloader as thread classloader.
+ */
+ public <R> R wrapClassLoader(Supplier<R> supplier) {
+ final ClassLoader previousClassloader = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(classLoader);
+ try {
+ return supplier.get();
+ } finally {
+ Thread.currentThread().setContextClassLoader(previousClassloader);
+ }
+ }
+
// --------------------------------------------------------------------------------------------
private static CommandLine createCommandLine(Deployment deployment, Options commandLineOptions) {
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 3b9e8e9..1318043 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -219,14 +219,16 @@ public class LocalExecutor implements Executor {
@Override
public String explainStatement(SessionContext session, String statement) throws SqlExecutionException {
- final TableEnvironment tableEnv = getOrCreateExecutionContext(session)
+ final ExecutionContext<?> context = getOrCreateExecutionContext(session);
+ final TableEnvironment tableEnv = context
.createEnvironmentInstance()
.getTableEnvironment();
// translate
try {
final Table table = createTable(tableEnv, statement);
- return tableEnv.explain(table);
+ // explanation requires an optimization step that might reference UDFs during code compilation
+ return context.wrapClassLoader(() -> tableEnv.explain(table));
} catch (Throwable t) {
// catch everything such that the query does not crash the executor
throw new SqlExecutionException("Invalid SQL statement.", t);
@@ -242,7 +244,7 @@ public class LocalExecutor implements Executor {
@Override
public TypedResult<List<Tuple2<Boolean, Row>>> retrieveResultChanges(SessionContext session,
String resultId) throws SqlExecutionException {
- final DynamicResult result = resultStore.getResult(resultId);
+ final DynamicResult<?> result = resultStore.getResult(resultId);
if (result == null) {
throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'.");
}
@@ -254,7 +256,7 @@ public class LocalExecutor implements Executor {
@Override
public TypedResult<Integer> snapshotResult(SessionContext session, String resultId, int pageSize) throws SqlExecutionException {
- final DynamicResult result = resultStore.getResult(resultId);
+ final DynamicResult<?> result = resultStore.getResult(resultId);
if (result == null) {
throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'.");
}
@@ -266,7 +268,7 @@ public class LocalExecutor implements Executor {
@Override
public List<Row> retrieveResultPage(String resultId, int page) throws SqlExecutionException {
- final DynamicResult result = resultStore.getResult(resultId);
+ final DynamicResult<?> result = resultStore.getResult(resultId);
if (result == null) {
throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'.");
}
@@ -350,7 +352,7 @@ public class LocalExecutor implements Executor {
private <C> ProgramTargetDescriptor executeUpdateInternal(ExecutionContext<C> context, String statement) {
final ExecutionContext.EnvironmentInstance envInst = context.createEnvironmentInstance();
- applyUpdate(envInst.getTableEnvironment(), envInst.getQueryConfig(), statement);
+ applyUpdate(context, envInst.getTableEnvironment(), envInst.getQueryConfig(), statement);
// create job graph with dependencies
final String jobName = context.getSessionContext().getName() + ": " + statement;
@@ -392,7 +394,11 @@ public class LocalExecutor implements Executor {
final String jobName = context.getSessionContext().getName() + ": " + query;
final JobGraph jobGraph;
try {
- table.writeToSink(result.getTableSink(), envInst.getQueryConfig());
+ // writing to a sink requires an optimization step that might reference UDFs during code compilation
+ context.wrapClassLoader(() -> {
+ table.writeToSink(result.getTableSink(), envInst.getQueryConfig());
+ return null;
+ });
jobGraph = envInst.createJobGraph(jobName);
} catch (Throwable t) {
// the result needs to be closed as long as
@@ -435,10 +441,14 @@ public class LocalExecutor implements Executor {
/**
* Applies the given update statement to the given table environment with query configuration.
*/
- private void applyUpdate(TableEnvironment tableEnv, QueryConfig queryConfig, String updateStatement) {
+ private <C> void applyUpdate(ExecutionContext<C> context, TableEnvironment tableEnv, QueryConfig queryConfig, String updateStatement) {
// parse and validate statement
try {
- tableEnv.sqlUpdate(updateStatement, queryConfig);
+ // update statement requires an optimization step that might reference UDFs during code compilation
+ context.wrapClassLoader(() -> {
+ tableEnv.sqlUpdate(updateStatement, queryConfig);
+ return null;
+ });
} catch (Throwable t) {
// catch everything such that the statement does not crash the executor
throw new SqlExecutionException("Invalid SQL update statement.", t);
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
index f58e12c..2b50bb9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
@@ -23,7 +23,6 @@ import java.util
import org.apache.calcite.plan.RelOptPlanner
import org.apache.calcite.rex.{RexBuilder, RexNode}
import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.commons.lang3.StringEscapeUtils
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.java.typeutils.RowTypeInfo
@@ -101,7 +100,10 @@ class ExpressionReducer(config: TableConfig)
|""".stripMargin,
resultType)
- val clazz = compile(getClass.getClassLoader, generatedFunction.name, generatedFunction.code)
+ val clazz = compile(
+ Thread.currentThread().getContextClassLoader,
+ generatedFunction.name,
+ generatedFunction.code)
val function = clazz.newInstance()
// execute