You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/05/22 02:30:57 UTC

[flink] branch master updated: [FLINK-12241][hive] Support Flink functions in catalog function APIs of HiveCatalog

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3813bb9  [FLINK-12241][hive] Support Flink functions in catalog function APIs of HiveCatalog
3813bb9 is described below

commit 3813bb90ba4fb8bc8637b7c1c5de2e24196c4b3b
Author: bowen.li <bo...@gmail.com>
AuthorDate: Tue May 21 14:06:25 2019 -0700

    [FLINK-12241][hive] Support Flink functions in catalog function APIs of HiveCatalog
    
    This PR adds support for Flink functions in HiveCatalog.
    
    This closes #8503.
---
 .../flink/table/catalog/hive/HiveCatalog.java      | 157 ++++++++++++++--
 .../hive/HiveCatalogGenericMetadataTest.java       |  30 +++
 .../catalog/hive/HiveCatalogHiveMetadataTest.java  |  55 ++++++
 .../table/catalog/GenericCatalogFunction.java      |   4 +
 .../table/catalog/GenericInMemoryCatalogTest.java  | 166 +----------------
 .../flink/table/catalog/CatalogTestBase.java       | 203 ++++++++++++++++++++-
 .../flink/table/catalog/CatalogTestUtil.java       |   5 -
 7 files changed, 437 insertions(+), 183 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 13c5fde..07a7159 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -30,6 +30,7 @@ import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.GenericCatalogDatabase;
+import org.apache.flink.table.catalog.GenericCatalogFunction;
 import org.apache.flink.table.catalog.GenericCatalogTable;
 import org.apache.flink.table.catalog.GenericCatalogView;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
@@ -59,9 +60,12 @@ import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.FunctionType;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.PrincipalType;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
@@ -91,6 +95,11 @@ public class HiveCatalog implements Catalog {
 	private static final String FLINK_PROPERTY_PREFIX = "flink.";
 	private static final String FLINK_PROPERTY_IS_GENERIC = FLINK_PROPERTY_PREFIX + GenericInMemoryCatalog.FLINK_IS_GENERIC_KEY;
 
+	// Prefix used to distinguish Flink functions from Hive functions.
+	// It's appended to Flink function's class name
+	// because Hive's Function object doesn't have properties or other place to store the flag for Flink functions.
+	private static final String FLINK_FUNCTION_PREFIX = "flink:";
+
 	protected final String catalogName;
 	protected final HiveConf hiveConf;
 
@@ -652,54 +661,176 @@ public class HiveCatalog implements Catalog {
 	@Override
 	public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)
 			throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException {
-		throw new UnsupportedOperationException();
+		checkNotNull(functionPath, "functionPath cannot be null");
+		checkNotNull(function, "function cannot be null");
+
+		Function hiveFunction;
+		if (function instanceof GenericCatalogFunction) {
+			hiveFunction = instantiateHiveFunction(functionPath, (GenericCatalogFunction) function);
+		} else {
+			throw new CatalogException(
+				String.format("Unsupported catalog function type %s", function.getClass().getName()));
+		}
+
+		try {
+			client.createFunction(hiveFunction);
+		} catch (NoSuchObjectException e) {
+			throw new DatabaseNotExistException(catalogName, functionPath.getDatabaseName(), e);
+		} catch (AlreadyExistsException e) {
+			if (!ignoreIfExists) {
+				throw new FunctionAlreadyExistException(catalogName, functionPath, e);
+			}
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to create function %s", functionPath.getFullName()), e);
+		}
 	}
 
 	@Override
 	public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists)
 			throws FunctionNotExistException, CatalogException {
-		throw new UnsupportedOperationException();
+		checkNotNull(functionPath, "functionPath cannot be null");
+		checkNotNull(newFunction, "newFunction cannot be null");
+
+		try {
+			CatalogFunction existingFunction = getFunction(functionPath);
+
+			if (existingFunction.getClass() != newFunction.getClass()) {
+				throw new CatalogException(
+					String.format("Function types don't match. Existing function is '%s' and new function is '%s'.",
+						existingFunction.getClass().getName(), newFunction.getClass().getName()));
+			}
+
+			Function hiveFunction;
+			if (existingFunction instanceof GenericCatalogFunction && newFunction instanceof GenericCatalogFunction) {
+					hiveFunction = instantiateHiveFunction(functionPath, (GenericCatalogFunction) newFunction);
+			} else {
+				throw new CatalogException(
+					String.format("Unsupported catalog function type %s", newFunction.getClass().getName()));
+			}
+
+			client.alterFunction(
+				functionPath.getDatabaseName(),
+				functionPath.getObjectName(),
+				hiveFunction);
+		} catch (FunctionNotExistException e) {
+			if (!ignoreIfNotExists) {
+				throw e;
+			}
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to alter function %s", functionPath.getFullName()), e);
+		}
 	}
 
 	@Override
 	public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
 			throws FunctionNotExistException, CatalogException {
-		throw new UnsupportedOperationException();
+		checkNotNull(functionPath, "functionPath cannot be null");
+
+		try {
+			client.dropFunction(functionPath.getDatabaseName(), functionPath.getObjectName());
+		} catch (NoSuchObjectException e) {
+			if (!ignoreIfNotExists) {
+				throw new FunctionNotExistException(catalogName, functionPath, e);
+			}
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to drop function %s", functionPath.getFullName()), e);
+		}
 	}
 
 	@Override
