You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/12/03 12:43:40 UTC

[flink] 02/02: [FLINK-10689] [table] Improve docs and fix bugs of ported classes

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 98437c765c8957fd5ea392502de393d4ec695ea2
Author: Timo Walther <tw...@apache.org>
AuthorDate: Fri Nov 30 17:05:58 2018 +0100

    [FLINK-10689] [table] Improve docs and fix bugs of ported classes
---
 .../flink-sql-client-test/pom.xml                  |   5 -
 .../flink/table/functions/AggregateFunction.java   | 102 ++++++++++++++-------
 .../flink/table/functions/FunctionContext.java     |  18 ++--
 .../flink/table/functions/ScalarFunction.java      |  37 ++++----
 .../flink/table/functions/TableFunction.java       |  81 ++++++++++------
 .../flink/table/functions/UserDefinedFunction.java |  28 +++---
 .../utils/userDefinedScalarFunctions.scala         |   1 -
 7 files changed, 170 insertions(+), 102 deletions(-)

diff --git a/flink-end-to-end-tests/flink-sql-client-test/pom.xml b/flink-end-to-end-tests/flink-sql-client-test/pom.xml
index 6a0534c..d564668 100644
--- a/flink-end-to-end-tests/flink-sql-client-test/pom.xml
+++ b/flink-end-to-end-tests/flink-sql-client-test/pom.xml
@@ -37,11 +37,6 @@ under the License.
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
-		<dependency>
-			<groupId>org.scala-lang</groupId>
-			<artifactId>scala-compiler</artifactId>
-			<scope>provided</scope>
-		</dependency>
 
 		<!-- The following dependencies are for connector/format sql-jars that
 			we copy using the maven-dependency-plugin. When extending the test
