You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/06/05 12:17:56 UTC
[flink] branch release-1.11 updated: [FLINK-18076][table sql /
client] Use correct classloader when parsing queries
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new e7902bb [FLINK-18076][table sql / client] Use correct classloader when parsing queries
e7902bb is described below
commit e7902bb4d1329833870ee53c782c6431cfc8cb80
Author: Leonard Xu <xb...@gmail.com>
AuthorDate: Thu Jun 4 23:53:07 2020 +0800
[FLINK-18076][table sql / client] Use correct classloader when parsing queries
This closes #12475
---
.../table/client/gateway/local/LocalExecutor.java | 15 ++++++-
.../table/client/gateway/local/DependencyTest.java | 50 ++++++++++++++++------
2 files changed, 51 insertions(+), 14 deletions(-)
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index fd78712..604734e 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -41,6 +41,7 @@ import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.client.SqlClientException;
import org.apache.flink.table.client.config.Environment;
@@ -56,6 +57,7 @@ import org.apache.flink.table.client.gateway.local.result.ChangelogResult;
import org.apache.flink.table.client.gateway.local.result.DynamicResult;
import org.apache.flink.table.client.gateway.local.result.MaterializedResult;
import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
import org.apache.flink.table.types.utils.DataTypeUtils;
@@ -460,7 +462,18 @@ public class LocalExecutor implements Executor {
public Parser getSqlParser(String sessionId) {
final ExecutionContext<?> context = getExecutionContext(sessionId);
final TableEnvironment tableEnv = context.getTableEnvironment();
- return ((TableEnvironmentInternal) tableEnv).getParser();
+ final Parser parser = ((TableEnvironmentInternal) tableEnv).getParser();
+ return new Parser() {
+ @Override
+ public List<Operation> parse(String statement) {
+ return context.wrapClassLoader(() -> parser.parse(statement));
+ }
+
+ @Override
+ public UnresolvedIdentifier parseIdentifier(String identifier) {
+ return context.wrapClassLoader(() -> parser.parseIdentifier(identifier));
+ }
+ };
}
@Override
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
index 33633f3..87f750d 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
@@ -44,10 +44,12 @@ import org.apache.flink.table.client.gateway.SessionContext;
import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil;
import org.apache.flink.table.client.gateway.utils.TestTableSinkFactoryBase;
import org.apache.flink.table.client.gateway.utils.TestTableSourceFactoryBase;
+import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.ModuleFactory;
import org.apache.flink.table.module.Module;
+import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.DataType;
import org.junit.Test;
@@ -65,6 +67,7 @@ import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATA
import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE;
import static org.apache.flink.table.descriptors.ModuleDescriptorValidator.MODULE_TYPE;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
/**
* Dependency tests for {@link LocalExecutor}. Mainly for testing classloading of dependencies.
@@ -82,6 +85,39 @@ public class DependencyTest {
@Test
public void testTableFactoryDiscovery() throws Exception {
+ final LocalExecutor executor = createExecutor();
+ final SessionContext session = new SessionContext("test-session", new Environment());
+ String sessionId = executor.openSession(session);
+ try {
+ final TableSchema result = executor.getTableSchema(sessionId, "TableNumber1");
+ final TableSchema expected = TableSchema.builder()
+ .field("IntegerField1", Types.INT())
+ .field("StringField1", Types.STRING())
+ .field("rowtimeField", Types.SQL_TIMESTAMP())
+ .build();
+
+ assertEquals(expected, result);
+ } finally {
+ executor.closeSession(sessionId);
+ }
+ }
+
+ @Test
+ public void testSqlParseWithUserClassLoader() throws Exception {
+ final LocalExecutor executor = createExecutor();
+ final SessionContext session = new SessionContext("test-session", new Environment());
+ String sessionId = executor.openSession(session);
+ try {
+ final Parser sqlParser = executor.getSqlParser(sessionId);
+ List<Operation> operations = sqlParser.parse("SELECT IntegerField1, StringField1 FROM TableNumber1");
+
+ assertTrue(operations != null && operations.size() == 1);
+ } finally {
+ executor.closeSession(sessionId);
+ }
+ }
+
+ private LocalExecutor createExecutor() throws Exception {
// create environment
final Map<String, String> replaceVars = new HashMap<>();
replaceVars.put("$VAR_CONNECTOR_TYPE", CONNECTOR_TYPE_VALUE);
@@ -91,24 +127,12 @@ public class DependencyTest {
// create executor with dependencies
final URL dependency = Paths.get("target", TABLE_FACTORY_JAR_FILE).toUri().toURL();
- final LocalExecutor executor = new LocalExecutor(
+ return new LocalExecutor(
env,
Collections.singletonList(dependency),
new Configuration(),
new DefaultCLI(new Configuration()),
new DefaultClusterClientServiceLoader());
-
- final SessionContext session = new SessionContext("test-session", new Environment());
- String sessionId = executor.openSession(session);
-
- final TableSchema result = executor.getTableSchema(sessionId, "TableNumber1");
- final TableSchema expected = TableSchema.builder()
- .field("IntegerField1", Types.INT())
- .field("StringField1", Types.STRING())
- .field("rowtimeField", Types.SQL_TIMESTAMP())
- .build();
-
- assertEquals(expected, result);
}
// --------------------------------------------------------------------------------------------