-	public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
+	public List<String> listFunctions(String databaseName) throws DatabaseNotExistException, CatalogException {
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName cannot be null or empty");
 
+		// client.getFunctions() returns empty list when the database doesn't exist
+		// thus we need to explicitly check whether the database exists or not
+		if (!databaseExists(databaseName)) {
+			throw new DatabaseNotExistException(catalogName, databaseName);
+		}
+
+		try {
+			return client.getFunctions(databaseName, null);
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to list functions in database %s", databaseName), e);
+		}
 	}
 
 	@Override
-	public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
+	public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException {
+		checkNotNull(functionPath, "functionPath cannot be null or empty");
+
+		try {
+			Function function = client.getFunction(functionPath.getDatabaseName(), functionPath.getObjectName());
+
+			if (function.getClassName().startsWith(FLINK_FUNCTION_PREFIX)) {
+				// TODO: extract more properties from Hive function and add to CatalogFunction's properties
 
+				Map<String, String> properties = new HashMap<>();
+				properties.put(GenericInMemoryCatalog.FLINK_IS_GENERIC_KEY, GenericInMemoryCatalog.FLINK_IS_GENERIC_VALUE);
+
+				return new GenericCatalogFunction(
+					function.getClassName().substring(FLINK_FUNCTION_PREFIX.length()), properties);
+			} else {
+				throw new CatalogException("Hive function is not supported yet");
+			}
+		} catch (NoSuchObjectException e) {
+			throw new FunctionNotExistException(catalogName, functionPath, e);
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to get function %s", functionPath.getFullName()), e);
+		}
 	}
 
 	@Override
-	public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+	public boolean functionExists(ObjectPath functionPath) throws CatalogException {
+		checkNotNull(functionPath, "functionPath cannot be null or empty");
+
+		try {
+			return client.getFunction(functionPath.getDatabaseName(), functionPath.getObjectName()) != null;
+		} catch (NoSuchObjectException e) {
+			return false;
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to check whether function %s exists or not", functionPath.getFullName()), e);
+		}
+	}
 
+	private static Function instantiateHiveFunction(ObjectPath functionPath, GenericCatalogFunction function) {
+		return new Function(
+			functionPath.getObjectName(),
+			functionPath.getDatabaseName(),
+			FLINK_FUNCTION_PREFIX + function.getClassName(),
+			null,			// Owner name
+			PrincipalType.GROUP,	// Temporarily set to GROUP type because it's required by Hive. May change later
+			(int) (System.currentTimeMillis() / 1000),
+			FunctionType.JAVA,		// FunctionType only has JAVA now
+			new ArrayList<>()		// Resource URIs
+		);
 	}
 
+	// ------ stats ------
+
 	@Override
-	public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+	public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
 
 	}
 
 	@Override
-	public List<String> listFunctions(String dbName) throws DatabaseNotExistException, CatalogException {
-		throw new UnsupportedOperationException();
+	public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
+
 	}
 
 	@Override
-	public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException {
-		throw new UnsupportedOperationException();
+	public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+
 	}
 
 	@Override
