You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/04/20 16:00:28 UTC

[GitHub] [flink] dawidwys commented on a change in pull request #11785: [FLINK-17206][table] refactor function catalog to support delayed UDF initialization.

dawidwys commented on a change in pull request #11785:
URL: https://github.com/apache/flink/pull/11785#discussion_r411497262



##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
##########
@@ -622,4 +612,90 @@ private void registerTempCatalogFunction(ObjectIdentifier oi, FunctionDefinition
 			Optional.of(new FunctionLookup.Result(FunctionIdentifier.of(funcName), fd)
 		)).orElseGet(() -> resolvePreciseFunctionReference(oi));
 	}
+
+	private void validateAndPrepareFunction(CatalogFunction function) throws ClassNotFoundException {
+		// If the input is instance of UserDefinedFunction, it means it uses the new type inference.
+		// In this situation the UDF have not been validated and cleaned, so we need to validate it
+		// and clean its closure here.
+		// If the input is instance of `ScalarFunctionDefinition`, `TableFunctionDefinition` and so on,
+		// it means it uses the old type inference. We assume that they have been validated before being
+		// wrapped.
+		if (function instanceof InlineCatalogFunction &&
+			((InlineCatalogFunction) function).getDefinition() instanceof UserDefinedFunction) {
+
+			FunctionDefinition definition = ((InlineCatalogFunction) function).getDefinition();
+			UserDefinedFunctionHelper.prepareInstance(config, (UserDefinedFunction) definition);
+		} else if (function.getFunctionLanguage() == FunctionLanguage.JAVA) {
+			ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+			UserDefinedFunctionHelper.validateClass(
+				(Class<? extends UserDefinedFunction>) contextClassLoader
+					.loadClass(function.getClassName()));
+		}
+	}
+
+	private FunctionDefinition getFunctionDefinition(String name, CatalogFunction function) {
+		if (function instanceof InlineCatalogFunction) {
+			// The instantiated UDFs have been validated and cleaned when registering, just return them
+			// directly.
+			return ((InlineCatalogFunction) function).getDefinition();
+		}
+		// Currently the uninstantiated functions are all from sql and catalog that use the old type inference,
+		// so using FunctionDefinitionUtil to instantiate them and wrap them with `ScalarFunctionDefinition`,
+		// `TableFunctionDefinition`, etc. If the new type inference is fully functional, this should be
+		// changed to use `UserDefinedFunctionHelper#instantiateFunction`.
+		return FunctionDefinitionUtil.createFunctionDefinition(name, function.getClassName());
+	}
+
+	/**
+	 * The CatalogFunction which holds a instantiated UDF.
+	 */
+	private static class InlineCatalogFunction implements CatalogFunction {
+
+		private final FunctionDefinition definition;
+
+		InlineCatalogFunction(FunctionDefinition definition) {
+			this.definition = definition;
+		}
+
+		@Override
+		public String getClassName() {
+			// Not all instantiated UDFs have a class name, such as Python Lambda UDF. Even if the UDF
+			// has a class name, there is no guarantee that the new UDF object constructed from the
+			// class name is the same as the UDF held by this object. To reduce the chance of making
+			// mistakes, UnsupportedOperationException is thrown here.
+			throw new UnsupportedOperationException(
+				"This CatalogFunction is a InlineCatalogFunction. This method should not be called.");
+		}
+
+		@Override
+		public CatalogFunction copy() {
+			return new InlineCatalogFunction(definition);
+		}
+
+		@Override
+		public Optional<String> getDescription() {
+			return Optional.empty();
+		}
+
+		@Override
+		public Optional<String> getDetailedDescription() {
+			return Optional.empty();
+		}
+
+		@Override
+		public boolean isGeneric() {
+			throw new UnsupportedOperationException(
+				"This CatalogFunction is a InlineCatalogFunction. This method should not be called.");
+		}
+
+		@Override
+		public FunctionLanguage getFunctionLanguage() {
+			throw new UnsupportedOperationException(

Review comment:
       Can't we support this method? I think it should be rather straightforward to pass the language from which the function originated. For now it will always be `JAVA` if I am correct.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org