You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/08/12 18:06:06 UTC
[flink] branch master updated: [FLINK-13526][sql-client] Switching
to a non existing catalog or database crashes sql-client
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a8fb572 [FLINK-13526][sql-client] Switching to a non existing catalog or database crashes sql-client
a8fb572 is described below
commit a8fb572054c5f2294b46200d64729bc3e20c301b
Author: Rui Li <li...@apache.org>
AuthorDate: Fri Aug 9 11:34:15 2019 +0800
[FLINK-13526][sql-client] Switching to a non existing catalog or database crashes sql-client
Avoid crashing sql-client when switching to non-existing catalog or database.
This closes #9399.
---
.../table/client/gateway/local/LocalExecutor.java | 13 +++++-
.../flink/table/client/cli/CliClientTest.java | 54 ++++++++++++++++++++++
.../client/gateway/local/LocalExecutorITCase.java | 23 +++++++++
3 files changed, 88 insertions(+), 2 deletions(-)
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 3394df7..43d0dc3 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -39,6 +39,7 @@ import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.client.SqlClientException;
import org.apache.flink.table.client.config.Environment;
import org.apache.flink.table.client.gateway.Executor;
@@ -240,7 +241,11 @@ public class LocalExecutor implements Executor {
context.wrapClassLoader(() -> {
// Rely on TableEnvironment/CatalogManager to validate input
- tableEnv.useCatalog(catalogName);
+ try {
+ tableEnv.useCatalog(catalogName);
+ } catch (CatalogException e) {
+ throw new SqlExecutionException("Failed to switch to catalog " + catalogName, e);
+ }
session.setCurrentCatalog(catalogName);
session.setCurrentDatabase(tableEnv.getCurrentDatabase());
return null;
@@ -256,7 +261,11 @@ public class LocalExecutor implements Executor {
context.wrapClassLoader(() -> {
// Rely on TableEnvironment/CatalogManager to validate input
- tableEnv.useDatabase(databaseName);
+ try {
+ tableEnv.useDatabase(databaseName);
+ } catch (CatalogException e) {
+ throw new SqlExecutionException("Failed to switch to database " + databaseName, e);
+ }
session.setCurrentDatabase(databaseName);
return null;
});
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
index 4ab1d41..7266bcb 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
@@ -37,9 +37,13 @@ import org.jline.reader.LineReaderBuilder;
import org.jline.reader.ParsedLine;
import org.jline.reader.Parser;
import org.jline.terminal.Terminal;
+import org.jline.terminal.impl.DumbTerminal;
import org.junit.Test;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -49,6 +53,10 @@ import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
/**
* Tests for the {@link CliClient}.
@@ -84,6 +92,52 @@ public class CliClientTest extends TestLogger {
verifySqlCompletion("show t ", 6, Collections.emptyList(), Collections.singletonList("SET"));
}
+ @Test
+ public void testUseNonExistingDB() throws Exception {
+ Executor executor = mock(Executor.class);
+ doThrow(new SqlExecutionException("mocked exception")).when(executor).useDatabase(any(), any());
+ InputStream inputStream = new ByteArrayInputStream("use db;\n".getBytes());
+ // don't care about the output
+ OutputStream outputStream = new OutputStream() {
+ @Override
+ public void write(int b) throws IOException {
+ }
+ };
+ CliClient cliClient = null;
+ try (Terminal terminal = new DumbTerminal(inputStream, outputStream)) {
+ cliClient = new CliClient(terminal, new SessionContext("test-session", new Environment()), executor);
+ cliClient.open();
+ verify(executor).useDatabase(any(), any());
+ } finally {
+ if (cliClient != null) {
+ cliClient.close();
+ }
+ }
+ }
+
+ @Test
+ public void testUseNonExistingCatalog() throws Exception {
+ Executor executor = mock(Executor.class);
+ doThrow(new SqlExecutionException("mocked exception")).when(executor).useCatalog(any(), any());
+ InputStream inputStream = new ByteArrayInputStream("use catalog cat;\n".getBytes());
+ // don't care about the output
+ OutputStream outputStream = new OutputStream() {
+ @Override
+ public void write(int b) throws IOException {
+ }
+ };
+ CliClient cliClient = null;
+ try (Terminal terminal = new DumbTerminal(inputStream, outputStream)) {
+ cliClient = new CliClient(terminal, new SessionContext("test-session", new Environment()), executor);
+ cliClient.open();
+ verify(executor).useCatalog(any(), any());
+ } finally {
+ if (cliClient != null) {
+ cliClient.close();
+ }
+ }
+ }
+
// --------------------------------------------------------------------------------------------
private void verifyUpdateSubmission(String statement, boolean failExecution, boolean testFailure) {
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 963314b..cbae581 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -51,7 +51,9 @@ import org.apache.flink.util.TestLogger;
import org.junit.BeforeClass;
import org.junit.ClassRule;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -126,6 +128,9 @@ public class LocalExecutorITCase extends TestLogger {
@Parameter
public String planner;
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
+
@Test
public void testValidateSession() throws Exception {
final Executor executor = createDefaultExecutor(clusterClient);
@@ -506,6 +511,24 @@ public class LocalExecutorITCase extends TestLogger {
}
}
+ @Test
+ public void testUseNonExistingDatabase() throws Exception {
+ final Executor executor = createDefaultExecutor(clusterClient);
+ final SessionContext session = new SessionContext("test-session", new Environment());
+
+ exception.expect(SqlExecutionException.class);
+ executor.useDatabase(session, "nonexistingdb");
+ }
+
+ @Test
+ public void testUseNonExistingCatalog() throws Exception {
+ final Executor executor = createDefaultExecutor(clusterClient);
+ final SessionContext session = new SessionContext("test-session", new Environment());
+
+ exception.expect(SqlExecutionException.class);
+ executor.useCatalog(session, "nonexistingcatalog");
+ }
+
private void executeStreamQueryTable(
Map<String, String> replaceVars,
String query,