You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/08/07 15:51:16 UTC

[flink] branch master updated (f695a76 -> 1b44f96)

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from f695a76  [FLINK-12749] [docs] Add operations Docker playground.
     new e08117f  [FLINK-13225][table-planner-blink] Fix type inference for hive udf
     new b2c1102  [FLINK-13225][table-planner-blink] Fix type inference for hive udtf
     new 3d502e3  [FLINK-13225][table-planner-blink] Fix type inference for hive udaf
     new 81d0889  [FLINK-13225][hive] Fix getAccumulatorType of HiveGenericUDAF
     new 1b44f96  [FLINK-13225][hive] Add hive function it case using blink-planner

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../table/functions/hive/HiveGenericUDAF.java      |  15 +-
 .../catalog/hive/HiveCatalogUseBlinkITCase.java    | 203 ++++++++++++++++++
 ...enericUDFArray.java => TestHiveGenericUDF.java} |  33 ++-
 .../functions/hive/util/TestHiveSimpleUDF.java     |  23 +-
 ...GenericUDFStructSize.java => TestHiveUDTF.java} |  41 ++--
 .../catalog/FunctionCatalogOperatorTable.java      |  47 ++++-
 .../planner/expressions/SqlAggFunctionVisitor.java |   3 +-
 .../functions/utils/HiveAggSqlFunction.java        |  83 ++++++++
 .../planner/functions/utils/HiveFunctionUtils.java |  80 +++++++
 .../functions/utils/HiveScalarSqlFunction.java     |  85 ++++++++
 .../functions/utils/HiveTableSqlFunction.java      | 235 +++++++++++++++++++++
 .../planner/plan/QueryOperationConverter.java      |   3 +-
 .../planner/codegen/CorrelateCodeGenerator.scala   |   8 +-
 .../table/planner/codegen/ExprCodeGenerator.scala  |  22 +-
 .../planner/functions/utils/AggSqlFunction.scala   |  10 +-
 .../functions/utils/ScalarSqlFunction.scala        |   9 +-
 .../planner/functions/utils/TableSqlFunction.scala | 112 +++++-----
 .../functions/utils/UserDefinedFunctionUtils.scala |   6 +-
 .../logical/FlinkLogicalTableFunctionScan.scala    |   2 +-
 ...relateToJoinFromTemporalTableFunctionRule.scala |   4 +-
 .../schema/DeferredTypeFlinkTableFunction.scala    |   4 +-
 .../planner/plan/schema/FlinkTableFunction.scala   |   1 +
 .../plan/schema/TypedFlinkTableFunction.scala      |   2 +
 .../planner/plan/utils/AggFunctionFactory.scala    |  11 +-
 .../utils/UserDefinedFunctionTestUtils.scala       |   2 +-
 25 files changed, 919 insertions(+), 125 deletions(-)
 create mode 100644 flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java
 copy flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/util/{TestGenericUDFArray.java => TestHiveGenericUDF.java} (60%)
 copy flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/VoidMetricFetcher.java => flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/util/TestHiveSimpleUDF.java (68%)
 copy flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/util/{TestGenericUDFStructSize.java => TestHiveUDTF.java} (53%)
 create mode 100644 flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/HiveAggSqlFunction.java
 create mode 100644 flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/HiveFunctionUtils.java
 create mode 100644 flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/HiveScalarSqlFunction.java
 create mode 100644 flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/HiveTableSqlFunction.java


[flink] 04/05: [FLINK-13225][hive] Fix getAccumulatorType of HiveGenericUDAF

Posted by ku...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 81d0889c96fd1445425e5af7f1e8a0fefd59f4f0
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Sun Jul 28 20:24:50 2019 +0800

    [FLINK-13225][hive] Fix getAccumulatorType of HiveGenericUDAF
---
 .../flink/table/functions/hive/HiveGenericUDAF.java       | 15 +++------------
 1 file changed, 3 insertions(+), 12 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
index 6015ef6..b2f2fe5 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.functions.hive;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.table.catalog.hive.client.HiveShim;
 import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
@@ -211,17 +212,7 @@ public class HiveGenericUDAF
 	}
 
 	@Override
-	public TypeInformation getAccumulatorType() {
-		try {
-			if (!initialized) {
-				init();
-			}
-
-			return LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(
-				HiveTypeUtil.toFlinkType(partialResultObjectInspector));
-		} catch (Exception e) {
-			throw new FlinkHiveUDFException(
-				String.format("Failed to get Hive accumulator type from %s", hiveFunctionWrapper.getClassName()), e);
-		}
+	public TypeInformation<GenericUDAFEvaluator.AggregationBuffer> getAccumulatorType() {
+		return new GenericTypeInfo<>(GenericUDAFEvaluator.AggregationBuffer.class);
 	}
 }


[flink] 05/05: [FLINK-13225][hive] Add hive function it case using blink-planner

Posted by ku...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1b44f967fdc9e186a72721e8dc24bc5c4f7b2f23
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Sun Jul 28 20:25:54 2019 +0800

    [FLINK-13225][hive] Add hive function it case using blink-planner
    
    This closes #9089
---
 .../catalog/hive/HiveCatalogUseBlinkITCase.java    | 203 +++++++++++++++++++++
 .../functions/hive/util/TestHiveGenericUDF.java    |  64 +++++++
 .../functions/hive/util/TestHiveSimpleUDF.java     |  37 ++++
 .../table/functions/hive/util/TestHiveUDTF.java    |  67 +++++++
 4 files changed, 371 insertions(+)

diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java
new file mode 100644
index 0000000..6f669d0
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.connectors.hive.FlinkStandaloneHiveRunner;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.catalog.CatalogFunctionImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableBuilder;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.descriptors.FileSystem;
+import org.apache.flink.table.descriptors.FormatDescriptor;
+import org.apache.flink.table.descriptors.OldCsv;
+import org.apache.flink.table.functions.hive.util.TestHiveGenericUDF;
+import org.apache.flink.table.functions.hive.util.TestHiveSimpleUDF;
+import org.apache.flink.table.functions.hive.util.TestHiveUDTF;
+import org.apache.flink.util.FileUtils;
+
+import com.klarna.hiverunner.HiveShell;
+import com.klarna.hiverunner.annotations.HiveSQL;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.lang.String.format;
+
+/**
+ * IT case for HiveCatalog.
+ * TODO: move to flink-connector-hive-test end-to-end test module once it's setup
+ */
+@RunWith(FlinkStandaloneHiveRunner.class)
+public class HiveCatalogUseBlinkITCase {
+
+	@HiveSQL(files = {})
+	private static HiveShell hiveShell;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	private static HiveCatalog hiveCatalog;
+
+	private String sourceTableName = "csv_source";
+	private String sinkTableName = "csv_sink";
+
+	@BeforeClass
+	public static void createCatalog() {
+		HiveConf hiveConf = hiveShell.getHiveConf();
+		hiveCatalog = HiveTestUtils.createHiveCatalog(hiveConf);
+		hiveCatalog.open();
+	}
+
+	@AfterClass
+	public static void closeCatalog() {
+		if (hiveCatalog != null) {
+			hiveCatalog.close();
+		}
+	}
+
+	@Test
+	public void testBlinkUdf() throws Exception {
+		TableEnvironment tEnv = TableEnvironment.create(
+				EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());
+
+		tEnv.registerCatalog("myhive", hiveCatalog);
+		tEnv.useCatalog("myhive");
+
+		TableSchema schema = TableSchema.builder()
+				.field("name", DataTypes.STRING())
+				.field("age", DataTypes.INT())
+				.build();
+
+		FormatDescriptor format = new OldCsv()
+				.field("name", Types.STRING())
+				.field("age", Types.INT());
+
+		CatalogTable source =
+				new CatalogTableBuilder(
+						new FileSystem().path(this.getClass().getResource("/csv/test.csv").getPath()),
+						schema)
+						.withFormat(format)
+						.inAppendMode()
+						.withComment(null)
+						.build();
+
+		Path p = Paths.get(tempFolder.newFolder().getAbsolutePath(), "test.csv");
+
+		TableSchema sinkSchema = TableSchema.builder()
+				.field("name1", Types.STRING())
+				.field("name2", Types.STRING())
+				.field("sum1", Types.INT())
+				.field("sum2", Types.LONG())
+				.build();
+
+		FormatDescriptor sinkFormat = new OldCsv()
+				.field("name1", Types.STRING())
+				.field("name2", Types.STRING())
+				.field("sum1", Types.INT())
+				.field("sum2", Types.LONG());
+		CatalogTable sink =
+				new CatalogTableBuilder(
+						new FileSystem().path(p.toAbsolutePath().toString()),
+						sinkSchema)
+						.withFormat(sinkFormat)
+						.inAppendMode()
+						.withComment(null)
+						.build();
+
+		hiveCatalog.createTable(
+				new ObjectPath(HiveCatalog.DEFAULT_DB, sourceTableName),
+				source,
+				false
+		);
+
+		hiveCatalog.createTable(
+				new ObjectPath(HiveCatalog.DEFAULT_DB, sinkTableName),
+				sink,
+				false
+		);
+
+		hiveCatalog.createFunction(
+				new ObjectPath(HiveCatalog.DEFAULT_DB, "myudf"),
+				new CatalogFunctionImpl(TestHiveSimpleUDF.class.getCanonicalName(), new HashMap<>()),
+				false);
+		hiveCatalog.createFunction(
+				new ObjectPath(HiveCatalog.DEFAULT_DB, "mygenericudf"),
+				new CatalogFunctionImpl(TestHiveGenericUDF.class.getCanonicalName(), new HashMap<>()),
+				false);
+		hiveCatalog.createFunction(
+				new ObjectPath(HiveCatalog.DEFAULT_DB, "myudtf"),
+				new CatalogFunctionImpl(TestHiveUDTF.class.getCanonicalName(), new HashMap<>()),
+				false);
+		hiveCatalog.createFunction(
+				new ObjectPath(HiveCatalog.DEFAULT_DB, "myudaf"),
+				new CatalogFunctionImpl(GenericUDAFSum.class.getCanonicalName(), new HashMap<>()),
+				false);
+
+		String innerSql = format("select mygenericudf(myudf(name), 1) as a, mygenericudf(myudf(age), 1) as b," +
+				" s from %s, lateral table(myudtf(name, 1)) as T(s)", sourceTableName);
+
+		tEnv.sqlUpdate(
+				format("insert into %s select a, s, sum(b), myudaf(b) from (%s) group by a, s",
+						sinkTableName,
+						innerSql));
+		tEnv.execute("myjob");
+
+		// assert written result
+		StringBuilder builder = new StringBuilder();
+		try (Stream<Path> paths = Files.walk(Paths.get(p.toAbsolutePath().toString()))) {
+			paths.filter(Files::isRegularFile).forEach(path -> {
+				try {
+					String content = FileUtils.readFileUtf8(path.toFile());
+					if (content.isEmpty()) {
+						return;
+					}
+					builder.append(content);
+				} catch (IOException e) {
+					throw new RuntimeException(e);
+				}
+			});
+		}
+		List<String> results = Arrays.stream(builder.toString().split("\n"))
+				.filter(s -> !s.isEmpty())
+				.collect(Collectors.toList());
+		results.sort(String::compareTo);
+		Assert.assertEquals(Arrays.asList("1,1,2,2", "2,2,4,4", "3,3,6,6"), results);
+	}
+}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/util/TestHiveGenericUDF.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/util/TestHiveGenericUDF.java
new file mode 100644
index 0000000..ffb191a
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/util/TestHiveGenericUDF.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive.util;
+
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.io.IntWritable;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Test generic udf.
+ */
+public class TestHiveGenericUDF extends GenericUDF {
+
+	@Override
+	public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
+		checkArgument(arguments.length == 2);
+
+		// TEST for constant arguments
+		checkArgument(arguments[1] instanceof ConstantObjectInspector);
+		Object constant = ((ConstantObjectInspector) arguments[1]).getWritableConstantValue();
+		checkArgument(constant instanceof IntWritable);
+		checkArgument(((IntWritable) constant).get() == 1);
+
+		if (arguments[0] instanceof IntObjectInspector ||
+				arguments[0] instanceof StringObjectInspector) {
+			return arguments[0];
+		} else {
+			throw new RuntimeException("Not support argument: " + arguments[0]);
+		}
+	}
+
+	@Override
+	public Object evaluate(DeferredObject[] arguments) throws HiveException {
+		return arguments[0].get();
+	}
+
+	@Override
+	public String getDisplayString(String[] children) {
+		return "TestHiveGenericUDF";
+	}
+}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/util/TestHiveSimpleUDF.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/util/TestHiveSimpleUDF.java
new file mode 100644
index 0000000..0d01586
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/util/TestHiveSimpleUDF.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive.util;
+
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Test simple udf.
+ */
+public class TestHiveSimpleUDF extends UDF {
+
+	public IntWritable evaluate(IntWritable i) {
+		return new IntWritable(i.get());
+	}
+
+	public Text evaluate(Text text) {
+		return new Text(text.toString());
+	}
+}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/util/TestHiveUDTF.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/util/TestHiveUDTF.java
new file mode 100644
index 0000000..354451c
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/util/TestHiveUDTF.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive.util;
+
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
+import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.IntWritable;
+
+import java.util.Collections;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Test split udtf.
+ */
+public class TestHiveUDTF extends GenericUDTF {
+
+	@Override
+	public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
+		checkArgument(argOIs.length == 2);
+
+		// TEST for constant arguments
+		checkArgument(argOIs[1] instanceof ConstantObjectInspector);
+		Object constant = ((ConstantObjectInspector) argOIs[1]).getWritableConstantValue();
+		checkArgument(constant instanceof IntWritable);
+		checkArgument(((IntWritable) constant).get() == 1);
+
+		return ObjectInspectorFactory.getStandardStructObjectInspector(
+			Collections.singletonList("col1"),
+			Collections.singletonList(PrimitiveObjectInspectorFactory.javaStringObjectInspector));
+	}
+
+	@Override
+	public void process(Object[] args) throws HiveException {
+		String str = (String) args[0];
+		for (String s : str.split(",")) {
+			forward(s);
+			forward(s);
+		}
+	}
+
+	@Override
+	public void close() {
+	}
+}


