You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/05/28 18:52:01 UTC

[flink] branch master updated: [FLINK-17880][table] Use new inference for table/scalar function in catalogs

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 40a0e71  [FLINK-17880][table] Use new inference for table/scalar function in catalogs
40a0e71 is described below

commit 40a0e7187067e2d56b3ac65958e010243af36fb7
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon May 25 11:06:06 2020 +0200

    [FLINK-17880][table] Use new inference for table/scalar function in catalogs
    
    This updates catalog related operations (e.g. function DDL, SQL Client) to
    the new type inference for scalar and table functions. Hive functions and
    legacy planner still work with the old inference. With this commit we can
    tell an easier story about how to implement functions, because all main
    entrypoints support the new inference.
    
    This closes #12336.
---
 .../client/config/entries/ExecutionEntry.java      |   9 +
 .../client/gateway/local/ExecutionContext.java     |  30 ++-
 .../api/bridge/java/StreamTableEnvironment.java    |   7 +
 .../flink/table/catalog/FunctionCatalog.java       |  10 +-
 .../table/functions/FunctionDefinitionUtil.java    |  17 +-
 .../flink/table/catalog/FunctionCatalogTest.java   | 203 +++++++++------------
 .../functions/FunctionDefinitionUtilTest.java      |   8 +-
 .../functions/python/PythonScalarFunction.java     |  20 ++
 .../functions/python/PythonTableFunction.java      |  20 ++
 .../plan/nodes/common/CommonPythonBase.scala       |  12 +-
 .../table/planner/plan/utils/PythonUtil.scala      |  10 +-
 .../utils/JavaUserDefinedScalarFunctions.java      |  11 +-
 .../catalog/FunctionCatalogOperatorTable.java      |  30 +++
 .../expressions/PlannerExpressionConverter.scala   |  58 +++++-
 .../table/runtime/stream/sql/FunctionITCase.java   |  19 --
 .../table/runtime/stream/table/FunctionITCase.java | 103 -----------
 .../flink/table/runtime/batch/sql/CalcITCase.scala |   9 -
 17 files changed, 286 insertions(+), 290 deletions(-)

diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java
index 78be7f5..fc94dd1 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java
@@ -221,6 +221,15 @@ public class ExecutionEntry extends ConfigEntry {
 		return false;
 	}
 
