You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/10/16 11:57:43 UTC

[GitHub] asfgit closed pull request #6791: [FLINK-8865][sql-client] Add CLI query code completion in SQL Client

asfgit closed pull request #6791: [FLINK-8865][sql-client] Add CLI query code completion in SQL Client
URL: https://github.com/apache/flink/pull/6791
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
index 7867532e546..d43e82fac96 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
@@ -97,10 +97,14 @@ public CliClient(SessionContext context, Executor executor) {
 			.terminal(terminal)
 			.appName(CliStrings.CLI_NAME)
 			.parser(new SqlMultiLineParser())
+			.completer(new SqlCompleter(context, executor))
 			.build();
 		// this option is disabled for now for correct backslash escaping
 		// a "SELECT '\'" query should return a string with a backslash
 		lineReader.option(LineReader.Option.DISABLE_EVENT_EXPANSION, true);
+		// this variable indicates the distance between the words in typoMatcher
+		// when do complete
+		lineReader.setVariable(LineReader.ERRORS, 1);
 
 		// create prompt
 		prompt = new AttributedStringBuilder()
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCompleter.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCompleter.java
new file mode 100644
index 00000000000..0c11ebaec9e
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCompleter.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.cli;
+
+import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.SessionContext;
+import org.apache.flink.table.client.gateway.local.LocalExecutor;
+
+import org.jline.reader.Candidate;
+import org.jline.reader.Completer;
+import org.jline.reader.LineReader;
+import org.jline.reader.ParsedLine;
+import org.jline.utils.AttributedString;
+
+import java.util.List;
+
+/**
+ * SQL Completer.
+ */
+public class SqlCompleter implements Completer {
+	private SessionContext context;
+	private Executor executor;
+
+	public SqlCompleter(SessionContext context, Executor executor) {
+		this.context = context;
+		this.executor = executor;
+	}
+
+	public void complete(LineReader reader, ParsedLine line, List<Candidate> candidates) {
+		if (executor instanceof LocalExecutor) {
+			try {
+				LocalExecutor localExecutor = (LocalExecutor) executor;
+				List<String> hints = localExecutor.getCompletionHints(context, line.line(), line.cursor());
+				hints.forEach(hint->
+					candidates.add(new Candidate(AttributedString.stripAnsi(hint), hint , null, null, null, null, true)));
+			} catch (Exception e) {
+				//ignore completer exception;
+			}
+		}
+	}
+
+}
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlMultiLineParser.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlMultiLineParser.java
index fb44a1e2ced..ee7e08750a8 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlMultiLineParser.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlMultiLineParser.java
@@ -20,33 +20,26 @@
 
 import org.jline.reader.EOFError;
 import org.jline.reader.ParsedLine;
-import org.jline.reader.Parser;
 import org.jline.reader.impl.DefaultParser;
 
-import java.util.Collections;
-
 /**
  * Multi-line parser for parsing an arbitrary number of SQL lines until a line ends with ';'.
  */
-public class SqlMultiLineParser implements Parser {
+public class SqlMultiLineParser extends DefaultParser {
 
 	private static final String EOF_CHARACTER = ";";
 	private static final String NEW_LINE_PROMPT = ""; // results in simple '>' output
 
 	@Override
 	public ParsedLine parse(String line, int cursor, ParseContext context) {
-		if (!line.trim().endsWith(EOF_CHARACTER)) {
+		if (!line.trim().endsWith(EOF_CHARACTER)
+			&& context != ParseContext.COMPLETE) {
 			throw new EOFError(
 				-1,
 				-1,
 				"New line without EOF character.",
 				NEW_LINE_PROMPT);
 		}
-		return new DefaultParser.ArgumentList(
-			line,
-			Collections.singletonList(line),
-			0,
-			0,
-			cursor);
+		return super.parse(line, cursor, context);
 	}
 }
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 1318043faf1..be9958f1bcc 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -235,6 +235,20 @@ public String explainStatement(SessionContext session, String statement) throws
 		}
 	}
 
+	public List<String> getCompletionHints(SessionContext session, String line, Integer pos) {
+		final TableEnvironment tableEnv = getOrCreateExecutionContext(session)
+				.createEnvironmentInstance()
+				.getTableEnvironment();
+
+		// complete
+		try {
+			return Arrays.asList(tableEnv.getCompletionHints(line, pos));
+		} catch (Exception e) {
+			//ignore completer exception
+		}
+		return Collections.emptyList();
+	}
+
 	@Override
 	public ResultDescriptor executeQuery(SessionContext session, String query) throws SqlExecutionException {
 		final ExecutionContext<?> context = getOrCreateExecutionContext(session);
diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 495f5e0d314..970f3130cae 100644
--- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -31,6 +31,8 @@
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.client.cli.SqlCompleter;
+import org.apache.flink.table.client.cli.SqlMultiLineParser;
 import org.apache.flink.table.client.config.Environment;
 import org.apache.flink.table.client.gateway.Executor;
 import org.apache.flink.table.client.gateway.ProgramTargetDescriptor;
@@ -45,6 +47,11 @@
 import org.apache.flink.types.Row;
 import org.apache.flink.util.TestLogger;
 
+import org.jline.reader.Candidate;
+import org.jline.reader.LineReader;
+import org.jline.reader.LineReaderBuilder;
+import org.jline.reader.ParsedLine;
+import org.jline.reader.Parser.ParseContext;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -222,6 +229,46 @@ public void testTableSchema() throws Exception {
 		assertEquals(expectedTableSchema, actualTableSchema);
 	}
 
+	@Test
+	public void testSqlCompleter() throws Exception {
+		final Executor executor = createDefaultExecutor(clusterClient);
+		final SessionContext session = new SessionContext("test-session", new Environment());
+
+		SqlCompleter completer = new SqlCompleter(session, executor);
+		SqlMultiLineParser parser = new SqlMultiLineParser();
+		LineReader reader = LineReaderBuilder.builder().build();
+
+		String line = "SELECT * FROM Ta";
+		ParsedLine parsedLine = parser.parse(line, 16, ParseContext.COMPLETE);
+		List<Candidate> candidates = new ArrayList<Candidate>();
+		List<String> results = new ArrayList<String>();
+		completer.complete(reader, parsedLine, candidates);
+		candidates.forEach(item->results.add(item.value()));
+		List<String> expectedResults = Arrays.asList(
+				"TableNumber1",
+				"TableNumber2",
+				"TableSourceSink");
+		assertTrue(results.containsAll(expectedResults));
+
+		line = "SELECT * FROM TableNumber2 WH";
+		parsedLine = parser.parse(line, 29, ParseContext.COMPLETE);
+		candidates.clear();
+		results.clear();
+		completer.complete(reader, parsedLine, candidates);
+		candidates.forEach(item->results.add(item.value()));
+		expectedResults = Arrays.asList("WHERE");
+		assertEquals(expectedResults, results);
+
+		line = "SELECT * FROM TableNumber1 WHERE Inte";
+		parsedLine = parser.parse(line, 37, ParseContext.COMPLETE);
+		candidates.clear();
+		results.clear();
+		completer.complete(reader, parsedLine, candidates);
+		candidates.forEach(item->results.add(item.value()));
+		expectedResults = Arrays.asList("IntegerField1");
+		assertTrue(results.containsAll(expectedResults));
+	}
+
 	@Test(timeout = 30_000L)
 	public void testStreamQueryExecutionChangelog() throws Exception {
 		final URL url = getClass().getClassLoader().getResource("test-data.csv");
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index d740c3f1f99..204da815f50 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.api
 
 import _root_.java.lang.reflect.Modifier
 import _root_.java.util.concurrent.atomic.AtomicInteger
+import _root_.java.util.Collections
 
 import com.google.common.collect.ImmutableList
 import org.apache.calcite.config.Lex
@@ -27,12 +28,15 @@ import org.apache.calcite.jdbc.CalciteSchema
 import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
 import org.apache.calcite.plan.hep.{HepMatchOrder, HepPlanner, HepProgramBuilder}
 import org.apache.calcite.plan.{Convention, RelOptPlanner, RelOptUtil, RelTraitSet}
+import org.apache.calcite.prepare.CalciteCatalogReader
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.schema.SchemaPlus
 import org.apache.calcite.schema.impl.AbstractTable
 import org.apache.calcite.sql._
+import org.apache.calcite.sql.advise.{SqlAdvisor, SqlAdvisorValidator}
 import org.apache.calcite.sql.parser.SqlParser
 import org.apache.calcite.sql.util.ChainedSqlOperatorTable
+import org.apache.calcite.sql.validate.SqlConformance
 import org.apache.calcite.tools._
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -45,7 +49,7 @@ import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment =>
 import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaStreamExecEnv}
 import org.apache.flink.table.api.java.{BatchTableEnvironment => JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv}
 import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv}
-import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem}
+import org.apache.flink.table.calcite.{CalciteConfig, FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem}
 import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema}
 import org.apache.flink.table.codegen.{ExpressionReducer, FunctionCodeGenerator, GeneratedFunction}
 import org.apache.flink.table.descriptors.{ConnectorDescriptor, TableDescriptor}
