You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/12/03 12:46:00 UTC

[jira] [Commented] (FLINK-10689) Port UDFs in Table API extension points to flink-table-common

    [ https://issues.apache.org/jira/browse/FLINK-10689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16707142#comment-16707142 ] 

ASF GitHub Bot commented on FLINK-10689:
----------------------------------------

asfgit closed pull request #7059: [FLINK-10689][table] Port UDFs in Table API extension points to flink-table-common
URL: https://github.com/apache/flink/pull/7059
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-end-to-end-tests/flink-sql-client-test/pom.xml b/flink-end-to-end-tests/flink-sql-client-test/pom.xml
index 1a6a80e9c8f..6a0534c58ed 100644
--- a/flink-end-to-end-tests/flink-sql-client-test/pom.xml
+++ b/flink-end-to-end-tests/flink-sql-client-test/pom.xml
@@ -33,7 +33,7 @@ under the License.
 	<dependencies>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table_${scala.binary.version}</artifactId>
+			<artifactId>flink-table-common</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
diff --git a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java
new file mode 100644
index 00000000000..63a4d3f7a2f
--- /dev/null
+++ b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+/**
+ * Base class for User-Defined Aggregates.
+ *
+ * <p>The behavior of an {@link AggregateFunction} can be defined by implementing a series of custom
+ * methods. An {@link AggregateFunction} needs at least three methods:
+ *  - createAccumulator,
+ *  - accumulate, and
+ *  - getValue.
+ *
+ * <p>There are a few other methods that can be optional to have:
+ *  - retract,
+ *  - merge, and
+ *  - resetAccumulator
+ *
+ * <p>All these methods must be declared publicly, not static and named exactly as the names
+ * mentioned above. The methods createAccumulator and getValue are defined in the
+ * {@link AggregateFunction} functions, while other methods are explained below.
+ *
+ * <p>Processes the input values and update the provided accumulator instance. The method
+ * accumulate can be overloaded with different custom types and arguments. An AggregateFunction
+ * requires at least one accumulate() method.
+ *
+ *
+ * <p>Retracts the input values from the accumulator instance. The current design assumes the
+ * inputs are the values that have been previously accumulated. The method retract can be
+ * overloaded with different custom types and arguments. This function must be implemented for
+ * datastream bounded over aggregate.
+ *
+ * <p>Merges a group of accumulator instances into one accumulator instance. This function must be
+ * implemented for datastream session window grouping aggregate and dataset grouping aggregate.
+ *
+ * <p>Resets the accumulator for this {@link AggregateFunction}. This function must be implemented for
+ * dataset grouping aggregate.
+ *
+ *
+ * @param T   the type of the aggregation result
+ * @param ACC the type of the aggregation accumulator. The accumulator is used to keep the
+ *             aggregated values which are needed to compute an aggregation result.
+ *             AggregateFunction represents its state using accumulator, thereby the state of the
+ *             AggregateFunction must be put into the accumulator.
+ */
+@PublicEvolving
+public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction {
+
+	/**
+	 * Creates and init the Accumulator for this {@link AggregateFunction}.
+	 *
+	 * @return the accumulator with the initial value
+	 */
+	public abstract ACC createAccumulator();
+
+	/**
+	 * Called every time when an aggregation result should be materialized.
+	 * The returned value could be either an early and incomplete result
+	 * (periodically emitted as data arrive) or the final result of the
+	 * aggregation.
+	 *
+	 * @param accumulator the accumulator which contains the current
+	 *                    aggregated results
+	 * @return the aggregation result
+	 */
+	public abstract T getValue(ACC accumulator);
+
+	/**
+	 * Returns true if this AggregateFunction can only be applied in an OVER window.
+	 *
+	 * @return true if the AggregateFunction requires an OVER window, false otherwise.
+	 */
+	public boolean requiresOver() {
+		return false;
+	}
+
+	/**
+	 * Returns the TypeInformation of the AggregateFunction's result.
+	 *
+	 * @return The TypeInformation of the AggregateFunction's result or null if the result type
+	 *         should be automatically inferred.
+	 */
+	public TypeInformation<T> getResultType() {
+		return null;
+	}
+
+	/**
+	 * Returns the TypeInformation of the AggregateFunction's accumulator.
+	 *
+	 * @return The TypeInformation of the AggregateFunction's accumulator or null if the
+	 *         accumulator type should be automatically inferred.
+	 */
+	public TypeInformation<ACC> getAccumulatorType() {
+		return null;
+	}
+}
diff --git a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionContext.java b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionContext.java
new file mode 100644
index 00000000000..93b2229a7c6
--- /dev/null
+++ b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionContext.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.metrics.MetricGroup;
+
+import java.io.File;
+
+/**
+ * A FunctionContext allows to obtain global runtime information about the context in which the
+ * user-defined function is executed. The information include the metric group,
+ * the distributed cache files, and the global job parameters.
+ */
+@PublicEvolving
+public class FunctionContext {
+
+	/**
+	 * @param context the runtime context in which the Flink Function is executed
+	 */
+	private RuntimeContext context;
+
+	public FunctionContext(RuntimeContext context) {
+		this.context = context;
+	}
+
+	/**
+	 * Returns the metric group for this parallel subtask.
+	 *
+	 * @return metric group for this parallel subtask.
+	 */
+	public MetricGroup getMetricGroup() {
+		return context.getMetricGroup();
+	}
+
+	/**
+	 * Gets the local temporary file copy of a distributed cache files.
+	 *
+	 * @param name distributed cache file name
+	 * @return local temporary file copy of a distributed cache file.
+	 */
+	public File getCachedFile(String name) {
+		return context.getDistributedCache().getFile(name);
+	}
+
+	/**
+	 * Gets the global job parameter value associated with the given key as a string.
+	 *
+	 * @param key          key pointing to the associated value
+	 * @param defaultValue default value which is returned in case global job parameter is null
+	 *                     or there is no value associated with the given key
+	 * @return (default) value associated with the given key
+	 */
+	public String getJobParameter(String key, String defaultValue) {
+		GlobalJobParameters conf = context.getExecutionConfig().getGlobalJobParameters();
+		if (conf != null && conf.toMap().containsKey(key)) {
+			return conf.toMap().get(key);
+		} else {
+			return defaultValue;
+		}
+	}
+}
diff --git a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunction.java b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunction.java
new file mode 100644
index 00000000000..091258ececf
--- /dev/null
+++ b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunction.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.table.api.ValidationException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Base class for a user-defined scalar function. A user-defined scalar functions maps zero, one,
+ * or multiple scalar values to a new scalar value.
+ *
+ * <p>The behavior of a {@link ScalarFunction} can be defined by implementing a custom evaluation
+ * method. An evaluation method must be declared publicly and named "eval". Evaluation methods
+ * can also be overloaded by implementing multiple methods named "eval".
+ *
+ * <p>User-defined functions must have a default constructor and must be instantiable during runtime.
+ *
+ * <p>By default the result type of an evaluation method is determined by Flink's type extraction
+ * facilities. This is sufficient for basic types or simple POJOs but might be wrong for more
+ * complex, custom, or composite types. In these cases {@link TypeInformation} of the result type
+ * can be manually defined by overriding {@link ScalarFunction#getResultType}.
+ *
+ * <p>Internally, the Table/SQL API code generation works with primitive values as much as possible.
+ * If a user-defined scalar function should not introduce much overhead during runtime, it is
+ * recommended to declare parameters and result types as primitive types instead of their boxed
+ * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long.
+ */
+@PublicEvolving
+public abstract class ScalarFunction extends UserDefinedFunction {
+
+	/**
+	 * Returns the result type of the evaluation method with a given signature.
+	 *
+	 * <p>This method needs to be overridden in case Flink's type extraction facilities are not
+	 * sufficient to extract the {@link TypeInformation} based on the return type of the evaluation
+	 * method. Flink's type extraction facilities can handle basic types or
+	 * simple POJOs but might be wrong for more complex, custom, or composite types.
+	 *
+	 * @param signature signature of the method the return type needs to be determined
+	 * @return {@link TypeInformation} of result type or null if Flink should determine the type
+	 */
+	public TypeInformation<?> getResultType(Class<?>[] signature) {
+		return null;
+	}
+
+	/**
+	 * Returns {@link TypeInformation} about the operands of the evaluation method with a given
+	 * signature.
+	 *
+	 * <p>In order to perform operand type inference in SQL (especially when NULL is used) it might be
+	 * necessary to determine the parameter {@link TypeInformation} of an evaluation method.
+	 * By default Flink's type extraction facilities are used for this but might be wrong for
+	 * more complex, custom, or composite types.
+	 *
+	 * @param signature signature of the method the operand types need to be determined
+	 * @return {@link TypeInformation} of operand types
+	 */
+	public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) {
+
+		List<TypeInformation<?>> typeList = Arrays.asList(signature).stream().map(c -> {
+			try {
+				return TypeExtractor.getForClass(c);
+			} catch (InvalidTypesException e) {
+				throw new ValidationException(
+						"Parameter types of table function " + this.getClass().getCanonicalName() + " cannot be " +
+						"automatically determined. Please provide type information manually.");
+			}
+		}).collect(Collectors.toList());
+
+		return typeList.toArray(new TypeInformation<?>[0]);
+	}
+}
diff --git a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java
new file mode 100644
index 00000000000..ae6b03a7d4f
--- /dev/null
+++ b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.util.Collector;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Base class for a user-defined table function (UDTF). A user-defined table functions works on
+ * zero, one, or multiple scalar values as input and returns multiple rows as output.
+ *
+ * <p>The behavior of a {@link TableFunction} can be defined by implementing a custom evaluation
+ * method. An evaluation method must be declared publicly, not static and named "eval".
+ * Evaluation methods can also be overloaded by implementing multiple methods named "eval".
+ *
+ * <p>User-defined functions must have a default constructor and must be instantiable during runtime.
+ *
+ * <p>By default the result type of an evaluation method is determined by Flink's type extraction
+ * facilities. This is sufficient for basic types or simple POJOs but might be wrong for more
+ * complex, custom, or composite types. In these cases {@link TypeInformation} of the result type
+ * can be manually defined by overriding {@link TableFunction#getResultType}.
+ *
+ * <p>Internally, the Table/SQL API code generation works with primitive values as much as possible.
+ * If a user-defined table function should not introduce much overhead during runtime, it is
+ * recommended to declare parameters and result types as primitive types instead of their boxed
+ * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long.
+ *
+ *
+ * @param T The type of the output row
+ */
+@PublicEvolving
+public abstract class TableFunction<T> extends UserDefinedFunction {
+
+	/**
+	 * Emit an output row.
+	 *
+	 * @param row the output row
+	 */
+	protected void collect(T row) {
+		collector.collect(row);
+	}
+
+	/**
+	 * The code generated collector used to emit row.
+	 */
+	protected Collector<T> collector;
+
+	/**
+	 * Internal use. Sets the current collector.
+	 */
+	public final void setCollector(Collector<T> collector) {
+		this.collector = collector;
+	}
+
+	/**
+	 * Returns the result type of the evaluation method with a given signature.
+	 *
+	 * <p>This method needs to be overridden in case Flink's type extraction facilities are not
+	 * sufficient to extract the {@link TypeInformation} based on the return type of the evaluation
+	 * method. Flink's type extraction facilities can handle basic types or
+	 * simple POJOs but might be wrong for more complex, custom, or composite types.
+	 *
+	 * @return {@link TypeInformation} of result type or null if Flink should determine the type
+	 */
+	public TypeInformation<T> getResultType() {
+		return null;
+	}
+
+	/**
+	 * Returns {@link TypeInformation} about the operands of the evaluation method with a given
+	 * signature.
+	 *
+	 * <p>In order to perform operand type inference in SQL (especially when NULL is used) it might be
+	 * necessary to determine the parameter {@link TypeInformation} of an evaluation method.
+	 * By default Flink's type extraction facilities are used for this but might be wrong for
+	 * more complex, custom, or composite types.
+	 *
+	 * @param signature signature of the method the operand types need to be determined
+	 * @return {@link TypeInformation} of operand types
+	 */
+	public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) {
+
+		List<TypeInformation<?>> typeList = Arrays.asList(signature).stream().map(c -> {
+			try {
+				return TypeExtractor.getForClass(c);
+			} catch (InvalidTypesException e) {
+				throw new ValidationException(
+						"Parameter types of table function " + this.getClass().getCanonicalName() + " cannot be " +
+						"automatically determined. Please provide type information manually.");
+			}
+		}).collect(Collectors.toList());
+
+		return typeList.toArray(new TypeInformation<?>[0]);
+	}
+
+}
diff --git a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java
new file mode 100644
index 00000000000..6c91c104907
--- /dev/null
+++ b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import java.io.Serializable;
+
+/**
+ * Base class for all user-defined functions such as scalar functions, table functions,
+ * or aggregation functions.
+ */
+@PublicEvolving
+public abstract class UserDefinedFunction implements Serializable {
+	/**
+	 * Setup method for user-defined function. It can be used for initialization work.
+	 * By default, this method does nothing.
+	 */
+	public void open(FunctionContext context) throws Exception {
+
+	}
+
+	/**
+	 * Tear-down method for user-defined function. It can be used for clean up work.
+	 * By default, this method does nothing.
+	 */
+	public void close() throws Exception {
+
+	}
+
+	/**
+	 * @return true if and only if a call to this function is guaranteed to always return
+	 * the same result given the same parameters; true is assumed by default
+	 * if user's function is not pure functional, like random(), date(), now()...
+	 * isDeterministic must return false
+	 */
+	public boolean isDeterministic() {
+		return true;
+	}
+
+	public String functionIdentifier() {
+		String md5 = EncodingUtils.hex(EncodingUtils.md5(EncodingUtils.encodeObjectToString(this)));
+		return getClass().getCanonicalName().replace('.', '$').concat("$").concat(md5);
+	}
+
+	/**
+	 * Returns the name of the UDF that is used for plan explain and logging.
+	 */
+	@Override
+	public String toString() {
+		return getClass().getSimpleName();
+	}
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index f9fb93cb7fa..390960ca855 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -28,7 +28,7 @@ import org.apache.flink.table.api.Table
 import org.apache.flink.table.expressions.TimeIntervalUnit.TimeIntervalUnit
 import org.apache.flink.table.expressions.TimePointUnit.TimePointUnit
 import org.apache.flink.table.expressions._
-import org.apache.flink.table.functions.{AggregateFunction, DistinctAggregateFunction}
+import org.apache.flink.table.functions.{AggregateFunction, DistinctAggregateFunction, ScalarFunction}
 
 import scala.language.implicitConversions
 
@@ -1021,6 +1021,12 @@ trait ImplicitExpressionConversions {
     def expr = Literal(sqlTimestamp)
   }
 
+  implicit class ScalarFunctionCallExpression(val s: ScalarFunction) {
+    def apply(params: Expression*): Expression = {
+      ScalarFunctionCall(s, params)
+    }
+  }
+
   implicit def symbol2FieldExpression(sym: Symbol): Expression = UnresolvedFieldReference(sym.name)
   implicit def byte2Literal(b: Byte): Expression = Literal(b)
   implicit def short2Literal(s: Short): Expression = Literal(s)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
deleted file mode 100644
index d3f9497e1bb..00000000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.functions
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-/**
-  * Base class for User-Defined Aggregates.
-  *
-  * The behavior of an [[AggregateFunction]] can be defined by implementing a series of custom
-  * methods. An [[AggregateFunction]] needs at least three methods:
-  *  - createAccumulator,
-  *  - accumulate, and
-  *  - getValue.
-  *
-  *  There are a few other methods that can be optional to have:
-  *  - retract,
-  *  - merge, and
-  *  - resetAccumulator
-  *
-  * All these methods must be declared publicly, not static and named exactly as the names
-  * mentioned above. The methods createAccumulator and getValue are defined in the
-  * [[AggregateFunction]] functions, while other methods are explained below.
-  *
-  *
-  * {{{
-  * Processes the input values and update the provided accumulator instance. The method
-  * accumulate can be overloaded with different custom types and arguments. An AggregateFunction
-  * requires at least one accumulate() method.
-  *
-  * @param accumulator           the accumulator which contains the current aggregated results
-  * @param [user defined inputs] the input value (usually obtained from a new arrived data).
-  *
-  * def accumulate(accumulator: ACC, [user defined inputs]): Unit
-  * }}}
-  *
-  *
-  * {{{
-  * Retracts the input values from the accumulator instance. The current design assumes the
-  * inputs are the values that have been previously accumulated. The method retract can be
-  * overloaded with different custom types and arguments. This function must be implemented for
-  * datastream bounded over aggregate.
-  *
-  * @param accumulator           the accumulator which contains the current aggregated results
-  * @param [user defined inputs] the input value (usually obtained from a new arrived data).
-  *
-  * def retract(accumulator: ACC, [user defined inputs]): Unit
-  * }}}
-  *
-  *
-  * {{{
-  * Merges a group of accumulator instances into one accumulator instance. This function must be
-  * implemented for datastream session window grouping aggregate and dataset grouping aggregate.
-  *
-  * @param accumulator  the accumulator which will keep the merged aggregate results. It should
-  *                     be noted that the accumulator may contain the previous aggregated
-  *                     results. Therefore user should not replace or clean this instance in the
-  *                     custom merge method.
-  * @param its          an [[java.lang.Iterable]] pointed to a group of accumulators that will be
-  *                     merged.
-  *
-  * def merge(accumulator: ACC, its: java.lang.Iterable[ACC]): Unit
-  * }}}
-  *
-  *
-  * {{{
-  * Resets the accumulator for this [[AggregateFunction]]. This function must be implemented for
-  * dataset grouping aggregate.
-  *
-  * @param accumulator  the accumulator which needs to be reset
-  *
-  * def resetAccumulator(accumulator: ACC): Unit
-  * }}}
-  *
-  *
-  * @tparam T   the type of the aggregation result
-  * @tparam ACC the type of the aggregation accumulator. The accumulator is used to keep the
-  *             aggregated values which are needed to compute an aggregation result.
-  *             AggregateFunction represents its state using accumulator, thereby the state of the
-  *             AggregateFunction must be put into the accumulator.
-  */
-abstract class AggregateFunction[T, ACC] extends UserDefinedFunction {
-  /**
-    * Creates and init the Accumulator for this [[AggregateFunction]].
-    *
-    * @return the accumulator with the initial value
-    */
-  def createAccumulator(): ACC
-
-  /**
-    * Called every time when an aggregation result should be materialized.
-    * The returned value could be either an early and incomplete result
-    * (periodically emitted as data arrive) or the final result of the
-    * aggregation.
-    *
-    * @param accumulator the accumulator which contains the current
-    *                    aggregated results
-    * @return the aggregation result
-    */
-  def getValue(accumulator: ACC): T
-
-    /**
-    * Returns true if this AggregateFunction can only be applied in an OVER window.
-    *
-    * @return true if the AggregateFunction requires an OVER window, false otherwise.
-    */
-  def requiresOver: Boolean = false
-
-  /**
-    * Returns the TypeInformation of the AggregateFunction's result.
-    *
-    * @return The TypeInformation of the AggregateFunction's result or null if the result type
-    *         should be automatically inferred.
-    */
-  def getResultType: TypeInformation[T] = null
-
-  /**
-    * Returns the TypeInformation of the AggregateFunction's accumulator.
-    *
-    * @return The TypeInformation of the AggregateFunction's accumulator or null if the
-    *         accumulator type should be automatically inferred.
-    */
-  def getAccumulatorType: TypeInformation[ACC] = null
-}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/FunctionContext.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/FunctionContext.scala
deleted file mode 100644
index beeb686f035..00000000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/FunctionContext.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.functions
-
-import java.io.File
-
-import org.apache.flink.api.common.functions.RuntimeContext
-import org.apache.flink.metrics.MetricGroup
-
-/**
-  * A FunctionContext allows to obtain global runtime information about the context in which the
-  * user-defined function is executed. The information include the metric group,
-  * the distributed cache files, and the global job parameters.
-  *
-  * @param context the runtime context in which the Flink Function is executed
-  */
-class FunctionContext(context: RuntimeContext) {
-
-  /**
-    * Returns the metric group for this parallel subtask.
-    *
-    * @return metric group for this parallel subtask.
-    */
-  def getMetricGroup: MetricGroup = context.getMetricGroup
-
-  /**
-    * Gets the local temporary file copy of a distributed cache files.
-    *
-    * @param name distributed cache file name
-    * @return local temporary file copy of a distributed cache file.
-    */
-  def getCachedFile(name: String): File = context.getDistributedCache.getFile(name)
-
-  /**
-    * Gets the global job parameter value associated with the given key as a string.
-    *
-    * @param key          key pointing to the associated value
-    * @param defaultValue default value which is returned in case global job parameter is null
-    *                     or there is no value associated with the given key
-    * @return (default) value associated with the given key
-    */
-  def getJobParameter(key: String, defaultValue: String): String = {
-    val conf = context.getExecutionConfig.getGlobalJobParameters
-    if (conf != null && conf.toMap.containsKey(key)) {
-      conf.toMap.get(key)
-    } else {
-      defaultValue
-    }
-  }
-}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ScalarFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ScalarFunction.scala
deleted file mode 100644
index 4c01c1c7d9a..00000000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ScalarFunction.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.functions
-
-import org.apache.flink.api.common.functions.InvalidTypesException
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.expressions.{Expression, ScalarFunctionCall}
-
-/**
-  * Base class for a user-defined scalar function. A user-defined scalar functions maps zero, one,
-  * or multiple scalar values to a new scalar value.
-  *
-  * The behavior of a [[ScalarFunction]] can be defined by implementing a custom evaluation
-  * method. An evaluation method must be declared publicly and named "eval". Evaluation methods
-  * can also be overloaded by implementing multiple methods named "eval".
-  *
-  * User-defined functions must have a default constructor and must be instantiable during runtime.
-  *
-  * By default the result type of an evaluation method is determined by Flink's type extraction
-  * facilities. This is sufficient for basic types or simple POJOs but might be wrong for more
-  * complex, custom, or composite types. In these cases [[TypeInformation]] of the result type
-  * can be manually defined by overriding [[getResultType()]].
-  *
-  * Internally, the Table/SQL API code generation works with primitive values as much as possible.
-  * If a user-defined scalar function should not introduce much overhead during runtime, it is
-  * recommended to declare parameters and result types as primitive types instead of their boxed
-  * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long.
-  */
-abstract class ScalarFunction extends UserDefinedFunction {
-
-  /**
-    * Creates a call to a [[ScalarFunction]] in Scala Table API.
-    *
-    * @param params actual parameters of function
-    * @return [[Expression]] in form of a [[ScalarFunctionCall]]
-    */
-  final def apply(params: Expression*): Expression = {
-    ScalarFunctionCall(this, params)
-  }
-
-  // ----------------------------------------------------------------------------------------------
-
-  /**
-    * Returns the result type of the evaluation method with a given signature.
-    *
-    * This method needs to be overridden in case Flink's type extraction facilities are not
-    * sufficient to extract the [[TypeInformation]] based on the return type of the evaluation
-    * method. Flink's type extraction facilities can handle basic types or
-    * simple POJOs but might be wrong for more complex, custom, or composite types.
-    *
-    * @param signature signature of the method the return type needs to be determined
-    * @return [[TypeInformation]] of result type or null if Flink should determine the type
-    */
-  def getResultType(signature: Array[Class[_]]): TypeInformation[_] = null
-
-  /**
-    * Returns [[TypeInformation]] about the operands of the evaluation method with a given
-    * signature.
-    *
-    * In order to perform operand type inference in SQL (especially when NULL is used) it might be
-    * necessary to determine the parameter [[TypeInformation]] of an evaluation method.
-    * By default Flink's type extraction facilities are used for this but might be wrong for
-    * more complex, custom, or composite types.
-    *
-    * @param signature signature of the method the operand types need to be determined
-    * @return [[TypeInformation]] of operand types
-    */
-  def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] = {
-    signature.map { c =>
-      try {
-        TypeExtractor.getForClass(c)
-      } catch {
-        case ite: InvalidTypesException =>
-          throw new ValidationException(
-            s"Parameter types of scalar function '${this.getClass.getCanonicalName}' cannot be " +
-            s"automatically determined. Please provide type information manually.")
-      }
-    }
-  }
-}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala
deleted file mode 100644
index e892a4cddcc..00000000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.functions
-
-import org.apache.flink.api.common.functions.InvalidTypesException
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.util.Collector
-
-/**
-  * Base class for a user-defined table function (UDTF). A user-defined table functions works on
-  * zero, one, or multiple scalar values as input and returns multiple rows as output.
-  *
-  * The behavior of a [[TableFunction]] can be defined by implementing a custom evaluation
-  * method. An evaluation method must be declared publicly, not static and named "eval".
-  * Evaluation methods can also be overloaded by implementing multiple methods named "eval".
-  *
-  * User-defined functions must have a default constructor and must be instantiable during runtime.
-  *
-  * By default the result type of an evaluation method is determined by Flink's type extraction
-  * facilities. This is sufficient for basic types or simple POJOs but might be wrong for more
-  * complex, custom, or composite types. In these cases [[TypeInformation]] of the result type
-  * can be manually defined by overriding [[getResultType()]].
-  *
-  * Internally, the Table/SQL API code generation works with primitive values as much as possible.
-  * If a user-defined table function should not introduce much overhead during runtime, it is
-  * recommended to declare parameters and result types as primitive types instead of their boxed
-  * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long.
-  *
-  * Example:
-  *
-  * {{{
-  *
-  *   public class Split extends TableFunction<String> {
-  *
-  *     // implement an "eval" method with as many parameters as you want
-  *     public void eval(String str) {
-  *       for (String s : str.split(" ")) {
-  *         collect(s);   // use collect(...) to emit an output row
-  *       }
-  *     }
-  *
-  *     // you can overload the eval method here ...
-  *   }
-  *
-  *   val tEnv: TableEnvironment = ...
-  *   val table: Table = ...    // schema: [a: String]
-  *
-  *   // for Scala users
-  *   val split = new Split()
-  *   table.join(split('c) as ('s)).select('a, 's)
-  *
-  *   // for Java users
-  *   tEnv.registerFunction("split", new Split())   // register table function first
-  *   table.join(new Table(tEnv, "split(a) as (s)")).select("a, s")
-  *
-  *   // for SQL users
-  *   tEnv.registerFunction("split", new Split())   // register table function first
-  *   tEnv.sqlQuery("SELECT a, s FROM MyTable, LATERAL TABLE(split(a)) as T(s)")
-  *
-  * }}}
-  *
-  * @tparam T The type of the output row
-  */
-abstract class TableFunction[T] extends UserDefinedFunction {
-
-  // ----------------------------------------------------------------------------------------------
-
-  /**
-    * Emit an output row.
-    *
-    * @param row the output row
-    */
-  protected def collect(row: T): Unit = {
-    collector.collect(row)
-  }
-
-  // ----------------------------------------------------------------------------------------------
-
-  /**
-    * The code generated collector used to emit row.
-    */
-  private var collector: Collector[T] = _
-
-  /**
-    * Internal use. Sets the current collector.
-    */
-  private[flink] final def setCollector(collector: Collector[T]): Unit = {
-    this.collector = collector
-  }
-
-  // ----------------------------------------------------------------------------------------------
-
-  /**
-    * Returns the result type of the evaluation method with a given signature.
-    *
-    * This method needs to be overridden in case Flink's type extraction facilities are not
-    * sufficient to extract the [[TypeInformation]] based on the return type of the evaluation
-    * method. Flink's type extraction facilities can handle basic types or
-    * simple POJOs but might be wrong for more complex, custom, or composite types.
-    *
-    * @return [[TypeInformation]] of result type or null if Flink should determine the type
-    */
-  def getResultType: TypeInformation[T] = null
-
-  /**
-    * Returns [[TypeInformation]] about the operands of the evaluation method with a given
-    * signature.
-    *
-    * In order to perform operand type inference in SQL (especially when NULL is used) it might be
-    * necessary to determine the parameter [[TypeInformation]] of an evaluation method.
-    * By default Flink's type extraction facilities are used for this but might be wrong for
-    * more complex, custom, or composite types.
-    *
-    * @param signature signature of the method the operand types need to be determined
-    * @return [[TypeInformation]] of operand types
-    */
-  def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] = {
-    signature.map { c =>
-      try {
-        TypeExtractor.getForClass(c)
-      } catch {
-        case ite: InvalidTypesException =>
-          throw new ValidationException(
-            s"Parameter types of table function '${this.getClass.getCanonicalName}' cannot be " +
-            s"automatically determined. Please provide type information manually.")
-      }
-    }
-  }
-
-}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
deleted file mode 100644
index 89ba0d4f364..00000000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.functions
-
-import org.apache.flink.table.utils.EncodingUtils
-
-/**
-  * Base class for all user-defined functions such as scalar functions, table functions,
-  * or aggregation functions.
-  */
-abstract class UserDefinedFunction extends Serializable {
-  /**
-    * Setup method for user-defined function. It can be used for initialization work.
-    *
-    * By default, this method does nothing.
-    */
-  @throws(classOf[Exception])
-  def open(context: FunctionContext): Unit = {}
-
-  /**
-    * Tear-down method for user-defined function. It can be used for clean up work.
-    *
-    * By default, this method does nothing.
-    */
-  @throws(classOf[Exception])
-  def close(): Unit = {}
-
-  /**
-    * @return true if and only if a call to this function is guaranteed to always return
-    *         the same result given the same parameters; true is assumed by default
-    *         if user's function is not pure functional, like random(), date(), now()...
-    *         isDeterministic must return false
-    */
-  def isDeterministic: Boolean = true
-
-  final def functionIdentifier: String = {
-    val md5 = EncodingUtils.hex(EncodingUtils.md5(EncodingUtils.encodeObjectToString(this)))
-    getClass.getCanonicalName.replace('.', '$').concat("$").concat(md5)
-  }
-
-  /**
-    * Returns the name of the UDF that is used for plan explain and logging.
-    */
-  override def toString: String = getClass.getSimpleName
-
-}
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
index 912bb047cba..7511e024341 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
@@ -29,6 +29,7 @@ import org.junit.Assert
 
 import scala.annotation.varargs
 import scala.collection.mutable
+import scala.collection.JavaConversions._
 import scala.io.Source
 
 case class SimplePojo(name: String, age: Int)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Port UDFs in Table API extension points to flink-table-common
> -------------------------------------------------------------
>
>                 Key: FLINK-10689
>                 URL: https://issues.apache.org/jira/browse/FLINK-10689
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API &amp; SQL
>            Reporter: Timo Walther
>            Assignee: xueyu
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.8.0
>
>
> After FLINK-10687 and FLINK-10688 have been resolved, we should also port the remaining extension points of the Table API to flink-table-common. This includes interfaces for UDFs and the external catalog interface.
> This ticket is for porting UDFs. This jira does NOT depend on FLINK-16088 so it can be started at anytime.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)