+	public boolean isBlinkPlanner() {
+		final String planner = properties.getOptionalString(EXECUTION_PLANNER)
+			.orElse(EXECUTION_PLANNER_VALUE_BLINK);
+		if (planner.equals(EXECUTION_PLANNER_VALUE_OLD)) {
+			return false;
+		}
+		return true;
+	}
+
 	public TimeCharacteristic getTimeCharacteristic() {
 		return properties.getOptionalString(EXECUTION_TIME_CHARACTERISTIC)
 			.flatMap((v) -> {
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 4f5e418..a13eab7 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -655,14 +655,28 @@ public class ExecutionContext<ClusterID> {
 		if (tableEnv instanceof StreamTableEnvironment) {
 			StreamTableEnvironment streamTableEnvironment = (StreamTableEnvironment) tableEnv;
 			functions.forEach((k, v) -> {
-				if (v instanceof ScalarFunction) {
-					streamTableEnvironment.registerFunction(k, (ScalarFunction) v);
-				} else if (v instanceof AggregateFunction) {
-					streamTableEnvironment.registerFunction(k, (AggregateFunction<?, ?>) v);
-				} else if (v instanceof TableFunction) {
-					streamTableEnvironment.registerFunction(k, (TableFunction<?>) v);
-				} else {
-					throw new SqlExecutionException("Unsupported function type: " + v.getClass().getName());
+				// Blink planner uses FLIP-65 functions for scalar and table functions
+				// aggregate functions still use the old type inference
+				if (environment.getExecution().isBlinkPlanner()) {
+					if (v instanceof ScalarFunction || v instanceof TableFunction) {
+						streamTableEnvironment.createTemporarySystemFunction(k, (UserDefinedFunction) v);
+					} else if (v instanceof AggregateFunction) {
+						streamTableEnvironment.registerFunction(k, (AggregateFunction<?, ?>) v);
+					} else {
+						throw new SqlExecutionException("Unsupported function type: " + v.getClass().getName());
+					}
+				}
+				// legacy
+				else {
+					if (v instanceof ScalarFunction) {
+						streamTableEnvironment.registerFunction(k, (ScalarFunction) v);
+					} else if (v instanceof AggregateFunction) {
+						streamTableEnvironment.registerFunction(k, (AggregateFunction<?, ?>) v);
+					} else if (v instanceof TableFunction) {
+						streamTableEnvironment.registerFunction(k, (TableFunction<?>) v);
+					} else {
+						throw new SqlExecutionException("Unsupported function type: " + v.getClass().getName());
+					}
 				}
 			});
 		} else {
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java
index b9d611e..6f80220 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java
@@ -35,6 +35,7 @@ import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.TableAggregateFunction;
 import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.UserDefinedFunction;
 
 /**
  * This table environment is the entry point and central context for creating Table and SQL
@@ -152,7 +153,13 @@ public interface StreamTableEnvironment extends TableEnvironment {
 	 * @param name The name under which the function is registered.
 	 * @param tableFunction The TableFunction to register.
 	 * @param <T> The type of the output row.
+	 *
+	 * @deprecated Use {@link #createTemporarySystemFunction(String, UserDefinedFunction)} instead. Please
+	 *             note that the new method also uses the new type system and reflective extraction logic. It
+	 *             might be necessary to update the function implementation as well. See the documentation of
+	 *             {@link TableFunction} for more information on the new function design.
 	 */
+	@Deprecated
 	<T> void registerFunction(String name, TableFunction<T> tableFunction);
 
 	/**
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index a6ab333..38c4094 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -388,6 +388,10 @@ public final class FunctionCatalog {
 		);
 	}
 
+	/**
+	 * @deprecated Use {@link #registerTemporarySystemFunction(String, FunctionDefinition, boolean)} instead.
+	 */
+	@Deprecated
 	public <T> void registerTempSystemTableFunction(
 			String name,
 			TableFunction<T> function,
@@ -639,9 +643,9 @@ public final class FunctionCatalog {
 			// directly.
 			return ((InlineCatalogFunction) function).getDefinition();
 		}
-		// Currently the uninstantiated functions are all from sql and catalog that use the old type inference,
-		// so using FunctionDefinitionUtil to instantiate them and wrap them with `ScalarFunctionDefinition`,
-		// `TableFunctionDefinition`, etc. If the new type inference is fully functional, this should be
+		// Until all functions support the new type inference, uninstantiated functions from sql and
+		// catalog use the FunctionDefinitionUtil to instantiate them and wrap them with `AggregateFunctionDefinition`,
+		// `TableAggregateFunctionDefinition`. If the new type inference is fully functional, this should be
 		// changed to use `UserDefinedFunctionHelper#instantiateFunction`.
 		return FunctionDefinitionUtil.createFunctionDefinition(
 			name, function.getClassName(), function.getFunctionLanguage(), config);
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java
index 1df8830..de5d91d 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java
@@ -60,18 +60,11 @@ public class FunctionDefinitionUtil {
 	}
 
 	private static FunctionDefinition createFunctionDefinitionInternal(String name, UserDefinedFunction udf) {
-		if (udf instanceof ScalarFunction) {
-			return new ScalarFunctionDefinition(
-				name,
-				(ScalarFunction) udf
-			);
-		} else if (udf instanceof TableFunction) {
-			TableFunction t = (TableFunction) udf;
-			return new TableFunctionDefinition(
-				name,
-				t,
-				UserDefinedFunctionHelper.getReturnTypeOfTableFunction(t)
-			);
+		if (udf instanceof ScalarFunction || udf instanceof TableFunction) {
+			// table and scalar function use the new type inference
+			// once the other functions have been updated, this entire class will not be necessary
+			// anymore and can be replaced with UserDefinedFunctionHelper.instantiateFunction
+			return udf;
 		} else if (udf instanceof AggregateFunction) {
 			AggregateFunction a = (AggregateFunction) udf;
 
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
index 4f68d4c..d1654db 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
@@ -26,9 +26,7 @@ import org.apache.flink.table.functions.AggregateFunctionDefinition;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.FunctionIdentifier;
 import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.table.functions.ScalarFunctionDefinition;
 import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.table.functions.TableFunctionDefinition;
 import org.apache.flink.table.module.Module;
 import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.utils.CatalogManagerMocks;
@@ -37,7 +35,6 @@ import org.hamcrest.Matcher;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Optional;
@@ -47,7 +44,6 @@ import static org.apache.flink.table.utils.CatalogManagerMocks.DEFAULT_CATALOG;
 import static org.apache.flink.table.utils.CatalogManagerMocks.DEFAULT_DATABASE;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.equalTo;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -59,24 +55,31 @@ import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
  */
 public class FunctionCatalogTest {
 
-	// TODO for now the resolution still returns the marker interfaces for functions
-	//  until we drop the old function stack in DDL
-
 	private static final ScalarFunction FUNCTION_1 = new TestFunction1();
 
 	private static final ScalarFunction FUNCTION_2 = new TestFunction2();
 
+	private static final ScalarFunction FUNCTION_3 = new TestFunction3();
+
+	private static final ScalarFunction FUNCTION_4 = new TestFunction4();
+
 	private static final ScalarFunction FUNCTION_INVALID = new InvalidTestFunction();
 
+	private static final TableFunction<?> TABLE_FUNCTION = new TestTableFunction();
+
+	private static final AggregateFunction<?, ?> AGGREGATE_FUNCTION = new TestAggregateFunction();
+
 	private static final String NAME = "test_function";
 
 	private static final ObjectIdentifier IDENTIFIER = ObjectIdentifier.of(
 		DEFAULT_CATALOG,
-		DEFAULT_DATABASE, NAME);
+		DEFAULT_DATABASE,
+		NAME);
 
 	private static final UnresolvedIdentifier FULL_UNRESOLVED_IDENTIFIER = UnresolvedIdentifier.of(
 		DEFAULT_CATALOG,
-		DEFAULT_DATABASE, NAME);
+		DEFAULT_DATABASE,
+		NAME);
 
 	private static final UnresolvedIdentifier PARTIAL_UNRESOLVED_IDENTIFIER = UnresolvedIdentifier.of(NAME);
 
@@ -120,24 +123,23 @@ public class FunctionCatalogTest {
 		// test catalog function is found
 		catalog.createFunction(
 			IDENTIFIER.toObjectPath(),
-			new CatalogFunctionImpl(TestFunction1.class.getName()),
+			new CatalogFunctionImpl(FUNCTION_1.getClass().getName()),
 			false);
 
-		FunctionLookup.Result result = functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER).get();
-
-		assertEquals(Optional.of(IDENTIFIER), result.getFunctionIdentifier().getIdentifier());
-		assertTrue(((ScalarFunctionDefinition) result.getFunctionDefinition()).getScalarFunction() instanceof TestFunction1);
+		assertThat(
+			functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER),
+			returnsFunction(FunctionIdentifier.of(IDENTIFIER), FUNCTION_1));
 
 		// test temp catalog function is found
-		functionCatalog.registerTempCatalogScalarFunction(
-			IDENTIFIER,
-			new TestFunction2()
+		functionCatalog.registerTemporaryCatalogFunction(
+			PARTIAL_UNRESOLVED_IDENTIFIER,
+			FUNCTION_2,
+			false
 		);
 
-		result = functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER).get();
-
-		assertEquals(Optional.of(IDENTIFIER), result.getFunctionIdentifier().getIdentifier());
-		assertTrue(((ScalarFunctionDefinition) result.getFunctionDefinition()).getScalarFunction() instanceof TestFunction2);
+		assertThat(
+			functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER),
+			returnsFunction(FunctionIdentifier.of(IDENTIFIER), FUNCTION_2));
 	}
 
 	@Test
@@ -148,40 +150,37 @@ public class FunctionCatalogTest {
 		// test catalog function is found
 		catalog.createFunction(
 			IDENTIFIER.toObjectPath(),
-			new CatalogFunctionImpl(TestFunction1.class.getName()),
+			new CatalogFunctionImpl(FUNCTION_1.getClass().getName()),
 			false);
 
-		FunctionLookup.Result result = functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER).get();
-
-		assertEquals(Optional.of(IDENTIFIER), result.getFunctionIdentifier().getIdentifier());
-		assertTrue(((ScalarFunctionDefinition) result.getFunctionDefinition()).getScalarFunction() instanceof TestFunction1);
+		assertThat(
+			functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER),
+			returnsFunction(FunctionIdentifier.of(IDENTIFIER), FUNCTION_1));
 
-		// test temp catalog function is found
-		functionCatalog.registerTempCatalogScalarFunction(
-			IDENTIFIER,
-			new TestFunction2()
+		// test temporary catalog function is found
+		functionCatalog.registerTemporaryCatalogFunction(
+			PARTIAL_UNRESOLVED_IDENTIFIER,
+			FUNCTION_2,
+			false
 		);
 
-		result = functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER).get();
-
-		assertEquals(Optional.of(IDENTIFIER), result.getFunctionIdentifier().getIdentifier());
-		assertTrue(((ScalarFunctionDefinition) result.getFunctionDefinition()).getScalarFunction() instanceof TestFunction2);
+		assertThat(
+			functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER),
+			returnsFunction(FunctionIdentifier.of(IDENTIFIER), FUNCTION_2));
 
 		// test system function is found
 		moduleManager.loadModule("test_module", new TestModule());
 
-		result = functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER).get();
-
-		assertEquals(Optional.of(NAME), result.getFunctionIdentifier().getSimpleName());
-		assertTrue(((ScalarFunctionDefinition) result.getFunctionDefinition()).getScalarFunction() instanceof TestFunction3);
-
-		// test temp system function is found
-		functionCatalog.registerTempSystemScalarFunction(NAME, new TestFunction4());
+		assertThat(
+			functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER),
+			returnsFunction(FunctionIdentifier.of(NAME), FUNCTION_3));
 
-		result = functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER).get();
+		// test temporary system function is found
+		functionCatalog.registerTemporarySystemFunction(NAME, FUNCTION_4, false);
 
-		assertEquals(Optional.of(NAME), result.getFunctionIdentifier().getSimpleName());
-		assertTrue(((ScalarFunctionDefinition) result.getFunctionDefinition()).getScalarFunction() instanceof TestFunction4);
+		assertThat(
+			functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER),
+			returnsFunction(FunctionIdentifier.of(NAME), FUNCTION_4));
 	}
 
 	@Test
