You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/09/26 08:14:40 UTC

[flink] branch release-1.6 updated: [FLINK-10263] [sql-client] Fix classloader issues in SQL Client

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.6 by this push:
     new 4acb904  [FLINK-10263] [sql-client] Fix classloader issues in SQL Client
4acb904 is described below

commit 4acb90474d18427e45ee0a274fd88cf963d9220c
Author: Timo Walther <tw...@apache.org>
AuthorDate: Thu Sep 20 10:28:22 2018 +0200

    [FLINK-10263] [sql-client] Fix classloader issues in SQL Client
    
    Fixes classloading issues when using a UDF with constant parameters. Every
    optimization might need to compile code (i.e. for constant folding), thus,
    needs access to the user-code classloader.
    
    This closes #6725.
---
 .../test-scripts/test_sql_client.sh                |  8 +++++--
 .../client/gateway/local/ExecutionContext.java     | 14 +++++++++++
 .../table/client/gateway/local/LocalExecutor.java  | 28 +++++++++++++++-------
 .../flink/table/codegen/ExpressionReducer.scala    |  5 +++-
 4 files changed, 43 insertions(+), 12 deletions(-)

diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
index b583072..ca02513 100755
--- a/flink-end-to-end-tests/test-scripts/test_sql_client.sh
+++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
@@ -212,6 +212,8 @@ tables:
         type: VARCHAR
       - name: duplicate_count
         type: BIGINT
+      - name: constant
+        type: VARCHAR
     connector:
       type: filesystem
       path: $RESULT
@@ -226,6 +228,8 @@ tables:
           type: VARCHAR
         - name: duplicate_count
           type: BIGINT
+        - name: constant
+          type: VARCHAR
 
 functions:
   - name: RegReplace
@@ -261,7 +265,7 @@ $FLINK_DIR/bin/sql-client.sh embedded \
 
 read -r -d '' SQL_STATEMENT_2 << EOF
 INSERT INTO CsvSinkTable
-  SELECT *
+  SELECT AvroBothTable.*, RegReplace('Test constant folding.', 'Test', 'Success') AS constant
   FROM AvroBothTable
 EOF
 
@@ -285,4 +289,4 @@ for i in {1..10}; do
   sleep 5
 done
 
