You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/09/25 17:46:52 UTC

[GitHub] asfgit closed pull request #6725: [FLINK-10263] [sql-client] Fix classloader issues in SQL Client

asfgit closed pull request #6725: [FLINK-10263] [sql-client] Fix classloader issues in SQL Client
URL: https://github.com/apache/flink/pull/6725
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
index b5830725db1..ca0251365e6 100755
--- a/flink-end-to-end-tests/test-scripts/test_sql_client.sh
+++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
@@ -212,6 +212,8 @@ tables:
         type: VARCHAR
       - name: duplicate_count
         type: BIGINT
+      - name: constant
+        type: VARCHAR
     connector:
       type: filesystem
       path: $RESULT
@@ -226,6 +228,8 @@ tables:
           type: VARCHAR
         - name: duplicate_count
           type: BIGINT
+        - name: constant
+          type: VARCHAR
 
 functions:
   - name: RegReplace
@@ -261,7 +265,7 @@ $FLINK_DIR/bin/sql-client.sh embedded \
 
 read -r -d '' SQL_STATEMENT_2 << EOF
 INSERT INTO CsvSinkTable
-  SELECT *
+  SELECT AvroBothTable.*, RegReplace('Test constant folding.', 'Test', 'Success') AS constant
   FROM AvroBothTable
 EOF
 
@@ -285,4 +289,4 @@ for i in {1..10}; do
   sleep 5
 done
 
