You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/08/21 22:58:59 UTC
[flink] branch master updated: [FLINK-13741][table] 'SHOW
FUNCTIONS' should include Flink built-in functions' names
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ae8acd6 [FLINK-13741][table] 'SHOW FUNCTIONS' should include Flink built-in functions' names
ae8acd6 is described below
commit ae8acd6c53bd925c99185db9d40721603fbf74a5
Author: bowen.li <bo...@gmail.com>
AuthorDate: Fri Aug 16 15:12:13 2019 -0700
[FLINK-13741][table] 'SHOW FUNCTIONS' should include Flink built-in functions' names
This closes #9457.
---
flink-python/pyflink/table/table_environment.py | 10 +++++
.../apache/flink/table/client/cli/CliClient.java | 4 +-
.../flink/table/client/gateway/Executor.java | 5 +++
.../table/client/gateway/local/LocalExecutor.java | 9 ++++
.../flink/table/client/cli/CliClientTest.java | 5 +++
.../flink/table/client/cli/CliResultViewTest.java | 5 +++
.../apache/flink/table/api/TableEnvironment.java | 5 +++
.../table/api/internal/TableEnvironmentImpl.java | 5 +++
.../flink/table/catalog/FunctionCatalog.java | 32 ++++++++++---
.../flink/table/catalog/FunctionCatalogTest.java | 52 ++++++++++++++++++++++
.../flink/table/api/internal/TableEnvImpl.scala | 2 +
.../flink/table/utils/MockTableEnvironment.scala | 2 +
12 files changed, 129 insertions(+), 7 deletions(-)
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index 915f9ab..33160e0 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -236,6 +236,16 @@ class TableEnvironment(object):
j_udf_name_array = self._j_tenv.listUserDefinedFunctions()
return [item for item in j_udf_name_array]
+ def list_functions(self):
+ """
+ Gets the names of all functions in this environment.
+
+ :return: List of the names of all functions in this environment.
+ :rtype: list[str]
+ """
+ j_function_name_array = self._j_tenv.listFunctions()
+ return [item for item in j_function_name_array]
+
def explain(self, table=None, extended=False):
"""
Returns the AST of the specified Table API and SQL queries and the execution plan to compute
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
index d1fed89..e956039 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
@@ -49,6 +49,7 @@ import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -406,7 +407,7 @@ public class CliClient {
private void callShowFunctions() {
final List<String> functions;
try {
- functions = executor.listUserDefinedFunctions(context);
+ functions = executor.listFunctions(context);
} catch (SqlExecutionException e) {
printExecutionException(e);
return;
@@ -414,6 +415,7 @@ public class CliClient {
if (functions.isEmpty()) {
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_EMPTY).toAnsi());
} else {
+ Collections.sort(functions);
functions.forEach((v) -> terminal.writer().println(v));
}
terminal.flush();
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
index f95339a..1ad24e4 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
@@ -61,6 +61,11 @@ public interface Executor {
List<String> listUserDefinedFunctions(SessionContext session) throws SqlExecutionException;
/**
+ * Lists all functions known to the executor.
+ */
+ List<String> listFunctions(SessionContext session) throws SqlExecutionException;
+
+ /**
* Sets a catalog with given name as the current catalog.
*/
void useCatalog(SessionContext session, String catalogName) throws SqlExecutionException;
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 43d0dc3..78d742f 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -233,6 +233,15 @@ public class LocalExecutor implements Executor {
}
@Override
+ public List<String> listFunctions(SessionContext session) throws SqlExecutionException {
+ final ExecutionContext<?> context = getOrCreateExecutionContext(session);
+ final TableEnvironment tableEnv = context
+ .createEnvironmentInstance()
+ .getTableEnvironment();
+ return context.wrapClassLoader(() -> Arrays.asList(tableEnv.listFunctions()));
+ }
+
+ @Override
public void useCatalog(SessionContext session, String catalogName) throws SqlExecutionException {
final ExecutionContext<?> context = getOrCreateExecutionContext(session);
final TableEnvironment tableEnv = context
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
index 7266bcb..ec466e3 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
@@ -233,6 +233,11 @@ public class CliClientTest extends TestLogger {
}
@Override
+ public List<String> listFunctions(SessionContext session) throws SqlExecutionException {
+ return null;
+ }
+
+ @Override
public void useCatalog(SessionContext session, String catalogName) throws SqlExecutionException {
}
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java
index 8967a8d..8eff36a 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java
@@ -146,6 +146,11 @@ public class CliResultViewTest {
}
@Override
+ public List<String> listFunctions(SessionContext session) throws SqlExecutionException {
+ return null;
+ }
+
+ @Override
public void useCatalog(SessionContext session, String catalogName) throws SqlExecutionException {
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index 6c3acde..23589b3 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -257,6 +257,11 @@ public interface TableEnvironment {
String[] listUserDefinedFunctions();
/**
+ * Gets the names of all functions in this environment.
+ */
+ String[] listFunctions();
+
+ /**
* Returns the AST of the specified Table API and SQL queries and the execution plan to compute
* the result of the given {@link Table}.
*
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index eda73e1..88aa167 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -258,6 +258,11 @@ public class TableEnvironmentImpl implements TableEnvironment {
}
@Override
+ public String[] listFunctions() {
+ return functionCatalog.getFunctions();
+ }
+
+ @Override
public String explain(Table table) {
return explain(table, false);
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index 1faffde..423b5f3 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.catalog;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
@@ -39,11 +40,11 @@ import org.apache.flink.table.functions.UserDefinedAggregateFunction;
import org.apache.flink.table.functions.UserFunctionsTypeHelper;
import org.apache.flink.util.Preconditions;
-import java.util.ArrayList;
+import java.util.HashSet;
import java.util.LinkedHashMap;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -134,7 +135,25 @@ public class FunctionCatalog implements FunctionLookup {
}
public String[] getUserDefinedFunctions() {
- List<String> result = new ArrayList<>();
+ return getUserDefinedFunctionNames().toArray(new String[0]);
+ }
+
+ public String[] getFunctions() {
+ Set<String> result = getUserDefinedFunctionNames();
+
+ // Get built-in functions
+ result.addAll(
+ BuiltInFunctionDefinitions.getDefinitions()
+ .stream()
+ .map(f -> normalizeName(f.getName()))
+ .collect(Collectors.toSet())
+ );
+
+ return result.toArray(new String[0]);
+ }
+
+ private Set<String> getUserDefinedFunctionNames() {
+ Set<String> result = new HashSet<>();
// Get functions in catalog
Catalog catalog = catalogManager.getCatalog(catalogManager.getCurrentCatalog()).get();
@@ -148,9 +167,9 @@ public class FunctionCatalog implements FunctionLookup {
result.addAll(
userFunctions.values().stream()
.map(FunctionDefinition::toString)
- .collect(Collectors.toList()));
+ .collect(Collectors.toSet()));
- return result.toArray(new String[0]);
+ return result;
}
@Override
@@ -226,7 +245,8 @@ public class FunctionCatalog implements FunctionLookup {
userFunctions.put(normalizeName(name), functionDefinition);
}
- private String normalizeName(String name) {
+ @VisibleForTesting
+ static String normalizeName(String name) {
return name.toUpperCase();
}
}
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
new file mode 100644
index 0000000..fb11cbc
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for {@link FunctionCatalog}.
+ */
+public class FunctionCatalogTest {
+
+ @Test
+ public void testGetBuiltInFunctions() {
+ FunctionCatalog functionCatalog = new FunctionCatalog(
+ new CatalogManager("test", new GenericInMemoryCatalog("test")));
+
+ Set<String> actual = new HashSet<>();
+ Collections.addAll(actual, functionCatalog.getFunctions());
+
+ Set<String> expected = BuiltInFunctionDefinitions.getDefinitions()
+ .stream()
+ .map(f -> FunctionCatalog.normalizeName(f.getName()))
+ .collect(Collectors.toSet());
+
+ assertTrue(actual.containsAll(expected));
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 8d35b87..89cc759 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -352,6 +352,8 @@ abstract class TableEnvImpl(
override def listUserDefinedFunctions(): Array[String] = functionCatalog.getUserDefinedFunctions
+ override def listFunctions(): Array[String] = functionCatalog.getFunctions
+
override def explain(table: Table): String
override def getCompletionHints(statement: String, position: Int): Array[String] = {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index 6235c7f..6104724 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -58,6 +58,8 @@ class MockTableEnvironment extends TableEnvironment {
override def listUserDefinedFunctions(): Array[String] = ???
+ override def listFunctions(): Array[String] = ???
+
override def explain(table: Table): String = ???
override def explain(table: Table, extended: Boolean): String = ???