-	public boolean functionExists(ObjectPath functionPath) throws CatalogException {
-		throw new UnsupportedOperationException();
+	public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
index ad15b9f..f690fc1 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
@@ -24,12 +24,15 @@ import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTestBase;
 import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.GenericCatalogDatabase;
+import org.apache.flink.table.catalog.GenericCatalogFunction;
 import org.apache.flink.table.catalog.GenericCatalogTable;
 import org.apache.flink.table.catalog.GenericCatalogView;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -90,6 +93,23 @@ public class HiveCatalogGenericMetadataTest extends CatalogTestBase {
 		checkEquals(table, (CatalogTable) catalog.getTable(path1));
 	}
 
+	// ------ functions ------
+
+	@Test
+	public void testAlterFunction_differentTypedFunction() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.createFunction(path1, createFunction(), false);
+
+		exception.expect(CatalogException.class);
+		exception.expectMessage(
+			"Function types don't match. " +
+				"Existing function is 'org.apache.flink.table.catalog.GenericCatalogFunction' and " +
+				"new function is 'org.apache.flink.table.catalog.CatalogTestBase$TestFunction'.");
+		catalog.alterFunction(path1, new TestFunction(), false);
+	}
+
+	// ------ test utils ------
+
 	@Override
 	public CatalogDatabase createDb() {
 		return new GenericCatalogDatabase(
@@ -169,4 +189,14 @@ public class HiveCatalogGenericMetadataTest extends CatalogTestBase {
 			new HashMap<>(),
 			"This is another view");
 	}
+
+	@Override
+	protected CatalogFunction createFunction() {
+		return new GenericCatalogFunction(MyScalarFunction.class.getName());
+	}
+
+	@Override
+	protected CatalogFunction createAnotherFunction() {
+		return new GenericCatalogFunction(MyOtherScalarFunction.class.getName());
+	}
 }
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
index 59ea607..3b221c0 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.catalog.hive;
 
 import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTestBase;
 import org.apache.flink.table.catalog.CatalogView;