@@ -196,7 +195,8 @@ public class FunctionCatalogTest {
 			returnsFunction(FunctionIdentifier.of(NAME), FUNCTION_1));
 
 		// register second time lenient
-		functionCatalog.registerTemporarySystemFunction(NAME,
+		functionCatalog.registerTemporarySystemFunction(
+			NAME,
 			FUNCTION_2,
 			true);
 		assertThat(
@@ -254,27 +254,27 @@ public class FunctionCatalogTest {
 		// register first time
 		functionCatalog.registerTemporarySystemFunction(
 			NAME,
-			TestFunction1.class.getName(),
+			FUNCTION_1.getClass().getName(),
 			FunctionLanguage.JAVA,
 			false);
 		assertThat(
 			functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER),
-			returnsFunction(FunctionIdentifier.of(NAME), new ScalarFunctionDefinition(NAME, FUNCTION_1)));
+			returnsFunction(FunctionIdentifier.of(NAME), FUNCTION_1));
 
 		// register second time lenient
 		functionCatalog.registerTemporarySystemFunction(NAME,
-			TestFunction2.class.getName(),
+			FUNCTION_2.getClass().getName(),
 			FunctionLanguage.JAVA,
 			true);
 		assertThat(
 			functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER),
-			returnsFunction(FunctionIdentifier.of(NAME), new ScalarFunctionDefinition(NAME, FUNCTION_1)));
+			returnsFunction(FunctionIdentifier.of(NAME), FUNCTION_1));
 
 		// register second time not lenient
 		try {
 			functionCatalog.registerTemporarySystemFunction(
 				NAME,
-				TestFunction2.class.getName(),
+				FUNCTION_2.getClass().getName(),
 				FunctionLanguage.JAVA,
 				false);
 			fail();
@@ -288,7 +288,7 @@ public class FunctionCatalogTest {
 		try {
 			functionCatalog.registerTemporarySystemFunction(
 				NAME,
-				InvalidTestFunction.class.getName(),
+				FUNCTION_INVALID.getClass().getName(),
 				FunctionLanguage.JAVA,
 				false);
 			fail();
@@ -305,28 +305,29 @@ public class FunctionCatalogTest {
 		// test register uninstantiated table function
 		functionCatalog.registerTemporarySystemFunction(
 			NAME,
-			TestTableFunction1.class.getName(),
+			TABLE_FUNCTION.getClass().getName(),
 			FunctionLanguage.JAVA,
 			false);
 		assertThat(
 			functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER),
 			returnsFunction(
 				FunctionIdentifier.of(NAME),
-				new TableFunctionDefinition(NAME, new TestTableFunction1(), Types.STRING)));
+				TABLE_FUNCTION));
 
 		functionCatalog.dropTemporarySystemFunction(NAME, true);
 
 		// test register uninstantiated aggregate function
 		functionCatalog.registerTemporarySystemFunction(
 			NAME,
-			TestAggregateFunction1.class.getName(),
+			AGGREGATE_FUNCTION.getClass().getName(),
 			FunctionLanguage.JAVA,
 			false);
 		assertThat(
 			functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER),
 			returnsFunction(
 				FunctionIdentifier.of(NAME),
-				new AggregateFunctionDefinition(NAME, new TestAggregateFunction1(), Types.STRING, Types.STRING)));
+				// TODO aggregate functions still use marker interface
+				new AggregateFunctionDefinition(NAME, AGGREGATE_FUNCTION, Types.STRING, Types.STRING)));
 	}
 
 	@Test