-check_result_hash "SQLClient" $RESULT "dca08a82cc09f6b19950291dbbef16bb"
+check_result_hash "SQLClient" $RESULT "0a1bf8bf716069b7269f575f87a802c0"
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 85b3e9265a8..552d0b37dca 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -75,6 +75,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Supplier;
 
 /**
  * Context for executing table programs. This class caches everything that can be cached across
@@ -183,6 +184,19 @@ public EnvironmentInstance createEnvironmentInstance() {
 		return tableSinks;
 	}
 
+	/**
+	 * Executes the given supplier using the execution context's classloader as thread classloader.
+	 */
+	public <R> R wrapClassLoader(Supplier<R> supplier) {
+		final ClassLoader previousClassloader = Thread.currentThread().getContextClassLoader();
+		Thread.currentThread().setContextClassLoader(classLoader);
+		try {
+			return supplier.get();
+		} finally {
+			Thread.currentThread().setContextClassLoader(previousClassloader);
+		}
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	private static CommandLine createCommandLine(Deployment deployment, Options commandLineOptions) {
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 3b9e8e99b82..1318043faf1 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -219,14 +219,16 @@ public TableSchema getTableSchema(SessionContext session, String name) throws Sq
 
 	@Override
 	public String explainStatement(SessionContext session, String statement) throws SqlExecutionException {
-		final TableEnvironment tableEnv = getOrCreateExecutionContext(session)
+		final ExecutionContext<?> context = getOrCreateExecutionContext(session);
+		final TableEnvironment tableEnv = context
 			.createEnvironmentInstance()
 			.getTableEnvironment();
 
 		// translate
 		try {
 			final Table table = createTable(tableEnv, statement);
-			return tableEnv.explain(table);
+			// explanation requires an optimization step that might reference UDFs during code compilation
+			return context.wrapClassLoader(() -> tableEnv.explain(table));
 		} catch (Throwable t) {
 			// catch everything such that the query does not crash the executor
 			throw new SqlExecutionException("Invalid SQL statement.", t);
@@ -242,7 +244,7 @@ public ResultDescriptor executeQuery(SessionContext session, String query) throw
 	@Override
 	public TypedResult<List<Tuple2<Boolean, Row>>> retrieveResultChanges(SessionContext session,
 			String resultId) throws SqlExecutionException {
-		final DynamicResult result = resultStore.getResult(resultId);
+		final DynamicResult<?> result = resultStore.getResult(resultId);
 		if (result == null) {
 			throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'.");
 		}
@@ -254,7 +256,7 @@ public ResultDescriptor executeQuery(SessionContext session, String query) throw
 
 	@Override
 	public TypedResult<Integer> snapshotResult(SessionContext session, String resultId, int pageSize) throws SqlExecutionException {
-		final DynamicResult result = resultStore.getResult(resultId);
+		final DynamicResult<?> result = resultStore.getResult(resultId);
 		if (result == null) {
 			throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'.");
 		}
@@ -266,7 +268,7 @@ public ResultDescriptor executeQuery(SessionContext session, String query) throw
 
 	@Override
 	public List<Row> retrieveResultPage(String resultId, int page) throws SqlExecutionException {
-		final DynamicResult result = resultStore.getResult(resultId);
+		final DynamicResult<?> result = resultStore.getResult(resultId);
 		if (result == null) {
 			throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'.");
 		}
@@ -350,7 +352,7 @@ public void stop(SessionContext session) {
 	private <C> ProgramTargetDescriptor executeUpdateInternal(ExecutionContext<C> context, String statement) {
 		final ExecutionContext.EnvironmentInstance envInst = context.createEnvironmentInstance();
 
-		applyUpdate(envInst.getTableEnvironment(), envInst.getQueryConfig(), statement);
+		applyUpdate(context, envInst.getTableEnvironment(), envInst.getQueryConfig(), statement);
 
 		// create job graph with dependencies
 		final String jobName = context.getSessionContext().getName() + ": " + statement;
@@ -392,7 +394,11 @@ public void stop(SessionContext session) {
 		final String jobName = context.getSessionContext().getName() + ": " + query;
 		final JobGraph jobGraph;
 		try {
-			table.writeToSink(result.getTableSink(), envInst.getQueryConfig());
+			// writing to a sink requires an optimization step that might reference UDFs during code compilation
+			context.wrapClassLoader(() -> {
+				table.writeToSink(result.getTableSink(), envInst.getQueryConfig());
+				return null;
+			});
 			jobGraph = envInst.createJobGraph(jobName);
 		} catch (Throwable t) {
 			// the result needs to be closed as long as
@@ -435,10 +441,14 @@ private Table createTable(TableEnvironment tableEnv, String selectQuery) {
 	/**
 	 * Applies the given update statement to the given table environment with query configuration.
 	 */
-	private void applyUpdate(TableEnvironment tableEnv, QueryConfig queryConfig, String updateStatement) {
+	private <C> void applyUpdate(ExecutionContext<C> context, TableEnvironment tableEnv, QueryConfig queryConfig, String updateStatement) {
 		// parse and validate statement
 		try {
-			tableEnv.sqlUpdate(updateStatement, queryConfig);
+			// update statement requires an optimization step that might reference UDFs during code compilation
+			context.wrapClassLoader(() -> {
+				tableEnv.sqlUpdate(updateStatement, queryConfig);
+				return null;
+			});
 		} catch (Throwable t) {
 			// catch everything such that the statement does not crash the executor
 			throw new SqlExecutionException("Invalid SQL update statement.", t);
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
index f58e12cfea2..2b50bb92c9a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
@@ -23,7 +23,6 @@ import java.util
 import org.apache.calcite.plan.RelOptPlanner
 import org.apache.calcite.rex.{RexBuilder, RexNode}
 import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.commons.lang3.StringEscapeUtils
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.api.java.typeutils.RowTypeInfo
@@ -101,7 +100,10 @@ class ExpressionReducer(config: TableConfig)
         |""".stripMargin,
       resultType)
 
-    val clazz = compile(getClass.getClassLoader, generatedFunction.name, generatedFunction.code)
+    val clazz = compile(
+      Thread.currentThread().getContextClassLoader,
+      generatedFunction.name,
+      generatedFunction.code)
     val function = clazz.newInstance()
 
     // execute


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services