You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/12/03 12:43:40 UTC
[flink] 02/02: [FLINK-10689] [table] Improve docs and fix bugs of
ported classes
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 98437c765c8957fd5ea392502de393d4ec695ea2
Author: Timo Walther <tw...@apache.org>
AuthorDate: Fri Nov 30 17:05:58 2018 +0100
[FLINK-10689] [table] Improve docs and fix bugs of ported classes
---
.../flink-sql-client-test/pom.xml | 5 -
.../flink/table/functions/AggregateFunction.java | 102 ++++++++++++++-------
.../flink/table/functions/FunctionContext.java | 18 ++--
.../flink/table/functions/ScalarFunction.java | 37 ++++----
.../flink/table/functions/TableFunction.java | 81 ++++++++++------
.../flink/table/functions/UserDefinedFunction.java | 28 +++---
.../utils/userDefinedScalarFunctions.scala | 1 -
7 files changed, 170 insertions(+), 102 deletions(-)
diff --git a/flink-end-to-end-tests/flink-sql-client-test/pom.xml b/flink-end-to-end-tests/flink-sql-client-test/pom.xml
index 6a0534c..d564668 100644
--- a/flink-end-to-end-tests/flink-sql-client-test/pom.xml
+++ b/flink-end-to-end-tests/flink-sql-client-test/pom.xml
@@ -37,11 +37,6 @@ under the License.
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-compiler</artifactId>
- <scope>provided</scope>
- </dependency>
<!-- The following dependencies are for connector/format sql-jars that
we copy using the maven-dependency-plugin. When extending the test
diff --git a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java
index 63a4d3f..70066a2 100644
--- a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java
+++ b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java
@@ -22,51 +22,89 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
/**
- * Base class for User-Defined Aggregates.
+ * Base class for user-defined aggregates.
*
* <p>The behavior of an {@link AggregateFunction} can be defined by implementing a series of custom
* methods. An {@link AggregateFunction} needs at least three methods:
- * - createAccumulator,
- * - accumulate, and
- * - getValue.
+ * - <code>createAccumulator</code>,
+ * - <code>accumulate</code>, and
+ * - <code>getValue</code>.
*
* <p>There are a few other methods that can be optional to have:
- * - retract,
- * - merge, and
- * - resetAccumulator
+ * - <code>retract</code>,
+ * - <code>merge</code>, and
+ * - <code>resetAccumulator</code>.
*
- * <p>All these methods must be declared publicly, not static and named exactly as the names
- * mentioned above. The methods createAccumulator and getValue are defined in the
- * {@link AggregateFunction} functions, while other methods are explained below.
+ * <p>All these methods must be declared publicly, not static, and named exactly as the names
+ * mentioned above. The methods {@link #createAccumulator()} and {@link #getValue} are defined in
+ * the {@link AggregateFunction} functions, while other methods are explained below.
*
- * <p>Processes the input values and update the provided accumulator instance. The method
+ * <pre>
+ * {@code
+ * Processes the input values and update the provided accumulator instance. The method
* accumulate can be overloaded with different custom types and arguments. An AggregateFunction
* requires at least one accumulate() method.
*
+ * param: accumulator the accumulator which contains the current aggregated results
+ * param: [user defined inputs] the input value (usually obtained from a new arrived data).
*
- * <p>Retracts the input values from the accumulator instance. The current design assumes the
+ * public void accumulate(ACC accumulator, [user defined inputs])
+ * }
+ * </pre>
+ *
+ * <pre>
+ * {@code
+ * Retracts the input values from the accumulator instance. The current design assumes the
* inputs are the values that have been previously accumulated. The method retract can be
* overloaded with different custom types and arguments. This function must be implemented for
- * datastream bounded over aggregate.
+ * data stream bounded OVER aggregates.
+ *
+ * param: accumulator the accumulator which contains the current aggregated results
+ * param: [user defined inputs] the input value (usually obtained from a new arrived data).
+ *
+ * public void retract(ACC accumulator, [user defined inputs])
+ * }
+ * </pre>
+ *
+ * <pre>
+ * {@code
+ * Merges a group of accumulator instances into one accumulator instance. This function must be
+ * implemented for data stream session window grouping aggregates and data set grouping aggregates.
+ *
+ * param: accumulator the accumulator which will keep the merged aggregate results. It should
+ * be noted that the accumulator may contain the previous aggregated
+ * results. Therefore user should not replace or clean this instance in the
+ * custom merge method.
+ * param: its an java.lang.Iterable pointed to a group of accumulators that will be
+ * merged.
+ *
+ * public void merge(ACC accumulator, java.lang.Iterable<ACC> iterable)
+ * }
+ * </pre>
*
- * <p>Merges a group of accumulator instances into one accumulator instance. This function must be
- * implemented for datastream session window grouping aggregate and dataset grouping aggregate.
+ * <pre>
+ * {@code
+ * Resets the accumulator for this AggregateFunction. This function must be implemented for
+ * data set grouping aggregates.
*
- * <p>Resets the accumulator for this {@link AggregateFunction}. This function must be implemented for
- * dataset grouping aggregate.
+ * param: accumulator the accumulator which needs to be reset
*
+ * public void resetAccumulator(ACC accumulator)
+ * }
+ * </pre>
*
- * @param T the type of the aggregation result
- * @param ACC the type of the aggregation accumulator. The accumulator is used to keep the
- * aggregated values which are needed to compute an aggregation result.
- * AggregateFunction represents its state using accumulator, thereby the state of the
- * AggregateFunction must be put into the accumulator.
+ * @param <T> the type of the aggregation result
+ * @param <ACC> the type of the aggregation accumulator. The accumulator is used to keep the
+ * aggregated values which are needed to compute an aggregation result.
+ * AggregateFunction represents its state using accumulator, thereby the state of the
+ * AggregateFunction must be put into the accumulator.
*/
@PublicEvolving
public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction {
/**
- * Creates and init the Accumulator for this {@link AggregateFunction}.
+ * Creates and initializes the accumulator for this {@link AggregateFunction}. The accumulator
+ * is used to keep the aggregated values which are needed to compute an aggregation result.
*
* @return the accumulator with the initial value
*/
@@ -85,29 +123,31 @@ public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction {
public abstract T getValue(ACC accumulator);
/**
- * Returns true if this AggregateFunction can only be applied in an OVER window.
+ * Returns <code>true</code> if this {@link AggregateFunction} can only be applied in an
+ * OVER window.
*
- * @return true if the AggregateFunction requires an OVER window, false otherwise.
+ * @return <code>true</code> if the {@link AggregateFunction} requires an OVER window,
+ * <code>false</code> otherwise.
*/
public boolean requiresOver() {
return false;
}
/**
- * Returns the TypeInformation of the AggregateFunction's result.
+ * Returns the {@link TypeInformation} of the {@link AggregateFunction}'s result.
*
- * @return The TypeInformation of the AggregateFunction's result or null if the result type
- * should be automatically inferred.
+ * @return The {@link TypeInformation} of the {@link AggregateFunction}'s result or
+ * <code>null</code> if the result type should be automatically inferred.
*/
public TypeInformation<T> getResultType() {
return null;
}
/**
- * Returns the TypeInformation of the AggregateFunction's accumulator.
+ * Returns the {@link TypeInformation} of the {@link AggregateFunction}'s accumulator.
*
- * @return The TypeInformation of the AggregateFunction's accumulator or null if the
- * accumulator type should be automatically inferred.
+ * @return The {@link TypeInformation} of the {@link AggregateFunction}'s accumulator or
+ * <code>null</code> if the accumulator type should be automatically inferred.
*/
public TypeInformation<ACC> getAccumulatorType() {
return null;
diff --git a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionContext.java b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionContext.java
index 93b2229..ad3fbd5 100644
--- a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionContext.java
+++ b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionContext.java
@@ -20,24 +20,28 @@ package org.apache.flink.table.functions;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
+import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.MetricGroup;
import java.io.File;
/**
- * A FunctionContext allows to obtain global runtime information about the context in which the
- * user-defined function is executed. The information include the metric group,
- * the distributed cache files, and the global job parameters.
+ * A {@link FunctionContext} allows to obtain global runtime information about the context in which the
+ * user-defined function is executed.
+ *
+ * <p>The information includes the metric group, distributed cache files, and global job parameters.
*/
@PublicEvolving
public class FunctionContext {
- /**
- * @param context the runtime context in which the Flink Function is executed
- */
private RuntimeContext context;
+ /**
+ * Wraps the underlying {@link RuntimeContext}.
+ *
+ * @param context the runtime context in which Flink's {@link Function} is executed.
+ */
public FunctionContext(RuntimeContext context) {
this.context = context;
}
@@ -70,7 +74,7 @@ public class FunctionContext {
* @return (default) value associated with the given key
*/
public String getJobParameter(String key, String defaultValue) {
- GlobalJobParameters conf = context.getExecutionConfig().getGlobalJobParameters();
+ final GlobalJobParameters conf = context.getExecutionConfig().getGlobalJobParameters();
if (conf != null && conf.toMap().containsKey(key)) {
return conf.toMap().get(key);
} else {
diff --git a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunction.java b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunction.java
index 091258e..9f7249c 100644
--- a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunction.java
+++ b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunction.java
@@ -24,17 +24,13 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.table.api.ValidationException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-
/**
* Base class for a user-defined scalar function. A user-defined scalar functions maps zero, one,
* or multiple scalar values to a new scalar value.
*
* <p>The behavior of a {@link ScalarFunction} can be defined by implementing a custom evaluation
- * method. An evaluation method must be declared publicly and named "eval". Evaluation methods
- * can also be overloaded by implementing multiple methods named "eval".
+ * method. An evaluation method must be declared publicly and named <code>eval</code>. Evaluation
+ * methods can also be overloaded by implementing multiple methods named <code>eval</code>.
*
* <p>User-defined functions must have a default constructor and must be instantiable during runtime.
*
@@ -46,7 +42,8 @@ import java.util.stream.Collectors;
* <p>Internally, the Table/SQL API code generation works with primitive values as much as possible.
* If a user-defined scalar function should not introduce much overhead during runtime, it is
* recommended to declare parameters and result types as primitive types instead of their boxed
- * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long.
+ * classes. <code>DATE/TIME</code> is equal to <code>int</code>, <code>TIMESTAMP</code> is equal
+ * to <code>long</code>.
*/
@PublicEvolving
public abstract class ScalarFunction extends UserDefinedFunction {
@@ -60,7 +57,8 @@ public abstract class ScalarFunction extends UserDefinedFunction {
* simple POJOs but might be wrong for more complex, custom, or composite types.
*
* @param signature signature of the method the return type needs to be determined
- * @return {@link TypeInformation} of result type or null if Flink should determine the type
+ * @return {@link TypeInformation} of result type or <code>null</code> if Flink should
+ * determine the type
*/
public TypeInformation<?> getResultType(Class<?>[] signature) {
return null;
@@ -70,26 +68,25 @@ public abstract class ScalarFunction extends UserDefinedFunction {
* Returns {@link TypeInformation} about the operands of the evaluation method with a given
* signature.
*
- * <p>In order to perform operand type inference in SQL (especially when NULL is used) it might be
- * necessary to determine the parameter {@link TypeInformation} of an evaluation method.
- * By default Flink's type extraction facilities are used for this but might be wrong for
- * more complex, custom, or composite types.
+ * <p>In order to perform operand type inference in SQL (especially when <code>NULL</code> is
+ * used) it might be necessary to determine the parameter {@link TypeInformation} of an
+ * evaluation method. By default Flink's type extraction facilities are used for this but might
+ * be wrong for more complex, custom, or composite types.
*
* @param signature signature of the method the operand types need to be determined
* @return {@link TypeInformation} of operand types
*/
public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) {
-
- List<TypeInformation<?>> typeList = Arrays.asList(signature).stream().map(c -> {
+ final TypeInformation<?>[] types = new TypeInformation<?>[signature.length];
+ for (int i = 0; i < signature.length; i++) {
try {
- return TypeExtractor.getForClass(c);
+ types[i] = TypeExtractor.getForClass(signature[i]);
} catch (InvalidTypesException e) {
throw new ValidationException(
- "Parameter types of table function " + this.getClass().getCanonicalName() + " cannot be " +
- "automatically determined. Please provide type information manually.");
+ "Parameter types of scalar function " + this.getClass().getCanonicalName() +
+ " cannot be automatically determined. Please provide type information manually.");
}
- }).collect(Collectors.toList());
-
- return typeList.toArray(new TypeInformation<?>[0]);
+ }
+ return types;
}
}
diff --git a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java
index ae6b03a..03cd96f 100644
--- a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java
+++ b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java
@@ -25,17 +25,13 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.util.Collector;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-
/**
* Base class for a user-defined table function (UDTF). A user-defined table functions works on
* zero, one, or multiple scalar values as input and returns multiple rows as output.
*
* <p>The behavior of a {@link TableFunction} can be defined by implementing a custom evaluation
- * method. An evaluation method must be declared publicly, not static and named "eval".
- * Evaluation methods can also be overloaded by implementing multiple methods named "eval".
+ * method. An evaluation method must be declared publicly, not static, and named <code>eval</code>.
+ * Evaluation methods can also be overloaded by implementing multiple methods named <code>eval</code>.
*
* <p>User-defined functions must have a default constructor and must be instantiable during runtime.
*
@@ -47,25 +43,49 @@ import java.util.stream.Collectors;
* <p>Internally, the Table/SQL API code generation works with primitive values as much as possible.
* If a user-defined table function should not introduce much overhead during runtime, it is
* recommended to declare parameters and result types as primitive types instead of their boxed
- * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long.
+ * classes. <code>DATE/TIME</code> is equal to <code>int</code>, <code>TIMESTAMP</code> is equal
+ * to <code>long</code>.
+ *
+ * <p>For Example:
+ *
+ * <pre>
+ * {@code
+ * public class Split extends TableFunction<String> {
+ *
+ * // implement an "eval" method with as many parameters as you want
+ * public void eval(String str) {
+ * for (String s : str.split(" ")) {
+ * collect(s); // use collect(...) to emit an output row
+ * }
+ * }
+ *
+ * // you can overload the eval method here ...
+ * }
+ *
+ * TableEnvironment tEnv = ...
+ * Table table = ... // schema: ROW(a VARCHAR)
*
+ * // for Scala users
+ * val split = new Split()
+ * table.join(split('c) as ('s)).select('a, 's)
*
- * @param T The type of the output row
+ * // for Java users
+ * tEnv.registerFunction("split", new Split()); // register table function first
+ * table.join(new Table(tEnv, "split(a) as (s)")).select("a, s");
+ *
+ * // for SQL users
+ * tEnv.registerFunction("split", new Split()); // register table function first
+ * tEnv.sqlQuery("SELECT a, s FROM MyTable, LATERAL TABLE(split(a)) as T(s)");
+ * }
+ * </pre>
+ *
+ * @param <T> The type of the output row
*/
@PublicEvolving
public abstract class TableFunction<T> extends UserDefinedFunction {
/**
- * Emit an output row.
- *
- * @param row the output row
- */
- protected void collect(T row) {
- collector.collect(row);
- }
-
- /**
- * The code generated collector used to emit row.
+ * The code generated collector used to emit rows.
*/
protected Collector<T> collector;
@@ -84,7 +104,7 @@ public abstract class TableFunction<T> extends UserDefinedFunction {
* method. Flink's type extraction facilities can handle basic types or
* simple POJOs but might be wrong for more complex, custom, or composite types.
*
- * @return {@link TypeInformation} of result type or null if Flink should determine the type
+ * @return {@link TypeInformation} of result type or <code>null</code> if Flink should determine the type
*/
public TypeInformation<T> getResultType() {
return null;
@@ -103,18 +123,25 @@ public abstract class TableFunction<T> extends UserDefinedFunction {
* @return {@link TypeInformation} of operand types
*/
public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) {
-
- List<TypeInformation<?>> typeList = Arrays.asList(signature).stream().map(c -> {
+ final TypeInformation<?>[] types = new TypeInformation<?>[signature.length];
+ for (int i = 0; i < signature.length; i++) {
try {
- return TypeExtractor.getForClass(c);
+ types[i] = TypeExtractor.getForClass(signature[i]);
} catch (InvalidTypesException e) {
throw new ValidationException(
- "Parameter types of table function " + this.getClass().getCanonicalName() + " cannot be " +
- "automatically determined. Please provide type information manually.");
+ "Parameter types of table function " + this.getClass().getCanonicalName() +
+ " cannot be automatically determined. Please provide type information manually.");
}
- }).collect(Collectors.toList());
-
- return typeList.toArray(new TypeInformation<?>[0]);
+ }
+ return types;
}
+ /**
+ * Emits an output row.
+ *
+ * @param row the output row
+ */
+ protected final void collect(T row) {
+ collector.collect(row);
+ }
}
diff --git a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java
index 6c91c10..0a4e600 100644
--- a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java
+++ b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java
@@ -29,12 +29,21 @@ import java.io.Serializable;
*/
@PublicEvolving
public abstract class UserDefinedFunction implements Serializable {
+
+ /**
+ * Returns a unique, serialized representation for this function.
+ */
+ public final String functionIdentifier() {
+ final String md5 = EncodingUtils.hex(EncodingUtils.md5(EncodingUtils.encodeObjectToString(this)));
+ return getClass().getCanonicalName().replace('.', '$').concat("$").concat(md5);
+ }
+
/**
* Setup method for user-defined function. It can be used for initialization work.
* By default, this method does nothing.
*/
public void open(FunctionContext context) throws Exception {
-
+ // do nothing
}
/**
@@ -42,24 +51,21 @@ public abstract class UserDefinedFunction implements Serializable {
* By default, this method does nothing.
*/
public void close() throws Exception {
-
+ // do nothing
}
/**
- * @return true if and only if a call to this function is guaranteed to always return
- * the same result given the same parameters; true is assumed by default
- * if user's function is not pure functional, like random(), date(), now()...
- * isDeterministic must return false
+ * Returns information about the determinism of the function's results.
+ *
+ * @return <code>true</code> if and only if a call to this function is guaranteed to
+ * always return the same result given the same parameters. <code>true</code> is
+ * assumed by default. If the function is not pure functional like
+ * <code>random(), date(), now(), ...</code> this method must return <code>false</code>.
*/
public boolean isDeterministic() {
return true;
}
- public String functionIdentifier() {
- String md5 = EncodingUtils.hex(EncodingUtils.md5(EncodingUtils.encodeObjectToString(this)));
- return getClass().getCanonicalName().replace('.', '$').concat("$").concat(md5);
- }
-
/**
* Returns the name of the UDF that is used for plan explain and logging.
*/
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
index b70e582..29de7e0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
@@ -29,7 +29,6 @@ import org.junit.Assert
import scala.annotation.varargs
import scala.collection.mutable
-import scala.collection.JavaConversions._
import scala.io.Source
case class SimplePojo(name: String, age: Int)