@@ -49,6 +50,50 @@ public class HiveCatalogHiveMetadataTest extends CatalogTestBase {
 	public void testCreateTable_Streaming() throws Exception {
 	}
 
+	// ------ functions ------
+
+	public void testCreateFunction() throws Exception {
+	}
+
+	public void testCreateFunction_DatabaseNotExistException() throws Exception {
+	}
+
+	public void testCreateFunction_FunctionAlreadyExistException() throws Exception {
+	}
+
+	public void testCreateFunction_FunctionAlreadyExist_ignored() throws Exception {
+	}
+
+	public void testAlterFunction() throws Exception {
+	}
+
+	public void testAlterFunction_FunctionNotExistException() throws Exception {
+	}
+
+	public void testAlterFunction_FunctionNotExist_ignored() throws Exception {
+	}
+
+	public void testListFunctions() throws Exception {
+	}
+
+	public void testListFunctions_DatabaseNotExistException() throws Exception{
+	}
+
+	public void testGetFunction_FunctionNotExistException() throws Exception {
+	}
+
+	public void testGetFunction_FunctionNotExistException_NoDb() throws Exception {
+	}
+
+	public void testDropFunction() throws Exception {
+	}
+
+	public void testDropFunction_FunctionNotExistException() throws Exception {
+	}
+
+	public void testDropFunction_FunctionNotExist_ignored() throws Exception {
+	}
+
 	// ------ utils ------
 
 	@Override
@@ -133,6 +178,16 @@ public class HiveCatalogHiveMetadataTest extends CatalogTestBase {
 	}
 
 	@Override
+	protected CatalogFunction createFunction() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	protected CatalogFunction createAnotherFunction() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
 	public void checkEquals(CatalogTable t1, CatalogTable t2) {
 		assertEquals(t1.getSchema(), t2.getSchema());
 		assertEquals(t1.getComment(), t2.getComment());
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogFunction.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogFunction.java
index fbfe271..d5056dc 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogFunction.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogFunction.java
@@ -27,6 +27,10 @@ import java.util.Optional;
  */
 public class GenericCatalogFunction extends AbstractCatalogFunction {
 
+	public GenericCatalogFunction(String className) {
+		this(className, new HashMap<>());
+	}
+
 	public GenericCatalogFunction(String className, Map<String, String> properties) {
 		super(className, properties);
 		properties.put(GenericInMemoryCatalog.FLINK_IS_GENERIC_KEY, GenericInMemoryCatalog.FLINK_IS_GENERIC_VALUE);
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
index f3d144f..342eb14 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
@@ -19,9 +19,6 @@
 package org.apache.flink.table.catalog;
 
 import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
-import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
 import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
 import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
 import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
@@ -39,7 +36,6 @@ import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.catalog.stats.Date;
 import org.apache.flink.table.functions.ScalarFunction;
 
-import org.junit.After;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -49,7 +45,6 @@ import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
@@ -64,13 +59,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 		catalog.open();
 	}
 
-	@After
-	public void close() throws Exception {
-		if (catalog.functionExists(path1)) {
-			catalog.dropFunction(path1, true);
-		}
-	}
-
 	// ------ tables ------
 
 	@Test
@@ -455,158 +443,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 		assertFalse(catalog.partitionExists(ObjectPath.fromString("non.exist"), createPartitionSpec()));
 	}
 
-	// ------ functions ------
-
-	@Test
-	public void testCreateFunction() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-
-		assertFalse(catalog.functionExists(path1));
-
-		catalog.createFunction(path1, createFunction(), false);
-
-		assertTrue(catalog.functionExists(path1));
-
-		catalog.dropFunction(path1, false);
-		catalog.dropDatabase(db1, false);
-	}
-
-	@Test
-	public void testCreateFunction_DatabaseNotExistException() throws Exception {
-		assertFalse(catalog.databaseExists(db1));
-
-		exception.expect(DatabaseNotExistException.class);
-		exception.expectMessage("Database db1 does not exist in Catalog");
-		catalog.createFunction(path1, createFunction(), false);
-	}
-
-	@Test
-	public void testCreateFunction_FunctionAlreadyExistException() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		catalog.createFunction(path1, createFunction(), false);
-
-		exception.expect(FunctionAlreadyExistException.class);
-		exception.expectMessage("Function db1.t1 already exists in Catalog");
-		catalog.createFunction(path1, createFunction(), false);
-	}
-
-	@Test
-	public void testCreateFunction_FunctionAlreadyExist_ignored() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-
-		CatalogFunction func = createFunction();
-		catalog.createFunction(path1, func, false);
-
-		CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
-
-		catalog.createFunction(path1, createAnotherFunction(), true);
-
-		CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
-
-		catalog.dropFunction(path1, false);
-		catalog.dropDatabase(db1, false);
-	}
-
-	@Test
-	public void testAlterFunction() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-
-		CatalogFunction func = createFunction();
-		catalog.createFunction(path1, func, false);
-
-		CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
-
-		CatalogFunction newFunc = createAnotherFunction();
-		catalog.alterFunction(path1, newFunc, false);
-
-		assertNotEquals(func, catalog.getFunction(path1));
-		CatalogTestUtil.checkEquals(newFunc, catalog.getFunction(path1));
-
-		catalog.dropFunction(path1, false);
-		catalog.dropDatabase(db1, false);
-	}
-
-	@Test
-	public void testAlterFunction_FunctionNotExistException() throws Exception {
-		exception.expect(FunctionNotExistException.class);
-		exception.expectMessage("Function db1.nonexist does not exist in Catalog");
-		catalog.alterFunction(nonExistObjectPath, createFunction(), false);
-	}
-
-	@Test
-	public void testAlterFunction_FunctionNotExist_ignored() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		catalog.alterFunction(nonExistObjectPath, createFunction(), true);
-
-		assertFalse(catalog.functionExists(nonExistObjectPath));
-
-		catalog.dropDatabase(db1, false);
-	}
-
-	@Test
-	public void testListFunctions() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-
-		CatalogFunction func = createFunction();
-		catalog.createFunction(path1, func, false);
-
-		assertEquals(path1.getObjectName(), catalog.listFunctions(db1).get(0));
-
-		catalog.dropFunction(path1, false);
-		catalog.dropDatabase(db1, false);
-	}
-
-	@Test
-	public void testListFunctions_DatabaseNotExistException() throws Exception{
-		exception.expect(DatabaseNotExistException.class);
-		exception.expectMessage("Database db1 does not exist in Catalog");
-		catalog.listFunctions(db1);
-	}
-
-	@Test
-	public void testGetFunction_FunctionNotExistException() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-
-		exception.expect(FunctionNotExistException.class);
-		exception.expectMessage("Function db1.nonexist does not exist in Catalog");
-		catalog.getFunction(nonExistObjectPath);
-	}
-
-	@Test
-	public void testGetFunction_FunctionNotExistException_NoDb() throws Exception {
-		exception.expect(FunctionNotExistException.class);
-		exception.expectMessage("Function db1.nonexist does not exist in Catalog");
-		catalog.getFunction(nonExistObjectPath);
-	}
-
-	@Test
-	public void testDropFunction() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		catalog.createFunction(path1, createFunction(), false);
-
-		assertTrue(catalog.functionExists(path1));
-
-		catalog.dropFunction(path1, false);
-
-		assertFalse(catalog.functionExists(path1));
-
-		catalog.dropDatabase(db1, false);
-	}
-
-	@Test
-	public void testDropFunction_FunctionNotExistException() throws Exception {
-		exception.expect(FunctionNotExistException.class);
-		exception.expectMessage("Function non.exist does not exist in Catalog");
-		catalog.dropFunction(nonExistDbPath, false);
-	}
-
-	@Test
-	public void testDropFunction_FunctionNotExist_ignored() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		catalog.dropFunction(nonExistObjectPath, true);
-		catalog.dropDatabase(db1, false);
-	}
-
 	// ------ statistics ------
 
 	@Test