-check_result_hash "SQLClient" $RESULT "dca08a82cc09f6b19950291dbbef16bb"
+check_result_hash "SQLClient" $RESULT "0a1bf8bf716069b7269f575f87a802c0"
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 4283953..4361c37 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -74,6 +74,7 @@ import java.net.URL;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Supplier;
 
 /**
  * Context for executing table programs. This class caches everything that can be cached across
@@ -182,6 +183,19 @@ public class ExecutionContext<T> {
 		return tableSinks;
 	}
 
+	/**
+	 * Executes the given supplier using the execution context's classloader as thread classloader.
+	 */
+	public <R> R wrapClassLoader(Supplier<R> supplier) {
+		final ClassLoader previousClassloader = Thread.currentThread().getContextClassLoader();
+		Thread.currentThread().setContextClassLoader(classLoader);
+		try {
+			return supplier.get();
+		} finally {
+			Thread.currentThread().setContextClassLoader(previousClassloader);
+		}
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	private static CommandLine createCommandLine(Deployment deployment, Options commandLineOptions) {
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index b2e8271..c44e3ae 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -219,14 +219,16 @@ public class LocalExecutor implements Executor {
 
 	@Override
 	public String explainStatement(SessionContext session, String statement) throws SqlExecutionException {
-		final TableEnvironment tableEnv = getOrCreateExecutionContext(session)
+		final ExecutionContext<?> context = getOrCreateExecutionContext(session);
+		final TableEnvironment tableEnv = context
 			.createEnvironmentInstance()
 			.getTableEnvironment();
 
 		// translate
 		try {
 			final Table table = createTable(tableEnv, statement);
-			return tableEnv.explain(table);
+			// explanation requires an optimization step that might reference UDFs during code compilation
+			return context.wrapClassLoader(() -> tableEnv.explain(table));
 		} catch (Throwable t) {
 			// catch everything such that the query does not crash the executor
 			throw new SqlExecutionException("Invalid SQL statement.", t);
@@ -242,7 +244,7 @@ public class LocalExecutor implements Executor {
 	@Override
 	public TypedResult<List<Tuple2<Boolean, Row>>> retrieveResultChanges(SessionContext session,
 			String resultId) throws SqlExecutionException {
-		final DynamicResult result = resultStore.getResult(resultId);
+		final DynamicResult<?> result = resultStore.getResult(resultId);
 		if (result == null) {
 			throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'.");
 		}
@@ -254,7 +256,7 @@ public class LocalExecutor implements Executor {
 
 	@Override
 	public TypedResult<Integer> snapshotResult(SessionContext session, String resultId, int pageSize) throws SqlExecutionException {
-		final DynamicResult result = resultStore.getResult(resultId);
+		final DynamicResult<?> result = resultStore.getResult(resultId);
 		if (result == null) {
 			throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'.");
 		}
@@ -266,7 +268,7 @@ public class LocalExecutor implements Executor {
 
 	@Override
 	public List<Row> retrieveResultPage(String resultId, int page) throws SqlExecutionException {
-		final DynamicResult result = resultStore.getResult(resultId);
+		final DynamicResult<?> result = resultStore.getResult(resultId);
 		if (result == null) {
 			throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'.");
 		}
@@ -344,7 +346,7 @@ public class LocalExecutor implements Executor {
 	private <C> ProgramTargetDescriptor executeUpdateInternal(ExecutionContext<C> context, String statement) {
 		final ExecutionContext.EnvironmentInstance envInst = context.createEnvironmentInstance();
 
-		applyUpdate(envInst.getTableEnvironment(), envInst.getQueryConfig(), statement);
+		applyUpdate(context, envInst.getTableEnvironment(), envInst.getQueryConfig(), statement);
 
 		// create job graph with dependencies
 		final String jobName = context.getSessionContext().getName() + ": " + statement;
@@ -386,7 +388,11 @@ public class LocalExecutor implements Executor {
 		final String jobName = context.getSessionContext().getName() + ": " + query;
 		final JobGraph jobGraph;
 		try {
-			table.writeToSink(result.getTableSink(), envInst.getQueryConfig());
+			// writing to a sink requires an optimization step that might reference UDFs during code compilation
+			context.wrapClassLoader(() -> {
+				table.writeToSink(result.getTableSink(), envInst.getQueryConfig());
+				return null;
+			});
 			jobGraph = envInst.createJobGraph(jobName);
 		} catch (Throwable t) {
 			// the result needs to be closed as long as
@@ -429,10 +435,14 @@ public class LocalExecutor implements Executor {
 	/**
 	 * Applies the given update statement to the given table environment with query configuration.
 	 */
-	private void applyUpdate(TableEnvironment tableEnv, QueryConfig queryConfig, String updateStatement) {
+	private <C> void applyUpdate(ExecutionContext<C> context, TableEnvironment tableEnv, QueryConfig queryConfig, String updateStatement) {
 		// parse and validate statement
 		try {
-			tableEnv.sqlUpdate(updateStatement, queryConfig);
+			// update statement requires an optimization step that might reference UDFs during code compilation
+			context.wrapClassLoader(() -> {
+				tableEnv.sqlUpdate(updateStatement, queryConfig);
+				return null;
+			});
 		} catch (Throwable t) {
 			// catch everything such that the statement does not crash the executor
 			throw new SqlExecutionException("Invalid SQL update statement.", t);
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
index a9dbf19..eaec52c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
@@ -101,7 +101,10 @@ class ExpressionReducer(config: TableConfig)
         |""".stripMargin,
       resultType)
 
-    val clazz = compile(getClass.getClassLoader, generatedFunction.name, generatedFunction.code)
+    val clazz = compile(
+      Thread.currentThread().getContextClassLoader,
+      generatedFunction.name,
+      generatedFunction.code)
     val function = clazz.newInstance()
 
     // execute