You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/11/01 17:23:55 UTC

[flink] branch master updated: [FLINK-14221][table] support drop temp system functions and temp catalog functions

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 34a9c10  [FLINK-14221][table] support drop temp system functions and temp catalog functions
34a9c10 is described below

commit 34a9c104f8679cf866a2de01ca6b87bd8be961a2
Author: bowen.li <bo...@gmail.com>
AuthorDate: Wed Oct 30 16:27:27 2019 -0700

    [FLINK-14221][table] support drop temp system functions and temp catalog functions
    
    Support dropping temp functions in FunctionCatalog.
    
    This closes #10054.
---
 .../flink/table/catalog/FunctionCatalog.java       | 37 ++++++++++++++++++++++
 .../flink/table/catalog/FunctionCatalogTest.java   | 37 +++++++++++++++++++---
 2 files changed, 70 insertions(+), 4 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index 29208e6..d566453 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.catalog;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
 import org.apache.flink.table.delegation.PlannerTypeInferenceUtil;
@@ -194,6 +195,42 @@ public class FunctionCatalog implements FunctionLookup {
 	}
 
 	/**
+	 * Drop a temporary system function.
+	 *
+	 * @param funcName name of the function
+	 * @param ignoreIfNotExist Flag to specify behavior when the function does not exist:
+	 *                         if set to false, throw an exception,
+	 *                         if set to true, do nothing.
+	 */
+	public void dropTempSystemFunction(String funcName, boolean ignoreIfNotExist) {
+		String normalizedName = FunctionIdentifier.normalizeName(funcName);
+
+		FunctionDefinition fd = tempSystemFunctions.remove(normalizedName);
+
+		if (fd == null && !ignoreIfNotExist) {
+			throw new ValidationException(String.format("Temporary system function %s doesn't exist", funcName));
+		}
+	}
+
+	/**
+	 * Drop a temporary catalog function.
+	 *
+	 * @param identifier identifier of the function
+	 * @param ignoreIfNotExist Flag to specify behavior when the function does not exist:
+	 *                         if set to false, throw an exception,
+	 *                         if set to true, do nothing.
+	 */
+	public void dropTempCatalogFunction(ObjectIdentifier identifier, boolean ignoreIfNotExist) {
+		ObjectIdentifier normalizedName = FunctionIdentifier.normalizeObjectIdentifier(identifier);
+
+		FunctionDefinition fd = tempCatalogFunctions.remove(normalizedName);
+
+		if (fd == null && !ignoreIfNotExist) {
+			throw new ValidationException(String.format("Temporary catalog function %s doesn't exist", identifier));
+		}
+	}
+
+	/**
 	 * Get names of all user defined functions, including temp system functions, temp catalog functions and catalog functions
 	 * in the current catalog and current database.
 	 */
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
index ee94c00..fb0205d 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.module.exceptions.ModuleAlreadyExistException;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Optional;
@@ -50,6 +51,10 @@ public class FunctionCatalogTest {
 	private ModuleManager moduleManager;
 
 	private final String testCatalogName = "test";
+	private final ObjectIdentifier oi = ObjectIdentifier.of(
+		testCatalogName,
+		GenericInMemoryCatalog.DEFAULT_DB,
+		TEST_FUNCTION_NAME);
 
 	private static final String TEST_FUNCTION_NAME = "test_function";
 
@@ -104,10 +109,6 @@ public class FunctionCatalogTest {
 
 	@Test
 	public void testAmbiguousFunctionReference() throws FunctionAlreadyExistException, DatabaseNotExistException, ModuleAlreadyExistException {
-		ObjectIdentifier oi = ObjectIdentifier.of(
-			testCatalogName,
-			GenericInMemoryCatalog.DEFAULT_DB,
-			TEST_FUNCTION_NAME);
 
 		// test no function is found
 		assertFalse(functionCatalog.lookupFunction(FunctionIdentifier.of(TEST_FUNCTION_NAME)).isPresent());
@@ -166,6 +167,34 @@ public class FunctionCatalogTest {
 		}
 	}
 
+	@Test
+	public void testRegisterAndDropTempSystemFunction() {
+		assertFalse(Arrays.asList(functionCatalog.getUserDefinedFunctions()).contains(TEST_FUNCTION_NAME));
+
+		functionCatalog.registerTempSystemScalarFunction(TEST_FUNCTION_NAME, new TestFunction1());
+		assertTrue(Arrays.asList(functionCatalog.getUserDefinedFunctions()).contains(TEST_FUNCTION_NAME));
+
+		functionCatalog.dropTempSystemFunction(TEST_FUNCTION_NAME, false);
+		assertFalse(Arrays.asList(functionCatalog.getUserDefinedFunctions()).contains(TEST_FUNCTION_NAME));
+
+		functionCatalog.dropTempSystemFunction(TEST_FUNCTION_NAME, true);
+		assertFalse(Arrays.asList(functionCatalog.getUserDefinedFunctions()).contains(TEST_FUNCTION_NAME));
+	}
+
+	@Test
+	public void testRegisterAndDropTempCatalogFunction() {
+		assertFalse(Arrays.asList(functionCatalog.getUserDefinedFunctions()).contains(TEST_FUNCTION_NAME));
+
+		functionCatalog.registerTempCatalogScalarFunction(oi, new TestFunction1());
+		assertTrue(Arrays.asList(functionCatalog.getUserDefinedFunctions()).contains(oi.getObjectName()));
+
+		functionCatalog.dropTempCatalogFunction(oi, false);
+		assertFalse(Arrays.asList(functionCatalog.getUserDefinedFunctions()).contains(oi.getObjectName()));
+
+		functionCatalog.dropTempCatalogFunction(oi, true);
+		assertFalse(Arrays.asList(functionCatalog.getUserDefinedFunctions()).contains(oi.getObjectName()));
+	}
+
 	/**
 	 * Testing function.
 	 */