You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/07/17 21:17:30 UTC

[flink] branch release-1.9 updated: [FLINK-13296][table] FunctionCatalog.lookupFunction() should check in memory functions if the target function doesn't exist in catalog

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 243fb4d  [FLINK-13296][table] FunctionCatalog.lookupFunction() should check in memory functions if the target function doesn't exist in catalog
243fb4d is described below

commit 243fb4d1237b86db8f9245954a8150236360034f
Author: bowen.li <bo...@gmail.com>
AuthorDate: Tue Jul 16 11:30:31 2019 -0700

    [FLINK-13296][table] FunctionCatalog.lookupFunction() should check in memory functions if the target function doesn't exist in catalog
    
    Currently the logic to lookup a function is check either the catalog or the in memory function. But the correct logic is to 1st check the catalog, and if the function doesn't exist there, check in memory functions. There should be a resolution order.
    
    This closes #9135.
---
 .../client/gateway/local/LocalExecutorITCase.java  |  2 -
 .../flink/table/catalog/FunctionCatalog.java       | 65 +++++++++++-----------
 2 files changed, 34 insertions(+), 33 deletions(-)

diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 18d794c..ac1a7ae 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -49,7 +49,6 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
@@ -430,7 +429,6 @@ public class LocalExecutorITCase extends TestLogger {
 		}
 	}
 
-	@Ignore
 	@Test
 	public void testUseCatalogAndUseDatabase() throws Exception {
 		final String csvOutputPath = new File(tempFolder.newFolder().getAbsolutePath(), "test-out.csv").toURI().toString();
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index af7f21f..01639b8 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -159,56 +159,59 @@ public class FunctionCatalog implements FunctionLookup {
 	public Optional<FunctionLookup.Result> lookupFunction(String name) {
 		String functionName = normalizeName(name);
 
-		FunctionDefinition userCandidate = null;
+		FunctionDefinition userCandidate;
 
 		Catalog catalog = catalogManager.getCatalog(catalogManager.getCurrentCatalog()).get();
 
-		if (catalog.getTableFactory().isPresent() &&
+		try {
+			CatalogFunction catalogFunction = catalog.getFunction(
+				new ObjectPath(catalogManager.getCurrentDatabase(), functionName));
+
+			if (catalog.getTableFactory().isPresent() &&
 				catalog.getTableFactory().get() instanceof FunctionDefinitionFactory) {
-			try {
-				CatalogFunction catalogFunction = catalog.getFunction(
-					new ObjectPath(catalogManager.getCurrentDatabase(), functionName));
 
 				FunctionDefinitionFactory factory = (FunctionDefinitionFactory) catalog.getTableFactory().get();
 
 				userCandidate = factory.createFunctionDefinition(functionName, catalogFunction);
-			} catch (FunctionNotExistException e) {
-				// Ignore
-			}
 
-			return Optional.of(
+				return Optional.of(
 					new FunctionLookup.Result(
 						ObjectIdentifier.of(catalogManager.getCurrentCatalog(), catalogManager.getCurrentDatabase(), name),
 						userCandidate)
 				);
-		} else {
-			// Else, check in-memory functions
-			userCandidate = userFunctions.get(functionName);
-
-			final Optional<FunctionDefinition> foundDefinition;
-			if (userCandidate != null) {
-				foundDefinition = Optional.of(userCandidate);
 			} else {
+				// TODO: should go thru function definition discover service
+			}
+		} catch (FunctionNotExistException e) {
+			// Ignore
+		}
 
-				// TODO once we connect this class with the Catalog APIs we need to make sure that
-				//  built-in functions are present in "root" built-in catalog. This allows to
-				//  overwrite built-in functions but also fallback to the "root" catalog. It should be
-				//  possible to disable the "root" catalog if that is desired.
+		// If no corresponding function is found in catalog, check in-memory functions
+		userCandidate = userFunctions.get(functionName);
 
-				foundDefinition = BuiltInFunctionDefinitions.getDefinitions()
-					.stream()
-					.filter(f -> functionName.equals(normalizeName(f.getName())))
-					.findFirst()
-					.map(Function.identity());
-			}
+		final Optional<FunctionDefinition> foundDefinition;
+		if (userCandidate != null) {
+			foundDefinition = Optional.of(userCandidate);
+		} else {
 
-			String defaultCatalogName = catalogManager.getDefaultCatalogName();
+			// TODO once we connect this class with the Catalog APIs we need to make sure that
+			//  built-in functions are present in "root" built-in catalog. This allows to
+			//  overwrite built-in functions but also fallback to the "root" catalog. It should be
+			//  possible to disable the "root" catalog if that is desired.
 
-			return foundDefinition.map(definition -> new FunctionLookup.Result(
-				ObjectIdentifier.of(defaultCatalogName, catalogManager.getCatalog(defaultCatalogName).get().getDefaultDatabase(), name),
-				definition)
-			);
+			foundDefinition = BuiltInFunctionDefinitions.getDefinitions()
+				.stream()
+				.filter(f -> functionName.equals(normalizeName(f.getName())))
+				.findFirst()
+				.map(Function.identity());
 		}
+
+		String defaultCatalogName = catalogManager.getDefaultCatalogName();
+
+		return foundDefinition.map(definition -> new FunctionLookup.Result(
+			ObjectIdentifier.of(defaultCatalogName, catalogManager.getCatalog(defaultCatalogName).get().getDefaultDatabase(), name),
+			definition)
+		);
 	}
 
 	@Override