You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/08/21 22:58:59 UTC

[flink] branch master updated: [FLINK-13741][table] 'SHOW FUNCTIONS' should include Flink built-in functions' names

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ae8acd6  [FLINK-13741][table] 'SHOW FUNCTIONS' should include Flink built-in functions' names
ae8acd6 is described below

commit ae8acd6c53bd925c99185db9d40721603fbf74a5
Author: bowen.li <bo...@gmail.com>
AuthorDate: Fri Aug 16 15:12:13 2019 -0700

    [FLINK-13741][table] 'SHOW FUNCTIONS' should include Flink built-in functions' names
    
    This closes #9457.
---
 flink-python/pyflink/table/table_environment.py    | 10 +++++
 .../apache/flink/table/client/cli/CliClient.java   |  4 +-
 .../flink/table/client/gateway/Executor.java       |  5 +++
 .../table/client/gateway/local/LocalExecutor.java  |  9 ++++
 .../flink/table/client/cli/CliClientTest.java      |  5 +++
 .../flink/table/client/cli/CliResultViewTest.java  |  5 +++
 .../apache/flink/table/api/TableEnvironment.java   |  5 +++
 .../table/api/internal/TableEnvironmentImpl.java   |  5 +++
 .../flink/table/catalog/FunctionCatalog.java       | 32 ++++++++++---
 .../flink/table/catalog/FunctionCatalogTest.java   | 52 ++++++++++++++++++++++
 .../flink/table/api/internal/TableEnvImpl.scala    |  2 +
 .../flink/table/utils/MockTableEnvironment.scala   |  2 +
 12 files changed, 129 insertions(+), 7 deletions(-)

diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index 915f9ab..33160e0 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -236,6 +236,16 @@ class TableEnvironment(object):
         j_udf_name_array = self._j_tenv.listUserDefinedFunctions()
         return [item for item in j_udf_name_array]
 
