You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/07/17 21:09:40 UTC

[flink] branch master updated: [FLINK-13296][table] FunctionCatalog.lookupFunction() should check in memory functions if the target function doesn't exist in catalog

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f4e71f9  [FLINK-13296][table] FunctionCatalog.lookupFunction() should check in memory functions if the target function doesn't exist in catalog
f4e71f9 is described below

commit f4e71f92957ca382e4631ebef795711195d742f7
Author: bowen.li <bo...@gmail.com>
AuthorDate: Tue Jul 16 11:30:31 2019 -0700

    [FLINK-13296][table] FunctionCatalog.lookupFunction() should check in memory functions if the target function doesn't exist in catalog
    
    Currently the logic to lookup a function is check either the catalog or the in memory function. But the correct logic is to 1st check the catalog, and if the function doesn't exist there, check in memory functions. There should be a resolution order.
    
    This closes #9135.
---
 .../client/gateway/local/LocalExecutorITCase.java  |  2 -
 .../flink/table/catalog/FunctionCatalog.java       | 65 +++++++++++-----------
 2 files changed, 34 insertions(+), 33 deletions(-)

diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 18d794c..ac1a7ae 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -49,7 +49,6 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
@@ -430,7 +429,6 @@ public class LocalExecutorITCase extends TestLogger {
 		}
 	}
 
-	@Ignore
 	@Test
 	public void testUseCatalogAndUseDatabase() throws Exception {
 		final String csvOutputPath = new File(tempFolder.newFolder().getAbsolutePath(), "test-out.csv").toURI().toString();
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index af7f21f..01639b8 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -159,56 +159,59 @@ public class FunctionCatalog implements FunctionLookup {
 	public Optional<FunctionLookup.Result> lookupFunction(String name) {
 		String functionName = normalizeName(name);
 
-		FunctionDefinition userCandidate = null;
+		FunctionDefinition userCandidate;
 
 		Catalog catalog = catalogManager.getCatalog(catalogManager.getCurrentCatalog()).get();
 
-		if (catalog.getTableFactory().isPresent() &&
+		try {
+			CatalogFunction catalogFunction = catalog.getFunction(
+				new ObjectPath(catalogManager.getCurrentDatabase(), functionName));
+
+			if (catalog.getTableFactory().isPresent() &&
 				catalog.getTableFactory().get() instanceof FunctionDefinitionFactory) {
-			try {
-				CatalogFunction catalogFunction = catalog.getFunction(
-					new ObjectPath(catalogManager.getCurrentDatabase(), functionName));
 
 				FunctionDefinitionFactory factory = (FunctionDefinitionFactory) catalog.getTableFactory().get();
 
 				userCandidate = factory.createFunctionDefinition(functionName, catalogFunction);
-			} catch (FunctionNotExistException e) {
-				// Ignore
-			}
 
-			return Optional.of(
+				return Optional.of(
 					new FunctionLookup.Result(
 						ObjectIdentifier.of(catalogManager.getCurrentCatalog(), catalogManager.getCurrentDatabase(), name),
 						userCandidate)
 				);
-		} else {
-			// Else, check in-memory functions
-			userCandidate = userFunctions.get(functionName);
-
-			final Optional<FunctionDefinition> foundDefinition;
-			if (userCandidate != null) {
-				foundDefinition = Optional.of(userCandidate);
 			} else {
+				// TODO: should go thru function definition discover service
+			}
+		} catch (FunctionNotExistException e) {
+			// Ignore
+		}
 
-				// TODO once we connect this class with the Catalog APIs we need to make sure that
-				//  built-in functions are present in "root" built-in catalog. This allows to
-				//  overwrite built-in functions but also fallback to the "root" catalog. It should be
-				//  possible to disable the "root" catalog if that is desired.
+		// If no corresponding function is found in catalog, check in-memory functions
+		userCandidate = userFunctions.get(functionName);
 
-				foundDefinition = BuiltInFunctionDefinitions.getDefinitions()
-					.stream()
-					.filter(f -> functionName.equals(normalizeName(f.getName())))
-					.findFirst()
-					.map(Function.identity());
-			}
+		final Optional<FunctionDefinition> foundDefinition;
+		if (userCandidate != null) {
+			foundDefinition = Optional.of(userCandidate);
+		} else {
 
-			String defaultCatalogName = catalogManager.getDefaultCatalogName();
+			// TODO once we connect this class with the Catalog APIs we need to make sure that
+			//  built-in functions are present in "root" built-in catalog. This allows to
+			//  overwrite built-in functions but also fallback to the "root" catalog. It should be
+			//  possible to disable the "root" catalog if that is desired.
 
-			return foundDefinition.map(definition -> new FunctionLookup.Result(
-				ObjectIdentifier.of(defaultCatalogName, catalogManager.getCatalog(defaultCatalogName).get().getDefaultDatabase(), name),
-				definition)
-			);
+			foundDefinition = BuiltInFunctionDefinitions.getDefinitions()
+				.stream()
+				.filter(f -> functionName.equals(normalizeName(f.getName())))
+				.findFirst()
+				.map(Function.identity());
 		}
+
+		String defaultCatalogName = catalogManager.getDefaultCatalogName();
+
+		return foundDefinition.map(definition -> new FunctionLookup.Result(
+			ObjectIdentifier.of(defaultCatalogName, catalogManager.getCatalog(defaultCatalogName).get().getDefaultDatabase(), name),
+			definition)
+		);
 	}
 
 	@Override