You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/10/30 12:12:01 UTC

[flink] branch master updated: [FLINK-13869][table-planner-blink][hive] Fix Hive functions can not work in blink planner streaming mode (#10013)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1da3055  [FLINK-13869][table-planner-blink][hive] Fix Hive functions can not work in blink planner streaming mode (#10013)
1da3055 is described below

commit 1da3055338e5f23a3ec4a54152dbf6b4e35e3862
Author: Jingsong Lee <lz...@aliyun.com>
AuthorDate: Wed Oct 30 20:11:52 2019 +0800

    [FLINK-13869][table-planner-blink][hive] Fix Hive functions can not work in blink planner streaming mode (#10013)
---
 .../catalog/hive/HiveCatalogUseBlinkITCase.java    | 163 +++++++++++++--------
 .../planner/functions/utils/AggSqlFunction.scala   |   6 +-
 .../functions/utils/ScalarSqlFunction.scala        |   4 +
 .../planner/functions/utils/TableSqlFunction.scala |   3 +-
 .../table/planner/plan/utils/AggregateUtil.scala   |   2 +-
 5 files changed, 116 insertions(+), 62 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java
index d610125..343c8c9 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java
@@ -18,12 +18,17 @@
 
 package org.apache.flink.table.catalog.hive;
 
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.connectors.hive.FlinkStandaloneHiveRunner;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.Types;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
 import org.apache.flink.table.catalog.CatalogFunctionImpl;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableBuilder;
@@ -35,7 +40,10 @@ import org.apache.flink.table.functions.hive.util.TestHiveGenericUDF;
 import org.apache.flink.table.functions.hive.util.TestHiveSimpleUDF;
 import org.apache.flink.table.functions.hive.util.TestHiveUDTF;
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.table.planner.runtime.utils.TestingRetractSink;
+import org.apache.flink.table.util.JavaScalaConversionUtil;
 import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
 import org.apache.flink.util.FileUtils;
 
 import com.klarna.hiverunner.HiveShell;
@@ -54,6 +62,7 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -96,14 +105,6 @@ public class HiveCatalogUseBlinkITCase extends AbstractTestBase {
 
 	@Test
 	public void testBlinkUdf() throws Exception {
-		TableEnvironment tEnv = TableEnvironment.create(
-				EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());
-
-		BatchTestBase.configForMiniCluster(tEnv.getConfig());
-
-		tEnv.registerCatalog("myhive", hiveCatalog);
-		tEnv.useCatalog("myhive");
-
 		TableSchema schema = TableSchema.builder()
 				.field("name", DataTypes.STRING())
 				.field("age", DataTypes.INT())
@@ -122,41 +123,12 @@ public class HiveCatalogUseBlinkITCase extends AbstractTestBase {
 						.withComment("Comment.")
 						.build();
 
-		Path p = Paths.get(tempFolder.newFolder().getAbsolutePath(), "test.csv");
-
-		TableSchema sinkSchema = TableSchema.builder()
-				.field("name1", Types.STRING())
-				.field("name2", Types.STRING())
-				.field("sum1", Types.INT())
-				.field("sum2", Types.LONG())
-				.build();
-
-		FormatDescriptor sinkFormat = new OldCsv()
-				.field("name1", Types.STRING())
-				.field("name2", Types.STRING())
-				.field("sum1", Types.INT())
-				.field("sum2", Types.LONG());
-		CatalogTable sink =
-				new CatalogTableBuilder(
-						new FileSystem().path(p.toAbsolutePath().toString()),
-						sinkSchema)
-						.withFormat(sinkFormat)
-						.inAppendMode()
-						.withComment("Comment.")
-						.build();
-
 		hiveCatalog.createTable(
 				new ObjectPath(HiveCatalog.DEFAULT_DB, sourceTableName),
 				source,
 				false
 		);
 
-		hiveCatalog.createTable(
-				new ObjectPath(HiveCatalog.DEFAULT_DB, sinkTableName),
-				sink,
-				false
-		);
-
 		hiveCatalog.createFunction(
 				new ObjectPath(HiveCatalog.DEFAULT_DB, "myudf"),
 				new CatalogFunctionImpl(TestHiveSimpleUDF.class.getCanonicalName(), new HashMap<>()),
@@ -174,34 +146,107 @@ public class HiveCatalogUseBlinkITCase extends AbstractTestBase {
 				new CatalogFunctionImpl(GenericUDAFSum.class.getCanonicalName(), new HashMap<>()),
 				false);
 
+		testUdf(true);
+		testUdf(false);
+	}
+
+	private void testUdf(boolean batch) throws Exception {
+		TableEnvironment tEnv;
+		EnvironmentSettings.Builder envBuilder = EnvironmentSettings.newInstance().useBlinkPlanner();
+		if (batch) {
+			envBuilder.inBatchMode();
+		} else {
+			envBuilder.inStreamingMode();
+		}
+		if (batch) {
+			tEnv = TableEnvironment.create(envBuilder.build());
+		} else {
+			tEnv = StreamTableEnvironment.create(
+					StreamExecutionEnvironment.getExecutionEnvironment(), envBuilder.build());
+		}
+
+		BatchTestBase.configForMiniCluster(tEnv.getConfig());
+
+		tEnv.registerCatalog("myhive", hiveCatalog);
+		tEnv.useCatalog("myhive");
+
 		String innerSql = format("select mygenericudf(myudf(name), 1) as a, mygenericudf(myudf(age), 1) as b," +
 				" s from %s, lateral table(myudtf(name, 1)) as T(s)", sourceTableName);
 
-		tEnv.sqlUpdate(
-				format("insert into %s select a, s, sum(b), myudaf(b) from (%s) group by a, s",
-						sinkTableName,
-						innerSql));
-		tEnv.execute("myjob");
-
-		// assert written result
-		StringBuilder builder = new StringBuilder();
-		try (Stream<Path> paths = Files.walk(Paths.get(p.toAbsolutePath().toString()))) {
-			paths.filter(Files::isRegularFile).forEach(path -> {
-				try {
-					String content = FileUtils.readFileUtf8(path.toFile());
-					if (content.isEmpty()) {
-						return;
+		String selectSql = format("select a, s, sum(b), myudaf(b) from (%s) group by a, s", innerSql);
+
+		List<String> results;
+		if (batch) {
+			Path p = Paths.get(tempFolder.newFolder().getAbsolutePath(), "test.csv");
+
+			TableSchema sinkSchema = TableSchema.builder()
+					.field("name1", Types.STRING())
+					.field("name2", Types.STRING())
+					.field("sum1", Types.INT())
+					.field("sum2", Types.LONG())
+					.build();
+
+			FormatDescriptor sinkFormat = new OldCsv()
+					.field("name1", Types.STRING())
+					.field("name2", Types.STRING())
+					.field("sum1", Types.INT())
+					.field("sum2", Types.LONG());
+			CatalogTable sink =
+					new CatalogTableBuilder(
+							new FileSystem().path(p.toAbsolutePath().toString()),
+							sinkSchema)
+							.withFormat(sinkFormat)
+							.inAppendMode()
+							.withComment("Comment.")
+							.build();
+
+			hiveCatalog.createTable(
+					new ObjectPath(HiveCatalog.DEFAULT_DB, sinkTableName),
+					sink,
+					false
+			);
+
+			tEnv.sqlUpdate(format("insert into %s " + selectSql, sinkTableName));
+			tEnv.execute("myjob");
+
+			// assert written result
+			StringBuilder builder = new StringBuilder();
+			try (Stream<Path> paths = Files.walk(Paths.get(p.toAbsolutePath().toString()))) {
+				paths.filter(Files::isRegularFile).forEach(path -> {
+					try {
+						String content = FileUtils.readFileUtf8(path.toFile());
+						if (content.isEmpty()) {
+							return;
+						}
+						builder.append(content);
+					} catch (IOException e) {
+						throw new RuntimeException(e);
 					}
-					builder.append(content);
-				} catch (IOException e) {
-					throw new RuntimeException(e);
-				}
-			});
+				});
+			}
+			results = Arrays.stream(builder.toString().split("\n"))
+					.filter(s -> !s.isEmpty())
+					.collect(Collectors.toList());
+		} else {
+			StreamTableEnvironment streamTEnv = (StreamTableEnvironment) tEnv;
+			TestingRetractSink sink = new TestingRetractSink();
+			streamTEnv.toRetractStream(tEnv.sqlQuery(selectSql), Row.class)
+					.map(new JavaToScala())
+					.addSink((SinkFunction) sink);
+			streamTEnv.execute("");
+			results = JavaScalaConversionUtil.toJava(sink.getRetractResults());
 		}
-		List<String> results = Arrays.stream(builder.toString().split("\n"))
-				.filter(s -> !s.isEmpty())
-				.collect(Collectors.toList());
+
+		results = new ArrayList<>(results);
 		results.sort(String::compareTo);
 		Assert.assertEquals(Arrays.asList("1,1,2,2", "2,2,4,4", "3,3,6,6"), results);
 	}
+
+	private static class JavaToScala implements MapFunction<Tuple2<Boolean, Row>, scala.Tuple2<Boolean, Row>> {
+
+		@Override
+		public scala.Tuple2<Boolean, Row> map(Tuple2<Boolean, Row> value) throws Exception {
+			return new scala.Tuple2<>(value.f0, value.f1);
+		}
+	}
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/AggSqlFunction.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/AggSqlFunction.scala
index 7effbba..5213929 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/AggSqlFunction.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/AggSqlFunction.scala
@@ -51,7 +51,7 @@ import java.util
 class AggSqlFunction(
     name: String,
     displayName: String,
-    aggregateFunction: UserDefinedAggregateFunction[_, _],
+    val aggregateFunction: UserDefinedAggregateFunction[_, _],
     val externalResultType: DataType,
     val externalAccType: DataType,
     typeFactory: FlinkTypeFactory,
@@ -72,6 +72,10 @@ class AggSqlFunction(
     typeFactory
   ) {
 
+  /**
+    * This is temporary solution for hive udf and should be removed once FLIP-65 is finished,
+    * please pass the non-null input arguments.
+    */
   def makeFunction(
       constants: Array[AnyRef],
       argTypes: Array[LogicalType]): UserDefinedAggregateFunction[_, _] = aggregateFunction
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/ScalarSqlFunction.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/ScalarSqlFunction.scala
index 8e5609c..f036cbe 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/ScalarSqlFunction.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/ScalarSqlFunction.scala
@@ -58,6 +58,10 @@ class ScalarSqlFunction(
     null,
     SqlFunctionCategory.USER_DEFINED_FUNCTION) {
 
+  /**
+    * This is temporary solution for hive udf and should be removed once FLIP-65 is finished,
+    * please pass the non-null input arguments.
+    */
   def makeFunction(constants: Array[AnyRef], argTypes: Array[LogicalType]): ScalarFunction =
     scalarFunction
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/TableSqlFunction.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/TableSqlFunction.scala
index c3f5ac3..4cb1d53 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/TableSqlFunction.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/TableSqlFunction.scala
@@ -67,7 +67,8 @@ class TableSqlFunction(
     functionImpl) {
 
   /**
-    * Get the user-defined table function.
+    * This is temporary solution for hive udf and should be removed once FLIP-65 is finished,
+    * please pass the non-null input arguments.
     */
   def makeFunction(constants: Array[AnyRef], argTypes: Array[LogicalType]): TableFunction[_] =
     udtf
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
index 92a5d3d..973a6da 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
@@ -749,7 +749,7 @@ object AggregateUtil extends Enumeration {
   private[flink] def isTableAggregate(aggCalls: util.List[AggregateCall]): Boolean = {
     aggCalls
       .filter(e => e.getAggregation.isInstanceOf[AggSqlFunction])
-      .map(e => e.getAggregation.asInstanceOf[AggSqlFunction].makeFunction(null, null))
+      .map(e => e.getAggregation.asInstanceOf[AggSqlFunction].aggregateFunction)
       .exists(_.isInstanceOf[TableAggregateFunction[_, _]])
   }
 }