[flink] 02/05: [FLINK-13225][table-planner-blink] Fix type inference for hive udtf

Posted by ku...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b2c110254b6a0b709ca41fe9c819298516dccbc5
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Sun Jul 28 20:21:20 2019 +0800

    [FLINK-13225][table-planner-blink] Fix type inference for hive udtf
---
 .../catalog/FunctionCatalogOperatorTable.java      |  21 +-
 .../functions/utils/HiveTableSqlFunction.java      | 235 +++++++++++++++++++++
 .../planner/plan/QueryOperationConverter.java      |   3 +-
 .../planner/codegen/CorrelateCodeGenerator.scala   |   8 +-
 .../table/planner/codegen/ExprCodeGenerator.scala  |   4 +-
 .../planner/functions/utils/TableSqlFunction.scala | 112 +++++-----
 .../functions/utils/UserDefinedFunctionUtils.scala |   2 +-
 .../logical/FlinkLogicalTableFunctionScan.scala    |   2 +-
 ...relateToJoinFromTemporalTableFunctionRule.scala |   4 +-
 .../schema/DeferredTypeFlinkTableFunction.scala    |   4 +-
 .../planner/plan/schema/FlinkTableFunction.scala   |   1 +
 .../plan/schema/TypedFlinkTableFunction.scala      |   2 +
 .../utils/UserDefinedFunctionTestUtils.scala       |   2 +-
 13 files changed, 336 insertions(+), 64 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
index ddf8f60..bc60f27 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.catalog;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.catalog.FunctionLookup;
 import org.apache.flink.table.functions.AggregateFunctionDefinition;
@@ -27,8 +28,12 @@ import org.apache.flink.table.functions.ScalarFunctionDefinition;
 import org.apache.flink.table.functions.TableFunctionDefinition;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.planner.functions.utils.HiveScalarSqlFunction;
+import org.apache.flink.table.planner.functions.utils.HiveTableSqlFunction;
 import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils;
+import org.apache.flink.table.planner.plan.schema.DeferredTypeFlinkTableFunction;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.Row;
 
 import org.apache.calcite.sql.SqlFunction;
 import org.apache.calcite.sql.SqlFunctionCategory;
@@ -42,6 +47,7 @@ import java.util.List;
 import java.util.Optional;
 
 import static org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.isHiveFunc;
+import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
 
 /**
  * Thin adapter between {@link SqlOperatorTable} and {@link FunctionCatalog}.
@@ -108,7 +114,20 @@ public class FunctionCatalogOperatorTable implements SqlOperatorTable {
 		} else if (functionDefinition instanceof TableFunctionDefinition &&
 				category != null &&
 				category.isTableFunction()) {
-			return convertTableFunction(name, (TableFunctionDefinition) functionDefinition);
+			TableFunctionDefinition def = (TableFunctionDefinition) functionDefinition;
+			if (isHiveFunc(def.getTableFunction())) {
+				DataType returnType = fromLegacyInfoToDataType(new GenericTypeInfo<>(Row.class));
+				return Optional.of(new HiveTableSqlFunction(
+						name,
+						name,
+						def.getTableFunction(),
+						returnType,
+						typeFactory,
+						new DeferredTypeFlinkTableFunction(def.getTableFunction(), returnType),
+						HiveTableSqlFunction.operandTypeChecker(name, def.getTableFunction())));
+			} else {
+				return convertTableFunction(name, (TableFunctionDefinition) functionDefinition);
+			}
 		}
 
 		return Optional.empty();
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/HiveTableSqlFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/HiveTableSqlFunction.java
new file mode 100644
index 0000000..6800fb9
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/HiveTableSqlFunction.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.utils;
+
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.schema.FlinkTableFunction;
+import org.apache.flink.table.runtime.functions.SqlDateTimeUtils;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlUtil;
+import org.apache.calcite.util.BitString;
+import org.apache.calcite.util.DateString;
+import org.apache.calcite.util.ImmutableNullableList;
+import org.apache.calcite.util.NlsString;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.TimeString;
+import org.apache.calcite.util.TimestampString;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Tuple3;
+
+import static org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeGetResultType;
+import static org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeSetArgs;
+import static org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils.buildRelDataType;
+import static org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType;
+import static org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType;
+
+/**
+ * Hive {@link TableSqlFunction}.
+ * Override getFunction to clone function and invoke {@code HiveGenericUDTF#setArgumentTypesAndConstants}.
+ * Override SqlReturnTypeInference to invoke {@code HiveGenericUDTF#getHiveResultType} instead of
+ * {@code HiveGenericUDTF#getResultType(Object[], Class[])}.
+ *
+ * @deprecated TODO hack code, its logical should be integrated to TableSqlFunction
+ */
+@Deprecated
+public class HiveTableSqlFunction extends TableSqlFunction {
+
+	private final TableFunction hiveUdtf;
+	private final HiveOperandTypeChecker operandTypeChecker;
+
+	public HiveTableSqlFunction(String name, String displayName,
+			TableFunction hiveUdtf,
+			DataType implicitResultType,
+			FlinkTypeFactory typeFactory,
+			FlinkTableFunction functionImpl,
+			HiveOperandTypeChecker operandTypeChecker) {
+		super(name, displayName, hiveUdtf, implicitResultType, typeFactory, functionImpl, scala.Option.apply(operandTypeChecker));
+		this.hiveUdtf = hiveUdtf;
+		this.operandTypeChecker = operandTypeChecker;
+	}
+
+	@Override
+	public TableFunction makeFunction(Object[] constantArguments, LogicalType[] argTypes) {
+		TableFunction clone;
+		try {
+			clone = InstantiationUtil.clone(hiveUdtf);
+		} catch (IOException | ClassNotFoundException e) {
+			throw new RuntimeException(e);
+		}
+		return (TableFunction) invokeSetArgs(clone, constantArguments, argTypes);
+	}
+
+	@Override
+	public RelDataType getRowType(RelDataTypeFactory typeFactory, List<SqlNode> operandList) {
+		Preconditions.checkNotNull(operandTypeChecker.previousArgTypes);
+		FlinkTypeFactory factory = (FlinkTypeFactory) typeFactory;
+		Object[] arguments = convertArguments(
+				Arrays.stream(operandTypeChecker.previousArgTypes)
+						.map(factory::createFieldTypeFromLogicalType)
+						.collect(Collectors.toList()),
+				operandList);
+		DataType resultType = fromLogicalTypeToDataType(FlinkTypeFactory.toLogicalType(
+				invokeGetResultType(hiveUdtf, arguments, operandTypeChecker.previousArgTypes, (FlinkTypeFactory) typeFactory)));
+		Tuple3<String[], int[], LogicalType[]> fieldInfo = UserDefinedFunctionUtils.getFieldInfo(resultType);
+		return buildRelDataType(typeFactory, fromDataTypeToLogicalType(resultType), fieldInfo._1(), fieldInfo._2());
+	}
+
+	/**
+	 * This method is copied from calcite, and modify it to not rely on Function.
+	 * TODO FlinkTableFunction need implement getElementType.
+	 */
+	private static Object[] convertArguments(
+			List<RelDataType> operandTypes,
+			List<SqlNode> operandList) {
+		List<Object> arguments = new ArrayList<>(operandList.size());
+		// Construct a list of arguments, if they are all constants.
+		for (Pair<RelDataType, SqlNode> pair
+				: Pair.zip(operandTypes, operandList)) {
+			try {
+				final Object o = getValue(pair.right);
+				final Object o2 = coerce(o, pair.left);
+				arguments.add(o2);
+			} catch (NonLiteralException e) {
+				arguments.add(null);
+			}
+		}
+		return arguments.toArray();
+	}
+
+	private static Object coerce(Object value, RelDataType type) {
+		if (value == null) {
+			return null;
+		}
+		switch (type.getSqlTypeName()) {
+			case CHAR:
+				return ((NlsString) value).getValue();
+			case BINARY:
+				return ((BitString) value).getAsByteArray();
+			case DECIMAL:
+				return value;
+			case BIGINT:
+				return ((BigDecimal) value).longValue();
+			case INTEGER:
+				return ((BigDecimal) value).intValue();
+			case SMALLINT:
+				return ((BigDecimal) value).shortValue();
+			case TINYINT:
+				return ((BigDecimal) value).byteValue();
+			case DOUBLE:
+				return ((BigDecimal) value).doubleValue();
+			case REAL:
+				return ((BigDecimal) value).floatValue();
+			case FLOAT:
+				return ((BigDecimal) value).floatValue();
+			case DATE:
+				return LocalDate.ofEpochDay(((DateString) value).getDaysSinceEpoch());
+			case TIME:
+				return LocalTime.ofNanoOfDay(((TimeString) value).getMillisOfDay() * 1000_000);
+			case TIMESTAMP:
+				return SqlDateTimeUtils.unixTimestampToLocalDateTime(((TimestampString) value).getMillisSinceEpoch());
+			default:
+				throw new RuntimeException("Not support type: " + type);
+		}
+	}
+
+	private static Object getValue(SqlNode right) throws NonLiteralException {
+		switch (right.getKind()) {
+			case ARRAY_VALUE_CONSTRUCTOR:
+				final List<Object> list = new ArrayList<>();
+				for (SqlNode o : ((SqlCall) right).getOperandList()) {
+					list.add(getValue(o));
+				}
+				return ImmutableNullableList.copyOf(list).toArray();
+			case MAP_VALUE_CONSTRUCTOR:
+				final Map<Object, Object> map = new HashMap<>();
+				final List<SqlNode> operands = ((SqlCall) right).getOperandList();
+				for (int i = 0; i < operands.size(); i += 2) {
+					final SqlNode key = operands.get(i);
+					final SqlNode value = operands.get(i + 1);
+					map.put(getValue(key), getValue(value));
+				}
+				return map;
+			default:
+				if (SqlUtil.isNullLiteral(right, true)) {
+					return null;
+				}
+				if (SqlUtil.isLiteral(right)) {
+					return ((SqlLiteral) right).getValue();
+				}
+				if (right.getKind() == SqlKind.DEFAULT) {
+					return null; // currently NULL is the only default value
+				}
+				throw new NonLiteralException();
+		}
+	}
+
+	/** Thrown when a non-literal occurs in an argument to a user-defined
+	 * table macro. */
+	private static class NonLiteralException extends Exception {
+	}
+
+	public static HiveOperandTypeChecker operandTypeChecker(String name, TableFunction udtf) {
+		return new HiveOperandTypeChecker(name, udtf, UserDefinedFunctionUtils.checkAndExtractMethods(udtf, "eval"));
+	}
+
+	/**
+	 * Checker for remember previousArgTypes.
+	 *
+	 * @deprecated TODO hack code, should modify calcite getRowType to pass operand types
+	 */
+	@Deprecated
+	public static class HiveOperandTypeChecker extends OperandTypeChecker {
+
+		private LogicalType[] previousArgTypes;
+
+		private HiveOperandTypeChecker(String name, TableFunction udtf, Method[] methods) {
+			super(name, udtf, methods);
+		}
+
+		@Override
+		public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFailure) {
+			previousArgTypes = UserDefinedFunctionUtils.getOperandTypeArray(callBinding);
+			return super.checkOperandTypes(callBinding, throwOnFailure);
+		}
+	}
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
index 2a2119d..e0f4763 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
@@ -279,7 +279,8 @@ public class QueryOperationConverter extends QueryOperationDefaultVisitor<RelNod
 					tableFunction,
 					resultType,
 					typeFactory,