@@ -790,10 +626,12 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 		return new CatalogColumnStatistics(colStatsMap);
 	}
 
+	@Override
 	protected CatalogFunction createFunction() {
 		return new GenericCatalogFunction(MyScalarFunction.class.getName(), new HashMap<>());
 	}
 
+	@Override
 	protected CatalogFunction createAnotherFunction() {
 		return new GenericCatalogFunction(MyOtherScalarFunction.class.getName(), new HashMap<>());
 	}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
index 0d9d5df..1e97947 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
@@ -25,8 +25,11 @@ import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.functions.ScalarFunction;
 
 import org.junit.After;
 import org.junit.AfterClass;
@@ -39,6 +42,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -88,7 +92,9 @@ public abstract class CatalogTestBase {
 		if (catalog.tableExists(path4)) {
 			catalog.dropTable(path4, true);
 		}
-
+		if (catalog.functionExists(path1)) {
+			catalog.dropFunction(path1, true);
+		}
 		if (catalog.databaseExists(db1)) {
 			catalog.dropDatabase(db1, true);
 		}
@@ -595,6 +601,134 @@ public abstract class CatalogTestBase {
 		assertTrue(catalog.tableExists(path3));
 	}
 
+	// ------ functions ------
+
+	@Test
+	public void testCreateFunction() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+
+		assertFalse(catalog.functionExists(path1));
+
+		catalog.createFunction(path1, createFunction(), false);
+
+		assertTrue(catalog.functionExists(path1));
+	}
+
+	@Test
+	public void testCreateFunction_DatabaseNotExistException() throws Exception {
+		assertFalse(catalog.databaseExists(db1));
+
+		exception.expect(DatabaseNotExistException.class);
+		exception.expectMessage("Database db1 does not exist in Catalog");
+		catalog.createFunction(path1, createFunction(), false);
+	}
+
+	@Test
+	public void testCreateFunction_FunctionAlreadyExistException() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.createFunction(path1, createFunction(), false);
+
+		assertTrue(catalog.functionExists(path1));
+
+		// test 'ignoreIfExist' flag
+		catalog.createFunction(path1, createAnotherFunction(), true);
+
+		exception.expect(FunctionAlreadyExistException.class);
+		exception.expectMessage("Function db1.t1 already exists in Catalog");
+		catalog.createFunction(path1, createFunction(), false);
+	}
+
+	@Test
+	public void testAlterFunction() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+
+		CatalogFunction func = createFunction();
+		catalog.createFunction(path1, func, false);
+
+		checkEquals(func, catalog.getFunction(path1));
+
+		CatalogFunction newFunc = createAnotherFunction();
+		catalog.alterFunction(path1, newFunc, false);
+		CatalogFunction actual = catalog.getFunction(path1);
+
+		assertFalse(func.getClassName().equals(actual.getClassName()));
+		checkEquals(newFunc, actual);
+	}
+
+	@Test
+	public void testAlterFunction_FunctionNotExistException() throws Exception {
+		exception.expect(FunctionNotExistException.class);
+		exception.expectMessage("Function db1.nonexist does not exist in Catalog");
+		catalog.alterFunction(nonExistObjectPath, createFunction(), false);
+	}
+
+	@Test
+	public void testAlterFunction_FunctionNotExist_ignored() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.alterFunction(nonExistObjectPath, createFunction(), true);
+
+		assertFalse(catalog.functionExists(nonExistObjectPath));
+	}
+
+	@Test
+	public void testListFunctions() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+
+		CatalogFunction func = createFunction();
+		catalog.createFunction(path1, func, false);
+
+		assertEquals(path1.getObjectName(), catalog.listFunctions(db1).get(0));
+	}
+
+	@Test
+	public void testListFunctions_DatabaseNotExistException() throws Exception{
+		exception.expect(DatabaseNotExistException.class);
+		exception.expectMessage("Database db1 does not exist in Catalog");
+		catalog.listFunctions(db1);
+	}
+
+	@Test
+	public void testGetFunction_FunctionNotExistException() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+
+		exception.expect(FunctionNotExistException.class);
+		exception.expectMessage("Function db1.nonexist does not exist in Catalog");
+		catalog.getFunction(nonExistObjectPath);
+	}
+
+	@Test
+	public void testGetFunction_FunctionNotExistException_NoDb() throws Exception {
+		exception.expect(FunctionNotExistException.class);
+		exception.expectMessage("Function db1.nonexist does not exist in Catalog");
+		catalog.getFunction(nonExistObjectPath);
+	}
+
+	@Test
+	public void testDropFunction() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.createFunction(path1, createFunction(), false);
+
+		assertTrue(catalog.functionExists(path1));
+
+		catalog.dropFunction(path1, false);
+
+		assertFalse(catalog.functionExists(path1));
+	}
+
+	@Test
+	public void testDropFunction_FunctionNotExistException() throws Exception {
+		exception.expect(FunctionNotExistException.class);
+		exception.expectMessage("Function non.exist does not exist in Catalog");
+		catalog.dropFunction(nonExistDbPath, false);
+	}
+
+	@Test
+	public void testDropFunction_FunctionNotExist_ignored() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.dropFunction(nonExistObjectPath, true);
+		catalog.dropDatabase(db1, false);
+	}
+
 	// ------ utilities ------
 
 	/**
@@ -660,6 +794,20 @@ public abstract class CatalogTestBase {
 	 */
 	public abstract CatalogView createAnotherView();
 
