You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/08/12 18:06:06 UTC

[flink] branch master updated: [FLINK-13526][sql-client] Switching to a non existing catalog or database crashes sql-client

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a8fb572  [FLINK-13526][sql-client] Switching to a non existing catalog or database crashes sql-client
a8fb572 is described below

commit a8fb572054c5f2294b46200d64729bc3e20c301b
Author: Rui Li <li...@apache.org>
AuthorDate: Fri Aug 9 11:34:15 2019 +0800

    [FLINK-13526][sql-client] Switching to a non existing catalog or database crashes sql-client
    
    Avoid crashing sql-client when switching to non-existing catalog or database.
    
    This closes #9399.
---
 .../table/client/gateway/local/LocalExecutor.java  | 13 +++++-
 .../flink/table/client/cli/CliClientTest.java      | 54 ++++++++++++++++++++++
 .../client/gateway/local/LocalExecutorITCase.java  | 23 +++++++++
 3 files changed, 88 insertions(+), 2 deletions(-)

diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 3394df7..43d0dc3 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -39,6 +39,7 @@ import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.client.SqlClientException;
 import org.apache.flink.table.client.config.Environment;
 import org.apache.flink.table.client.gateway.Executor;
@@ -240,7 +241,11 @@ public class LocalExecutor implements Executor {
 
 		context.wrapClassLoader(() -> {
 			// Rely on TableEnvironment/CatalogManager to validate input
-			tableEnv.useCatalog(catalogName);
+			try {
+				tableEnv.useCatalog(catalogName);
+			} catch (CatalogException e) {
+				throw new SqlExecutionException("Failed to switch to catalog " + catalogName, e);
+			}
 			session.setCurrentCatalog(catalogName);
 			session.setCurrentDatabase(tableEnv.getCurrentDatabase());
 			return null;
@@ -256,7 +261,11 @@ public class LocalExecutor implements Executor {
 
 		context.wrapClassLoader(() -> {
 			// Rely on TableEnvironment/CatalogManager to validate input
-			tableEnv.useDatabase(databaseName);
+			try {
+				tableEnv.useDatabase(databaseName);
+			} catch (CatalogException e) {
+				throw new SqlExecutionException("Failed to switch to database " + databaseName, e);
+			}
 			session.setCurrentDatabase(databaseName);
 			return null;
 		});
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
index 4ab1d41..7266bcb 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
@@ -37,9 +37,13 @@ import org.jline.reader.LineReaderBuilder;
 import org.jline.reader.ParsedLine;
 import org.jline.reader.Parser;
 import org.jline.terminal.Terminal;
+import org.jline.terminal.impl.DumbTerminal;
 import org.junit.Test;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -49,6 +53,10 @@ import java.util.Map;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 
 /**
  * Tests for the {@link CliClient}.
@@ -84,6 +92,52 @@ public class CliClientTest extends TestLogger {
 		verifySqlCompletion("show t ", 6, Collections.emptyList(), Collections.singletonList("SET"));
 	}
 
+	@Test
+	public void testUseNonExistingDB() throws Exception {
+		Executor executor = mock(Executor.class);
+		doThrow(new SqlExecutionException("mocked exception")).when(executor).useDatabase(any(), any());
+		InputStream inputStream = new ByteArrayInputStream("use db;\n".getBytes());
+		// don't care about the output
+		OutputStream outputStream = new OutputStream() {
+			@Override
+			public void write(int b) throws IOException {
+			}
+		};
+		CliClient cliClient = null;
+		try (Terminal terminal = new DumbTerminal(inputStream, outputStream)) {
+			cliClient = new CliClient(terminal, new SessionContext("test-session", new Environment()), executor);
+			cliClient.open();
+			verify(executor).useDatabase(any(), any());
+		} finally {
+			if (cliClient != null) {
+				cliClient.close();
+			}
+		}
+	}
+
+	@Test
+	public void testUseNonExistingCatalog() throws Exception {
+		Executor executor = mock(Executor.class);
+		doThrow(new SqlExecutionException("mocked exception")).when(executor).useCatalog(any(), any());
+		InputStream inputStream = new ByteArrayInputStream("use catalog cat;\n".getBytes());
+		// don't care about the output
+		OutputStream outputStream = new OutputStream() {
+			@Override
+			public void write(int b) throws IOException {
+			}
+		};
+		CliClient cliClient = null;
+		try (Terminal terminal = new DumbTerminal(inputStream, outputStream)) {
+			cliClient = new CliClient(terminal, new SessionContext("test-session", new Environment()), executor);
+			cliClient.open();
+			verify(executor).useCatalog(any(), any());
+		} finally {
+			if (cliClient != null) {
+				cliClient.close();
+			}
+		}
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	private void verifyUpdateSubmission(String statement, boolean failExecution, boolean testFailure) {
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 963314b..cbae581 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -51,7 +51,9 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -126,6 +128,9 @@ public class LocalExecutorITCase extends TestLogger {
 	@Parameter
 	public String planner;
 
+	@Rule
+	public ExpectedException exception = ExpectedException.none();
+
 	@Test
 	public void testValidateSession() throws Exception {
 		final Executor executor = createDefaultExecutor(clusterClient);
@@ -506,6 +511,24 @@ public class LocalExecutorITCase extends TestLogger {
 		}
 	}
 
+	@Test
+	public void testUseNonExistingDatabase() throws Exception {
+		final Executor executor = createDefaultExecutor(clusterClient);
+		final SessionContext session = new SessionContext("test-session", new Environment());
+
+		exception.expect(SqlExecutionException.class);
+		executor.useDatabase(session, "nonexistingdb");
+	}
+
+	@Test
+	public void testUseNonExistingCatalog() throws Exception {
+		final Executor executor = createDefaultExecutor(clusterClient);
+		final SessionContext session = new SessionContext("test-session", new Environment());
+
+		exception.expect(SqlExecutionException.class);
+		executor.useCatalog(session, "nonexistingcatalog");
+	}
+
 	private void executeStreamQueryTable(
 			Map<String, String> replaceVars,
 			String query,