@@ -338,9 +339,7 @@ public class FunctionCatalogTest {
 			false);
 		assertThat(
 			functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER),
-			returnsFunction(
-				FunctionIdentifier.of(IDENTIFIER),
-				new ScalarFunctionDefinition(NAME, FUNCTION_1)));
+			returnsFunction(FunctionIdentifier.of(IDENTIFIER), FUNCTION_1));
 
 		// register second time lenient
 		functionCatalog.registerCatalogFunction(
@@ -349,9 +348,7 @@ public class FunctionCatalogTest {
 			true);
 		assertThat(
 			functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER),
-			returnsFunction(
-				FunctionIdentifier.of(IDENTIFIER),
-				new ScalarFunctionDefinition(NAME, FUNCTION_1)));
+			returnsFunction(FunctionIdentifier.of(IDENTIFIER), FUNCTION_1));
 
 		// register second time not lenient
 		try {
@@ -414,9 +411,7 @@ public class FunctionCatalogTest {
 			false);
 		assertThat(
 			functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER),
-			returnsFunction(
-				FunctionIdentifier.of(IDENTIFIER),
-				new ScalarFunctionDefinition(NAME, FUNCTION_2)));
+			returnsFunction(FunctionIdentifier.of(IDENTIFIER), FUNCTION_2));
 
 		// register temporary first time
 		functionCatalog.registerTemporaryCatalogFunction(
@@ -488,7 +483,7 @@ public class FunctionCatalogTest {
 			functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER),
 			returnsFunction(
 				FunctionIdentifier.of(IDENTIFIER),
-				new ScalarFunctionDefinition(NAME, FUNCTION_2))); // permanent function is visible again
+				FUNCTION_2)); // permanent function is visible again
 
 		// drop temporary second time lenient
 		assertThat(
@@ -530,9 +525,7 @@ public class FunctionCatalogTest {
 			false);
 		assertThat(
 			functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER),
-			returnsFunction(
-				FunctionIdentifier.of(IDENTIFIER),
-				new ScalarFunctionDefinition(NAME, FUNCTION_2)));
+			returnsFunction(FunctionIdentifier.of(IDENTIFIER), FUNCTION_2));
 
 		// register temporary first time
 		functionCatalog.registerTemporaryCatalogFunction(
@@ -542,9 +535,7 @@ public class FunctionCatalogTest {
 		// temporary function hides catalog function
 		assertThat(
 			functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER),
-			returnsFunction(
-				FunctionIdentifier.of(IDENTIFIER),
-				new ScalarFunctionDefinition(IDENTIFIER.getObjectName(), FUNCTION_1)));
+			returnsFunction(FunctionIdentifier.of(IDENTIFIER), FUNCTION_1));
 
 		// register temporary second time lenient
 		functionCatalog.registerTemporaryCatalogFunction(
@@ -553,9 +544,7 @@ public class FunctionCatalogTest {
 			true);
 		assertThat(
 			functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER),
-			returnsFunction(
-				FunctionIdentifier.of(IDENTIFIER),
-				new ScalarFunctionDefinition(IDENTIFIER.getObjectName(), FUNCTION_1)));
+			returnsFunction(FunctionIdentifier.of(IDENTIFIER), FUNCTION_1));
 
 		// register temporary second time not lenient
 		try {
@@ -590,58 +579,25 @@ public class FunctionCatalogTest {
 		// test register uninstantiated table function
 		functionCatalog.registerTemporaryCatalogFunction(
 			PARTIAL_UNRESOLVED_IDENTIFIER,
-			new CatalogFunctionImpl(TestTableFunction1.class.getName()),
+			new CatalogFunctionImpl(TABLE_FUNCTION.getClass().getName()),
 			false);
 		assertThat(
 			functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER),
-			returnsFunction(
-				FunctionIdentifier.of(IDENTIFIER),
-				new TableFunctionDefinition(NAME, new TestTableFunction1(), Types.STRING)));
+			returnsFunction(FunctionIdentifier.of(IDENTIFIER), TABLE_FUNCTION));
 
 		functionCatalog.dropTemporaryCatalogFunction(PARTIAL_UNRESOLVED_IDENTIFIER, true);
 
 		// test register uninstantiated aggregate function
 		functionCatalog.registerTemporaryCatalogFunction(
 			PARTIAL_UNRESOLVED_IDENTIFIER,
-			new CatalogFunctionImpl(TestAggregateFunction1.class.getName()),
+			new CatalogFunctionImpl(AGGREGATE_FUNCTION.getClass().getName()),
 			false);
 		assertThat(
 			functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER),
 			returnsFunction(
 				FunctionIdentifier.of(IDENTIFIER),
-				new AggregateFunctionDefinition(NAME, new TestAggregateFunction1(), Types.STRING, Types.STRING)));
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// Legacy function handling before FLIP-65
-	// --------------------------------------------------------------------------------------------
-
-	@Test
-	public void testRegisterAndDropTempSystemFunction() {
-		assertFalse(Arrays.asList(functionCatalog.getUserDefinedFunctions()).contains(NAME));
-
-		functionCatalog.registerTempSystemScalarFunction(NAME, new TestFunction1());
-		assertTrue(Arrays.asList(functionCatalog.getUserDefinedFunctions()).contains(NAME));
-
-		functionCatalog.dropTemporarySystemFunction(NAME, false);
-		assertFalse(Arrays.asList(functionCatalog.getUserDefinedFunctions()).contains(NAME));
-
-		functionCatalog.dropTemporarySystemFunction(NAME, true);
-		assertFalse(Arrays.asList(functionCatalog.getUserDefinedFunctions()).contains(NAME));
-	}
-
-	@Test
-	public void testRegisterAndDropTempCatalogFunction() {
-		assertFalse(Arrays.asList(functionCatalog.getUserDefinedFunctions()).contains(NAME));
-
-		functionCatalog.registerTempCatalogScalarFunction(IDENTIFIER, new TestFunction1());
-		assertTrue(Arrays.asList(functionCatalog.getUserDefinedFunctions()).contains(IDENTIFIER.getObjectName()));
-
-		functionCatalog.dropTempCatalogFunction(IDENTIFIER, false);
-		assertFalse(Arrays.asList(functionCatalog.getUserDefinedFunctions()).contains(IDENTIFIER.getObjectName()));
-
-		functionCatalog.dropTempCatalogFunction(IDENTIFIER, true);
-		assertFalse(Arrays.asList(functionCatalog.getUserDefinedFunctions()).contains(IDENTIFIER.getObjectName()));
+				// TODO aggregate functions still use marker interface
+				new AggregateFunctionDefinition(NAME, AGGREGATE_FUNCTION, Types.STRING, Types.STRING)));
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -673,7 +629,7 @@ public class FunctionCatalogTest {
 
 		@Override
 		public Optional<FunctionDefinition> getFunctionDefinition(String name) {
-			return Optional.of(new ScalarFunctionDefinition(NAME, new TestFunction3()));
+			return Optional.of(FUNCTION_3);
 		}
 	}
 