+	/**
+	 * Create a CatalogFunction instance by specific catalog implementation.
+	 *
+	 * @return a CatalogFunction instance
+	 */
+	protected abstract CatalogFunction createFunction();
+
+	/**
+	 * Create another CatalogFunction instance by specific catalog implementation.
+	 *
+	 * @return another CatalogFunction instance
+	 */
+	protected abstract CatalogFunction createAnotherFunction();
+
 	protected TableSchema createTableSchema() {
 		return new TableSchema(
 			new String[] {"first", "second", "third"},
@@ -698,6 +846,54 @@ public abstract class CatalogTestBase {
 		}};
 	}
 
+	/**
+	 * A Flink function for test.
+	 */
+	public static class MyScalarFunction extends ScalarFunction {
+		public Integer eval(Integer i) {
+			return i + 1;
+		}
+	}
+
+	/**
+	 * Another Flink function for test.
+	 */
+	public static class MyOtherScalarFunction extends ScalarFunction {
+		public String eval(Integer i) {
+			return String.valueOf(i);
+		}
+	}
+
+	/**
+	 * Test function used to assert on function of different class.
+	 */
+	public static class TestFunction implements CatalogFunction {
+		@Override
+		public String getClassName() {
+			return null;
+		}
+
+		@Override
+		public Map<String, String> getProperties() {
+			return null;
+		}
+
+		@Override
+		public CatalogFunction copy() {
+			return null;
+		}
+
+		@Override
+		public Optional<String> getDescription() {
+			return Optional.empty();
+		}
+
+		@Override
+		public Optional<String> getDetailedDescription() {
+			return Optional.empty();
+		}
+	}
+
 	// ------ equality check utils ------
 	// Can be overriden by sub test class
 
@@ -716,4 +912,9 @@ public abstract class CatalogTestBase {
 		assertEquals(v1.getOriginalQuery(), v2.getOriginalQuery());
 		assertEquals(v1.getExpandedQuery(), v2.getExpandedQuery());
 	}
+
+	protected void checkEquals(CatalogFunction f1, CatalogFunction f2) {
+		assertEquals(f1.getClassName(), f2.getClassName());
+		assertEquals(f1.getProperties(), f2.getProperties());
+	}
 }
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
index f080bdb..2b98091 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
@@ -50,11 +50,6 @@ public class CatalogTestUtil {
 		assertEquals(d1.getProperties(), d2.getProperties());
 	}
 
-	public static void checkEquals(CatalogFunction f1, CatalogFunction f2) {
-		assertEquals(f1.getClassName(), f2.getClassName());
-		assertEquals(f1.getProperties(), f2.getProperties());
-	}
-
 	public static void checkEquals(CatalogPartition p1, CatalogPartition p2) {
 		assertEquals(p1.getProperties(), p2.getProperties());
 	}