-					function);
+					function,
+					scala.Option.empty());
 
 			List<RexNode> parameters = convertToRexNodes(calculatedTable.getParameters());
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
index cf861bf..c292c64 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
@@ -69,15 +69,19 @@ object CorrelateCodeGenerator {
     val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
     // we need result Type to do code generation
     val arguments = UserDefinedFunctionUtils.transformRexNodes(rexCall.operands)
+    val operandTypes = rexCall.operands
+        .map(_.getType)
+        .map(FlinkTypeFactory.toLogicalType).toArray
+    val func = sqlFunction.makeFunction(arguments, operandTypes)
     val argTypes = getEvalMethodSignature(
-      sqlFunction.getTableFunction,
+      func,
       rexCall.operands
         .map(_.getType)
         .map(FlinkTypeFactory.toLogicalType).toArray)
     val udtfExternalType = sqlFunction
         .getFunction
         .asInstanceOf[FlinkTableFunction]
-        .getExternalResultType(arguments, argTypes)
+        .getExternalResultType(func, arguments, argTypes)
     val pojoFieldMapping = Some(UserDefinedFunctionUtils.getFieldInfo(udtfExternalType)._2)
     val inputType = FlinkTypeFactory.toLogicalRowType(inputRelType)
     val (returnType, swallowInputOnly ) = if (projectProgram.isDefined) {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
index 7c55d73..3a05fe5 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
@@ -737,7 +737,9 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
             .generate(ctx, operands, resultType)
 
       case tsf: TableSqlFunction =>
-        new TableFunctionCallGen(tsf.getTableFunction).generate(ctx, operands, resultType)
+        new TableFunctionCallGen(
+          tsf.makeFunction(getOperandLiterals(operands), operands.map(_.resultType).toArray))
+            .generate(ctx, operands, resultType)
 
       // advanced scalar functions
       case sqlOperator: SqlOperator =>
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/TableSqlFunction.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/TableSqlFunction.scala
index 4bf554a..c3f5ac3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/TableSqlFunction.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/TableSqlFunction.scala
@@ -26,6 +26,7 @@ import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils._
 import org.apache.flink.table.planner.plan.schema.FlinkTableFunction
 import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
 import org.apache.flink.table.types.DataType
+import org.apache.flink.table.types.logical.LogicalType
 
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
 import org.apache.calcite.sql._
@@ -34,6 +35,7 @@ import org.apache.calcite.sql.`type`._
 import org.apache.calcite.sql.parser.SqlParserPos
 import org.apache.calcite.sql.validate.{SqlUserDefinedTableFunction, SqlUserDefinedTableMacro}
 
+import java.lang.reflect.Method
 import java.util
 
 /**
@@ -49,22 +51,26 @@ import java.util
 class TableSqlFunction(
     name: String,
     displayName: String,
-    udtf: TableFunction[_],
+    val udtf: TableFunction[_],
     implicitResultType: DataType,
     typeFactory: FlinkTypeFactory,
-    functionImpl: FlinkTableFunction)
+    functionImpl: FlinkTableFunction,
+    operandTypeInfer: Option[SqlOperandTypeChecker] = None)
   extends SqlUserDefinedTableFunction(
     new SqlIdentifier(name, SqlParserPos.ZERO),
     ReturnTypes.CURSOR,
+    // type inference has the UNKNOWN operand types.
     createOperandTypeInference(name, udtf, typeFactory),
-    createOperandTypeChecker(name, udtf),
+    // only checker has the real operand types.
+    operandTypeInfer.getOrElse(createOperandTypeChecker(name, udtf)),
     null,
     functionImpl) {
 
   /**
     * Get the user-defined table function.
     */
-  def getTableFunction = udtf
+  def makeFunction(constants: Array[AnyRef], argTypes: Array[LogicalType]): TableFunction[_] =
+    udtf
 
   /**
     * Get the type information of the table returned by the table function.
@@ -131,61 +137,61 @@ object TableSqlFunction {
   private[flink] def createOperandTypeChecker(
       name: String,
       udtf: TableFunction[_]): SqlOperandTypeChecker = {
+    new OperandTypeChecker(name, udtf, checkAndExtractMethods(udtf, "eval"))
+  }
+}
 
-    val methods = checkAndExtractMethods(udtf, "eval")
+/**
+  * Operand type checker based on [[TableFunction]] given information.
+  */
+class OperandTypeChecker(
+    name: String, udtf: TableFunction[_], methods: Array[Method]) extends SqlOperandTypeChecker {
 
-    /**
-      * Operand type checker based on [[TableFunction]] given information.
-      */
-    new SqlOperandTypeChecker {
-      override def getAllowedSignatures(op: SqlOperator, opName: String): String = {
-        s"$opName[${signaturesToString(udtf, "eval")}]"
-      }
+  override def getAllowedSignatures(op: SqlOperator, opName: String): String = {
+    s"$opName[${signaturesToString(udtf, "eval")}]"
+  }
 
-      override def getOperandCountRange: SqlOperandCountRange = {
-        var min = 254
-        var max = -1
-        var isVarargs = false
-        methods.foreach(m => {
-          var len = m.getParameterTypes.length
-          if (len > 0 && m.isVarArgs && m.getParameterTypes()(len - 1).isArray) {
-            isVarargs = true
-            len = len - 1
-          }
-          max = Math.max(len, max)
-          min = Math.min(len, min)
-        })
-        if (isVarargs) {
-          // if eval method is varargs, set max to -1 to skip length check in Calcite
-          max = -1
-        }
-        SqlOperandCountRanges.between(min, max)
+  override def getOperandCountRange: SqlOperandCountRange = {
+    var min = 254
+    var max = -1
+    var isVarargs = false
+    methods.foreach(m => {
+      var len = m.getParameterTypes.length
+      if (len > 0 && m.isVarArgs && m.getParameterTypes()(len - 1).isArray) {
+        isVarargs = true
+        len = len - 1
       }
+      max = Math.max(len, max)
+      min = Math.min(len, min)
+    })
+    if (isVarargs) {
+      // if eval method is varargs, set max to -1 to skip length check in Calcite
+      max = -1
+    }
+    SqlOperandCountRanges.between(min, max)
+  }
 
-      override def checkOperandTypes(
-          callBinding: SqlCallBinding,
-          throwOnFailure: Boolean)
-        : Boolean = {
-        val operandTypes = getOperandType(callBinding)
-
-        if (getEvalUserDefinedMethod(udtf, operandTypes).isEmpty) {
-          if (throwOnFailure) {
-            throw new ValidationException(
-              s"Given parameters of function '$name' do not match any signature. \n" +
-                  s"Actual: ${signatureInternalToString(operandTypes)} \n" +
-                  s"Expected: ${signaturesToString(udtf, "eval")}")
-          } else {
-            false
-          }
-        } else {
-          true
-        }
+  override def checkOperandTypes(
+      callBinding: SqlCallBinding,
+      throwOnFailure: Boolean)
+  : Boolean = {
+    val operandTypes = getOperandType(callBinding)
+
+    if (getEvalUserDefinedMethod(udtf, operandTypes).isEmpty) {
+      if (throwOnFailure) {
+        throw new ValidationException(
+          s"Given parameters of function '$name' do not match any signature. \n" +
+              s"Actual: ${signatureInternalToString(operandTypes)} \n" +
+              s"Expected: ${signaturesToString(udtf, "eval")}")
+      } else {
+        false
       }
-
-      override def isOptional(i: Int): Boolean = false
-
-      override def getConsistency: Consistency = Consistency.NONE
-
+    } else {
+      true
     }
   }
+
+  override def isOptional(i: Int): Boolean = false
+
+  override def getConsistency: Consistency = Consistency.NONE
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
index e565a6e..3552a7f 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
@@ -802,7 +802,7 @@ object UserDefinedFunctionUtils {
     }.toArray
   }
 
-  private[table] def buildRelDataType(
+  def buildRelDataType(
       typeFactory: RelDataTypeFactory,
       resultType: LogicalType,
       fieldNames: Array[String],
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
index e2ab608..38f4b03 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
@@ -100,7 +100,7 @@ class FlinkLogicalTableFunctionScanConverter
       return false
     }
     val tableFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
-    tableFunction.getTableFunction.isInstanceOf[TemporalTableFunction]
+    tableFunction.udtf.isInstanceOf[TemporalTableFunction]
   }
 
   def convert(rel: RelNode): RelNode = {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala
index a240b64..62a4872 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala
@@ -182,11 +182,11 @@ class GetTemporalTableFunctionCall(
     }
     val tableFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
 
-    if (!tableFunction.getTableFunction.isInstanceOf[TemporalTableFunction]) {
+    if (!tableFunction.udtf.isInstanceOf[TemporalTableFunction]) {
       return null
     }
     val temporalTableFunction =
-      tableFunction.getTableFunction.asInstanceOf[TemporalTableFunctionImpl]
+      tableFunction.udtf.asInstanceOf[TemporalTableFunctionImpl]
 
     checkState(
       rexCall.getOperands.size().equals(1),
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/DeferredTypeFlinkTableFunction.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/DeferredTypeFlinkTableFunction.scala
index 9bf8221..6f467d4 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/DeferredTypeFlinkTableFunction.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/DeferredTypeFlinkTableFunction.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.schema
 
+import org.apache.flink.table.functions
 import org.apache.flink.table.functions.TableFunction
 import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils
 import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
@@ -39,6 +40,7 @@ class DeferredTypeFlinkTableFunction(
   extends FlinkTableFunction(tableFunction) {
 
   override def getExternalResultType(
+      tableFunction: functions.TableFunction[_],
       arguments: Array[AnyRef],
       argTypes: Array[Class[_]]): DataType = {
     // TODO
@@ -56,7 +58,7 @@ class DeferredTypeFlinkTableFunction(
       typeFactory: RelDataTypeFactory,
       arguments: Array[AnyRef],
       argTypes: Array[Class[_]]): RelDataType = {
-    val resultType = getExternalResultType(arguments, argTypes)
+    val resultType = getExternalResultType(tableFunction, arguments, argTypes)
     val (fieldNames, fieldIndexes, _) = UserDefinedFunctionUtils.getFieldInfo(resultType)
     UserDefinedFunctionUtils.buildRelDataType(
       typeFactory, fromDataTypeToLogicalType(resultType), fieldNames, fieldIndexes)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/FlinkTableFunction.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/FlinkTableFunction.scala
index 73f5e63..6ecb8e7 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/FlinkTableFunction.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/FlinkTableFunction.scala
@@ -51,6 +51,7 @@ abstract class FlinkTableFunction(
     * Returns the Type for usage, i.e. code generation.
     */
   def getExternalResultType(
+      tableFunction: functions.TableFunction[_],
       arguments: Array[AnyRef],
       argTypes: Array[Class[_]]): DataType
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/TypedFlinkTableFunction.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/TypedFlinkTableFunction.scala
index 773af90..828a4b6 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/TypedFlinkTableFunction.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/TypedFlinkTableFunction.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.schema
 
+import org.apache.flink.table.functions
 import org.apache.flink.table.functions.TableFunction
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
@@ -41,6 +42,7 @@ class TypedFlinkTableFunction(
   extends FlinkTableFunction(tableFunction) {
 
   override def getExternalResultType(
+      tableFunction: functions.TableFunction[_],
       arguments: Array[AnyRef],
       argTypes: Array[Class[_]]): DataType =
     externalResultType
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/UserDefinedFunctionTestUtils.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/UserDefinedFunctionTestUtils.scala
index 432f0a2..bd69126 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/UserDefinedFunctionTestUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/UserDefinedFunctionTestUtils.scala
@@ -334,7 +334,7 @@ object UserDefinedFunctionTestUtils {
       }
       a + b
     }
-    
+
     def eval(a: Long, b: Int): Long = {
       eval(a, b.asInstanceOf[Long])
     }


[flink] 03/05: [FLINK-13225][table-planner-blink] Fix type inference for hive udaf

Posted by ku...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3d502e3069faa0e898b9b0a1059622eac2b1c2f0
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Sun Jul 28 20:23:56 2019 +0800

    [FLINK-13225][table-planner-blink] Fix type inference for hive udaf
---
 .../catalog/FunctionCatalogOperatorTable.java      | 12 +++-
 .../planner/expressions/SqlAggFunctionVisitor.java |  3 +-
 .../functions/utils/HiveAggSqlFunction.java        | 83 ++++++++++++++++++++++
 .../planner/functions/utils/AggSqlFunction.scala   | 10 ++-
 .../planner/plan/utils/AggFunctionFactory.scala    | 11 ++-
 5 files changed, 113 insertions(+), 6 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
index bc60f27..3c875dd 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.ScalarFunctionDefinition;
 import org.apache.flink.table.functions.TableFunctionDefinition;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.functions.utils.HiveAggSqlFunction;
 import org.apache.flink.table.planner.functions.utils.HiveScalarSqlFunction;
 import org.apache.flink.table.planner.functions.utils.HiveTableSqlFunction;
 import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils;
@@ -99,7 +100,16 @@ public class FunctionCatalogOperatorTable implements SqlOperatorTable {
 			String name,
 			FunctionDefinition functionDefinition) {
 		if (functionDefinition instanceof AggregateFunctionDefinition) {
-			return convertAggregateFunction(name, (AggregateFunctionDefinition) functionDefinition);
+			AggregateFunctionDefinition def = (AggregateFunctionDefinition) functionDefinition;
+			if (isHiveFunc(def.getAggregateFunction())) {
+				return Optional.of(new HiveAggSqlFunction(
+						name,
+						name,
+						def.getAggregateFunction(),
+						typeFactory));
+			} else {
+				return convertAggregateFunction(name, (AggregateFunctionDefinition) functionDefinition);
+			}
 		} else if (functionDefinition instanceof ScalarFunctionDefinition) {
 			ScalarFunctionDefinition def = (ScalarFunctionDefinition) functionDefinition;
 			if (isHiveFunc(def.getScalarFunction())) {
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/SqlAggFunctionVisitor.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/SqlAggFunctionVisitor.java
index 8221a78..4b3b9a7 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/SqlAggFunctionVisitor.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/SqlAggFunctionVisitor.java
@@ -90,7 +90,8 @@ public class SqlAggFunctionVisitor extends ExpressionDefaultVisitor<SqlAggFuncti
 					fromLegacyInfoToDataType(aggDef.getResultTypeInfo()),
 					fromLegacyInfoToDataType(aggDef.getAccumulatorTypeInfo()),
 					typeFactory,
-					aggFunc.requiresOver());
+					aggFunc.requiresOver(),
+					scala.Option.empty());
 		} else {
 			throw new UnsupportedOperationException("TableAggregateFunction is not supported yet!");
 		}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/HiveAggSqlFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/HiveAggSqlFunction.java
new file mode 100644
index 0000000..54bde22
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/HiveAggSqlFunction.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.utils;
+
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.type.SqlReturnTypeInference;
+
+import java.io.IOException;
+import java.util.List;
+
+import scala.Some;
+
+import static org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeGetResultType;
+import static org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeSetArgs;
+import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
+
+/**
+ * Hive {@link AggSqlFunction}.
+ * Override getFunction to clone function and invoke {@code HiveUDAF#setArgumentTypesAndConstants}.
+ * Override SqlReturnTypeInference to invoke {@code HiveUDAF#getHiveResultType} instead of
+ * {@code HiveUDAF#getResultType}.
+ *
+ * @deprecated TODO hack code, its logical should be integrated to AggSqlFunction
+ */
+@Deprecated
+public class HiveAggSqlFunction extends AggSqlFunction {
+
+	private final AggregateFunction aggregateFunction;
+
+	public HiveAggSqlFunction(String name, String displayName,
+			AggregateFunction aggregateFunction, FlinkTypeFactory typeFactory) {
+		super(name, displayName, aggregateFunction, fromLegacyInfoToDataType(new GenericTypeInfo<>(Object.class)),
+				fromLegacyInfoToDataType(new GenericTypeInfo<>(Object.class)), typeFactory, false,
+				new Some<>(createReturnTypeInference(aggregateFunction, typeFactory)));
+		this.aggregateFunction = aggregateFunction;
+	}
+
+	@Override
+	public AggregateFunction makeFunction(Object[] constantArguments, LogicalType[] argTypes) {
+		AggregateFunction clone;
+		try {
+			clone = InstantiationUtil.clone(aggregateFunction);
+		} catch (IOException | ClassNotFoundException e) {
+			throw new RuntimeException(e);
+		}
+		return (AggregateFunction) invokeSetArgs(clone, constantArguments, argTypes);
+	}
+
+	private static SqlReturnTypeInference createReturnTypeInference(
+			AggregateFunction function, FlinkTypeFactory typeFactory) {
+		return opBinding -> {
+			List<RelDataType> sqlTypes = opBinding.collectOperandTypes();
+			LogicalType[] parameters = UserDefinedFunctionUtils.getOperandTypeArray(opBinding);
+
+			Object[] constantArguments = new Object[sqlTypes.size()];
+			// Can not touch the literals, Calcite make them in previous RelNode.
+			// In here, all inputs are input refs.
+			return invokeGetResultType(function, constantArguments, parameters, typeFactory);
+		};
+	}
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/AggSqlFunction.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/AggSqlFunction.scala
index b2b14fc..5c5f744 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/AggSqlFunction.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/AggSqlFunction.scala
@@ -54,10 +54,12 @@ class AggSqlFunction(
     val externalResultType: DataType,
     val externalAccType: DataType,
     typeFactory: FlinkTypeFactory,
-    requiresOver: Boolean)
+    requiresOver: Boolean,
+    returnTypeInfer: Option[SqlReturnTypeInference] = None)
   extends SqlUserDefinedAggFunction(
     new SqlIdentifier(name, SqlParserPos.ZERO),
-    createReturnTypeInference(fromDataTypeToLogicalType(externalResultType), typeFactory),
+    returnTypeInfer.getOrElse(createReturnTypeInference(
+      fromDataTypeToLogicalType(externalResultType), typeFactory)),
     createOperandTypeInference(name, aggregateFunction, typeFactory),
     createOperandTypeChecker(name, aggregateFunction),
     // Do not need to provide a calcite aggregateFunction here. Flink aggregateion function
@@ -69,7 +71,9 @@ class AggSqlFunction(
     typeFactory
   ) {
 
-  def getFunction: AggregateFunction[_, _] = aggregateFunction
+  def makeFunction(
+      constants: Array[AnyRef], argTypes: Array[LogicalType]): AggregateFunction[_, _] =
+    aggregateFunction
 
   override def isDeterministic: Boolean = aggregateFunction.isDeterministic
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
index e8707f2..d5f9e56 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
@@ -43,6 +43,8 @@ import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.sql.fun._
 import org.apache.calcite.sql.{SqlAggFunction, SqlKind, SqlRankFunction}
 
+import java.util
+
 import scala.collection.JavaConversions._
 
 /**
@@ -122,7 +124,14 @@ class AggFunctionFactory(
       case a: SqlAggFunction if a.getKind == SqlKind.COLLECT =>
         createCollectAggFunction(argTypes)
 
-      case udagg: AggSqlFunction => udagg.getFunction
+      case udagg: AggSqlFunction =>
+        // Can not touch the literals, Calcite make them in previous RelNode.
+        // In here, all inputs are input refs.
+        val constants = new util.ArrayList[AnyRef]()
+        argTypes.foreach(t => constants.add(null))
+        udagg.makeFunction(
+          constants.toArray,
+          argTypes)
 
       case unSupported: SqlAggFunction =>
         throw new TableException(s"Unsupported Function: '${unSupported.getName}'")


[flink] 01/05: [FLINK-13225][table-planner-blink] Fix type inference for hive udf

Posted by ku...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e08117f89ba012c37f70c4aad99d569d8a9ba2b6
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Sun Jul 28 20:16:14 2019 +0800

    [FLINK-13225][table-planner-blink] Fix type inference for hive udf
---
 .../catalog/FunctionCatalogOperatorTable.java      | 14 +++-
 .../planner/functions/utils/HiveFunctionUtils.java | 80 ++++++++++++++++++++
 .../functions/utils/HiveScalarSqlFunction.java     | 85 ++++++++++++++++++++++
 .../table/planner/codegen/ExprCodeGenerator.scala  | 18 ++++-
 .../functions/utils/ScalarSqlFunction.scala        |  9 ++-
 .../functions/utils/UserDefinedFunctionUtils.scala |  4 +
 6 files changed, 205 insertions(+), 5 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
index 87a7bcb..ddf8f60 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.ScalarFunctionDefinition;
 import org.apache.flink.table.functions.TableFunctionDefinition;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.functions.utils.HiveScalarSqlFunction;
 import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils;
 import org.apache.flink.table.types.utils.TypeConversions;
 
@@ -40,6 +41,8 @@ import org.apache.calcite.sql.validate.SqlNameMatcher;
 import java.util.List;
 import java.util.Optional;
 
+import static org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.isHiveFunc;
+
 /**
  * Thin adapter between {@link SqlOperatorTable} and {@link FunctionCatalog}.
  */
@@ -92,7 +95,16 @@ public class FunctionCatalogOperatorTable implements SqlOperatorTable {
 		if (functionDefinition instanceof AggregateFunctionDefinition) {
 			return convertAggregateFunction(name, (AggregateFunctionDefinition) functionDefinition);
 		} else if (functionDefinition instanceof ScalarFunctionDefinition) {
-			return convertScalarFunction(name, (ScalarFunctionDefinition) functionDefinition);
+			ScalarFunctionDefinition def = (ScalarFunctionDefinition) functionDefinition;
+			if (isHiveFunc(def.getScalarFunction())) {
+				return Optional.of(new HiveScalarSqlFunction(
+						name,
+						name,
+						def.getScalarFunction(),
+						typeFactory));
+			} else {
+				return convertScalarFunction(name, def);
+			}
 		} else if (functionDefinition instanceof TableFunctionDefinition &&
 				category != null &&
 				category.isTableFunction()) {
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/HiveFunctionUtils.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/HiveFunctionUtils.java
new file mode 100644
index 0000000..13a82cb
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/HiveFunctionUtils.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.utils;
+
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.calcite.rel.type.RelDataType;
+
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import static org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType;
+
+/**
+ * Hack utils for hive function.
+ */
+public class HiveFunctionUtils {
+
+	public static boolean isHiveFunc(Object function) {
+		try {
+			getSetArgsMethod(function);
+			return true;
+		} catch (NoSuchMethodException e) {
+			return false;
+		}
+	}
+
+	private static Method getSetArgsMethod(Object function) throws NoSuchMethodException {
+		return function.getClass().getMethod(
+				"setArgumentTypesAndConstants", Object[].class, DataType[].class);
+
+	}
+
+	static Serializable invokeSetArgs(
+			Serializable function, Object[] constantArguments, LogicalType[] argTypes) {
+		try {
+			// See hive HiveFunction
+			Method method = getSetArgsMethod(function);
+			method.invoke(function, constantArguments, TypeConversions.fromLogicalToDataType(argTypes));
+			return function;
+		} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	static RelDataType invokeGetResultType(
+			Object function, Object[] constantArguments, LogicalType[] argTypes,
+			FlinkTypeFactory typeFactory) {
+		try {
+			// See hive HiveFunction
+			Method method = function.getClass()
+					.getMethod("getHiveResultType", Object[].class, DataType[].class);
+			DataType resultType = (DataType) method.invoke(
+					function, constantArguments, TypeConversions.fromLogicalToDataType(argTypes));
+			return typeFactory.createFieldTypeFromLogicalType(fromDataTypeToLogicalType(resultType));
+		} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
+			throw new RuntimeException(e);
+		}
+	}
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/HiveScalarSqlFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/HiveScalarSqlFunction.java
new file mode 100644
index 0000000..a44576a
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/HiveScalarSqlFunction.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.utils;
+
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.type.SqlReturnTypeInference;
+
+import java.io.IOException;
+import java.util.List;
+
+import scala.Some;
+
+import static org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeGetResultType;
+import static org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeSetArgs;
+import static org.apache.flink.table.runtime.types.ClassLogicalTypeConverter.getDefaultExternalClassForType;
+
+/**
+ * Hive {@link ScalarSqlFunction}.
+ * Override getFunction to clone function and invoke {@code HiveScalarFunction#setArgumentTypesAndConstants}.
+ * Override SqlReturnTypeInference to invoke {@code HiveScalarFunction#getHiveResultType} instead of
+ * {@code HiveScalarFunction#getResultType(Class[])}.
+ *
+ * @deprecated TODO hack code, its logical should be integrated to ScalarSqlFunction
+ */
+@Deprecated
+public class HiveScalarSqlFunction extends ScalarSqlFunction {
+
+	private final ScalarFunction function;
+
+	public HiveScalarSqlFunction(
+			String name, String displayName,
+			ScalarFunction function, FlinkTypeFactory typeFactory) {
+		super(name, displayName, function, typeFactory, new Some<>(createReturnTypeInference(function, typeFactory)));
+		this.function = function;
+	}
+
+	@Override
+	public ScalarFunction makeFunction(Object[] constantArguments, LogicalType[] argTypes) {
+		ScalarFunction clone;
+		try {
+			clone = InstantiationUtil.clone(function);
+		} catch (IOException | ClassNotFoundException e) {
+			throw new RuntimeException(e);
+		}
+		return (ScalarFunction) invokeSetArgs(clone, constantArguments, argTypes);
+	}
+
+	private static SqlReturnTypeInference createReturnTypeInference(
+			ScalarFunction function, FlinkTypeFactory typeFactory) {
+		return opBinding -> {
+			List<RelDataType> sqlTypes = opBinding.collectOperandTypes();
+			LogicalType[] parameters = UserDefinedFunctionUtils.getOperandTypeArray(opBinding);
+
+			Object[] constantArguments = new Object[sqlTypes.size()];
+			for (int i = 0; i < sqlTypes.size(); i++) {
+				if (!opBinding.isOperandNull(i, false) && opBinding.isOperandLiteral(i, false)) {
+					constantArguments[i] = opBinding.getOperandLiteralValue(
+							i, getDefaultExternalClassForType(parameters[i]));
+				}
+			}
+			return invokeGetResultType(function, constantArguments, parameters, typeFactory);
+		};
+	}
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
index e641708..7c55d73 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.codegen
 
 import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.table.api.TableException
+import org.apache.flink.table.dataformat.DataFormatConverters.{DataFormatConverter, getConverterForDataType}
 import org.apache.flink.table.dataformat._
 import org.apache.flink.table.planner.calcite.{FlinkTypeFactory, RexAggLocalVariable, RexDistinctKeyVariable}
 import org.apache.flink.table.planner.codegen.CodeGenUtils.{requireTemporal, requireTimeInterval, _}
@@ -30,6 +31,7 @@ import org.apache.flink.table.planner.codegen.calls.{FunctionGenerator, ScalarFu
 import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable._
 import org.apache.flink.table.planner.functions.sql.SqlThrowExceptionFunction
 import org.apache.flink.table.planner.functions.utils.{ScalarSqlFunction, TableSqlFunction}
+import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType
 import org.apache.flink.table.runtime.types.PlannerTypeUtils.isInteroperable
 import org.apache.flink.table.runtime.typeutils.TypeCheckUtils
 import org.apache.flink.table.runtime.typeutils.TypeCheckUtils.{isNumeric, isTemporal, isTimeInterval}
@@ -730,7 +732,9 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
         GeneratedExpression(nullValue.resultTerm, nullValue.nullTerm, code, resultType)
 
       case ssf: ScalarSqlFunction =>
-        new ScalarFunctionCallGen(ssf.getScalarFunction).generate(ctx, operands, resultType)
+        new ScalarFunctionCallGen(
+          ssf.makeFunction(getOperandLiterals(operands), operands.map(_.resultType).toArray))
+            .generate(ctx, operands, resultType)
 
       case tsf: TableSqlFunction =>
         new TableFunctionCallGen(tsf.getTableFunction).generate(ctx, operands, resultType)
@@ -757,4 +761,16 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
         throw new CodeGenException(s"Unsupported call: $explainCall")
     }
   }
+
+  def getOperandLiterals(operands: Seq[GeneratedExpression]): Array[AnyRef] = {
+    operands.map { expr =>
+      expr.literalValue match {
+        case None => null
+        case Some(literal) =>
+          getConverterForDataType(fromLogicalTypeToDataType(expr.resultType))
+              .asInstanceOf[DataFormatConverter[AnyRef, AnyRef]
+              ].toExternal(literal.asInstanceOf[AnyRef])
+      }
+    }.toArray
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/ScalarSqlFunction.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/ScalarSqlFunction.scala
index 35b5b5d..159d4f1 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/ScalarSqlFunction.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/ScalarSqlFunction.scala
@@ -26,6 +26,7 @@ import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils.{
 import org.apache.flink.table.runtime.types.ClassLogicalTypeConverter.getDefaultExternalClassForType
 import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
 import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
+import org.apache.flink.table.types.logical.LogicalType
 
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.sql._
@@ -47,16 +48,18 @@ class ScalarSqlFunction(
     name: String,
     displayName: String,
     scalarFunction: ScalarFunction,
-    typeFactory: FlinkTypeFactory)
+    typeFactory: FlinkTypeFactory,
+    returnTypeInfer: Option[SqlReturnTypeInference] = None)
   extends SqlFunction(
     new SqlIdentifier(name, SqlParserPos.ZERO),
-    createReturnTypeInference(name, scalarFunction, typeFactory),
+    returnTypeInfer.getOrElse(createReturnTypeInference(name, scalarFunction, typeFactory)),
     createOperandTypeInference(name, scalarFunction, typeFactory),
     createOperandTypeChecker(name, scalarFunction),
     null,
     SqlFunctionCategory.USER_DEFINED_FUNCTION) {
 
-  def getScalarFunction: ScalarFunction = scalarFunction
+  def makeFunction(constants: Array[AnyRef], argTypes: Array[LogicalType]): ScalarFunction =
+    scalarFunction
 
   override def isDeterministic: Boolean = scalarFunction.isDeterministic
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
index 1de25dd..e565a6e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
@@ -754,6 +754,10 @@ object UserDefinedFunctionUtils {
     }
   }
 
+  def getOperandTypeArray(callBinding: SqlOperatorBinding): Array[LogicalType] = {
+    getOperandType(callBinding).toArray
+  }
+
   def getOperandType(callBinding: SqlOperatorBinding): Seq[LogicalType] = {
     val operandTypes = for (i <- 0 until callBinding.getOperandCount)
       yield callBinding.getOperandType(i)