+    def list_functions(self):
+        """
+        Gets the names of all functions in this environment.
+
+        :return: List of the names of all functions in this environment.
+        :rtype: list[str]
+        """
+        j_function_name_array = self._j_tenv.listFunctions()
+        return [item for item in j_function_name_array]
+
     def explain(self, table=None, extended=False):
         """
         Returns the AST of the specified Table API and SQL queries and the execution plan to compute
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
index d1fed89..e956039 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
@@ -49,6 +49,7 @@ import java.nio.charset.Charset;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -406,7 +407,7 @@ public class CliClient {
 	private void callShowFunctions() {
 		final List<String> functions;
 		try {
-			functions = executor.listUserDefinedFunctions(context);
+			functions = executor.listFunctions(context);
 		} catch (SqlExecutionException e) {
 			printExecutionException(e);
 			return;
@@ -414,6 +415,7 @@ public class CliClient {
 		if (functions.isEmpty()) {
 			terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_EMPTY).toAnsi());
 		} else {
+			Collections.sort(functions);
 			functions.forEach((v) -> terminal.writer().println(v));
 		}
 		terminal.flush();
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
index f95339a..1ad24e4 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
@@ -61,6 +61,11 @@ public interface Executor {
 	List<String> listUserDefinedFunctions(SessionContext session) throws SqlExecutionException;
 
 	/**
+	 * Lists all functions known to the executor.
+	 */
+	List<String> listFunctions(SessionContext session) throws SqlExecutionException;
+
+	/**
 	 * Sets a catalog with given name as the current catalog.
 	 */
 	void useCatalog(SessionContext session, String catalogName) throws SqlExecutionException;
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 43d0dc3..78d742f 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -233,6 +233,15 @@ public class LocalExecutor implements Executor {
 	}
 
 	@Override
+	public List<String> listFunctions(SessionContext session) throws SqlExecutionException {
+		final ExecutionContext<?> context = getOrCreateExecutionContext(session);
+		final TableEnvironment tableEnv = context
+			.createEnvironmentInstance()
+			.getTableEnvironment();
+		return context.wrapClassLoader(() -> Arrays.asList(tableEnv.listFunctions()));
+	}
+
+	@Override
 	public void useCatalog(SessionContext session, String catalogName) throws SqlExecutionException {
 		final ExecutionContext<?> context = getOrCreateExecutionContext(session);
 		final TableEnvironment tableEnv = context
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
index 7266bcb..ec466e3 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
@@ -233,6 +233,11 @@ public class CliClientTest extends TestLogger {
 		}
 
 		@Override
+		public List<String> listFunctions(SessionContext session) throws SqlExecutionException {
+			return null;
+		}
+
+		@Override
 		public void useCatalog(SessionContext session, String catalogName) throws SqlExecutionException {
 
 		}
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java
index 8967a8d..8eff36a 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java
@@ -146,6 +146,11 @@ public class CliResultViewTest {
 		}
 
 		@Override
+		public List<String> listFunctions(SessionContext session) throws SqlExecutionException {
+			return null;
+		}
+
+		@Override
 		public void useCatalog(SessionContext session, String catalogName) throws SqlExecutionException {
 
 		}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index 6c3acde..23589b3 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -257,6 +257,11 @@ public interface TableEnvironment {
 	String[] listUserDefinedFunctions();
 
 	/**
+	 * Gets the names of all functions in this environment.
+	 */
+	String[] listFunctions();
+
+	/**
 	 * Returns the AST of the specified Table API and SQL queries and the execution plan to compute
 	 * the result of the given {@link Table}.
 	 *
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index eda73e1..88aa167 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -258,6 +258,11 @@ public class TableEnvironmentImpl implements TableEnvironment {
 	}
 
 	@Override
+	public String[] listFunctions() {
+		return functionCatalog.getFunctions();
+	}
+
+	@Override
 	public String explain(Table table) {
 		return explain(table, false);
 	}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index 1faffde..423b5f3 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.catalog;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
@@ -39,11 +40,11 @@ import org.apache.flink.table.functions.UserDefinedAggregateFunction;
 import org.apache.flink.table.functions.UserFunctionsTypeHelper;
 import org.apache.flink.util.Preconditions;
 
-import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -134,7 +135,25 @@ public class FunctionCatalog implements FunctionLookup {
 	}
 
 	public String[] getUserDefinedFunctions() {
-		List<String> result = new ArrayList<>();
+		return getUserDefinedFunctionNames().toArray(new String[0]);
+	}
+
+	public String[] getFunctions() {
+		Set<String> result = getUserDefinedFunctionNames();
+
+		// Get built-in functions
+		result.addAll(
+			BuiltInFunctionDefinitions.getDefinitions()
+				.stream()
+				.map(f -> normalizeName(f.getName()))
+				.collect(Collectors.toSet())
+		);
+
+		return result.toArray(new String[0]);
+	}
+
+	private Set<String> getUserDefinedFunctionNames() {
+		Set<String> result = new HashSet<>();
 
 		// Get functions in catalog
 		Catalog catalog = catalogManager.getCatalog(catalogManager.getCurrentCatalog()).get();
@@ -148,9 +167,9 @@ public class FunctionCatalog implements FunctionLookup {
 		result.addAll(
 			userFunctions.values().stream()
 				.map(FunctionDefinition::toString)
-				.collect(Collectors.toList()));
+				.collect(Collectors.toSet()));
 
-		return result.toArray(new String[0]);
+		return result;
 	}
 
 	@Override
@@ -226,7 +245,8 @@ public class FunctionCatalog implements FunctionLookup {
 		userFunctions.put(normalizeName(name), functionDefinition);
 	}
 
-	private String normalizeName(String name) {
+	@VisibleForTesting
+	static String normalizeName(String name) {
 		return name.toUpperCase();
 	}
 }
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
new file mode 100644
index 0000000..fb11cbc
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for {@link FunctionCatalog}.
+ */
+public class FunctionCatalogTest {
+
+	@Test
+	public void testGetBuiltInFunctions() {
+		FunctionCatalog functionCatalog = new FunctionCatalog(
+			new CatalogManager("test", new GenericInMemoryCatalog("test")));
+
+		Set<String> actual = new HashSet<>();
+		Collections.addAll(actual, functionCatalog.getFunctions());
+
+		Set<String> expected = BuiltInFunctionDefinitions.getDefinitions()
+			.stream()
+			.map(f -> FunctionCatalog.normalizeName(f.getName()))
+			.collect(Collectors.toSet());
+
+		assertTrue(actual.containsAll(expected));
+	}
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 8d35b87..89cc759 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -352,6 +352,8 @@ abstract class TableEnvImpl(
 
   override def listUserDefinedFunctions(): Array[String] = functionCatalog.getUserDefinedFunctions
 
+  override def listFunctions(): Array[String] = functionCatalog.getFunctions
+
   override def explain(table: Table): String
 
   override def getCompletionHints(statement: String, position: Int): Array[String] = {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index 6235c7f..6104724 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -58,6 +58,8 @@ class MockTableEnvironment extends TableEnvironment {
 
   override def listUserDefinedFunctions(): Array[String] = ???
 
+  override def listFunctions(): Array[String] = ???
+
   override def explain(table: Table): String = ???
 
   override def explain(table: Table, extended: Boolean): String = ???