diff --git a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java
index 63a4d3f..70066a2 100644
--- a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java
+++ b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java
@@ -22,51 +22,89 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 
 /**
- * Base class for User-Defined Aggregates.
+ * Base class for user-defined aggregates.
  *
  * <p>The behavior of an {@link AggregateFunction} can be defined by implementing a series of custom
  * methods. An {@link AggregateFunction} needs at least three methods:
- *  - createAccumulator,
- *  - accumulate, and
- *  - getValue.
+ *  - <code>createAccumulator</code>,
+ *  - <code>accumulate</code>, and
+ *  - <code>getValue</code>.
  *
  * <p>There are a few other methods that can be optional to have:
- *  - retract,
- *  - merge, and
- *  - resetAccumulator
+ *  - <code>retract</code>,
+ *  - <code>merge</code>, and
+ *  - <code>resetAccumulator</code>.
  *
- * <p>All these methods must be declared publicly, not static and named exactly as the names
- * mentioned above. The methods createAccumulator and getValue are defined in the
- * {@link AggregateFunction} functions, while other methods are explained below.
+ * <p>All these methods must be declared publicly, not static, and named exactly as the names
+ * mentioned above. The methods {@link #createAccumulator()} and {@link #getValue} are defined in
+ * the {@link AggregateFunction} functions, while other methods are explained below.
  *
- * <p>Processes the input values and update the provided accumulator instance. The method
+ * <pre>
+ * {@code
+ * Processes the input values and update the provided accumulator instance. The method
  * accumulate can be overloaded with different custom types and arguments. An AggregateFunction
  * requires at least one accumulate() method.
  *
+ * param: accumulator           the accumulator which contains the current aggregated results
+ * param: [user defined inputs] the input value (usually obtained from a new arrived data).
  *
- * <p>Retracts the input values from the accumulator instance. The current design assumes the
+ * public void accumulate(ACC accumulator, [user defined inputs])
+ * }
+ * </pre>
+ *
+ * <pre>
+ * {@code
+ * Retracts the input values from the accumulator instance. The current design assumes the
  * inputs are the values that have been previously accumulated. The method retract can be
  * overloaded with different custom types and arguments. This function must be implemented for
- * datastream bounded over aggregate.
+ * data stream bounded OVER aggregates.
+ *
+ * param: accumulator           the accumulator which contains the current aggregated results
+ * param: [user defined inputs] the input value (usually obtained from a new arrived data).
+ *
+ * public void retract(ACC accumulator, [user defined inputs])
+ * }
+ * </pre>
+ *
+ * <pre>
+ * {@code
+ * Merges a group of accumulator instances into one accumulator instance. This function must be
+ * implemented for data stream session window grouping aggregates and data set grouping aggregates.
+ *
+ * param: accumulator the accumulator which will keep the merged aggregate results. It should
+ *                    be noted that the accumulator may contain the previous aggregated
+ *                    results. Therefore user should not replace or clean this instance in the
+ *                    custom merge method.
+ * param: its         an java.lang.Iterable pointed to a group of accumulators that will be
+ *                    merged.
+ *
+ * public void merge(ACC accumulator, java.lang.Iterable<ACC> iterable)
+ * }
+ * </pre>
  *
- * <p>Merges a group of accumulator instances into one accumulator instance. This function must be
- * implemented for datastream session window grouping aggregate and dataset grouping aggregate.
+ * <pre>
+ * {@code
+ * Resets the accumulator for this AggregateFunction. This function must be implemented for
+ * data set grouping aggregates.
  *
- * <p>Resets the accumulator for this {@link AggregateFunction}. This function must be implemented for
- * dataset grouping aggregate.
+ * param: accumulator the accumulator which needs to be reset
  *
+ * public void resetAccumulator(ACC accumulator)
+ * }
+ * </pre>
  *
- * @param T   the type of the aggregation result
- * @param ACC the type of the aggregation accumulator. The accumulator is used to keep the
- *             aggregated values which are needed to compute an aggregation result.
- *             AggregateFunction represents its state using accumulator, thereby the state of the
- *             AggregateFunction must be put into the accumulator.
+ * @param <T>   the type of the aggregation result
+ * @param <ACC> the type of the aggregation accumulator. The accumulator is used to keep the
+ *              aggregated values which are needed to compute an aggregation result.
+ *              AggregateFunction represents its state using accumulator, thereby the state of the
+ *              AggregateFunction must be put into the accumulator.
  */
 @PublicEvolving
 public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction {
 
 	/**
-	 * Creates and init the Accumulator for this {@link AggregateFunction}.
+	 * Creates and initializes the accumulator for this {@link AggregateFunction}. The accumulator
+	 * is used to keep the aggregated values which are needed to compute an aggregation result.
 	 *
 	 * @return the accumulator with the initial value
 	 */
@@ -85,29 +123,31 @@ public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction {
 	public abstract T getValue(ACC accumulator);
 
 	/**
-	 * Returns true if this AggregateFunction can only be applied in an OVER window.
+	 * Returns <code>true</code> if this {@link AggregateFunction} can only be applied in an
+	 * OVER window.
 	 *
-	 * @return true if the AggregateFunction requires an OVER window, false otherwise.
+	 * @return <code>true</code> if the {@link AggregateFunction} requires an OVER window,
+	 *         <code>false</code> otherwise.
 	 */
 	public boolean requiresOver() {
 		return false;
 	}
 
 	/**
-	 * Returns the TypeInformation of the AggregateFunction's result.
+	 * Returns the {@link TypeInformation} of the {@link AggregateFunction}'s result.
 	 *
-	 * @return The TypeInformation of the AggregateFunction's result or null if the result type
-	 *         should be automatically inferred.
+	 * @return The {@link TypeInformation} of the {@link AggregateFunction}'s result or
+	 *         <code>null</code> if the result type should be automatically inferred.
 	 */
 	public TypeInformation<T> getResultType() {
 		return null;
 	}
 
 	/**
-	 * Returns the TypeInformation of the AggregateFunction's accumulator.
+	 * Returns the {@link TypeInformation} of the {@link AggregateFunction}'s accumulator.
 	 *
-	 * @return The TypeInformation of the AggregateFunction's accumulator or null if the
-	 *         accumulator type should be automatically inferred.
+	 * @return The {@link TypeInformation} of the {@link AggregateFunction}'s accumulator or
+	 *         <code>null</code> if the accumulator type should be automatically inferred.
 	 */
 	public TypeInformation<ACC> getAccumulatorType() {
 		return null;
diff --git a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionContext.java b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionContext.java
index 93b2229..ad3fbd5 100644
--- a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionContext.java
+++ b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionContext.java
@@ -20,24 +20,28 @@ package org.apache.flink.table.functions;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
+import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.metrics.MetricGroup;
 
 import java.io.File;
 
 /**
- * A FunctionContext allows to obtain global runtime information about the context in which the
- * user-defined function is executed. The information include the metric group,
- * the distributed cache files, and the global job parameters.
+ * A {@link FunctionContext} allows to obtain global runtime information about the context in which the
+ * user-defined function is executed.
+ *
+ * <p>The information includes the metric group, distributed cache files, and global job parameters.
  */
 @PublicEvolving
 public class FunctionContext {
 
-	/**
-	 * @param context the runtime context in which the Flink Function is executed
-	 */
 	private RuntimeContext context;
 
+	/**
+	 * Wraps the underlying {@link RuntimeContext}.
+	 *
+	 * @param context the runtime context in which Flink's {@link Function} is executed.
+	 */
 	public FunctionContext(RuntimeContext context) {
 		this.context = context;
 	}
@@ -70,7 +74,7 @@ public class FunctionContext {
 	 * @return (default) value associated with the given key
 	 */
 	public String getJobParameter(String key, String defaultValue) {
-		GlobalJobParameters conf = context.getExecutionConfig().getGlobalJobParameters();
+		final GlobalJobParameters conf = context.getExecutionConfig().getGlobalJobParameters();
 		if (conf != null && conf.toMap().containsKey(key)) {
 			return conf.toMap().get(key);
 		} else {
diff --git a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunction.java b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunction.java
index 091258e..9f7249c 100644
--- a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunction.java
+++ b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunction.java
@@ -24,17 +24,13 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.table.api.ValidationException;
 
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-
 /**
  * Base class for a user-defined scalar function. A user-defined scalar functions maps zero, one,
  * or multiple scalar values to a new scalar value.
  *
  * <p>The behavior of a {@link ScalarFunction} can be defined by implementing a custom evaluation
- * method. An evaluation method must be declared publicly and named "eval". Evaluation methods
- * can also be overloaded by implementing multiple methods named "eval".
+ * method. An evaluation method must be declared publicly and named <code>eval</code>. Evaluation
+ * methods can also be overloaded by implementing multiple methods named <code>eval</code>.
  *
  * <p>User-defined functions must have a default constructor and must be instantiable during runtime.
  *
@@ -46,7 +42,8 @@ import java.util.stream.Collectors;
  * <p>Internally, the Table/SQL API code generation works with primitive values as much as possible.
  * If a user-defined scalar function should not introduce much overhead during runtime, it is
  * recommended to declare parameters and result types as primitive types instead of their boxed
- * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long.
+ * classes. <code>DATE/TIME</code> is equal to <code>int</code>, <code>TIMESTAMP</code> is equal
+ * to <code>long</code>.
  */
 @PublicEvolving
 public abstract class ScalarFunction extends UserDefinedFunction {
@@ -60,7 +57,8 @@ public abstract class ScalarFunction extends UserDefinedFunction {
 	 * simple POJOs but might be wrong for more complex, custom, or composite types.
 	 *
 	 * @param signature signature of the method the return type needs to be determined
-	 * @return {@link TypeInformation} of result type or null if Flink should determine the type
+	 * @return {@link TypeInformation} of result type or <code>null</code> if Flink should
+	 *         determine the type
 	 */
 	public TypeInformation<?> getResultType(Class<?>[] signature) {
 		return null;
@@ -70,26 +68,25 @@ public abstract class ScalarFunction extends UserDefinedFunction {
 	 * Returns {@link TypeInformation} about the operands of the evaluation method with a given
 	 * signature.
 	 *
-	 * <p>In order to perform operand type inference in SQL (especially when NULL is used) it might be
-	 * necessary to determine the parameter {@link TypeInformation} of an evaluation method.
-	 * By default Flink's type extraction facilities are used for this but might be wrong for
-	 * more complex, custom, or composite types.
+	 * <p>In order to perform operand type inference in SQL (especially when <code>NULL</code> is
+	 * used) it might be necessary to determine the parameter {@link TypeInformation} of an
+	 * evaluation method. By default Flink's type extraction facilities are used for this but might
+	 * be wrong for more complex, custom, or composite types.
 	 *
 	 * @param signature signature of the method the operand types need to be determined
 	 * @return {@link TypeInformation} of operand types
 	 */
 	public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) {
-
-		List<TypeInformation<?>> typeList = Arrays.asList(signature).stream().map(c -> {
+		final TypeInformation<?>[] types = new TypeInformation<?>[signature.length];
+		for (int i = 0; i < signature.length; i++) {
 			try {
-				return TypeExtractor.getForClass(c);
+				types[i] = TypeExtractor.getForClass(signature[i]);
 			} catch (InvalidTypesException e) {
 				throw new ValidationException(
-						"Parameter types of table function " + this.getClass().getCanonicalName() + " cannot be " +
-						"automatically determined. Please provide type information manually.");
+					"Parameter types of scalar function " + this.getClass().getCanonicalName() +
+					" cannot be automatically determined. Please provide type information manually.");
 			}
-		}).collect(Collectors.toList());
-
-		return typeList.toArray(new TypeInformation<?>[0]);
+		}
+		return types;
 	}
 }
diff --git a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java
index ae6b03a..03cd96f 100644
--- a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java
+++ b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java
@@ -25,17 +25,13 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.util.Collector;
 
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-
 /**
  * Base class for a user-defined table function (UDTF). A user-defined table functions works on
  * zero, one, or multiple scalar values as input and returns multiple rows as output.
  *
  * <p>The behavior of a {@link TableFunction} can be defined by implementing a custom evaluation
- * method. An evaluation method must be declared publicly, not static and named "eval".
- * Evaluation methods can also be overloaded by implementing multiple methods named "eval".
+ * method. An evaluation method must be declared publicly, not static, and named <code>eval</code>.
+ * Evaluation methods can also be overloaded by implementing multiple methods named <code>eval</code>.
  *
  * <p>User-defined functions must have a default constructor and must be instantiable during runtime.
  *
@@ -47,25 +43,49 @@ import java.util.stream.Collectors;
  * <p>Internally, the Table/SQL API code generation works with primitive values as much as possible.
  * If a user-defined table function should not introduce much overhead during runtime, it is
  * recommended to declare parameters and result types as primitive types instead of their boxed
- * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long.
+ * classes. <code>DATE/TIME</code> is equal to <code>int</code>, <code>TIMESTAMP</code> is equal
+ * to <code>long</code>.
+ *
+ * <p>For Example:
+ *
+ * <pre>
+ * {@code
+ *   public class Split extends TableFunction<String> {
+ *
+ *     // implement an "eval" method with as many parameters as you want
+ *     public void eval(String str) {
+ *       for (String s : str.split(" ")) {
+ *         collect(s);   // use collect(...) to emit an output row
+ *       }
+ *     }
+ *
+ *     // you can overload the eval method here ...
+ *   }
+ *
+ *   TableEnvironment tEnv = ...
+ *   Table table = ...    // schema: ROW(a VARCHAR)
  *
+ *   // for Scala users
+ *   val split = new Split()
+ *   table.join(split('c) as ('s)).select('a, 's)
  *
- * @param T The type of the output row
+ *   // for Java users
+ *   tEnv.registerFunction("split", new Split());   // register table function first
+ *   table.join(new Table(tEnv, "split(a) as (s)")).select("a, s");
+ *
+ *   // for SQL users
+ *   tEnv.registerFunction("split", new Split());   // register table function first
+ *   tEnv.sqlQuery("SELECT a, s FROM MyTable, LATERAL TABLE(split(a)) as T(s)");
+ * }
+ * </pre>
+ *
+ * @param <T> The type of the output row
  */
 @PublicEvolving
 public abstract class TableFunction<T> extends UserDefinedFunction {
 
 	/**
-	 * Emit an output row.
-	 *
-	 * @param row the output row
-	 */
-	protected void collect(T row) {
-		collector.collect(row);
-	}
-
-	/**
-	 * The code generated collector used to emit row.
+	 * The code generated collector used to emit rows.
 	 */
 	protected Collector<T> collector;
 
@@ -84,7 +104,7 @@ public abstract class TableFunction<T> extends UserDefinedFunction {
 	 * method. Flink's type extraction facilities can handle basic types or
 	 * simple POJOs but might be wrong for more complex, custom, or composite types.
 	 *
-	 * @return {@link TypeInformation} of result type or null if Flink should determine the type
+	 * @return {@link TypeInformation} of result type or <code>null</code> if Flink should determine the type
 	 */
 	public TypeInformation<T> getResultType() {
 		return null;
@@ -103,18 +123,25 @@ public abstract class TableFunction<T> extends UserDefinedFunction {
 	 * @return {@link TypeInformation} of operand types
 	 */
 	public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) {
-
-		List<TypeInformation<?>> typeList = Arrays.asList(signature).stream().map(c -> {
+		final TypeInformation<?>[] types = new TypeInformation<?>[signature.length];
+		for (int i = 0; i < signature.length; i++) {
 			try {
-				return TypeExtractor.getForClass(c);
+				types[i] = TypeExtractor.getForClass(signature[i]);
 			} catch (InvalidTypesException e) {
 				throw new ValidationException(
-						"Parameter types of table function " + this.getClass().getCanonicalName() + " cannot be " +
-						"automatically determined. Please provide type information manually.");
+					"Parameter types of table function " + this.getClass().getCanonicalName() +
+					" cannot be automatically determined. Please provide type information manually.");
 			}
-		}).collect(Collectors.toList());
-
-		return typeList.toArray(new TypeInformation<?>[0]);
+		}
+		return types;
 	}
 
+	/**
+	 * Emits an output row.
+	 *
+	 * @param row the output row
+	 */
+	protected final void collect(T row) {
+		collector.collect(row);
+	}
 }
diff --git a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java
index 6c91c10..0a4e600 100644
--- a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java
+++ b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java
@@ -29,12 +29,21 @@ import java.io.Serializable;
  */
 @PublicEvolving
 public abstract class UserDefinedFunction implements Serializable {
+
+	/**
+	 * Returns a unique, serialized representation for this function.
+	 */
+	public final String functionIdentifier() {
+		final String md5 = EncodingUtils.hex(EncodingUtils.md5(EncodingUtils.encodeObjectToString(this)));
+		return getClass().getCanonicalName().replace('.', '$').concat("$").concat(md5);
+	}
+
 	/**
 	 * Setup method for user-defined function. It can be used for initialization work.
 	 * By default, this method does nothing.
 	 */
 	public void open(FunctionContext context) throws Exception {
-
+		// do nothing
 	}
 
 	/**
@@ -42,24 +51,21 @@ public abstract class UserDefinedFunction implements Serializable {
 	 * By default, this method does nothing.
 	 */
 	public void close() throws Exception {
-
+		// do nothing
 	}
 
 	/**
-	 * @return true if and only if a call to this function is guaranteed to always return
-	 * the same result given the same parameters; true is assumed by default
-	 * if user's function is not pure functional, like random(), date(), now()...
-	 * isDeterministic must return false
+	 * Returns information about the determinism of the function's results.
+	 *
+	 * @return <code>true</code> if and only if a call to this function is guaranteed to
+	 *         always return the same result given the same parameters. <code>true</code> is
+	 *         assumed by default. If the function is not pure functional like
+	 *         <code>random(), date(), now(), ...</code> this method must return <code>false</code>.
 	 */
 	public boolean isDeterministic() {
 		return true;
 	}
 
-	public String functionIdentifier() {
-		String md5 = EncodingUtils.hex(EncodingUtils.md5(EncodingUtils.encodeObjectToString(this)));
-		return getClass().getCanonicalName().replace('.', '$').concat("$").concat(md5);
-	}
-
 	/**
 	 * Returns the name of the UDF that is used for plan explain and logging.
 	 */
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
index b70e582..29de7e0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
@@ -29,7 +29,6 @@ import org.junit.Assert
 
 import scala.annotation.varargs
 import scala.collection.mutable
-import scala.collection.JavaConversions._
 import scala.io.Source
 
 case class SimplePojo(name: String, age: Int)