You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/05/21 22:49:09 UTC

[GitHub] [flink] xuefuz commented on a change in pull request #8503: [FLINK-12241][hive] Support Flink functions in catalog function APIs of HiveCatalog

xuefuz commented on a change in pull request #8503: [FLINK-12241][hive] Support Flink functions in catalog function APIs of HiveCatalog
URL: https://github.com/apache/flink/pull/8503#discussion_r286254255
 
 

 ##########
 File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##########
 @@ -652,54 +661,170 @@ public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partit
 	@Override
 	public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)
 			throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException {
-		throw new UnsupportedOperationException();
+		checkNotNull(functionPath, "functionPath cannot be null");
+		checkNotNull(function, "function cannot be null");
+
+		Function hiveFunction;
+		if (function instanceof GenericCatalogFunction) {
+			hiveFunction = instantiateHiveFunction(functionPath, (GenericCatalogFunction) function);
+		} else {
+			throw new CatalogException(
+				String.format("Unsupported catalog function type %s", function.getClass().getName()));
+		}
+
+		try {
+			client.createFunction(hiveFunction);
+		} catch (NoSuchObjectException e) {
+			throw new DatabaseNotExistException(catalogName, functionPath.getDatabaseName(), e);
+		} catch (AlreadyExistsException e) {
+			if (!ignoreIfExists) {
+				throw new FunctionAlreadyExistException(catalogName, functionPath, e);
+			}
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to create function %s", functionPath.getFullName()), e);
+		}
 	}
 
 	@Override
 	public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists)
 			throws FunctionNotExistException, CatalogException {
-		throw new UnsupportedOperationException();
+		checkNotNull(functionPath, "functionPath cannot be null");
+		checkNotNull(newFunction, "newFunction cannot be null");
+
+		try {
+			CatalogFunction existingFunction = getFunction(functionPath);
+
+			if (existingFunction.getClass() != newFunction.getClass()) {
+				throw new CatalogException(
+					String.format("Function types don't match. Existing function is '%s' and new function is '%s'.",
+						existingFunction.getClass().getName(), newFunction.getClass().getName()));
+			}
+
+			Function hiveFunction;
+			if (existingFunction instanceof GenericCatalogFunction && newFunction instanceof GenericCatalogFunction) {
+					hiveFunction = instantiateHiveFunction(functionPath, (GenericCatalogFunction) newFunction);
+			} else {
+				throw new CatalogException(
+					String.format("Unsupported catalog function type %s", newFunction.getClass().getName()));
+			}
+
+			client.alterFunction(
+				functionPath.getDatabaseName(),
+				functionPath.getObjectName(),
+				hiveFunction);
+		} catch (FunctionNotExistException e) {
+			if (!ignoreIfNotExists) {
+				throw e;
+			}
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to alter function %s", functionPath.getFullName()), e);
+		}
 	}
 
 	@Override
 	public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
 			throws FunctionNotExistException, CatalogException {
-		throw new UnsupportedOperationException();
+		checkNotNull(functionPath, "functionPath cannot be null");
+
+		try {
+			client.dropFunction(functionPath.getDatabaseName(), functionPath.getObjectName());
+		} catch (NoSuchObjectException e) {
+			if (!ignoreIfNotExists) {
+				throw new FunctionNotExistException(catalogName, functionPath, e);
+			}
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to drop function %s", functionPath.getFullName()), e);
+		}
 	}
 
 	@Override
-	public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
+	public List<String> listFunctions(String dbName) throws DatabaseNotExistException, CatalogException {
 
 Review comment:
   1. To be consistent, dbName -> databaseName
   2. Check dbName

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services