You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/05/22 02:30:57 UTC
[flink] branch master updated: [FLINK-12241][hive] Support Flink
functions in catalog function APIs of HiveCatalog
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3813bb9 [FLINK-12241][hive] Support Flink functions in catalog function APIs of HiveCatalog
3813bb9 is described below
commit 3813bb90ba4fb8bc8637b7c1c5de2e24196c4b3b
Author: bowen.li <bo...@gmail.com>
AuthorDate: Tue May 21 14:06:25 2019 -0700
[FLINK-12241][hive] Support Flink functions in catalog function APIs of HiveCatalog
This PR adds support for Flink functions in HiveCatalog.
This closes #8503.
---
.../flink/table/catalog/hive/HiveCatalog.java | 157 ++++++++++++++--
.../hive/HiveCatalogGenericMetadataTest.java | 30 +++
.../catalog/hive/HiveCatalogHiveMetadataTest.java | 55 ++++++
.../table/catalog/GenericCatalogFunction.java | 4 +
.../table/catalog/GenericInMemoryCatalogTest.java | 166 +----------------
.../flink/table/catalog/CatalogTestBase.java | 203 ++++++++++++++++++++-
.../flink/table/catalog/CatalogTestUtil.java | 5 -
7 files changed, 437 insertions(+), 183 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 13c5fde..07a7159 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -30,6 +30,7 @@ import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.GenericCatalogDatabase;
+import org.apache.flink.table.catalog.GenericCatalogFunction;
import org.apache.flink.table.catalog.GenericCatalogTable;
import org.apache.flink.table.catalog.GenericCatalogView;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
@@ -59,9 +60,12 @@ import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.FunctionType;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.PrincipalType;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
@@ -91,6 +95,11 @@ public class HiveCatalog implements Catalog {
private static final String FLINK_PROPERTY_PREFIX = "flink.";
private static final String FLINK_PROPERTY_IS_GENERIC = FLINK_PROPERTY_PREFIX + GenericInMemoryCatalog.FLINK_IS_GENERIC_KEY;
+ // Prefix used to distinguish Flink functions from Hive functions.
+ // It's appended to Flink function's class name
+ // because Hive's Function object doesn't have properties or other place to store the flag for Flink functions.
+ private static final String FLINK_FUNCTION_PREFIX = "flink:";
+
protected final String catalogName;
protected final HiveConf hiveConf;
@@ -652,54 +661,176 @@ public class HiveCatalog implements Catalog {
@Override
public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)
throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException {
- throw new UnsupportedOperationException();
+ checkNotNull(functionPath, "functionPath cannot be null");
+ checkNotNull(function, "function cannot be null");
+
+ Function hiveFunction;
+ if (function instanceof GenericCatalogFunction) {
+ hiveFunction = instantiateHiveFunction(functionPath, (GenericCatalogFunction) function);
+ } else {
+ throw new CatalogException(
+ String.format("Unsupported catalog function type %s", function.getClass().getName()));
+ }
+
+ try {
+ client.createFunction(hiveFunction);
+ } catch (NoSuchObjectException e) {
+ throw new DatabaseNotExistException(catalogName, functionPath.getDatabaseName(), e);
+ } catch (AlreadyExistsException e) {
+ if (!ignoreIfExists) {
+ throw new FunctionAlreadyExistException(catalogName, functionPath, e);
+ }
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to create function %s", functionPath.getFullName()), e);
+ }
}
@Override
public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists)
throws FunctionNotExistException, CatalogException {
- throw new UnsupportedOperationException();
+ checkNotNull(functionPath, "functionPath cannot be null");
+ checkNotNull(newFunction, "newFunction cannot be null");
+
+ try {
+ CatalogFunction existingFunction = getFunction(functionPath);
+
+ if (existingFunction.getClass() != newFunction.getClass()) {
+ throw new CatalogException(
+ String.format("Function types don't match. Existing function is '%s' and new function is '%s'.",
+ existingFunction.getClass().getName(), newFunction.getClass().getName()));
+ }
+
+ Function hiveFunction;
+ if (existingFunction instanceof GenericCatalogFunction && newFunction instanceof GenericCatalogFunction) {
+ hiveFunction = instantiateHiveFunction(functionPath, (GenericCatalogFunction) newFunction);
+ } else {
+ throw new CatalogException(
+ String.format("Unsupported catalog function type %s", newFunction.getClass().getName()));
+ }
+
+ client.alterFunction(
+ functionPath.getDatabaseName(),
+ functionPath.getObjectName(),
+ hiveFunction);
+ } catch (FunctionNotExistException e) {
+ if (!ignoreIfNotExists) {
+ throw e;
+ }
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to alter function %s", functionPath.getFullName()), e);
+ }
}
@Override
public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
throws FunctionNotExistException, CatalogException {
- throw new UnsupportedOperationException();
+ checkNotNull(functionPath, "functionPath cannot be null");
+
+ try {
+ client.dropFunction(functionPath.getDatabaseName(), functionPath.getObjectName());
+ } catch (NoSuchObjectException e) {
+ if (!ignoreIfNotExists) {
+ throw new FunctionNotExistException(catalogName, functionPath, e);
+ }
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to drop function %s", functionPath.getFullName()), e);
+ }
}
@Override
- public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
+ public List<String> listFunctions(String databaseName) throws DatabaseNotExistException, CatalogException {
+ checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName cannot be null or empty");
+ // client.getFunctions() returns empty list when the database doesn't exist
+ // thus we need to explicitly check whether the database exists or not
+ if (!databaseExists(databaseName)) {
+ throw new DatabaseNotExistException(catalogName, databaseName);
+ }
+
+ try {
+ return client.getFunctions(databaseName, null);
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to list functions in database %s", databaseName), e);
+ }
}
@Override
- public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
+ public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException {
+ checkNotNull(functionPath, "functionPath cannot be null or empty");
+
+ try {
+ Function function = client.getFunction(functionPath.getDatabaseName(), functionPath.getObjectName());
+
+ if (function.getClassName().startsWith(FLINK_FUNCTION_PREFIX)) {
+ // TODO: extract more properties from Hive function and add to CatalogFunction's properties
+ Map<String, String> properties = new HashMap<>();
+ properties.put(GenericInMemoryCatalog.FLINK_IS_GENERIC_KEY, GenericInMemoryCatalog.FLINK_IS_GENERIC_VALUE);
+
+ return new GenericCatalogFunction(
+ function.getClassName().substring(FLINK_FUNCTION_PREFIX.length()), properties);
+ } else {
+ throw new CatalogException("Hive function is not supported yet");
+ }
+ } catch (NoSuchObjectException e) {
+ throw new FunctionNotExistException(catalogName, functionPath, e);
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to get function %s", functionPath.getFullName()), e);
+ }
}
@Override
- public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+ public boolean functionExists(ObjectPath functionPath) throws CatalogException {
+ checkNotNull(functionPath, "functionPath cannot be null or empty");
+
+ try {
+ return client.getFunction(functionPath.getDatabaseName(), functionPath.getObjectName()) != null;
+ } catch (NoSuchObjectException e) {
+ return false;
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to check whether function %s exists or not", functionPath.getFullName()), e);
+ }
+ }
+ private static Function instantiateHiveFunction(ObjectPath functionPath, GenericCatalogFunction function) {
+ return new Function(
+ functionPath.getObjectName(),
+ functionPath.getDatabaseName(),
+ FLINK_FUNCTION_PREFIX + function.getClassName(),
+ null, // Owner name
+ PrincipalType.GROUP, // Temporarily set to GROUP type because it's required by Hive. May change later
+ (int) (System.currentTimeMillis() / 1000),
+ FunctionType.JAVA, // FunctionType only has JAVA now
+ new ArrayList<>() // Resource URIs
+ );
}
+ // ------ stats ------
+
@Override
- public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+ public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
}
@Override
- public List<String> listFunctions(String dbName) throws DatabaseNotExistException, CatalogException {
- throw new UnsupportedOperationException();
+ public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
+
}
@Override
- public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException {
- throw new UnsupportedOperationException();
+ public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+
}
@Override
- public boolean functionExists(ObjectPath functionPath) throws CatalogException {
- throw new UnsupportedOperationException();
+ public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+
}
@Override
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
index ad15b9f..f690fc1 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
@@ -24,12 +24,15 @@ import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTestBase;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.GenericCatalogDatabase;
+import org.apache.flink.table.catalog.GenericCatalogFunction;
import org.apache.flink.table.catalog.GenericCatalogTable;
import org.apache.flink.table.catalog.GenericCatalogView;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -90,6 +93,23 @@ public class HiveCatalogGenericMetadataTest extends CatalogTestBase {
checkEquals(table, (CatalogTable) catalog.getTable(path1));
}
+ // ------ functions ------
+
+ @Test
+ public void testAlterFunction_differentTypedFunction() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createFunction(path1, createFunction(), false);
+
+ exception.expect(CatalogException.class);
+ exception.expectMessage(
+ "Function types don't match. " +
+ "Existing function is 'org.apache.flink.table.catalog.GenericCatalogFunction' and " +
+ "new function is 'org.apache.flink.table.catalog.CatalogTestBase$TestFunction'.");
+ catalog.alterFunction(path1, new TestFunction(), false);
+ }
+
+ // ------ test utils ------
+
@Override
public CatalogDatabase createDb() {
return new GenericCatalogDatabase(
@@ -169,4 +189,14 @@ public class HiveCatalogGenericMetadataTest extends CatalogTestBase {
new HashMap<>(),
"This is another view");
}
+
+ @Override
+ protected CatalogFunction createFunction() {
+ return new GenericCatalogFunction(MyScalarFunction.class.getName());
+ }
+
+ @Override
+ protected CatalogFunction createAnotherFunction() {
+ return new GenericCatalogFunction(MyOtherScalarFunction.class.getName());
+ }
}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
index 59ea607..3b221c0 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.catalog.hive;
import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTestBase;
import org.apache.flink.table.catalog.CatalogView;
@@ -49,6 +50,50 @@ public class HiveCatalogHiveMetadataTest extends CatalogTestBase {
public void testCreateTable_Streaming() throws Exception {
}
+ // ------ functions ------
+
+ public void testCreateFunction() throws Exception {
+ }
+
+ public void testCreateFunction_DatabaseNotExistException() throws Exception {
+ }
+
+ public void testCreateFunction_FunctionAlreadyExistException() throws Exception {
+ }
+
+ public void testCreateFunction_FunctionAlreadyExist_ignored() throws Exception {
+ }
+
+ public void testAlterFunction() throws Exception {
+ }
+
+ public void testAlterFunction_FunctionNotExistException() throws Exception {
+ }
+
+ public void testAlterFunction_FunctionNotExist_ignored() throws Exception {
+ }
+
+ public void testListFunctions() throws Exception {
+ }
+
+ public void testListFunctions_DatabaseNotExistException() throws Exception{
+ }
+
+ public void testGetFunction_FunctionNotExistException() throws Exception {
+ }
+
+ public void testGetFunction_FunctionNotExistException_NoDb() throws Exception {
+ }
+
+ public void testDropFunction() throws Exception {
+ }
+
+ public void testDropFunction_FunctionNotExistException() throws Exception {
+ }
+
+ public void testDropFunction_FunctionNotExist_ignored() throws Exception {
+ }
+
// ------ utils ------
@Override
@@ -133,6 +178,16 @@ public class HiveCatalogHiveMetadataTest extends CatalogTestBase {
}
@Override
+ protected CatalogFunction createFunction() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected CatalogFunction createAnotherFunction() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public void checkEquals(CatalogTable t1, CatalogTable t2) {
assertEquals(t1.getSchema(), t2.getSchema());
assertEquals(t1.getComment(), t2.getComment());
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogFunction.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogFunction.java
index fbfe271..d5056dc 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogFunction.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogFunction.java
@@ -27,6 +27,10 @@ import java.util.Optional;
*/
public class GenericCatalogFunction extends AbstractCatalogFunction {
+ public GenericCatalogFunction(String className) {
+ this(className, new HashMap<>());
+ }
+
public GenericCatalogFunction(String className, Map<String, String> properties) {
super(className, properties);
properties.put(GenericInMemoryCatalog.FLINK_IS_GENERIC_KEY, GenericInMemoryCatalog.FLINK_IS_GENERIC_VALUE);
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
index f3d144f..342eb14 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
@@ -19,9 +19,6 @@
package org.apache.flink.table.catalog;
import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
-import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
@@ -39,7 +36,6 @@ import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.catalog.stats.Date;
import org.apache.flink.table.functions.ScalarFunction;
-import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -49,7 +45,6 @@ import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -64,13 +59,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
catalog.open();
}
- @After
- public void close() throws Exception {
- if (catalog.functionExists(path1)) {
- catalog.dropFunction(path1, true);
- }
- }
-
// ------ tables ------
@Test
@@ -455,158 +443,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
assertFalse(catalog.partitionExists(ObjectPath.fromString("non.exist"), createPartitionSpec()));
}
- // ------ functions ------
-
- @Test
- public void testCreateFunction() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
-
- assertFalse(catalog.functionExists(path1));
-
- catalog.createFunction(path1, createFunction(), false);
-
- assertTrue(catalog.functionExists(path1));
-
- catalog.dropFunction(path1, false);
- catalog.dropDatabase(db1, false);
- }
-
- @Test
- public void testCreateFunction_DatabaseNotExistException() throws Exception {
- assertFalse(catalog.databaseExists(db1));
-
- exception.expect(DatabaseNotExistException.class);
- exception.expectMessage("Database db1 does not exist in Catalog");
- catalog.createFunction(path1, createFunction(), false);
- }
-
- @Test
- public void testCreateFunction_FunctionAlreadyExistException() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
- catalog.createFunction(path1, createFunction(), false);
-
- exception.expect(FunctionAlreadyExistException.class);
- exception.expectMessage("Function db1.t1 already exists in Catalog");
- catalog.createFunction(path1, createFunction(), false);
- }
-
- @Test
- public void testCreateFunction_FunctionAlreadyExist_ignored() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
-
- CatalogFunction func = createFunction();
- catalog.createFunction(path1, func, false);
-
- CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
-
- catalog.createFunction(path1, createAnotherFunction(), true);
-
- CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
-
- catalog.dropFunction(path1, false);
- catalog.dropDatabase(db1, false);
- }
-
- @Test
- public void testAlterFunction() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
-
- CatalogFunction func = createFunction();
- catalog.createFunction(path1, func, false);
-
- CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
-
- CatalogFunction newFunc = createAnotherFunction();
- catalog.alterFunction(path1, newFunc, false);
-
- assertNotEquals(func, catalog.getFunction(path1));
- CatalogTestUtil.checkEquals(newFunc, catalog.getFunction(path1));
-
- catalog.dropFunction(path1, false);
- catalog.dropDatabase(db1, false);
- }
-
- @Test
- public void testAlterFunction_FunctionNotExistException() throws Exception {
- exception.expect(FunctionNotExistException.class);
- exception.expectMessage("Function db1.nonexist does not exist in Catalog");
- catalog.alterFunction(nonExistObjectPath, createFunction(), false);
- }
-
- @Test
- public void testAlterFunction_FunctionNotExist_ignored() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
- catalog.alterFunction(nonExistObjectPath, createFunction(), true);
-
- assertFalse(catalog.functionExists(nonExistObjectPath));
-
- catalog.dropDatabase(db1, false);
- }
-
- @Test
- public void testListFunctions() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
-
- CatalogFunction func = createFunction();
- catalog.createFunction(path1, func, false);
-
- assertEquals(path1.getObjectName(), catalog.listFunctions(db1).get(0));
-
- catalog.dropFunction(path1, false);
- catalog.dropDatabase(db1, false);
- }
-
- @Test
- public void testListFunctions_DatabaseNotExistException() throws Exception{
- exception.expect(DatabaseNotExistException.class);
- exception.expectMessage("Database db1 does not exist in Catalog");
- catalog.listFunctions(db1);
- }
-
- @Test
- public void testGetFunction_FunctionNotExistException() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
-
- exception.expect(FunctionNotExistException.class);
- exception.expectMessage("Function db1.nonexist does not exist in Catalog");
- catalog.getFunction(nonExistObjectPath);
- }
-
- @Test
- public void testGetFunction_FunctionNotExistException_NoDb() throws Exception {
- exception.expect(FunctionNotExistException.class);
- exception.expectMessage("Function db1.nonexist does not exist in Catalog");
- catalog.getFunction(nonExistObjectPath);
- }
-
- @Test
- public void testDropFunction() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
- catalog.createFunction(path1, createFunction(), false);
-
- assertTrue(catalog.functionExists(path1));
-
- catalog.dropFunction(path1, false);
-
- assertFalse(catalog.functionExists(path1));
-
- catalog.dropDatabase(db1, false);
- }
-
- @Test
- public void testDropFunction_FunctionNotExistException() throws Exception {
- exception.expect(FunctionNotExistException.class);
- exception.expectMessage("Function non.exist does not exist in Catalog");
- catalog.dropFunction(nonExistDbPath, false);
- }
-
- @Test
- public void testDropFunction_FunctionNotExist_ignored() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
- catalog.dropFunction(nonExistObjectPath, true);
- catalog.dropDatabase(db1, false);
- }
-
// ------ statistics ------
@Test
@@ -790,10 +626,12 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
return new CatalogColumnStatistics(colStatsMap);
}
+ @Override
protected CatalogFunction createFunction() {
return new GenericCatalogFunction(MyScalarFunction.class.getName(), new HashMap<>());
}
+ @Override
protected CatalogFunction createAnotherFunction() {
return new GenericCatalogFunction(MyOtherScalarFunction.class.getName(), new HashMap<>());
}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
index 0d9d5df..1e97947 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
@@ -25,8 +25,11 @@ import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.functions.ScalarFunction;
import org.junit.After;
import org.junit.AfterClass;
@@ -39,6 +42,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -88,7 +92,9 @@ public abstract class CatalogTestBase {
if (catalog.tableExists(path4)) {
catalog.dropTable(path4, true);
}
-
+ if (catalog.functionExists(path1)) {
+ catalog.dropFunction(path1, true);
+ }
if (catalog.databaseExists(db1)) {
catalog.dropDatabase(db1, true);
}
@@ -595,6 +601,134 @@ public abstract class CatalogTestBase {
assertTrue(catalog.tableExists(path3));
}
+ // ------ functions ------
+
+ @Test
+ public void testCreateFunction() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+
+ assertFalse(catalog.functionExists(path1));
+
+ catalog.createFunction(path1, createFunction(), false);
+
+ assertTrue(catalog.functionExists(path1));
+ }
+
+ @Test
+ public void testCreateFunction_DatabaseNotExistException() throws Exception {
+ assertFalse(catalog.databaseExists(db1));
+
+ exception.expect(DatabaseNotExistException.class);
+ exception.expectMessage("Database db1 does not exist in Catalog");
+ catalog.createFunction(path1, createFunction(), false);
+ }
+
+ @Test
+ public void testCreateFunction_FunctionAlreadyExistException() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createFunction(path1, createFunction(), false);
+
+ assertTrue(catalog.functionExists(path1));
+
+ // test 'ignoreIfExist' flag
+ catalog.createFunction(path1, createAnotherFunction(), true);
+
+ exception.expect(FunctionAlreadyExistException.class);
+ exception.expectMessage("Function db1.t1 already exists in Catalog");
+ catalog.createFunction(path1, createFunction(), false);
+ }
+
+ @Test
+ public void testAlterFunction() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+
+ CatalogFunction func = createFunction();
+ catalog.createFunction(path1, func, false);
+
+ checkEquals(func, catalog.getFunction(path1));
+
+ CatalogFunction newFunc = createAnotherFunction();
+ catalog.alterFunction(path1, newFunc, false);
+ CatalogFunction actual = catalog.getFunction(path1);
+
+ assertFalse(func.getClassName().equals(actual.getClassName()));
+ checkEquals(newFunc, actual);
+ }
+
+ @Test
+ public void testAlterFunction_FunctionNotExistException() throws Exception {
+ exception.expect(FunctionNotExistException.class);
+ exception.expectMessage("Function db1.nonexist does not exist in Catalog");
+ catalog.alterFunction(nonExistObjectPath, createFunction(), false);
+ }
+
+ @Test
+ public void testAlterFunction_FunctionNotExist_ignored() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.alterFunction(nonExistObjectPath, createFunction(), true);
+
+ assertFalse(catalog.functionExists(nonExistObjectPath));
+ }
+
+ @Test
+ public void testListFunctions() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+
+ CatalogFunction func = createFunction();
+ catalog.createFunction(path1, func, false);
+
+ assertEquals(path1.getObjectName(), catalog.listFunctions(db1).get(0));
+ }
+
+ @Test
+ public void testListFunctions_DatabaseNotExistException() throws Exception{
+ exception.expect(DatabaseNotExistException.class);
+ exception.expectMessage("Database db1 does not exist in Catalog");
+ catalog.listFunctions(db1);
+ }
+
+ @Test
+ public void testGetFunction_FunctionNotExistException() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+
+ exception.expect(FunctionNotExistException.class);
+ exception.expectMessage("Function db1.nonexist does not exist in Catalog");
+ catalog.getFunction(nonExistObjectPath);
+ }
+
+ @Test
+ public void testGetFunction_FunctionNotExistException_NoDb() throws Exception {
+ exception.expect(FunctionNotExistException.class);
+ exception.expectMessage("Function db1.nonexist does not exist in Catalog");
+ catalog.getFunction(nonExistObjectPath);
+ }
+
+ @Test
+ public void testDropFunction() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createFunction(path1, createFunction(), false);
+
+ assertTrue(catalog.functionExists(path1));
+
+ catalog.dropFunction(path1, false);
+
+ assertFalse(catalog.functionExists(path1));
+ }
+
+ @Test
+ public void testDropFunction_FunctionNotExistException() throws Exception {
+ exception.expect(FunctionNotExistException.class);
+ exception.expectMessage("Function non.exist does not exist in Catalog");
+ catalog.dropFunction(nonExistDbPath, false);
+ }
+
+ @Test
+ public void testDropFunction_FunctionNotExist_ignored() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.dropFunction(nonExistObjectPath, true);
+ catalog.dropDatabase(db1, false);
+ }
+
// ------ utilities ------
/**
@@ -660,6 +794,20 @@ public abstract class CatalogTestBase {
*/
public abstract CatalogView createAnotherView();
+ /**
+ * Create a CatalogFunction instance by specific catalog implementation.
+ *
+ * @return a CatalogFunction instance
+ */
+ protected abstract CatalogFunction createFunction();
+
+ /**
+ * Create another CatalogFunction instance by specific catalog implementation.
+ *
+ * @return another CatalogFunction instance
+ */
+ protected abstract CatalogFunction createAnotherFunction();
+
protected TableSchema createTableSchema() {
return new TableSchema(
new String[] {"first", "second", "third"},
@@ -698,6 +846,54 @@ public abstract class CatalogTestBase {
}};
}
+ /**
+ * A Flink function for test.
+ */
+ public static class MyScalarFunction extends ScalarFunction {
+ public Integer eval(Integer i) {
+ return i + 1;
+ }
+ }
+
+ /**
+ * Another Flink function for test.
+ */
+ public static class MyOtherScalarFunction extends ScalarFunction {
+ public String eval(Integer i) {
+ return String.valueOf(i);
+ }
+ }
+
+ /**
+ * Test function used to assert on function of different class.
+ */
+ public static class TestFunction implements CatalogFunction {
+ @Override
+ public String getClassName() {
+ return null;
+ }
+
+ @Override
+ public Map<String, String> getProperties() {
+ return null;
+ }
+
+ @Override
+ public CatalogFunction copy() {
+ return null;
+ }
+
+ @Override
+ public Optional<String> getDescription() {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<String> getDetailedDescription() {
+ return Optional.empty();
+ }
+ }
+
// ------ equality check utils ------
// Can be overriden by sub test class
@@ -716,4 +912,9 @@ public abstract class CatalogTestBase {
assertEquals(v1.getOriginalQuery(), v2.getOriginalQuery());
assertEquals(v1.getExpandedQuery(), v2.getExpandedQuery());
}
+
+ protected void checkEquals(CatalogFunction f1, CatalogFunction f2) {
+ assertEquals(f1.getClassName(), f2.getClassName());
+ assertEquals(f1.getProperties(), f2.getProperties());
+ }
}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
index f080bdb..2b98091 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
@@ -50,11 +50,6 @@ public class CatalogTestUtil {
assertEquals(d1.getProperties(), d2.getProperties());
}
- public static void checkEquals(CatalogFunction f1, CatalogFunction f2) {
- assertEquals(f1.getClassName(), f2.getClassName());
- assertEquals(f1.getProperties(), f2.getProperties());
- }
-
public static void checkEquals(CatalogPartition p1, CatalogPartition p2) {
assertEquals(p1.getProperties(), p2.getProperties());
}