@@ -698,6 +654,11 @@ public class FunctionCatalogTest {
 		public String eval(){
 			return null;
 		}
+
+		@Override
+		public boolean equals(Object o) {
+			return o != null && o.getClass() == this.getClass();
+		}
 	}
 
 	/**
@@ -707,6 +668,11 @@ public class FunctionCatalogTest {
 		public String eval(){
 			return null;
 		}
+
+		@Override
+		public boolean equals(Object o) {
+			return o != null && o.getClass() == this.getClass();
+		}
 	}
 
 	/**
@@ -716,6 +682,11 @@ public class FunctionCatalogTest {
 		public String eval(){
 			return null;
 		}
+
+		@Override
+		public boolean equals(Object o) {
+			return o != null && o.getClass() == this.getClass();
+		}
 	}
 
 	/**
@@ -728,7 +699,8 @@ public class FunctionCatalogTest {
 	/**
 	 * Testing table function.
 	 */
-	public static class TestTableFunction1 extends TableFunction<String> {
+	@SuppressWarnings("unused")
+	public static class TestTableFunction extends TableFunction<String> {
 		public void eval(String in) {}
 
 		@Override
@@ -740,7 +712,8 @@ public class FunctionCatalogTest {
 	/**
 	 * Testing aggregate function.
 	 */
-	public static class TestAggregateFunction1 extends AggregateFunction<String, String> {
+	@SuppressWarnings("unused")
+	public static class TestAggregateFunction extends AggregateFunction<String, String> {
 
 		@Override
 		public String getValue(String accumulator) {
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/FunctionDefinitionUtilTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/FunctionDefinitionUtilTest.java
index 83497a8..8fc6e40 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/FunctionDefinitionUtilTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/FunctionDefinitionUtilTest.java
@@ -37,7 +37,7 @@ public class FunctionDefinitionUtilTest {
 				TestScalarFunction.class.getName()
 		);
 
-		assertTrue(((ScalarFunctionDefinition) fd).getScalarFunction() instanceof TestScalarFunction);
+		assertTrue(fd instanceof TestScalarFunction);
 	}
 
 	@Test
@@ -47,14 +47,14 @@ public class FunctionDefinitionUtilTest {
 			TestTableFunction.class.getName()
 		);
 
-		assertTrue(((TableFunctionDefinition) fd1).getTableFunction() instanceof TestTableFunction);
+		assertTrue(fd1 instanceof TestTableFunction);
 
 		FunctionDefinition fd2 = FunctionDefinitionUtil.createFunctionDefinition(
 				"test",
 				TestTableFunctionWithoutResultType.class.getName()
 		);
 
-		assertTrue(((TableFunctionDefinition) fd2).getTableFunction() instanceof TestTableFunctionWithoutResultType);
+		assertTrue(fd2 instanceof TestTableFunctionWithoutResultType);
 	}
 
 	@Test
@@ -64,6 +64,7 @@ public class FunctionDefinitionUtilTest {
 			TestAggFunction.class.getName()
 		);
 
+		// TODO aggregate functions still use marker interface
 		assertTrue(((AggregateFunctionDefinition) fd1).getAggregateFunction() instanceof TestAggFunction);
 
 		FunctionDefinition fd2 = FunctionDefinitionUtil.createFunctionDefinition(
@@ -85,6 +86,7 @@ public class FunctionDefinitionUtilTest {
 			TestTableAggFunction.class.getName()
 		);
 
+		// TODO table aggregate functions still use marker interface
 		assertTrue(((TableAggregateFunctionDefinition) fd1).getTableAggregateFunction() instanceof TestTableAggFunction);
 
 		FunctionDefinition fd2 = FunctionDefinitionUtil.createFunctionDefinition(
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java
index db2480c..72df5e3 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java
@@ -20,7 +20,16 @@ package org.apache.flink.table.functions.python;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.table.types.inference.TypeStrategies;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * The wrapper of user defined python scalar function.
@@ -91,6 +100,17 @@ public class PythonScalarFunction extends ScalarFunction implements PythonFuncti
 	}
 
 	@Override
+	public TypeInference getTypeInference(DataTypeFactory typeFactory) {
+		final List<DataType> argumentDataTypes = Stream.of(inputTypes)
+			.map(TypeConversions::fromLegacyInfoToDataType)
+			.collect(Collectors.toList());
+		return TypeInference.newBuilder()
+			.typedArguments(argumentDataTypes)
+			.outputTypeStrategy(TypeStrategies.explicit(TypeConversions.fromLegacyInfoToDataType(resultType)))
+			.build();
+	}
+
+	@Override
 	public String toString() {
 		return name;
 	}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java
index 2244375..3560170 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java
@@ -21,9 +21,18 @@ package org.apache.flink.table.functions.python;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.table.types.inference.TypeStrategies;
+import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.flink.types.Row;
 
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
 /**
  * The wrapper of user defined python table function.
  */
@@ -93,6 +102,17 @@ public class PythonTableFunction extends TableFunction<Row> implements PythonFun
 	}
 
 	@Override
+	public TypeInference getTypeInference(DataTypeFactory typeFactory) {
+		final List<DataType> argumentDataTypes = Stream.of(inputTypes)
+			.map(TypeConversions::fromLegacyInfoToDataType)
+			.collect(Collectors.toList());
+		return TypeInference.newBuilder()
+			.typedArguments(argumentDataTypes)
+			.outputTypeStrategy(TypeStrategies.explicit(TypeConversions.fromLegacyInfoToDataType(resultType)))
+			.build();
+	}
+
+	@Override
 	public String toString() {
 		return name;
 	}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonBase.scala
index eba8eab1..fc96c57 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonBase.scala
@@ -23,13 +23,14 @@ import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.api.{TableConfig, TableException}
-import org.apache.flink.table.functions.UserDefinedFunction
+import org.apache.flink.table.functions.FunctionDefinition
 import org.apache.flink.table.functions.python.{PythonFunction, PythonFunctionInfo}
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction
 import org.apache.flink.table.planner.functions.utils.{ScalarSqlFunction, TableSqlFunction}
 import org.apache.flink.table.planner.utils.DummyStreamExecutionEnvironment
 
-import scala.collection.mutable
 import scala.collection.JavaConversions._
+import scala.collection.mutable
 
 trait CommonPythonBase {
 
@@ -50,7 +51,8 @@ trait CommonPythonBase {
   private def createPythonFunctionInfo(
       pythonRexCall: RexCall,
       inputNodes: mutable.Map[RexNode, Integer],
-      func: UserDefinedFunction): PythonFunctionInfo = {
+      functionDefinition: FunctionDefinition)
+    : PythonFunctionInfo = {
     val inputs = new mutable.ArrayBuffer[AnyRef]()
     pythonRexCall.getOperands.foreach {
       case pythonRexCall: RexCall =>
@@ -73,7 +75,7 @@ trait CommonPythonBase {
         }
     }
 
-    new PythonFunctionInfo(func.asInstanceOf[PythonFunction], inputs.toArray)
+    new PythonFunctionInfo(functionDefinition.asInstanceOf[PythonFunction], inputs.toArray)
   }
 
   protected def createPythonFunctionInfo(
@@ -84,6 +86,8 @@ trait CommonPythonBase {
         createPythonFunctionInfo(pythonRexCall, inputNodes, sfc.scalarFunction)
       case tfc: TableSqlFunction =>
         createPythonFunctionInfo(pythonRexCall, inputNodes, tfc.udtf)
+      case bsf: BridgingSqlFunction =>
+        createPythonFunctionInfo(pythonRexCall, inputNodes, bsf.getDefinition)
     }
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/PythonUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/PythonUtil.scala
index d924e0e..b89410b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/PythonUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/PythonUtil.scala
@@ -19,8 +19,9 @@
 package org.apache.flink.table.planner.plan.utils
 
 import org.apache.calcite.rex.{RexCall, RexNode}
-import org.apache.flink.table.functions.UserDefinedFunction
+import org.apache.flink.table.functions.FunctionDefinition
 import org.apache.flink.table.functions.python.{PythonFunction, PythonFunctionKind}
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction
 import org.apache.flink.table.planner.functions.utils.{ScalarSqlFunction, TableSqlFunction}
 
 import scala.collection.JavaConversions._
@@ -91,13 +92,14 @@ object PythonUtil {
       rexCall.getOperator match {
         case sfc: ScalarSqlFunction => isPythonFunction(sfc.scalarFunction)
         case tfc: TableSqlFunction => isPythonFunction(tfc.udtf)
+        case bsf: BridgingSqlFunction => isPythonFunction(bsf.getDefinition)
         case _ => false
     }
 
-    private def isPythonFunction(userDefinedFunction: UserDefinedFunction): Boolean = {
-      userDefinedFunction.isInstanceOf[PythonFunction] &&
+    private def isPythonFunction(functionDefinition: FunctionDefinition): Boolean = {
+      functionDefinition.isInstanceOf[PythonFunction] &&
         (pythonFunctionKind.isEmpty ||
-          userDefinedFunction.asInstanceOf[PythonFunction].getPythonFunctionKind ==
+          functionDefinition.asInstanceOf[PythonFunction].getPythonFunctionKind ==
             pythonFunctionKind.get)
     }
 
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java
index 0c9bdef..1c86fc4 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java
@@ -20,8 +20,8 @@ package org.apache.flink.table.planner.runtime.utils;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.annotation.DataTypeHint;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.FunctionContext;
@@ -140,7 +140,9 @@ public class JavaUserDefinedScalarFunctions {
 			openCalled = true;
 		}
 
-		public Timestamp eval(TimestampData timestampData, Integer offset) {
+		public @DataTypeHint("TIMESTAMP(3)") Timestamp eval(
+				@DataTypeHint("TIMESTAMP(3)") TimestampData timestampData,
+				Integer offset) {
 			if (!openCalled) {
 				fail("Open was not called before run.");
 			}
@@ -154,11 +156,6 @@ public class JavaUserDefinedScalarFunctions {
 		}
 
 		@Override
-		public TypeInformation<?> getResultType(Class<?>[] signature) {
-			return Types.SQL_TIMESTAMP;
-		}
-
-		@Override
 		public void close() {
 			closeCalled = true;
 		}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/FunctionCatalogOperatorTable.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/FunctionCatalogOperatorTable.java
index 7bd7a31..2d0a063 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/FunctionCatalogOperatorTable.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/FunctionCatalogOperatorTable.java
@@ -24,8 +24,11 @@ import org.apache.flink.table.calcite.FlinkTypeFactory;
 import org.apache.flink.table.functions.AggregateFunctionDefinition;
 import org.apache.flink.table.functions.BuiltInFunctionDefinition;
 import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.ScalarFunctionDefinition;
+import org.apache.flink.table.functions.TableFunction;
 import org.apache.flink.table.functions.TableFunctionDefinition;
+import org.apache.flink.table.functions.UserDefinedFunctionHelper;
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils;
 
 import org.apache.calcite.sql.SqlFunction;
@@ -35,6 +38,8 @@ import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.SqlOperatorTable;
 import org.apache.calcite.sql.SqlSyntax;
 import org.apache.calcite.sql.validate.SqlNameMatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Optional;
@@ -45,6 +50,8 @@ import java.util.Optional;
 @Internal
 public class FunctionCatalogOperatorTable implements SqlOperatorTable {
 
+	private static final Logger LOG = LoggerFactory.getLogger(FunctionCatalogOperatorTable.class);
+
 	private final FunctionCatalog functionCatalog;
 	private final FlinkTypeFactory typeFactory;
 
@@ -100,6 +107,29 @@ public class FunctionCatalogOperatorTable implements SqlOperatorTable {
 		} else if (functionDefinition instanceof BuiltInFunctionDefinition) {
 			return Optional.empty();
 		}
+		LOG.warn(
+			"The new type inference for functions is only supported in the Blink planner. " +
+				"Falling back to legacy type inference for function '{}'.",
+			functionDefinition.getClass().toString());
+		if (functionDefinition instanceof ScalarFunction) {
+			return convertToSqlFunction(
+				category,
+				name,
+				new ScalarFunctionDefinition(
+					name,
+					(ScalarFunction) functionDefinition)
+			);
+		} else if (functionDefinition instanceof TableFunction) {
+			final TableFunction<?> t = (TableFunction<?>) functionDefinition;
+			return convertToSqlFunction(
+				category,
+				name,
+				new TableFunctionDefinition(
+					name,
+					t,
+					UserDefinedFunctionHelper.getReturnTypeOfTableFunction(t))
+			);
+		}
 		throw new TableException(
 			"The new type inference for functions is only supported in the Blink planner.");
 	}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
index 7921c74..86fc0eb 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
@@ -26,15 +26,18 @@ import org.apache.flink.table.functions._
 import org.apache.flink.table.types.logical.LogicalTypeRoot.SYMBOL
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks._
 import org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo
-
 import java.time.{LocalDate, LocalDateTime}
 
+import org.apache.flink.table.util.Logging
+
 import _root_.scala.collection.JavaConverters._
 
 /**
   * Visitor implementation for converting [[Expression]]s to [[PlannerExpression]]s.
   */
-class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExpression] {
+class PlannerExpressionConverter private
+  extends ApiExpressionVisitor[PlannerExpression]
+  with Logging {
 
   override def visit(call: CallExpression): PlannerExpression = {
     translateCall(call.getFunctionDefinition, call.getChildren.asScala)
@@ -84,6 +87,7 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp
     val args = children.map(_.accept(this))
 
     func match {
+      // explicit legacy
       case sfd: ScalarFunctionDefinition =>
         val call = PlannerScalarFunctionCall(
           sfd.getScalarFunction,
@@ -92,6 +96,7 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp
         call.validateInput()
         call
 
+      // explicit legacy
       case tfd: TableFunctionDefinition =>
         PlannerTableFunctionCall(
           tfd.toString,
@@ -99,6 +104,7 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp
           args,
           tfd.getResultType)
 
+      // explicit legacy
       case afd: AggregateFunctionDefinition =>
         AggFunctionCall(
           afd.getAggregateFunction,
@@ -106,6 +112,7 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp
           afd.getAccumulatorTypeInfo,
           args)
 
+      // explicit legacy
       case tafd: TableAggregateFunctionDefinition =>
         AggFunctionCall(
           tafd.getTableAggregateFunction,
@@ -113,6 +120,51 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp
           tafd.getAccumulatorTypeInfo,
           args)
 
+      // best-effort support for new type inference
+      case sf: ScalarFunction =>
+        LOG.warn(
+          "The new type inference for functions is only supported in the Blink planner. " +
+           "Falling back to legacy type inference for function '{}'.", sf.getClass)
+        val call = PlannerScalarFunctionCall(
+          sf,
+          args)
+        // it configures underlying state
+        call.validateInput()
+        call
+
+      // best-effort support for new type inference
+      case tf: TableFunction[_] =>
+        LOG.warn(
+          "The new type inference for functions is only supported in the Blink planner. " +
+           "Falling back to legacy type inference for function '{}'.", tf.getClass)
+        PlannerTableFunctionCall(
+          tf.toString,
+          tf,
+          args,
+          UserDefinedFunctionHelper.getReturnTypeOfTableFunction(tf))
+
+      // best-effort support for new type inference
+      case af: AggregateFunction[_, _] =>
+        LOG.warn(
+          "The new type inference for functions is only supported in the Blink planner. " +
+           "Falling back to legacy type inference for function '{}'.", af.getClass)
+        AggFunctionCall(
+          af,
+          UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(af),
+          UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(af),
+          args)
+
+      // best-effort support for new type inference
+      case taf: TableAggregateFunction[_, _] =>
+        LOG.warn(
+          "The new type inference for functions is only supported in the Blink planner. " +
+           "Falling back to legacy type inference for function '{}'.", taf.getClass)
+        AggFunctionCall(
+          taf,
+          UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(taf),
+          UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(taf),
+          args)
+
       case _ : UserDefinedFunction =>
         throw new ValidationException(
           "The new type inference for functions is only supported in the Blink planner.")
@@ -442,7 +494,7 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp
             assert(args.size == 1)
             Ln(args.head)
 
-          case LOG =>
+          case BuiltInFunctionDefinitions.LOG =>
             assert(args.size == 1 || args.size == 2)
             if (args.size == 1) {
               Log(args.head)
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/FunctionITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/FunctionITCase.java
index 2ca2135..f466472 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/FunctionITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/FunctionITCase.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.runtime.stream.sql;
 
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
@@ -451,24 +450,6 @@ public class FunctionITCase extends AbstractTestBase {
 		tableEnv.sqlUpdate("drop table t2");
 	}
 
-	@Test
-	public void testDataTypeBasedTypeInferenceNotSupported() throws Exception {
-		thrown.expect(ValidationException.class);
-		thrown.expectMessage("The new type inference for functions is only supported in the Blink planner.");
-
-		StreamExecutionEnvironment streamExecEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
-		EnvironmentSettings settings = EnvironmentSettings.newInstance().useOldPlanner().build();
-		StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(
-				streamExecEnvironment, settings);
-
-		tableEnvironment.createTemporarySystemFunction("func", SimpleScalarFunction.class);
-		Table table = tableEnvironment
-			.sqlQuery("SELECT func(1)");
-		tableEnvironment.toAppendStream(table, Row.class).print();
-
-		streamExecEnvironment.execute();
-	}
-
 	/**
 	 * Simple scalar function.
 	 */
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/table/FunctionITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/table/FunctionITCase.java
deleted file mode 100644
index 509005d..0000000
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/table/FunctionITCase.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.stream.table;
-
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.annotation.DataTypeHint;
-import org.apache.flink.table.annotation.FunctionHint;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.test.util.AbstractTestBase;
-import org.apache.flink.types.Row;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-import static org.apache.flink.table.api.Expressions.$;
-import static org.apache.flink.table.api.Expressions.call;
-
-/**
- * Tests for user defined functions in the Table API.
- */
-public class FunctionITCase extends AbstractTestBase {
-
-	@Rule
-	public ExpectedException thrown = ExpectedException.none();
-
-	@Test
-	public void testDataTypeBasedTypeInferenceNotSupported() throws Exception {
-		thrown.expect(ValidationException.class);
-		thrown.expectMessage("The new type inference for functions is only supported in the Blink planner.");
-
-		StreamExecutionEnvironment streamExecEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
-		EnvironmentSettings settings = EnvironmentSettings.newInstance().useOldPlanner().build();
-		StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(
-				streamExecEnvironment, settings);
-
-		Table table = tableEnvironment
-			.sqlQuery("SELECT * FROM (VALUES (1)) AS TableName(f0)")
-			.select(call(new SimpleScalarFunction(), $("f0")));
-		tableEnvironment.toAppendStream(table, Row.class).print();
-
-		streamExecEnvironment.execute();
-	}
-
-	@Test
-	public void testDataTypeBasedTypeInferenceNotSupportedInLateralJoin() throws Exception {
-		thrown.expect(ValidationException.class);
-		thrown.expectMessage("The new type inference for functions is only supported in the Blink planner.");
-
-		StreamExecutionEnvironment streamExecEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
-		EnvironmentSettings settings = EnvironmentSettings.newInstance().useOldPlanner().build();
-		StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(
-				streamExecEnvironment, settings);
-
-		Table table = tableEnvironment
-			.sqlQuery("SELECT * FROM (VALUES ('A,B,C')) AS TableName(f0)")
-			.joinLateral(call(new SimpleTableFunction(), $("f0")).as("a", "b"))
-			.select($("a"), $("b"));
-		tableEnvironment.toAppendStream(table, Row.class).print();
-
-		streamExecEnvironment.execute();
-	}
-
-	/**
-	 * Simple scalar function.
-	 */
-	public static class SimpleScalarFunction extends ScalarFunction {
-		public long eval(Integer i) {
-			return i;
-		}
-	}
-
-	/**
-	 * Simple table function.
-	 */
-	@FunctionHint(output = @DataTypeHint("ROW<s STRING, sa ARRAY<STRING>>"))
-	public static class SimpleTableFunction extends TableFunction<Row> {
-		public void eval(String s) {
-			// no-op
-		}
-	}
-}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
index 3bcaeac..10bb2f4 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
@@ -393,15 +393,6 @@ class CalcITCase(
     val expected = List("a,a,d,d,e,e", "x,x,z,z,z,z").mkString("\n")
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
-
-  // new type inference for functions is only supported in the Blink planner
-  @Test(expected = classOf[ValidationException])
-  def testUnsupportedNewFunctionTypeInference(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = BatchTableEnvironment.create(env)
-    tEnv.createTemporarySystemFunction("testFunc", new Func13(">>"))
-    tEnv.sqlQuery("SELECT testFunc('fail')").toDataSet[Row]
-  }
 }
 
 object MyHashCode extends ScalarFunction {