@@ -210,6 +214,7 @@ abstract class TableEnvironment(val config: TableConfig) {
         SqlParser
           .configBuilder()
           .setLex(Lex.JAVA)
+          .setCaseSensitive(false)
           .build()
 
       case Some(sqlParserConfig) =>
@@ -656,6 +661,28 @@ abstract class TableEnvironment(val config: TableConfig) {
     */
   def explain(table: Table): String
 
+  /**
+   *  Gets completion hints for the sql query
+   */
+  def getCompletionHints(query: String, pos: Integer) : Array[String] = {
+    val catalogReader = new CalciteCatalogReader(
+      CalciteSchema.from(rootSchema),
+      Collections.emptyList(),
+      typeFactory,
+      CalciteConfig.connectionConfig(getSqlParserConfig))
+    val validator : SqlAdvisorValidator = new SqlAdvisorValidator(
+      getSqlOperatorTable,
+      catalogReader,
+      typeFactory,
+      SqlConformance.DEFAULT)
+    val advisor: SqlAdvisor = new SqlAdvisor(validator)
+    val replaced: Array[String] = Array(null)
+    val hints = advisor.getCompletionHints(query, pos, replaced)
+      .asScala
+      .map(item => item.toIdentifier.toString)
+    hints.toArray
+  }
+
   /**
     * Evaluates a SQL query on registered tables and retrieves the result as a [[Table]].
     *


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services