You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/01/24 15:40:25 UTC

[GitHub] [flink] twalthr opened a new pull request #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment

twalthr opened a new pull request #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment
URL: https://github.com/apache/flink/pull/10942
 
 
   ## What is the purpose of the change
   
   This updates all catalog related interfaces to FLIP-65. It also continues FLIP-64 by exposing new interfaces in table environment for registering functions of temporary/system/catalog kind. Furthermore, it adds early class validation to the function catalog.
   
   The first functions will be fully functional once the code generator has been updated.
   
   ## Brief change log
   
   - Update of `TableEnvironment`
   - Update of `FunctionCatalog`
   - Update of `FunctionCatalogOperatorTable`
   
   ## Verifying this change
   
   - New `FunctionCatalogTest` cases
   - New `UserDefinedFunctionHelperTest` case
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): yes
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? JavaDocs
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr commented on a change in pull request #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment
URL: https://github.com/apache/flink/pull/10942#discussion_r372282993
 
 

 ##########
 File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 ##########
 @@ -273,6 +274,72 @@ public void registerFunction(String name, ScalarFunction function) {
 			function);
 	}
 
+	@Override
+	public void createTemporarySystemFunction(String name, Class<? extends UserDefinedFunction> functionClass) {
+		final UserDefinedFunction functionInstance = UserDefinedFunctionHelper.instantiateFunction(functionClass);
 
 Review comment:
   Yes, that makes sense. We can do this while integrating the `FunctionCatalog` into `CatalogManager`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dawidwys commented on a change in pull request #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment
URL: https://github.com/apache/flink/pull/10942#discussion_r371791193
 
 

 ##########
 File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
 ##########
 @@ -97,7 +97,13 @@
 
 		@Override
 		public List<ResolvedExpression> visit(UnresolvedCallExpression unresolvedCall) {
-			final FunctionDefinition definition = prepareUserDefinedFunction(unresolvedCall.getFunctionDefinition());
+			final FunctionDefinition definition;
+			// clean functions that were not registered in a catalog
+			if (!unresolvedCall.getFunctionIdentifier().isPresent()) {
 
 Review comment:
   Just a comment. The inline functions are extremely tricky in the two types stacks setup imo. They always work with the old stack right?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr commented on a change in pull request #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment
URL: https://github.com/apache/flink/pull/10942#discussion_r372289321
 
 

 ##########
 File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
 ##########
 @@ -238,36 +244,40 @@ private ResolvedExpression runLegacyTypeInference(
 		/**
 		 * Validates and cleans an inline, unregistered {@link UserDefinedFunction}.
 		 */
-		private FunctionDefinition prepareUserDefinedFunction(FunctionDefinition definition) {
+		private FunctionDefinition prepareInlineUserDefinedFunction(FunctionDefinition definition) {
 			if (definition instanceof ScalarFunctionDefinition) {
 				final ScalarFunctionDefinition sf = (ScalarFunctionDefinition) definition;
-				UserDefinedFunctionHelper.prepareFunction(resolutionContext.configuration(), sf.getScalarFunction());
+				UserDefinedFunctionHelper.prepareInstance(resolutionContext.configuration(), sf.getScalarFunction());
 				return new ScalarFunctionDefinition(
 					sf.getName(),
 					sf.getScalarFunction());
 			} else if (definition instanceof TableFunctionDefinition) {
 				final TableFunctionDefinition tf = (TableFunctionDefinition) definition;
-				UserDefinedFunctionHelper.prepareFunction(resolutionContext.configuration(), tf.getTableFunction());
+				UserDefinedFunctionHelper.prepareInstance(resolutionContext.configuration(), tf.getTableFunction());
 				return new TableFunctionDefinition(
 					tf.getName(),
 					tf.getTableFunction(),
 					tf.getResultType());
 			} else if (definition instanceof AggregateFunctionDefinition) {
 				final AggregateFunctionDefinition af = (AggregateFunctionDefinition) definition;
-				UserDefinedFunctionHelper.prepareFunction(resolutionContext.configuration(), af.getAggregateFunction());
+				UserDefinedFunctionHelper.prepareInstance(resolutionContext.configuration(), af.getAggregateFunction());
 				return new AggregateFunctionDefinition(
 					af.getName(),
 					af.getAggregateFunction(),
 					af.getResultTypeInfo(),
 					af.getAccumulatorTypeInfo());
 			} else if (definition instanceof TableAggregateFunctionDefinition) {
 				final TableAggregateFunctionDefinition taf = (TableAggregateFunctionDefinition) definition;
-				UserDefinedFunctionHelper.prepareFunction(resolutionContext.configuration(), taf.getTableAggregateFunction());
+				UserDefinedFunctionHelper.prepareInstance(resolutionContext.configuration(), taf.getTableAggregateFunction());
 				return new TableAggregateFunctionDefinition(
 					taf.getName(),
 					taf.getTableAggregateFunction(),
 					taf.getResultTypeInfo(),
 					taf.getAccumulatorTypeInfo());
+			} else if (definition instanceof UserDefinedFunction) {
+				UserDefinedFunctionHelper.prepareInstance(
 
 Review comment:
   Not yet :(

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dawidwys commented on a change in pull request #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment
URL: https://github.com/apache/flink/pull/10942#discussion_r371888078
 
 

 ##########
 File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
 ##########
 @@ -81,8 +85,283 @@ public void setPlannerTypeInferenceUtil(PlannerTypeInferenceUtil plannerTypeInfe
 		this.plannerTypeInferenceUtil = plannerTypeInferenceUtil;
 	}
 
+	/**
+	 * Registers a temporary system function.
+	 */
+	public void registerTemporarySystemFunction(
+			String name,
+			FunctionDefinition definition,
+			boolean ignoreIfExists) {
+		final String normalizedName = FunctionIdentifier.normalizeName(name);
+
+		if (definition instanceof UserDefinedFunction) {
+			try {
+				UserDefinedFunctionHelper.prepareInstance(config, (UserDefinedFunction) definition);
+			} catch (Throwable t) {
+				throw new ValidationException(
+					String.format(
+						"Could not register temporary system function '%s' due to implementation errors.",
+						name),
+					t);
+			}
+		}
+
+		if (!tempSystemFunctions.containsKey(normalizedName)) {
+			tempSystemFunctions.put(normalizedName, definition);
+		} else if (!ignoreIfExists) {
+			throw new ValidationException(
+				String.format(
+					"Could not register temporary system function. A function named '%s' does already exist.",
+					name));
+		}
+	}
+
+	/**
+	 * Drops a temporary system function. Returns true if a function was dropped.
+	 */
+	public boolean dropTemporarySystemFunction(
+			String name,
+			boolean ignoreIfNotExist) {
+		final String normalizedName = FunctionIdentifier.normalizeName(name);
+		final FunctionDefinition definition = tempSystemFunctions.remove(normalizedName);
+
+		if (definition == null && !ignoreIfNotExist) {
+			throw new ValidationException(
+				String.format(
+					"Could not drop temporary system function. A function named '%s' doesn't exist.",
+					name));
+		}
+
+		return definition != null;
+	}
+
+	/**
+	 * Registers a temporary catalog function.
+	 */
+	public void registerTemporaryCatalogFunction(
+			UnresolvedIdentifier unresolvedIdentifier,
 
 Review comment:
   We're not being too consistent with the type of identifier that the `FunctionCatalog/CatalogManager` accepts (I'm guilty of that myself).
   
   Some methods accept `UnresolvedIdentifier` e.g. `FunctionCatalog#registerTemporaryCatalogFunction`, `CatalogManager#dropTemporaryView`. Some resolved `ObjectIdentifier` e.g. `CatalogManager#createTemporaryTable`, `CatalogManager#createTable`.
   
   I am not sure which one should we prefer. If we go with the `UnresolvedIdentifier` the benefit is that we always qualify in the `Catalog*`. The downside is that we would no longer qualify in the `*Operations`, (e.g. CreateTableOperation etc.), whereas we said that all Operations should be fully resolved...
   
   We can create a follow up for that.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment
URL: https://github.com/apache/flink/pull/10942#issuecomment-578198793
 
 
   <!--
   Meta data
   Hash:67a8e490524c51740ea2b06b295189c3d19ca358 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/145962993 TriggerType:PUSH TriggerID:67a8e490524c51740ea2b06b295189c3d19ca358
   Hash:67a8e490524c51740ea2b06b295189c3d19ca358 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4610 TriggerType:PUSH TriggerID:67a8e490524c51740ea2b06b295189c3d19ca358
   Hash:67a8e490524c51740ea2b06b295189c3d19ca358 Status:CANCELED URL:https://travis-ci.com/flink-ci/flink/builds/145962993 TriggerType:MANUAL TriggerID:579706265
   Hash:67a8e490524c51740ea2b06b295189c3d19ca358 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4610 TriggerType:MANUAL TriggerID:579706265
   Hash:d0b30c6c17d46b19475e93821369a3d96b2a17b9 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/146576629 TriggerType:PUSH TriggerID:d0b30c6c17d46b19475e93821369a3d96b2a17b9
   Hash:d0b30c6c17d46b19475e93821369a3d96b2a17b9 Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4650 TriggerType:PUSH TriggerID:d0b30c6c17d46b19475e93821369a3d96b2a17b9
   -->
   ## CI report:
   
   * 67a8e490524c51740ea2b06b295189c3d19ca358 Travis: [CANCELED](https://travis-ci.com/flink-ci/flink/builds/145962993) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4610) 
   * d0b30c6c17d46b19475e93821369a3d96b2a17b9 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/146576629) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4650) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment
URL: https://github.com/apache/flink/pull/10942#issuecomment-578198793
 
 
   <!--
   Meta data
   Hash:67a8e490524c51740ea2b06b295189c3d19ca358 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/145962993 TriggerType:PUSH TriggerID:67a8e490524c51740ea2b06b295189c3d19ca358
   Hash:67a8e490524c51740ea2b06b295189c3d19ca358 Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4610 TriggerType:PUSH TriggerID:67a8e490524c51740ea2b06b295189c3d19ca358
   -->
   ## CI report:
   
   * 67a8e490524c51740ea2b06b295189c3d19ca358 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/145962993) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4610) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr commented on a change in pull request #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment
URL: https://github.com/apache/flink/pull/10942#discussion_r372303421
 
 

 ##########
 File path: flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/UserDefinedFunctionHelperTest.java
 ##########
 @@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.util.Collector;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link UserDefinedFunctionHelper}.
+ */
+@RunWith(Parameterized.class)
+@SuppressWarnings("unused")
+public class UserDefinedFunctionHelperTest {
+
+	@Parameterized.Parameters
+	public static List<TestSpec> testData() {
+		return Arrays.asList(
+			TestSpec
+				.forClass(ValidScalarFunction.class)
+				.expectSuccess(),
+
+			TestSpec
+				.forInstance(new ValidScalarFunction())
+				.expectSuccess(),
+
+			TestSpec
+				.forClass(PrivateScalarFunction.class)
+				.expectErrorMessage(
+					"Function class '" + PrivateScalarFunction.class.getName() + "' is not public."),
+
+			TestSpec
+				.forClass(MissingImplementationScalarFunction.class)
+				.expectErrorMessage(
+					"Function class '" + MissingImplementationScalarFunction.class.getName() +
+						"' does not implement a method named 'eval'."),
+
+			TestSpec
+				.forClass(PrivateMethodScalarFunction.class)
+				.expectErrorMessage(
+					"Method 'eval' of function class '" + PrivateMethodScalarFunction.class.getName() +
+						"' is not public."),
+
+			TestSpec
+				.forInstance(new ValidTableAggregateFunction())
+				.expectSuccess(),
+
+			TestSpec
+				.forInstance(new MissingEmitTableAggregateFunction())
+				.expectErrorMessage(
+					"Function class '" + MissingEmitTableAggregateFunction.class.getName() +
+						"' does not implement a method named 'emitUpdateWithRetract' or 'emitValue'."),
+
+			TestSpec
+				.forInstance(new ValidTableFunction())
+				.expectSuccess(),
+
+			TestSpec
+				.forInstance(new ParameterizedTableFunction(12))
+				.expectSuccess(),
+
+			TestSpec
+				.forClass(ParameterizedTableFunction.class)
+				.expectErrorMessage(
+					"Function class '" + ParameterizedTableFunction.class.getName() +
+						"' must have a public default constructor."),
+
+			TestSpec
+				.forClass(HierarchicalTableAggregateFunction.class)
+				.expectSuccess()
+		);
+	}
+
+	@Parameter
+	public TestSpec testSpec;
+
+	@Rule
+	public ExpectedException thrown = ExpectedException.none();
+
+	@Test
+	public void testInstantiation() {
+		if (testSpec.functionClass != null) {
+			if (testSpec.expectedErrorMessage != null) {
+				thrown.expect(ValidationException.class);
+				thrown.expectMessage(testSpec.expectedErrorMessage);
+			}
+			assertThat(
+				UserDefinedFunctionHelper.instantiateFunction(testSpec.functionClass),
+				notNullValue());
+		}
+	}
+
+	@Test
+	public void testValidation() {
+		if (testSpec.expectedErrorMessage != null) {
+			thrown.expect(ValidationException.class);
+			thrown.expectMessage(testSpec.expectedErrorMessage);
+		}
+		if (testSpec.functionClass != null) {
+			UserDefinedFunctionHelper.validateClass(testSpec.functionClass);
+		} else if (testSpec.functionInstance != null) {
+			UserDefinedFunctionHelper.prepareInstance(new TableConfig(), testSpec.functionInstance);
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Test utilities
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Test specification shared with the Scala tests.
 
 Review comment:
   nope copy paste error
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr commented on issue #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment

Posted by GitBox <gi...@apache.org>.
twalthr commented on issue #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment
URL: https://github.com/apache/flink/pull/10942#issuecomment-579706265
 
 
   @flinkbot run travis

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot commented on issue #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment

Posted by GitBox <gi...@apache.org>.
flinkbot commented on issue #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment
URL: https://github.com/apache/flink/pull/10942#issuecomment-578183603
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 67a8e490524c51740ea2b06b295189c3d19ca358 (Fri Jan 24 15:43:15 UTC 2020)
   
   **Warnings:**
    * **1 pom.xml files were touched**: Check for build and licensing issues.
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr closed pull request #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment

Posted by GitBox <gi...@apache.org>.
twalthr closed pull request #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment
URL: https://github.com/apache/flink/pull/10942
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr commented on issue #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment

Posted by GitBox <gi...@apache.org>.
twalthr commented on issue #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment
URL: https://github.com/apache/flink/pull/10942#issuecomment-579756527
 
 
   @dawidwys I addressed your feedback and Travis gives green light. If you have no objections, I will merge this. Can you take care of opening follow up issues? Thanks.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dawidwys commented on a change in pull request #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment
URL: https://github.com/apache/flink/pull/10942#discussion_r373963373
 
 

 ##########
 File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 ##########
 @@ -273,6 +274,72 @@ public void registerFunction(String name, ScalarFunction function) {
 			function);
 	}
 
+	@Override
+	public void createTemporarySystemFunction(String name, Class<? extends UserDefinedFunction> functionClass) {
+		final UserDefinedFunction functionInstance = UserDefinedFunctionHelper.instantiateFunction(functionClass);
 
 Review comment:
   Created https://issues.apache.org/jira/browse/FLINK-15860

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment
URL: https://github.com/apache/flink/pull/10942#issuecomment-578198793
 
 
   <!--
   Meta data
   Hash:67a8e490524c51740ea2b06b295189c3d19ca358 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/145962993 TriggerType:PUSH TriggerID:67a8e490524c51740ea2b06b295189c3d19ca358
   Hash:67a8e490524c51740ea2b06b295189c3d19ca358 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4610 TriggerType:PUSH TriggerID:67a8e490524c51740ea2b06b295189c3d19ca358
   Hash:67a8e490524c51740ea2b06b295189c3d19ca358 Status:PENDING URL:https://travis-ci.com/flink-ci/flink/builds/145962993 TriggerType:MANUAL TriggerID:579706265
   Hash:d0b30c6c17d46b19475e93821369a3d96b2a17b9 Status:UNKNOWN URL:TBD TriggerType:PUSH TriggerID:d0b30c6c17d46b19475e93821369a3d96b2a17b9
   -->
   ## CI report:
   
   * 67a8e490524c51740ea2b06b295189c3d19ca358 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/145962993) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4610) 
   * d0b30c6c17d46b19475e93821369a3d96b2a17b9 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr commented on a change in pull request #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment
URL: https://github.com/apache/flink/pull/10942#discussion_r372289180
 
 

 ##########
 File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
 ##########
 @@ -97,7 +97,13 @@
 
 		@Override
 		public List<ResolvedExpression> visit(UnresolvedCallExpression unresolvedCall) {
-			final FunctionDefinition definition = prepareUserDefinedFunction(unresolvedCall.getFunctionDefinition());
+			final FunctionDefinition definition;
+			// clean functions that were not registered in a catalog
+			if (!unresolvedCall.getFunctionIdentifier().isPresent()) {
 
 Review comment:
   https://issues.apache.org/jira/browse/FLINK-15804

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dawidwys commented on a change in pull request #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment
URL: https://github.com/apache/flink/pull/10942#discussion_r371791374
 
 

 ##########
 File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
 ##########
 @@ -238,36 +244,40 @@ private ResolvedExpression runLegacyTypeInference(
 		/**
 		 * Validates and cleans an inline, unregistered {@link UserDefinedFunction}.
 		 */
-		private FunctionDefinition prepareUserDefinedFunction(FunctionDefinition definition) {
+		private FunctionDefinition prepareInlineUserDefinedFunction(FunctionDefinition definition) {
 			if (definition instanceof ScalarFunctionDefinition) {
 				final ScalarFunctionDefinition sf = (ScalarFunctionDefinition) definition;
-				UserDefinedFunctionHelper.prepareFunction(resolutionContext.configuration(), sf.getScalarFunction());
+				UserDefinedFunctionHelper.prepareInstance(resolutionContext.configuration(), sf.getScalarFunction());
 				return new ScalarFunctionDefinition(
 					sf.getName(),
 					sf.getScalarFunction());
 			} else if (definition instanceof TableFunctionDefinition) {
 				final TableFunctionDefinition tf = (TableFunctionDefinition) definition;
-				UserDefinedFunctionHelper.prepareFunction(resolutionContext.configuration(), tf.getTableFunction());
+				UserDefinedFunctionHelper.prepareInstance(resolutionContext.configuration(), tf.getTableFunction());
 				return new TableFunctionDefinition(
 					tf.getName(),
 					tf.getTableFunction(),
 					tf.getResultType());
 			} else if (definition instanceof AggregateFunctionDefinition) {
 				final AggregateFunctionDefinition af = (AggregateFunctionDefinition) definition;
-				UserDefinedFunctionHelper.prepareFunction(resolutionContext.configuration(), af.getAggregateFunction());
+				UserDefinedFunctionHelper.prepareInstance(resolutionContext.configuration(), af.getAggregateFunction());
 				return new AggregateFunctionDefinition(
 					af.getName(),
 					af.getAggregateFunction(),
 					af.getResultTypeInfo(),
 					af.getAccumulatorTypeInfo());
 			} else if (definition instanceof TableAggregateFunctionDefinition) {
 				final TableAggregateFunctionDefinition taf = (TableAggregateFunctionDefinition) definition;
-				UserDefinedFunctionHelper.prepareFunction(resolutionContext.configuration(), taf.getTableAggregateFunction());
+				UserDefinedFunctionHelper.prepareInstance(resolutionContext.configuration(), taf.getTableAggregateFunction());
 				return new TableAggregateFunctionDefinition(
 					taf.getName(),
 					taf.getTableAggregateFunction(),
 					taf.getResultTypeInfo(),
 					taf.getAccumulatorTypeInfo());
+			} else if (definition instanceof UserDefinedFunction) {
+				UserDefinedFunctionHelper.prepareInstance(
 
 Review comment:
   Just a question. Is it possible to end up here from the API?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment
URL: https://github.com/apache/flink/pull/10942#issuecomment-578198793
 
 
   <!--
   Meta data
   Hash:67a8e490524c51740ea2b06b295189c3d19ca358 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/145962993 TriggerType:PUSH TriggerID:67a8e490524c51740ea2b06b295189c3d19ca358
   Hash:67a8e490524c51740ea2b06b295189c3d19ca358 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4610 TriggerType:PUSH TriggerID:67a8e490524c51740ea2b06b295189c3d19ca358
   -->
   ## CI report:
   
   * 67a8e490524c51740ea2b06b295189c3d19ca358 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/145962993) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4610) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr commented on a change in pull request #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment
URL: https://github.com/apache/flink/pull/10942#discussion_r372306128
 
 

 ##########
 File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
 ##########
 @@ -81,8 +85,283 @@ public void setPlannerTypeInferenceUtil(PlannerTypeInferenceUtil plannerTypeInfe
 		this.plannerTypeInferenceUtil = plannerTypeInferenceUtil;
 	}
 
+	/**
+	 * Registers a temporary system function.
+	 */
+	public void registerTemporarySystemFunction(
+			String name,
+			FunctionDefinition definition,
+			boolean ignoreIfExists) {
+		final String normalizedName = FunctionIdentifier.normalizeName(name);
+
+		if (definition instanceof UserDefinedFunction) {
+			try {
+				UserDefinedFunctionHelper.prepareInstance(config, (UserDefinedFunction) definition);
+			} catch (Throwable t) {
+				throw new ValidationException(
+					String.format(
+						"Could not register temporary system function '%s' due to implementation errors.",
+						name),
+					t);
+			}
+		}
+
+		if (!tempSystemFunctions.containsKey(normalizedName)) {
+			tempSystemFunctions.put(normalizedName, definition);
+		} else if (!ignoreIfExists) {
+			throw new ValidationException(
+				String.format(
+					"Could not register temporary system function. A function named '%s' does already exist.",
+					name));
+		}
+	}
+
+	/**
+	 * Drops a temporary system function. Returns true if a function was dropped.
+	 */
+	public boolean dropTemporarySystemFunction(
+			String name,
+			boolean ignoreIfNotExist) {
+		final String normalizedName = FunctionIdentifier.normalizeName(name);
+		final FunctionDefinition definition = tempSystemFunctions.remove(normalizedName);
+
+		if (definition == null && !ignoreIfNotExist) {
+			throw new ValidationException(
+				String.format(
+					"Could not drop temporary system function. A function named '%s' doesn't exist.",
+					name));
+		}
+
+		return definition != null;
+	}
+
+	/**
+	 * Registers a temporary catalog function.
+	 */
+	public void registerTemporaryCatalogFunction(
+			UnresolvedIdentifier unresolvedIdentifier,
 
 Review comment:
   I also observed these inconsistencies. I think the catalog manager can resolve identifiers internally and thus can just expose `UnresolvedIdentifier`. That's why all my newly added methods take the unresolved one.
   
   We said that operation should be fully resolved but that was before the DDL. So things might have changed a bit. Maybe we should distinguish between resolved and unresolved operations. Let's discuss this in a follow up issue.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment
URL: https://github.com/apache/flink/pull/10942#issuecomment-578198793
 
 
   <!--
   Meta data
   Hash:67a8e490524c51740ea2b06b295189c3d19ca358 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/145962993 TriggerType:PUSH TriggerID:67a8e490524c51740ea2b06b295189c3d19ca358
   Hash:67a8e490524c51740ea2b06b295189c3d19ca358 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4610 TriggerType:PUSH TriggerID:67a8e490524c51740ea2b06b295189c3d19ca358
   Hash:67a8e490524c51740ea2b06b295189c3d19ca358 Status:CANCELED URL:https://travis-ci.com/flink-ci/flink/builds/145962993 TriggerType:MANUAL TriggerID:579706265
   Hash:67a8e490524c51740ea2b06b295189c3d19ca358 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4610 TriggerType:MANUAL TriggerID:579706265
   Hash:d0b30c6c17d46b19475e93821369a3d96b2a17b9 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/146576629 TriggerType:PUSH TriggerID:d0b30c6c17d46b19475e93821369a3d96b2a17b9
   Hash:d0b30c6c17d46b19475e93821369a3d96b2a17b9 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4650 TriggerType:PUSH TriggerID:d0b30c6c17d46b19475e93821369a3d96b2a17b9
   -->
   ## CI report:
   
   * 67a8e490524c51740ea2b06b295189c3d19ca358 Travis: [CANCELED](https://travis-ci.com/flink-ci/flink/builds/145962993) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4610) 
   * d0b30c6c17d46b19475e93821369a3d96b2a17b9 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/146576629) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4650) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr commented on a change in pull request #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment
URL: https://github.com/apache/flink/pull/10942#discussion_r372283609
 
 

 ##########
 File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
 ##########
 @@ -97,7 +97,13 @@
 
 		@Override
 		public List<ResolvedExpression> visit(UnresolvedCallExpression unresolvedCall) {
-			final FunctionDefinition definition = prepareUserDefinedFunction(unresolvedCall.getFunctionDefinition());
+			final FunctionDefinition definition;
+			// clean functions that were not registered in a catalog
+			if (!unresolvedCall.getFunctionIdentifier().isPresent()) {
 
 Review comment:
   Yes, this is a shortcoming. I will create a follow up issue for this.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dawidwys commented on a change in pull request #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment
URL: https://github.com/apache/flink/pull/10942#discussion_r371781417
 
 

 ##########
 File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 ##########
 @@ -273,6 +274,72 @@ public void registerFunction(String name, ScalarFunction function) {
 			function);
 	}
 
+	@Override
+	public void createTemporarySystemFunction(String name, Class<? extends UserDefinedFunction> functionClass) {
+		final UserDefinedFunction functionInstance = UserDefinedFunctionHelper.instantiateFunction(functionClass);
 
 Review comment:
   Just as a future reference. I think we should change the `FunctionCatalog` so that it stores temporary functions as `CatalogFunctions` instead of instances of `FunctionDefinition` the same way we store `CatalogTables` for temporary tables.
   
   Shall we create a follow-up issue for it?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment
URL: https://github.com/apache/flink/pull/10942#issuecomment-578198793
 
 
   <!--
   Meta data
   Hash:67a8e490524c51740ea2b06b295189c3d19ca358 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/145962993 TriggerType:PUSH TriggerID:67a8e490524c51740ea2b06b295189c3d19ca358
   Hash:67a8e490524c51740ea2b06b295189c3d19ca358 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4610 TriggerType:PUSH TriggerID:67a8e490524c51740ea2b06b295189c3d19ca358
   Hash:67a8e490524c51740ea2b06b295189c3d19ca358 Status:CANCELED URL:https://travis-ci.com/flink-ci/flink/builds/145962993 TriggerType:MANUAL TriggerID:579706265
   Hash:67a8e490524c51740ea2b06b295189c3d19ca358 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4610 TriggerType:MANUAL TriggerID:579706265
   Hash:d0b30c6c17d46b19475e93821369a3d96b2a17b9 Status:PENDING URL:https://travis-ci.com/flink-ci/flink/builds/146576629 TriggerType:PUSH TriggerID:d0b30c6c17d46b19475e93821369a3d96b2a17b9
   Hash:d0b30c6c17d46b19475e93821369a3d96b2a17b9 Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4650 TriggerType:PUSH TriggerID:d0b30c6c17d46b19475e93821369a3d96b2a17b9
   -->
   ## CI report:
   
   * 67a8e490524c51740ea2b06b295189c3d19ca358 Travis: [CANCELED](https://travis-ci.com/flink-ci/flink/builds/145962993) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4610) 
   * d0b30c6c17d46b19475e93821369a3d96b2a17b9 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/146576629) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4650) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr commented on a change in pull request #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment
URL: https://github.com/apache/flink/pull/10942#discussion_r372289711
 
 

 ##########
 File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/UserDefinedFunctionHelper.java
 ##########
 @@ -147,39 +157,156 @@
 	}
 
 	/**
-	 * Prepares a {@link UserDefinedFunction} for usage in the API.
+	 * Instantiates a {@link UserDefinedFunction} assuming a default constructor.
 	 */
-	public static void prepareFunction(TableConfig config, UserDefinedFunction function) {
-		if (function instanceof TableFunction) {
-			UserDefinedFunctionHelper.validateNotSingleton(function.getClass());
+	public static UserDefinedFunction instantiateFunction(Class<? extends UserDefinedFunction> functionClass) {
+		validateClass(functionClass, true);
+		try {
+			return functionClass.newInstance();
+		} catch (Exception e) {
+			throw new ValidationException(
+				String.format("Cannot instantiate user-defined function class '%s'.", functionClass.getName()),
+				e);
 		}
-		UserDefinedFunctionHelper.validateInstantiation(function.getClass());
-		UserDefinedFunctionHelper.cleanFunction(config, function);
 	}
 
 	/**
-	 * Checks if a user-defined function can be easily instantiated.
+	 * Prepares a {@link UserDefinedFunction} instance for usage in the API.
 	 */
-	private static void validateInstantiation(Class<?> clazz) {
-		if (!InstantiationUtil.isPublic(clazz)) {
-			throw new ValidationException(String.format("Function class %s is not public.", clazz.getCanonicalName()));
-		} else if (!InstantiationUtil.isProperClass(clazz)) {
-			throw new ValidationException(String.format(
-				"Function class %s is no proper class," +
-					" it is either abstract, an interface, or a primitive type.", clazz.getCanonicalName()));
+	public static void prepareInstance(TableConfig config, UserDefinedFunction function) {
+		validateClass(function.getClass(), false);
+		cleanFunction(config, function);
+	}
+
+	/**
+	 * Validates a {@link UserDefinedFunction} class for usage in the API.
+	 *
+	 * <p>Note: This is an initial validation to indicate common errors early. The concrete signature
+	 * validation happens in the code generation when the actual {@link DataType}s for arguments and result
+	 * are known.
+	 */
+	public static void validateClass(Class<? extends UserDefinedFunction> functionClass) {
+		validateClass(functionClass, true);
+	}
+
+	/**
+	 * Validates a {@link UserDefinedFunction} class for usage in the API.
+	 */
+	private static void validateClass(
+			Class<? extends UserDefinedFunction> functionClass,
+			boolean requiresDefaultConstructor) {
+		if (TableFunction.class.isAssignableFrom(functionClass)) {
+			validateNotSingleton(functionClass);
 		}
+		validateInstantiation(functionClass, requiresDefaultConstructor);
+		validateImplementationMethods(functionClass);
 	}
 
 	/**
 	 * Check whether this is a Scala object. Using Scala objects can lead to concurrency issues,
 	 * e.g., due to a shared collector.
 	 */
 	private static void validateNotSingleton(Class<?> clazz) {
-		// TODO it is not a good way to check singleton. Maybe improve it further.
 		if (Arrays.stream(clazz.getFields()).anyMatch(f -> f.getName().equals("MODULE$"))) {
 			throw new ValidationException(String.format(
 				"Function implemented by class %s is a Scala object. This is forbidden because of concurrency" +
-					" problems when using them.", clazz.getCanonicalName()));
+					" problems when using them.", clazz.getName()));
+		}
+	}
+
+	/**
+	 * Validates the implementation methods such as {@code eval()} or {@code accumulate()} depending
+	 * on the {@link UserDefinedFunction} subclass.
+	 *
+	 * <p>This method must be kept in sync with the code generation requirements and the individual
+	 * docs of each function.
+	 */
+	private static void validateImplementationMethods(Class<? extends UserDefinedFunction> functionClass) {
+		if (ScalarFunction.class.isAssignableFrom(functionClass)) {
+			validateImplementationMethod(functionClass, false, false, "eval");
 
 Review comment:
   I think this helper class is a good location. Will update the PR.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dawidwys commented on a change in pull request #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment
URL: https://github.com/apache/flink/pull/10942#discussion_r371863785
 
 

 ##########
 File path: flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/UserDefinedFunctionHelperTest.java
 ##########
 @@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.util.Collector;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link UserDefinedFunctionHelper}.
+ */
+@RunWith(Parameterized.class)
+@SuppressWarnings("unused")
+public class UserDefinedFunctionHelperTest {
+
+	@Parameterized.Parameters
+	public static List<TestSpec> testData() {
+		return Arrays.asList(
+			TestSpec
+				.forClass(ValidScalarFunction.class)
+				.expectSuccess(),
+
+			TestSpec
+				.forInstance(new ValidScalarFunction())
+				.expectSuccess(),
+
+			TestSpec
+				.forClass(PrivateScalarFunction.class)
+				.expectErrorMessage(
+					"Function class '" + PrivateScalarFunction.class.getName() + "' is not public."),
+
+			TestSpec
+				.forClass(MissingImplementationScalarFunction.class)
+				.expectErrorMessage(
+					"Function class '" + MissingImplementationScalarFunction.class.getName() +
+						"' does not implement a method named 'eval'."),
+
+			TestSpec
+				.forClass(PrivateMethodScalarFunction.class)
+				.expectErrorMessage(
+					"Method 'eval' of function class '" + PrivateMethodScalarFunction.class.getName() +
+						"' is not public."),
+
+			TestSpec
+				.forInstance(new ValidTableAggregateFunction())
+				.expectSuccess(),
+
+			TestSpec
+				.forInstance(new MissingEmitTableAggregateFunction())
+				.expectErrorMessage(
+					"Function class '" + MissingEmitTableAggregateFunction.class.getName() +
+						"' does not implement a method named 'emitUpdateWithRetract' or 'emitValue'."),
+
+			TestSpec
+				.forInstance(new ValidTableFunction())
+				.expectSuccess(),
+
+			TestSpec
+				.forInstance(new ParameterizedTableFunction(12))
+				.expectSuccess(),
+
+			TestSpec
+				.forClass(ParameterizedTableFunction.class)
+				.expectErrorMessage(
+					"Function class '" + ParameterizedTableFunction.class.getName() +
+						"' must have a public default constructor."),
+
+			TestSpec
+				.forClass(HierarchicalTableAggregateFunction.class)
+				.expectSuccess()
+		);
+	}
+
+	@Parameter
+	public TestSpec testSpec;
+
+	@Rule
+	public ExpectedException thrown = ExpectedException.none();
+
+	@Test
+	public void testInstantiation() {
+		if (testSpec.functionClass != null) {
+			if (testSpec.expectedErrorMessage != null) {
+				thrown.expect(ValidationException.class);
+				thrown.expectMessage(testSpec.expectedErrorMessage);
+			}
+			assertThat(
+				UserDefinedFunctionHelper.instantiateFunction(testSpec.functionClass),
+				notNullValue());
+		}
+	}
+
+	@Test
+	public void testValidation() {
+		if (testSpec.expectedErrorMessage != null) {
+			thrown.expect(ValidationException.class);
+			thrown.expectMessage(testSpec.expectedErrorMessage);
+		}
+		if (testSpec.functionClass != null) {
+			UserDefinedFunctionHelper.validateClass(testSpec.functionClass);
+		} else if (testSpec.functionInstance != null) {
+			UserDefinedFunctionHelper.prepareInstance(new TableConfig(), testSpec.functionInstance);
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Test utilities
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Test specification shared with the Scala tests.
 
 Review comment:
   nit: Is this comment valid?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot commented on issue #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment

Posted by GitBox <gi...@apache.org>.
flinkbot commented on issue #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment
URL: https://github.com/apache/flink/pull/10942#issuecomment-578198793
 
 
   <!--
   Meta data
   Hash:67a8e490524c51740ea2b06b295189c3d19ca358 Status:UNKNOWN URL:TBD TriggerType:PUSH TriggerID:67a8e490524c51740ea2b06b295189c3d19ca358
   -->
   ## CI report:
   
   * 67a8e490524c51740ea2b06b295189c3d19ca358 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dawidwys commented on a change in pull request #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment
URL: https://github.com/apache/flink/pull/10942#discussion_r371793891
 
 

 ##########
 File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/UserDefinedFunctionHelper.java
 ##########
 @@ -147,39 +157,156 @@
 	}
 
 	/**
-	 * Prepares a {@link UserDefinedFunction} for usage in the API.
+	 * Instantiates a {@link UserDefinedFunction} assuming a default constructor.
 	 */
-	public static void prepareFunction(TableConfig config, UserDefinedFunction function) {
-		if (function instanceof TableFunction) {
-			UserDefinedFunctionHelper.validateNotSingleton(function.getClass());
+	public static UserDefinedFunction instantiateFunction(Class<? extends UserDefinedFunction> functionClass) {
+		validateClass(functionClass, true);
+		try {
+			return functionClass.newInstance();
+		} catch (Exception e) {
+			throw new ValidationException(
+				String.format("Cannot instantiate user-defined function class '%s'.", functionClass.getName()),
+				e);
 		}
-		UserDefinedFunctionHelper.validateInstantiation(function.getClass());
-		UserDefinedFunctionHelper.cleanFunction(config, function);
 	}
 
 	/**
-	 * Checks if a user-defined function can be easily instantiated.
+	 * Prepares a {@link UserDefinedFunction} instance for usage in the API.
 	 */
-	private static void validateInstantiation(Class<?> clazz) {
-		if (!InstantiationUtil.isPublic(clazz)) {
-			throw new ValidationException(String.format("Function class %s is not public.", clazz.getCanonicalName()));
-		} else if (!InstantiationUtil.isProperClass(clazz)) {
-			throw new ValidationException(String.format(
-				"Function class %s is no proper class," +
-					" it is either abstract, an interface, or a primitive type.", clazz.getCanonicalName()));
+	public static void prepareInstance(TableConfig config, UserDefinedFunction function) {
+		validateClass(function.getClass(), false);
+		cleanFunction(config, function);
+	}
+
+	/**
+	 * Validates a {@link UserDefinedFunction} class for usage in the API.
+	 *
+	 * <p>Note: This is an initial validation to indicate common errors early. The concrete signature
+	 * validation happens in the code generation when the actual {@link DataType}s for arguments and result
+	 * are known.
+	 */
+	public static void validateClass(Class<? extends UserDefinedFunction> functionClass) {
+		validateClass(functionClass, true);
+	}
+
+	/**
+	 * Validates a {@link UserDefinedFunction} class for usage in the API.
+	 */
+	private static void validateClass(
+			Class<? extends UserDefinedFunction> functionClass,
+			boolean requiresDefaultConstructor) {
+		if (TableFunction.class.isAssignableFrom(functionClass)) {
+			validateNotSingleton(functionClass);
 		}
+		validateInstantiation(functionClass, requiresDefaultConstructor);
+		validateImplementationMethods(functionClass);
 	}
 
 	/**
 	 * Check whether this is a Scala object. Using Scala objects can lead to concurrency issues,
 	 * e.g., due to a shared collector.
 	 */
 	private static void validateNotSingleton(Class<?> clazz) {
-		// TODO it is not a good way to check singleton. Maybe improve it further.
 		if (Arrays.stream(clazz.getFields()).anyMatch(f -> f.getName().equals("MODULE$"))) {
 			throw new ValidationException(String.format(
 				"Function implemented by class %s is a Scala object. This is forbidden because of concurrency" +
-					" problems when using them.", clazz.getCanonicalName()));
+					" problems when using them.", clazz.getName()));
+		}
+	}
+
+	/**
+	 * Validates the implementation methods such as {@code eval()} or {@code accumulate()} depending
+	 * on the {@link UserDefinedFunction} subclass.
+	 *
+	 * <p>This method must be kept in sync with the code generation requirements and the individual
+	 * docs of each function.
+	 */
+	private static void validateImplementationMethods(Class<? extends UserDefinedFunction> functionClass) {
+		if (ScalarFunction.class.isAssignableFrom(functionClass)) {
+			validateImplementationMethod(functionClass, false, false, "eval");
 
 Review comment:
   nit: Could we create String constants for those method names? We use those strings in multiple different places.
   
   I think we can create a separate JIRA for this.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dawidwys commented on a change in pull request #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment
URL: https://github.com/apache/flink/pull/10942#discussion_r373962137
 
 

 ##########
 File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
 ##########
 @@ -81,8 +85,283 @@ public void setPlannerTypeInferenceUtil(PlannerTypeInferenceUtil plannerTypeInfe
 		this.plannerTypeInferenceUtil = plannerTypeInferenceUtil;
 	}
 
+	/**
+	 * Registers a temporary system function.
+	 */
+	public void registerTemporarySystemFunction(
+			String name,
+			FunctionDefinition definition,
+			boolean ignoreIfExists) {
+		final String normalizedName = FunctionIdentifier.normalizeName(name);
+
+		if (definition instanceof UserDefinedFunction) {
+			try {
+				UserDefinedFunctionHelper.prepareInstance(config, (UserDefinedFunction) definition);
+			} catch (Throwable t) {
+				throw new ValidationException(
+					String.format(
+						"Could not register temporary system function '%s' due to implementation errors.",
+						name),
+					t);
+			}
+		}
+
+		if (!tempSystemFunctions.containsKey(normalizedName)) {
+			tempSystemFunctions.put(normalizedName, definition);
+		} else if (!ignoreIfExists) {
+			throw new ValidationException(
+				String.format(
+					"Could not register temporary system function. A function named '%s' does already exist.",
+					name));
+		}
+	}
+
+	/**
+	 * Drops a temporary system function. Returns true if a function was dropped.
+	 */
+	public boolean dropTemporarySystemFunction(
+			String name,
+			boolean ignoreIfNotExist) {
+		final String normalizedName = FunctionIdentifier.normalizeName(name);
+		final FunctionDefinition definition = tempSystemFunctions.remove(normalizedName);
+
+		if (definition == null && !ignoreIfNotExist) {
+			throw new ValidationException(
+				String.format(
+					"Could not drop temporary system function. A function named '%s' doesn't exist.",
+					name));
+		}
+
+		return definition != null;
+	}
+
+	/**
+	 * Registers a temporary catalog function.
+	 */
+	public void registerTemporaryCatalogFunction(
+			UnresolvedIdentifier unresolvedIdentifier,
 
 Review comment:
   Created https://issues.apache.org/jira/browse/FLINK-15859

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10942: [FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvironment
URL: https://github.com/apache/flink/pull/10942#issuecomment-578198793
 
 
   <!--
   Meta data
   Hash:67a8e490524c51740ea2b06b295189c3d19ca358 Status:PENDING URL:https://travis-ci.com/flink-ci/flink/builds/145962993 TriggerType:PUSH TriggerID:67a8e490524c51740ea2b06b295189c3d19ca358
   Hash:67a8e490524c51740ea2b06b295189c3d19ca358 Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4610 TriggerType:PUSH TriggerID:67a8e490524c51740ea2b06b295189c3d19ca358
   -->
   ## CI report:
   
   * 67a8e490524c51740ea2b06b295189c3d19ca358 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/145962993) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4610) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services