You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/06/05 12:17:56 UTC

[flink] branch release-1.11 updated: [FLINK-18076][table sql / client] Use correct classloader when parsing queries

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new e7902bb  [FLINK-18076][table sql / client] Use correct classloader when parsing queries
e7902bb is described below

commit e7902bb4d1329833870ee53c782c6431cfc8cb80
Author: Leonard Xu <xb...@gmail.com>
AuthorDate: Thu Jun 4 23:53:07 2020 +0800

    [FLINK-18076][table sql / client] Use correct classloader when parsing queries
    
    This closes #12475
---
 .../table/client/gateway/local/LocalExecutor.java  | 15 ++++++-
 .../table/client/gateway/local/DependencyTest.java | 50 ++++++++++++++++------
 2 files changed, 51 insertions(+), 14 deletions(-)

diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index fd78712..604734e 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -41,6 +41,7 @@ import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.client.SqlClientException;
 import org.apache.flink.table.client.config.Environment;
@@ -56,6 +57,7 @@ import org.apache.flink.table.client.gateway.local.result.ChangelogResult;
 import org.apache.flink.table.client.gateway.local.result.DynamicResult;
 import org.apache.flink.table.client.gateway.local.result.MaterializedResult;
 import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
 import org.apache.flink.table.types.utils.DataTypeUtils;
@@ -460,7 +462,18 @@ public class LocalExecutor implements Executor {
 	public Parser getSqlParser(String sessionId) {
 		final ExecutionContext<?> context = getExecutionContext(sessionId);
 		final TableEnvironment tableEnv = context.getTableEnvironment();
-		return ((TableEnvironmentInternal) tableEnv).getParser();
+		final Parser parser = ((TableEnvironmentInternal) tableEnv).getParser();
+		return new Parser() {
+			@Override
+			public List<Operation> parse(String statement) {
+				return context.wrapClassLoader(() -> parser.parse(statement));
+			}
+
+			@Override
+			public UnresolvedIdentifier parseIdentifier(String identifier) {
+				return context.wrapClassLoader(() -> parser.parseIdentifier(identifier));
+			}
+		};
 	}
 
 	@Override
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
index 33633f3..87f750d 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
@@ -44,10 +44,12 @@ import org.apache.flink.table.client.gateway.SessionContext;
 import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil;
 import org.apache.flink.table.client.gateway.utils.TestTableSinkFactoryBase;
 import org.apache.flink.table.client.gateway.utils.TestTableSourceFactoryBase;
+import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.factories.CatalogFactory;
 import org.apache.flink.table.factories.ModuleFactory;
 import org.apache.flink.table.module.Module;
+import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.types.DataType;
 
 import org.junit.Test;
@@ -65,6 +67,7 @@ import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATA
 import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE;
 import static org.apache.flink.table.descriptors.ModuleDescriptorValidator.MODULE_TYPE;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Dependency tests for {@link LocalExecutor}. Mainly for testing classloading of dependencies.
@@ -82,6 +85,39 @@ public class DependencyTest {
 
 	@Test
 	public void testTableFactoryDiscovery() throws Exception {
+		final LocalExecutor executor = createExecutor();
+		final SessionContext session = new SessionContext("test-session", new Environment());
+		String sessionId = executor.openSession(session);
+		try {
+			final TableSchema result = executor.getTableSchema(sessionId, "TableNumber1");
+			final TableSchema expected = TableSchema.builder()
+				.field("IntegerField1", Types.INT())
+				.field("StringField1", Types.STRING())
+				.field("rowtimeField", Types.SQL_TIMESTAMP())
+				.build();
+
+			assertEquals(expected, result);
+		} finally {
+			executor.closeSession(sessionId);
+		}
+	}
+
+	@Test
+	public void testSqlParseWithUserClassLoader() throws Exception {
+		final LocalExecutor executor = createExecutor();
+		final SessionContext session = new SessionContext("test-session", new Environment());
+		String sessionId = executor.openSession(session);
+		try {
+			final Parser sqlParser = executor.getSqlParser(sessionId);
+			List<Operation> operations = sqlParser.parse("SELECT IntegerField1, StringField1 FROM TableNumber1");
+
+			assertTrue(operations != null && operations.size() == 1);
+		} finally {
+			executor.closeSession(sessionId);
+		}
+	}
+
+	private LocalExecutor createExecutor() throws Exception {
 		// create environment
 		final Map<String, String> replaceVars = new HashMap<>();
 		replaceVars.put("$VAR_CONNECTOR_TYPE", CONNECTOR_TYPE_VALUE);
@@ -91,24 +127,12 @@ public class DependencyTest {
 
 		// create executor with dependencies
 		final URL dependency = Paths.get("target", TABLE_FACTORY_JAR_FILE).toUri().toURL();
-		final LocalExecutor executor = new LocalExecutor(
+		return new LocalExecutor(
 			env,
 			Collections.singletonList(dependency),
 			new Configuration(),
 			new DefaultCLI(new Configuration()),
 			new DefaultClusterClientServiceLoader());
-
-		final SessionContext session = new SessionContext("test-session", new Environment());
-		String sessionId = executor.openSession(session);
-
-		final TableSchema result = executor.getTableSchema(sessionId, "TableNumber1");
-		final TableSchema expected = TableSchema.builder()
-			.field("IntegerField1", Types.INT())
-			.field("StringField1", Types.STRING())
-			.field("rowtimeField", Types.SQL_TIMESTAMP())
-			.build();
-
-		assertEquals(expected, result);
 	}
 
 	// --------------------------------------------------------------------------------------------