You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/10/30 12:12:01 UTC
[flink] branch master updated:
[FLINK-13869][table-planner-blink][hive] Fix Hive functions can not work in
blink planner streaming mode (#10013)
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1da3055 [FLINK-13869][table-planner-blink][hive] Fix Hive functions can not work in blink planner streaming mode (#10013)
1da3055 is described below
commit 1da3055338e5f23a3ec4a54152dbf6b4e35e3862
Author: Jingsong Lee <lz...@aliyun.com>
AuthorDate: Wed Oct 30 20:11:52 2019 +0800
[FLINK-13869][table-planner-blink][hive] Fix Hive functions can not work in blink planner streaming mode (#10013)
---
.../catalog/hive/HiveCatalogUseBlinkITCase.java | 163 +++++++++++++--------
.../planner/functions/utils/AggSqlFunction.scala | 6 +-
.../functions/utils/ScalarSqlFunction.scala | 4 +
.../planner/functions/utils/TableSqlFunction.scala | 3 +-
.../table/planner/plan/utils/AggregateUtil.scala | 2 +-
5 files changed, 116 insertions(+), 62 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java
index d610125..343c8c9 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java
@@ -18,12 +18,17 @@
package org.apache.flink.table.catalog.hive;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connectors.hive.FlinkStandaloneHiveRunner;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogFunctionImpl;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableBuilder;
@@ -35,7 +40,10 @@ import org.apache.flink.table.functions.hive.util.TestHiveGenericUDF;
import org.apache.flink.table.functions.hive.util.TestHiveSimpleUDF;
import org.apache.flink.table.functions.hive.util.TestHiveUDTF;
import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.table.planner.runtime.utils.TestingRetractSink;
+import org.apache.flink.table.util.JavaScalaConversionUtil;
import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
import org.apache.flink.util.FileUtils;
import com.klarna.hiverunner.HiveShell;
@@ -54,6 +62,7 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -96,14 +105,6 @@ public class HiveCatalogUseBlinkITCase extends AbstractTestBase {
@Test
public void testBlinkUdf() throws Exception {
- TableEnvironment tEnv = TableEnvironment.create(
- EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());
-
- BatchTestBase.configForMiniCluster(tEnv.getConfig());
-
- tEnv.registerCatalog("myhive", hiveCatalog);
- tEnv.useCatalog("myhive");
-
TableSchema schema = TableSchema.builder()
.field("name", DataTypes.STRING())
.field("age", DataTypes.INT())
@@ -122,41 +123,12 @@ public class HiveCatalogUseBlinkITCase extends AbstractTestBase {
.withComment("Comment.")
.build();
- Path p = Paths.get(tempFolder.newFolder().getAbsolutePath(), "test.csv");
-
- TableSchema sinkSchema = TableSchema.builder()
- .field("name1", Types.STRING())
- .field("name2", Types.STRING())
- .field("sum1", Types.INT())
- .field("sum2", Types.LONG())
- .build();
-
- FormatDescriptor sinkFormat = new OldCsv()
- .field("name1", Types.STRING())
- .field("name2", Types.STRING())
- .field("sum1", Types.INT())
- .field("sum2", Types.LONG());
- CatalogTable sink =
- new CatalogTableBuilder(
- new FileSystem().path(p.toAbsolutePath().toString()),
- sinkSchema)
- .withFormat(sinkFormat)
- .inAppendMode()
- .withComment("Comment.")
- .build();
-
hiveCatalog.createTable(
new ObjectPath(HiveCatalog.DEFAULT_DB, sourceTableName),
source,
false
);
- hiveCatalog.createTable(
- new ObjectPath(HiveCatalog.DEFAULT_DB, sinkTableName),
- sink,
- false
- );
-
hiveCatalog.createFunction(
new ObjectPath(HiveCatalog.DEFAULT_DB, "myudf"),
new CatalogFunctionImpl(TestHiveSimpleUDF.class.getCanonicalName(), new HashMap<>()),
@@ -174,34 +146,107 @@ public class HiveCatalogUseBlinkITCase extends AbstractTestBase {
new CatalogFunctionImpl(GenericUDAFSum.class.getCanonicalName(), new HashMap<>()),
false);
+ testUdf(true);
+ testUdf(false);
+ }
+
+ private void testUdf(boolean batch) throws Exception {
+ TableEnvironment tEnv;
+ EnvironmentSettings.Builder envBuilder = EnvironmentSettings.newInstance().useBlinkPlanner();
+ if (batch) {
+ envBuilder.inBatchMode();
+ } else {
+ envBuilder.inStreamingMode();
+ }
+ if (batch) {
+ tEnv = TableEnvironment.create(envBuilder.build());
+ } else {
+ tEnv = StreamTableEnvironment.create(
+ StreamExecutionEnvironment.getExecutionEnvironment(), envBuilder.build());
+ }
+
+ BatchTestBase.configForMiniCluster(tEnv.getConfig());
+
+ tEnv.registerCatalog("myhive", hiveCatalog);
+ tEnv.useCatalog("myhive");
+
String innerSql = format("select mygenericudf(myudf(name), 1) as a, mygenericudf(myudf(age), 1) as b," +
" s from %s, lateral table(myudtf(name, 1)) as T(s)", sourceTableName);
- tEnv.sqlUpdate(
- format("insert into %s select a, s, sum(b), myudaf(b) from (%s) group by a, s",
- sinkTableName,
- innerSql));
- tEnv.execute("myjob");
-
- // assert written result
- StringBuilder builder = new StringBuilder();
- try (Stream<Path> paths = Files.walk(Paths.get(p.toAbsolutePath().toString()))) {
- paths.filter(Files::isRegularFile).forEach(path -> {
- try {
- String content = FileUtils.readFileUtf8(path.toFile());
- if (content.isEmpty()) {
- return;
+ String selectSql = format("select a, s, sum(b), myudaf(b) from (%s) group by a, s", innerSql);
+
+ List<String> results;
+ if (batch) {
+ Path p = Paths.get(tempFolder.newFolder().getAbsolutePath(), "test.csv");
+
+ TableSchema sinkSchema = TableSchema.builder()
+ .field("name1", Types.STRING())
+ .field("name2", Types.STRING())
+ .field("sum1", Types.INT())
+ .field("sum2", Types.LONG())
+ .build();
+
+ FormatDescriptor sinkFormat = new OldCsv()
+ .field("name1", Types.STRING())
+ .field("name2", Types.STRING())
+ .field("sum1", Types.INT())
+ .field("sum2", Types.LONG());
+ CatalogTable sink =
+ new CatalogTableBuilder(
+ new FileSystem().path(p.toAbsolutePath().toString()),
+ sinkSchema)
+ .withFormat(sinkFormat)
+ .inAppendMode()
+ .withComment("Comment.")
+ .build();
+
+ hiveCatalog.createTable(
+ new ObjectPath(HiveCatalog.DEFAULT_DB, sinkTableName),
+ sink,
+ false
+ );
+
+ tEnv.sqlUpdate(format("insert into %s " + selectSql, sinkTableName));
+ tEnv.execute("myjob");
+
+ // assert written result
+ StringBuilder builder = new StringBuilder();
+ try (Stream<Path> paths = Files.walk(Paths.get(p.toAbsolutePath().toString()))) {
+ paths.filter(Files::isRegularFile).forEach(path -> {
+ try {
+ String content = FileUtils.readFileUtf8(path.toFile());
+ if (content.isEmpty()) {
+ return;
+ }
+ builder.append(content);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
- builder.append(content);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- });
+ });
+ }
+ results = Arrays.stream(builder.toString().split("\n"))
+ .filter(s -> !s.isEmpty())
+ .collect(Collectors.toList());
+ } else {
+ StreamTableEnvironment streamTEnv = (StreamTableEnvironment) tEnv;
+ TestingRetractSink sink = new TestingRetractSink();
+ streamTEnv.toRetractStream(tEnv.sqlQuery(selectSql), Row.class)
+ .map(new JavaToScala())
+ .addSink((SinkFunction) sink);
+ streamTEnv.execute("");
+ results = JavaScalaConversionUtil.toJava(sink.getRetractResults());
}
- List<String> results = Arrays.stream(builder.toString().split("\n"))
- .filter(s -> !s.isEmpty())
- .collect(Collectors.toList());
+
+ results = new ArrayList<>(results);
results.sort(String::compareTo);
Assert.assertEquals(Arrays.asList("1,1,2,2", "2,2,4,4", "3,3,6,6"), results);
}
+
+ private static class JavaToScala implements MapFunction<Tuple2<Boolean, Row>, scala.Tuple2<Boolean, Row>> {
+
+ @Override
+ public scala.Tuple2<Boolean, Row> map(Tuple2<Boolean, Row> value) throws Exception {
+ return new scala.Tuple2<>(value.f0, value.f1);
+ }
+ }
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/AggSqlFunction.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/AggSqlFunction.scala
index 7effbba..5213929 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/AggSqlFunction.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/AggSqlFunction.scala
@@ -51,7 +51,7 @@ import java.util
class AggSqlFunction(
name: String,
displayName: String,
- aggregateFunction: UserDefinedAggregateFunction[_, _],
+ val aggregateFunction: UserDefinedAggregateFunction[_, _],
val externalResultType: DataType,
val externalAccType: DataType,
typeFactory: FlinkTypeFactory,
@@ -72,6 +72,10 @@ class AggSqlFunction(
typeFactory
) {
+ /**
+ * This is temporary solution for hive udf and should be removed once FLIP-65 is finished,
+ * please pass the non-null input arguments.
+ */
def makeFunction(
constants: Array[AnyRef],
argTypes: Array[LogicalType]): UserDefinedAggregateFunction[_, _] = aggregateFunction
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/ScalarSqlFunction.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/ScalarSqlFunction.scala
index 8e5609c..f036cbe 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/ScalarSqlFunction.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/ScalarSqlFunction.scala
@@ -58,6 +58,10 @@ class ScalarSqlFunction(
null,
SqlFunctionCategory.USER_DEFINED_FUNCTION) {
+ /**
+ * This is temporary solution for hive udf and should be removed once FLIP-65 is finished,
+ * please pass the non-null input arguments.
+ */
def makeFunction(constants: Array[AnyRef], argTypes: Array[LogicalType]): ScalarFunction =
scalarFunction
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/TableSqlFunction.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/TableSqlFunction.scala
index c3f5ac3..4cb1d53 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/TableSqlFunction.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/TableSqlFunction.scala
@@ -67,7 +67,8 @@ class TableSqlFunction(
functionImpl) {
/**
- * Get the user-defined table function.
+ * This is temporary solution for hive udf and should be removed once FLIP-65 is finished,
+ * please pass the non-null input arguments.
*/
def makeFunction(constants: Array[AnyRef], argTypes: Array[LogicalType]): TableFunction[_] =
udtf
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
index 92a5d3d..973a6da 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
@@ -749,7 +749,7 @@ object AggregateUtil extends Enumeration {
private[flink] def isTableAggregate(aggCalls: util.List[AggregateCall]): Boolean = {
aggCalls
.filter(e => e.getAggregation.isInstanceOf[AggSqlFunction])
- .map(e => e.getAggregation.asInstanceOf[AggSqlFunction].makeFunction(null, null))
+ .map(e => e.getAggregation.asInstanceOf[AggSqlFunction].aggregateFunction)
.exists(_.isInstanceOf[TableAggregateFunction[_, _]])
}
}