You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/01/22 20:07:04 UTC
[flink] branch master updated: [FLINK-21050][examples-table] Add
some advanced function examples
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c696f03 [FLINK-21050][examples-table] Add some advanced function examples
c696f03 is described below
commit c696f0361c8196e4015922ae5228aeb93ec74e4d
Author: Timo Walther <tw...@apache.org>
AuthorDate: Thu Jan 21 10:52:22 2021 +0100
[FLINK-21050][examples-table] Add some advanced function examples
This closes #14714.
---
docs/dev/table/functions/udfs.md | 3 +
docs/dev/table/functions/udfs.zh.md | 3 +
flink-examples/flink-examples-table/pom.xml | 24 ++++
.../java/functions/AdvancedFunctionsExample.java | 125 +++++++++++++++++++
.../java/functions/InternalRowMergerFunction.java | 134 +++++++++++++++++++++
.../java/functions/LastDatedValueFunction.java | 132 ++++++++++++++++++++
.../functions/AdvancedFunctionsExampleITCase.java | 81 +++++++++++++
7 files changed, 502 insertions(+)
diff --git a/docs/dev/table/functions/udfs.md b/docs/dev/table/functions/udfs.md
index 7455233..cbbb87d 100644
--- a/docs/dev/table/functions/udfs.md
+++ b/docs/dev/table/functions/udfs.md
@@ -551,6 +551,9 @@ public static class LiteralFunction extends ScalarFunction {
</div>
+For more examples of custom type inference, see also the `flink-examples-table` module with
+{% gh_link flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/functions/AdvancedFunctionsExample.java "advanced function implementation" %}.
+
### Determinism
Every user-defined function class can declare whether it produces deterministic results or not by overriding
diff --git a/docs/dev/table/functions/udfs.zh.md b/docs/dev/table/functions/udfs.zh.md
index c470468..cb1a416 100644
--- a/docs/dev/table/functions/udfs.zh.md
+++ b/docs/dev/table/functions/udfs.zh.md
@@ -537,6 +537,9 @@ public static class LiteralFunction extends ScalarFunction {
</div>
+For more examples of custom type inference, see also the `flink-examples-table` module with
+{% gh_link flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/functions/AdvancedFunctionsExample.java "advanced function implementation" %}.
+
### 运行时集成
-------------------
diff --git a/flink-examples/flink-examples-table/pom.xml b/flink-examples/flink-examples-table/pom.xml
index 52b2ab3..b9e6586 100644
--- a/flink-examples/flink-examples-table/pom.xml
+++ b/flink-examples/flink-examples-table/pom.xml
@@ -283,6 +283,30 @@ under the License.
</execution>
<execution>
+ <id>AdvancedFunctionsExample</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+
+ <configuration>
+ <classifier>AdvancedFunctionsExample</classifier>
+
+ <archive>
+ <manifestEntries>
+ <program-class>org.apache.flink.table.examples.java.functions.AdvancedFunctionsExample</program-class>
+ </manifestEntries>
+ </archive>
+
+ <includes>
+ <include>org/apache/flink/table/examples/java/functions/*</include>
+ <include>META-INF/LICENSE</include>
+ <include>META-INF/NOTICE</include>
+ </includes>
+ </configuration>
+ </execution>
+
+ <execution>
<id>StreamTableExample</id>
<phase>package</phase>
<goals>
diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/functions/AdvancedFunctionsExample.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/functions/AdvancedFunctionsExample.java
new file mode 100644
index 0000000..4a734a5
--- /dev/null
+++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/functions/AdvancedFunctionsExample.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.examples.java.functions;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.FunctionHint;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.types.Row;
+
+import java.time.LocalDate;
+
+/**
+ * Example for implementing more complex {@link UserDefinedFunction}s.
+ *
+ * <p>In many use cases, function signatures can be reflectively extracted from a UDF class. The
+ * annotations {@link DataTypeHint} and {@link FunctionHint} help if reflective information is not
+ * enough and needs to be enriched with further logical details. Check the website documentation as
+ * well as the docs of {@link ScalarFunction}, {@link TableFunction}, and {@link AggregateFunction}
+ * for more information.
+ *
+ * <p>Both reflective extraction and annotations are suitable for function signatures with fixed
+ * input and output types. However, for advanced use cases it might be required to derive an output
+ * type from one of the argument types or perform stricter validation.
+ *
+ * <p>This example demonstrates various UDF implementations. We are executing multiple Flink jobs
+ * where the result is written to stdout.
+ */
+public class AdvancedFunctionsExample {
+
+ public static void main(String[] args) throws Exception {
+ // setup the environment
+ final EnvironmentSettings settings =
+ EnvironmentSettings.newInstance().inBatchMode().build();
+ final TableEnvironment env = TableEnvironment.create(settings);
+
+ // execute different kinds of functions
+ executeLastDatedValueFunction(env);
+ executeInternalRowMergerFunction(env);
+ }
+
+ /**
+ * Aggregates data by name and returns the latest non-null {@code item_count} value with its
+ * corresponding {@code order_date}.
+ */
+ private static void executeLastDatedValueFunction(TableEnvironment env) {
+ // create a table with example data
+ final Table customers =
+ env.fromValues(
+ DataTypes.of("ROW<name STRING, order_date DATE, item_count INT>"),
+ Row.of("Guillermo Smith", LocalDate.parse("2020-12-01"), 3),
+ Row.of("Guillermo Smith", LocalDate.parse("2020-12-05"), 5),
+ Row.of("Valeria Mendoza", LocalDate.parse("2020-03-23"), 4),
+ Row.of("Valeria Mendoza", LocalDate.parse("2020-06-02"), 10),
+ Row.of("Leann Holloway", LocalDate.parse("2020-05-26"), 9),
+ Row.of("Leann Holloway", LocalDate.parse("2020-05-27"), null),
+ Row.of("Brandy Sanders", LocalDate.parse("2020-10-14"), 1),
+ Row.of("John Turner", LocalDate.parse("2020-10-02"), 12),
+ Row.of("Ellen Ortega", LocalDate.parse("2020-06-18"), 100));
+ env.createTemporaryView("customers", customers);
+
+ // register and execute the function
+ env.createTemporarySystemFunction("LastDatedValueFunction", LastDatedValueFunction.class);
+ env.executeSql(
+ "SELECT name, LastDatedValueFunction(item_count, order_date) "
+ + "FROM customers GROUP BY name")
+ .print();
+
+ // clean up
+ env.dropTemporaryView("customers");
+ }
+
+ /** Merges two rows as efficient as possible using internal data structures. */
+ private static void executeInternalRowMergerFunction(TableEnvironment env) {
+ // create a table with example data
+ final Table customers =
+ env.fromValues(
+ DataTypes.of(
+ "ROW<name STRING, data1 ROW<birth_date DATE>, data2 ROW<city STRING, phone STRING>>"),
+ Row.of(
+ "Guillermo Smith",
+ Row.of(LocalDate.parse("1992-12-12")),
+ Row.of("New Jersey", "816-443-8010")),
+ Row.of(
+ "Valeria Mendoza",
+ Row.of(LocalDate.parse("1970-03-28")),
+ Row.of("Los Angeles", "928-264-9662")),
+ Row.of(
+ "Leann Holloway",
+ Row.of(LocalDate.parse("1989-05-21")),
+ Row.of("Eugene", "614-889-6038")));
+ env.createTemporaryView("customers", customers);
+
+ // register and execute the function
+ env.createTemporarySystemFunction(
+ "InternalRowMergerFunction", InternalRowMergerFunction.class);
+ env.executeSql("SELECT name, InternalRowMergerFunction(data1, data2) FROM customers")
+ .print();
+
+ // clean up
+ env.dropTemporaryView("customers");
+ }
+}
diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/functions/InternalRowMergerFunction.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/functions/InternalRowMergerFunction.java
new file mode 100644
index 0000000..f0cfe4f
--- /dev/null
+++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/functions/InternalRowMergerFunction.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.examples.java.functions;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.inference.Signature.Argument;
+import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.IntStream;
+
+/**
+ * Merges two rows into a single row with unique field names.
+ *
+ * <p>The function uses a custom {@link TypeInference} and thus disables any of the default
+ * reflection-based logic. It returns internal data structures for both input and output types of
+ * the {@code eval} method.
+ *
+ * <p>For code readability, we might use some internal utility methods that should rarely change.
+ * Implementers can copy those if they don't want to rely on non-official API.
+ */
+public final class InternalRowMergerFunction extends ScalarFunction {
+
+ // --------------------------------------------------------------------------------------------
+ // Planning
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public TypeInference getTypeInference(DataTypeFactory typeFactory) {
+ return TypeInference.newBuilder()
+ // accept a signature (ROW, ROW) with arbitrary field types but
+ // with internal conversion classes
+ .inputTypeStrategy(
+ new InputTypeStrategy() {
+ @Override
+ public ArgumentCount getArgumentCount() {
+ // the argument count is checked before input types are inferred
+ return ConstantArgumentCount.of(2);
+ }
+
+ @Override
+ public Optional<List<DataType>> inferInputTypes(
+ CallContext callContext, boolean throwOnFailure) {
+ final List<DataType> args = callContext.getArgumentDataTypes();
+ final DataType arg0 = args.get(0);
+ final DataType arg1 = args.get(1);
+ // perform some basic validation based on the logical type
+ if (arg0.getLogicalType().getTypeRoot() != LogicalTypeRoot.ROW
+ || arg1.getLogicalType().getTypeRoot()
+ != LogicalTypeRoot.ROW) {
+ if (throwOnFailure) {
+ throw callContext.newValidationError(
+ "Two row arguments expected.");
+ }
+ return Optional.empty();
+ }
+ // keep the original logical type but express that both arguments
+ // should use internal data structures
+ return Optional.of(
+ Arrays.asList(
+ arg0.bridgedTo(RowData.class),
+ arg1.bridgedTo(RowData.class)));
+ }
+
+ @Override
+ public List<Signature> getExpectedSignatures(
+ FunctionDefinition definition) {
+ // this helps in printing nice error messages
+ return Collections.singletonList(
+ Signature.of(Argument.of("ROW"), Argument.of("ROW")));
+ }
+ })
+ .outputTypeStrategy(
+ callContext -> {
+ // merge fields and give them a unique name
+ final List<DataType> args = callContext.getArgumentDataTypes();
+ final List<DataType> allFieldDataTypes = new ArrayList<>();
+ allFieldDataTypes.addAll(args.get(0).getChildren());
+ allFieldDataTypes.addAll(args.get(1).getChildren());
+ final DataTypes.Field[] fields =
+ IntStream.range(0, allFieldDataTypes.size())
+ .mapToObj(
+ i ->
+ DataTypes.FIELD(
+ "f" + i,
+ allFieldDataTypes.get(i)))
+ .toArray(DataTypes.Field[]::new);
+ // create a new row with the merged fields and express that the return
+ // type will use an internal data structure
+ return Optional.of(DataTypes.ROW(fields).bridgedTo(RowData.class));
+ })
+ .build();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Runtime
+ // --------------------------------------------------------------------------------------------
+
+ public RowData eval(RowData r1, RowData r2) {
+ return new JoinedRowData(r1, r2);
+ }
+}
diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/functions/LastDatedValueFunction.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/functions/LastDatedValueFunction.java
new file mode 100644
index 0000000..641fecb
--- /dev/null
+++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/functions/LastDatedValueFunction.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.examples.java.functions;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.InputTypeStrategies;
+import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.types.Row;
+
+import java.time.LocalDate;
+import java.util.Optional;
+
+/**
+ * Implementation of an {@link AggregateFunction} that returns a row containing the latest non-null
+ * value with its corresponding date.
+ *
+ * <p>The function uses a custom {@link TypeInference} and thus disables any of the default
+ * reflection-based logic. It has a generic parameter {@code T} which will result in {@link Object}
+ * (due to type erasure) during runtime. The {@link TypeInference} will provide the necessary
+ * information how to call {@code accumulate(...)} for the given call in the query.
+ *
+ * <p>For code readability, we might use some internal utility methods that should rarely change.
+ * Implementers can copy those if they don't want to rely on non-official API.
+ *
+ * @param <T> input value
+ */
+public final class LastDatedValueFunction<T>
+ extends AggregateFunction<Row, LastDatedValueFunction.Accumulator<T>> {
+
+ // --------------------------------------------------------------------------------------------
+ // Planning
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Declares the {@link TypeInference} of this function. It specifies:
+ *
+ * <ul>
+ * <li>which argument types are supported when calling this function,
+ * <li>which {@link DataType#getConversionClass()} should be used when calling the JVM method
+ * {@link #accumulate(Accumulator, Object, LocalDate)} during runtime,
+ * <li>a similar strategy how to derive an accumulator type,
+ * <li>and a similar strategy how to derive the output type.
+ * </ul>
+ */
+ @Override
+ public TypeInference getTypeInference(DataTypeFactory typeFactory) {
+ return TypeInference.newBuilder()
+ // accept a signature (ANY, DATE) both with default conversion classes,
+ // the input type strategy is mostly used to produce nicer validation exceptions
+ // during planning, implementers can decide to skip it if they are fine with failing
+ // at a later stage during code generation when the runtime method is checked
+ .inputTypeStrategy(
+ InputTypeStrategies.sequence(
+ InputTypeStrategies.ANY,
+ InputTypeStrategies.explicit(DataTypes.DATE())))
+ // let the accumulator data type depend on the first input argument
+ .accumulatorTypeStrategy(
+ callContext -> {
+ final DataType argDataType = callContext.getArgumentDataTypes().get(0);
+ final DataType accDataType =
+ DataTypes.STRUCTURED(
+ Accumulator.class,
+ DataTypes.FIELD("value", argDataType),
+ DataTypes.FIELD("date", DataTypes.DATE()));
+ return Optional.of(accDataType);
+ })
+ // let the output data type depend on the first input argument
+ .outputTypeStrategy(
+ callContext -> {
+ final DataType argDataType = callContext.getArgumentDataTypes().get(0);
+ final DataType outputDataType =
+ DataTypes.ROW(
+ DataTypes.FIELD("value", argDataType),
+ DataTypes.FIELD("date", DataTypes.DATE()));
+ return Optional.of(outputDataType);
+ })
+ .build();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Runtime
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Generic accumulator for representing state. It will contain different kind of instances for
+ * {@code value} depending on actual call in the query.
+ */
+ public static class Accumulator<T> {
+ public T value;
+ public LocalDate date;
+ }
+
+ @Override
+ public Accumulator<T> createAccumulator() {
+ return new Accumulator<>();
+ }
+
+ /**
+ * Generic runtime function that will be called with different kind of instances for {@code
+ * input} depending on actual call in the query.
+ */
+ public void accumulate(Accumulator<T> acc, T input, LocalDate date) {
+ if (input != null && (acc.date == null || date.isAfter(acc.date))) {
+ acc.value = input;
+ acc.date = date;
+ }
+ }
+
+ @Override
+ public Row getValue(Accumulator<T> acc) {
+ return Row.of(acc.value, acc.date);
+ }
+}
diff --git a/flink-examples/flink-examples-table/src/test/java/org/apache/flink/table/examples/java/functions/AdvancedFunctionsExampleITCase.java b/flink-examples/flink-examples-table/src/test/java/org/apache/flink/table/examples/java/functions/AdvancedFunctionsExampleITCase.java
new file mode 100644
index 0000000..35fe1ed
--- /dev/null
+++ b/flink-examples/flink-examples-table/src/test/java/org/apache/flink/table/examples/java/functions/AdvancedFunctionsExampleITCase.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.examples.java.functions;
+
+import org.apache.flink.table.examples.utils.ExampleOutputTestBase;
+
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertThat;
+
+/** Test for Java {@link AdvancedFunctionsExample}. */
+public class AdvancedFunctionsExampleITCase extends ExampleOutputTestBase {
+
+ @Test
+ public void testExample() throws Exception {
+ AdvancedFunctionsExample.main(new String[0]);
+ final String consoleOutput = getOutputString();
+
+ testExecuteLastDatedValueFunction(consoleOutput);
+ testExecuteInternalRowMergerFunction(consoleOutput);
+ }
+
+ private void testExecuteLastDatedValueFunction(String consoleOutput) {
+ assertThat(
+ consoleOutput,
+ containsString(
+ "| Guillermo Smith | +I[5, 2020-12-05] |"));
+ assertThat(
+ consoleOutput,
+ containsString(
+ "| John Turner | +I[12, 2020-10-02] |"));
+ assertThat(
+ consoleOutput,
+ containsString(
+ "| Brandy Sanders | +I[1, 2020-10-14] |"));
+ assertThat(
+ consoleOutput,
+ containsString(
+ "| Valeria Mendoza | +I[10, 2020-06-02] |"));
+ assertThat(
+ consoleOutput,
+ containsString(
+ "| Ellen Ortega | +I[100, 2020-06-18] |"));
+ assertThat(
+ consoleOutput,
+ containsString(
+ "| Leann Holloway | +I[9, 2020-05-26] |"));
+ }
+
+ private void testExecuteInternalRowMergerFunction(String consoleOutput) {
+ assertThat(
+ consoleOutput,
+ containsString(
+ "| Guillermo Smith | +I[1992-12-12, New Jersey, ... |"));
+ assertThat(
+ consoleOutput,
+ containsString(
+ "| Valeria Mendoza | +I[1970-03-28, Los Angeles,... |"));
+ assertThat(
+ consoleOutput,
+ containsString(
+ "| Leann Holloway | +I[1989-05-21, Eugene, 614-... |"));
+ }
+}