You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/06/09 13:52:53 UTC
[flink] 01/04: [FLINK-17753] [table-planner-blink] Fix watermark
defined in ddl does not work in Table api
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 655545472d87e7c7d19bbc910cee45870d2b3888
Author: godfreyhe <go...@163.com>
AuthorDate: Tue May 26 17:08:56 2020 +0800
[FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api
This closes #12335
---
.../table/catalog/hive/HiveCatalogITCase.java | 32 +++++-
.../table/client/gateway/local/LocalExecutor.java | 6 ++
.../api/internal/CatalogTableSchemaResolver.java | 118 +++++++++++++++++++++
.../table/api/internal/TableEnvironmentImpl.java | 7 +-
.../apache/flink/table/catalog/CatalogManager.java | 34 +++++-
.../flink/table/catalog/CatalogTableImpl.java | 8 ++
.../org/apache/flink/table/delegation/Parser.java | 12 +++
.../org/apache/flink/table/utils/ParserMock.java | 7 ++
.../planner/catalog/CatalogCalciteSchema.java | 9 +-
.../catalog/CatalogManagerCalciteSchema.java | 8 +-
.../table/planner/catalog/CatalogSchemaTable.java | 11 +-
.../planner/catalog/DatabaseCalciteSchema.java | 7 --
.../flink/table/planner/delegation/ParserImpl.java | 22 +++-
.../table/planner/delegation/PlannerBase.scala | 32 +++---
.../table/planner/sources/TableSourceUtil.scala | 99 ++++++-----------
.../operations/SqlToOperationConverterTest.java | 26 +++--
.../plan/FlinkCalciteCatalogReaderTest.java | 17 +--
.../flink/table/planner/utils/PlannerMocks.java | 38 ++++---
.../planner/plan/stream/sql/TableSourceTest.xml | 104 +++++++++---------
.../plan/stream/table/LegacyTableSourceTest.xml | 72 ++++++++++---
.../planner/plan/stream/table/TableSourceTest.xml | 118 ++++++++++++++-------
.../flink/table/api/TableEnvironmentTest.scala | 2 +-
.../codegen/WatermarkGeneratorCodeGenTest.scala | 30 +++++-
.../planner/plan/stream/sql/TableSourceTest.scala | 48 ++-------
.../plan/stream/table/LegacyTableSourceTest.scala | 3 +
.../plan/stream/table/TableSourceTest.scala | 47 +++-----
.../org/apache/flink/table/planner/ParserImpl.java | 7 ++
.../flink/table/api/internal/TableEnvImpl.scala | 2 +
.../flink/table/catalog/CatalogManagerTest.java | 3 +
.../table/catalog/DatabaseCalciteSchemaTest.java | 11 +-
.../table/sqlexec/SqlToOperationConverterTest.java | 3 +
31 files changed, 601 insertions(+), 342 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
index f6fa581..ba12079 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
@@ -68,6 +68,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Optional;
+import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -281,6 +282,32 @@ public class HiveCatalogITCase {
}
private void testReadWriteCsvWithProctime(boolean isStreaming) {
+ TableEnvironment tableEnv = prepareTable(isStreaming);
+ ArrayList<Row> rows = Lists.newArrayList(
+ tableEnv.executeSql("SELECT * FROM proctime_src").collect());
+ Assert.assertEquals(5, rows.size());
+ tableEnv.executeSql("DROP TABLE proctime_src");
+ }
+
+ @Test
+ public void testTableApiWithProctimeForBatch() {
+ testTableApiWithProctime(false);
+ }
+
+ @Test
+ public void testTableApiWithProctimeForStreaming() {
+ testTableApiWithProctime(true);
+ }
+
+ private void testTableApiWithProctime(boolean isStreaming) {
+ TableEnvironment tableEnv = prepareTable(isStreaming);
+ ArrayList<Row> rows = Lists.newArrayList(
+ tableEnv.from("proctime_src").select($("price"), $("ts"), $("l_proctime")).execute().collect());
+ Assert.assertEquals(5, rows.size());
+ tableEnv.executeSql("DROP TABLE proctime_src");
+ }
+
+ private TableEnvironment prepareTable(boolean isStreaming) {
EnvironmentSettings.Builder builder = EnvironmentSettings.newInstance().useBlinkPlanner();
if (isStreaming) {
builder = builder.inStreamingMode();
@@ -308,10 +335,7 @@ public class HiveCatalogITCase {
"'connector.path' = 'file://%s'," +
"'format.type' = 'csv')", srcPath));
- ArrayList<Row> rows = Lists.newArrayList(
- tableEnv.executeSql("SELECT * FROM proctime_src").collect());
- Assert.assertEquals(5, rows.size());
- tableEnv.executeSql("DROP TABLE proctime_src");
+ return tableEnv;
}
@Test
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 604734e..5caa2a9 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -57,6 +57,7 @@ import org.apache.flink.table.client.gateway.local.result.ChangelogResult;
import org.apache.flink.table.client.gateway.local.result.DynamicResult;
import org.apache.flink.table.client.gateway.local.result.MaterializedResult;
import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
@@ -473,6 +474,11 @@ public class LocalExecutor implements Executor {
public UnresolvedIdentifier parseIdentifier(String identifier) {
return context.wrapClassLoader(() -> parser.parseIdentifier(identifier));
}
+
+ @Override
+ public ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema inputSchema) {
+ return context.wrapClassLoader(() -> parser.parseSqlExpression(sqlExpression, inputSchema));
+ }
};
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CatalogTableSchemaResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CatalogTableSchemaResolver.java
new file mode 100644
index 0000000..3e1e08a
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CatalogTableSchemaResolver.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+/**
+ * The {@link CatalogTableSchemaResolver} is used to derive correct result type of computed column,
+ * because the date type of computed column from catalog table is not trusted.
+ *
+ * <p>Such as `proctime()` function, its type in given TableSchema is Timestamp(3),
+ * but its correct type is Timestamp(3) *PROCTIME*.
+ */
+@Internal
+public class CatalogTableSchemaResolver {
+ private final Parser parser;
+ // A flag to indicate the table environment should work in a batch or streaming
+ // TODO remove this once FLINK-18180 is finished
+ private final boolean isStreamingMode;
+
+ public CatalogTableSchemaResolver(Parser parser, boolean isStreamingMode) {
+ this.parser = parser;
+ this.isStreamingMode = isStreamingMode;
+ }
+
+ /**
+ * Resolve the computed column's type for the given schema.
+ *
+ * @param tableSchema Table schema to derive table field names and data types
+ * @return the resolved TableSchema
+ */
+ public TableSchema resolve(TableSchema tableSchema) {
+ final String rowtime;
+ if (!tableSchema.getWatermarkSpecs().isEmpty()) {
+ // TODO: [FLINK-14473] we only support top-level rowtime attribute right now
+ rowtime = tableSchema.getWatermarkSpecs().get(0).getRowtimeAttribute();
+ if (rowtime.contains(".")) {
+ throw new ValidationException(
+ String.format("Nested field '%s' as rowtime attribute is not supported right now.", rowtime));
+ }
+ } else {
+ rowtime = null;
+ }
+
+ String[] fieldNames = tableSchema.getFieldNames();
+ DataType[] fieldTypes = tableSchema.getFieldDataTypes();
+
+ TableSchema.Builder builder = TableSchema.builder();
+ for (int i = 0; i < tableSchema.getFieldCount(); ++i) {
+ TableColumn tableColumn = tableSchema.getTableColumns().get(i);
+ DataType fieldType = fieldTypes[i];
+ if (tableColumn.isGenerated() && isProctimeType(tableColumn.getExpr().get(), tableSchema)) {
+ if (fieldNames[i].equals(rowtime)) {
+ throw new TableException("Watermark can not be defined for a processing time attribute column.");
+ }
+ TimestampType originalType = (TimestampType) fieldType.getLogicalType();
+ LogicalType proctimeType = new TimestampType(
+ originalType.isNullable(),
+ TimestampKind.PROCTIME,
+ originalType.getPrecision());
+ fieldType = TypeConversions.fromLogicalToDataType(proctimeType);
+ } else if (isStreamingMode && fieldNames[i].equals(rowtime)) {
+ TimestampType originalType = (TimestampType) fieldType.getLogicalType();
+ LogicalType rowtimeType = new TimestampType(
+ originalType.isNullable(),
+ TimestampKind.ROWTIME,
+ originalType.getPrecision());
+ fieldType = TypeConversions.fromLogicalToDataType(rowtimeType);
+ }
+ if (tableColumn.isGenerated()) {
+ builder.field(fieldNames[i], fieldType, tableColumn.getExpr().get());
+ } else {
+ builder.field(fieldNames[i], fieldType);
+ }
+ }
+
+ tableSchema.getWatermarkSpecs().forEach(builder::watermark);
+ tableSchema.getPrimaryKey().ifPresent(
+ pk -> builder.primaryKey(pk.getName(), pk.getColumns().toArray(new String[0])));
+ return builder.build();
+ }
+
+ private boolean isProctimeType(String expr, TableSchema tableSchema) {
+ ResolvedExpression resolvedExpr = parser.parseSqlExpression(expr, tableSchema);
+ if (resolvedExpr == null) {
+ return false;
+ }
+ LogicalType type = resolvedExpr.getOutputDataType().getLogicalType();
+ return type instanceof TimestampType && ((TimestampType) type).getKind() == TimestampKind.PROCTIME;
+ }
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 3d48655..6edce4a 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -199,6 +199,8 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
Planner planner,
boolean isStreamingMode) {
this.catalogManager = catalogManager;
+ this.catalogManager.setCatalogTableSchemaResolver(
+ new CatalogTableSchemaResolver(planner.getParser(), isStreamingMode));
this.moduleManager = moduleManager;
this.execEnv = executor;
@@ -490,8 +492,9 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
private Optional<CatalogQueryOperation> scanInternal(UnresolvedIdentifier identifier) {
ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(identifier);
- return catalogManager.getTable(tableIdentifier)
- .map(t -> new CatalogQueryOperation(tableIdentifier, t.getTable().getSchema()));
+ return catalogManager.getTable(tableIdentifier).map(t -> {
+ return new CatalogQueryOperation(tableIdentifier, t.getTable().getSchema());
+ });
}
@Override
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index 44dc6c3..9c1e1d3 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -23,12 +23,18 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.CatalogNotExistException;
import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.CatalogTableSchemaResolver;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
@@ -70,6 +76,8 @@ public final class CatalogManager {
private String currentDatabaseName;
+ private CatalogTableSchemaResolver schemaResolver;
+
// The name of the built-in catalog
private final String builtInCatalogName;
@@ -147,6 +155,16 @@ public final class CatalogManager {
}
/**
+ * We do not pass it in the ctor, because we need a {@link Parser} that is constructed in a
+ * {@link Planner}. At the same time {@link Planner} needs a {@link CatalogManager} to
+ * be constructed. Thus we can't get {@link Parser} instance when creating a
+ * {@link CatalogManager}. See {@link TableEnvironmentImpl#create}.
+ */
+ public void setCatalogTableSchemaResolver(CatalogTableSchemaResolver schemaResolver) {
+ this.schemaResolver = schemaResolver;
+ }
+
+ /**
* Returns a factory for creating fully resolved data types that can be used for planning.
*/
public DataTypeFactory getDataTypeFactory() {
@@ -336,12 +354,24 @@ public final class CatalogManager {
* @return table that the path points to.
*/
public Optional<TableLookupResult> getTable(ObjectIdentifier objectIdentifier) {
+ Preconditions.checkNotNull(schemaResolver, "schemaResolver should not be null");
CatalogBaseTable temporaryTable = temporaryTables.get(objectIdentifier);
if (temporaryTable != null) {
- return Optional.of(TableLookupResult.temporary(temporaryTable));
+ return Optional.of(TableLookupResult.temporary(resolveTableSchema(temporaryTable)));
} else {
- return getPermanentTable(objectIdentifier);
+ Optional<TableLookupResult> result = getPermanentTable(objectIdentifier);
+ return result.map(tableLookupResult ->
+ TableLookupResult.permanent(resolveTableSchema(tableLookupResult.getTable())));
+ }
+ }
+
+ private CatalogBaseTable resolveTableSchema(CatalogBaseTable table) {
+ if (!(table instanceof CatalogTableImpl)) {
+ return table;
}
+ CatalogTableImpl catalogTableImpl = (CatalogTableImpl) table;
+ TableSchema newTableSchema = schemaResolver.resolve(catalogTableImpl.getSchema());
+ return catalogTableImpl.copy(newTableSchema);
}
/**
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
index 9ac9c5c..dfe5d8b 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
@@ -88,6 +88,14 @@ public class CatalogTableImpl extends AbstractCatalogTable {
return new CatalogTableImpl(getSchema(), getPartitionKeys(), options, getComment());
}
+ public CatalogTable copy(TableSchema tableSchema) {
+ return new CatalogTableImpl(
+ tableSchema.copy(),
+ new ArrayList<>(getPartitionKeys()),
+ new HashMap<>(getProperties()),
+ getComment());
+ }
+
/**
* Construct a {@link CatalogTableImpl} from complete properties that contains table schema.
*/
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Parser.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Parser.java
index c1df477..a94c152 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Parser.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Parser.java
@@ -19,7 +19,9 @@
package org.apache.flink.table.delegation;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
@@ -54,4 +56,14 @@ public interface Parser {
* @throws org.apache.flink.table.api.SqlParserException when failed to parse the identifier
*/
UnresolvedIdentifier parseIdentifier(String identifier);
+
+ /**
+ * Entry point for parsing SQL expressions expressed as a String.
+ *
+ * @param sqlExpression the SQL expression to parse
+ * @param inputSchema the schema of the fields in sql expression
+ * @return resolved expression
+ * @throws org.apache.flink.table.api.SqlParserException when failed to parse the sql expression
+ */
+ ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema inputSchema);
}
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ParserMock.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ParserMock.java
index 7507b00..4ec00e8 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ParserMock.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ParserMock.java
@@ -18,8 +18,10 @@
package org.apache.flink.table.utils;
+import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.operations.Operation;
import java.util.List;
@@ -37,4 +39,9 @@ public class ParserMock implements Parser {
public UnresolvedIdentifier parseIdentifier(String identifier) {
return UnresolvedIdentifier.of(identifier);
}
+
+ @Override
+ public ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema inputSchema) {
+ return null;
+ }
}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogCalciteSchema.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogCalciteSchema.java
index a216097..7331326 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogCalciteSchema.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogCalciteSchema.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.planner.catalog;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.CatalogManager;
-import org.apache.flink.table.planner.calcite.SqlExprToRexConverterFactory;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.schema.Schema;
@@ -43,18 +42,13 @@ public class CatalogCalciteSchema extends FlinkSchema {
private final CatalogManager catalogManager;
// Flag that tells if the current planner should work in a batch or streaming mode.
private final boolean isStreamingMode;
- // The SQL expression converter factory is used to derive correct result type of computed column,
- // because the date type of computed column from catalog table is not trusted.
- private final SqlExprToRexConverterFactory converterFactory;
public CatalogCalciteSchema(
String catalogName,
CatalogManager catalog,
- SqlExprToRexConverterFactory converterFactory,
boolean isStreamingMode) {
this.catalogName = catalogName;
this.catalogManager = catalog;
- this.converterFactory = converterFactory;
this.isStreamingMode = isStreamingMode;
}
@@ -67,8 +61,7 @@ public class CatalogCalciteSchema extends FlinkSchema {
@Override
public Schema getSubSchema(String schemaName) {
if (catalogManager.schemaExists(catalogName, schemaName)) {
- return new DatabaseCalciteSchema(
- schemaName, catalogName, catalogManager, converterFactory, isStreamingMode);
+ return new DatabaseCalciteSchema(schemaName, catalogName, catalogManager, isStreamingMode);
} else {
return null;
}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogManagerCalciteSchema.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogManagerCalciteSchema.java
index 84e7671..a0dd7c7 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogManagerCalciteSchema.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogManagerCalciteSchema.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.planner.catalog;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogManager;
-import org.apache.flink.table.planner.calcite.SqlExprToRexConverterFactory;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.schema.Schema;
@@ -45,17 +44,12 @@ public class CatalogManagerCalciteSchema extends FlinkSchema {
private final CatalogManager catalogManager;
// Flag that tells if the current planner should work in a batch or streaming mode.
private final boolean isStreamingMode;
- // The SQL expression converter factory is used to derive correct result type of computed column,
- // because the date type of computed column from catalog table is not trusted.
- private final SqlExprToRexConverterFactory converterFactory;
public CatalogManagerCalciteSchema(
CatalogManager catalogManager,
- SqlExprToRexConverterFactory converterFactory,
boolean isStreamingMode) {
this.catalogManager = catalogManager;
this.isStreamingMode = isStreamingMode;
- this.converterFactory = converterFactory;
}
@Override
@@ -71,7 +65,7 @@ public class CatalogManagerCalciteSchema extends FlinkSchema {
@Override
public Schema getSubSchema(String name) {
if (catalogManager.schemaExists(name)) {
- return new CatalogCalciteSchema(name, catalogManager, converterFactory, isStreamingMode);
+ return new CatalogCalciteSchema(name, catalogManager, isStreamingMode);
} else {
return null;
}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
index e0b2217..23a8d80 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
@@ -34,7 +34,6 @@ import org.apache.flink.table.factories.TableFactoryUtil;
import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.table.factories.TableSourceFactoryContextImpl;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
-import org.apache.flink.table.planner.calcite.SqlExprToRexConverterFactory;
import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
import org.apache.flink.table.planner.sources.TableSourceUtil;
import org.apache.flink.table.sources.StreamTableSource;
@@ -70,7 +69,6 @@ public class CatalogSchemaTable extends AbstractTable implements TemporalTable {
private final ObjectIdentifier tableIdentifier;
private final CatalogBaseTable catalogBaseTable;
private final FlinkStatistic statistic;
- private final SqlExprToRexConverterFactory converterFactory;
private final boolean isStreamingMode;
private final boolean isTemporary;
private final Catalog catalog;
@@ -84,9 +82,6 @@ public class CatalogSchemaTable extends AbstractTable implements TemporalTable {
* @param catalogBaseTable CatalogBaseTable instance which exists in the catalog
* @param statistic Table statistics
* @param catalog The catalog which the schema table belongs to
- * @param converterFactory The SQL expression converter factory is used to derive correct result
- * type of computed column, because the date type of computed column
- * from catalog table is not trusted.
* @param isStreaming If the table is for streaming mode
* @param isTemporary If the table is temporary
*/
@@ -95,14 +90,12 @@ public class CatalogSchemaTable extends AbstractTable implements TemporalTable {
CatalogBaseTable catalogBaseTable,
FlinkStatistic statistic,
Catalog catalog,
- SqlExprToRexConverterFactory converterFactory,
boolean isStreaming,
boolean isTemporary) {
this.tableIdentifier = tableIdentifier;
this.catalogBaseTable = catalogBaseTable;
this.statistic = statistic;
this.catalog = catalog;
- this.converterFactory = converterFactory;
this.isStreamingMode = isStreaming;
this.isTemporary = isTemporary;
}
@@ -186,10 +179,10 @@ public class CatalogSchemaTable extends AbstractTable implements TemporalTable {
}
}
- return TableSourceUtil.getSourceRowTypeFromSchema(
- converterFactory,
+ return TableSourceUtil.getSourceRowType(
flinkTypeFactory,
tableSchema,
+ scala.Option.empty(),
isStreamingMode);
}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/DatabaseCalciteSchema.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/DatabaseCalciteSchema.java
index 51102af..66ee213 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/DatabaseCalciteSchema.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/DatabaseCalciteSchema.java
@@ -32,7 +32,6 @@ import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.plan.stats.TableStats;
-import org.apache.flink.table.planner.calcite.SqlExprToRexConverterFactory;
import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
import org.apache.calcite.linq4j.tree.Expression;
@@ -56,9 +55,6 @@ class DatabaseCalciteSchema extends FlinkSchema {
private final String databaseName;
private final String catalogName;
private final CatalogManager catalogManager;
- // The SQL expression converter factory is used to derive correct result type of computed column,
- // because the date type of computed column from catalog table is not trusted.
- private final SqlExprToRexConverterFactory converterFactory;
// Flag that tells if the current planner should work in a batch or streaming mode.
private final boolean isStreamingMode;
@@ -66,12 +62,10 @@ class DatabaseCalciteSchema extends FlinkSchema {
String databaseName,
String catalogName,
CatalogManager catalog,
- SqlExprToRexConverterFactory converterFactory,
boolean isStreamingMode) {
this.databaseName = databaseName;
this.catalogName = catalogName;
this.catalogManager = catalog;
- this.converterFactory = converterFactory;
this.isStreamingMode = isStreamingMode;
}
@@ -87,7 +81,6 @@ class DatabaseCalciteSchema extends FlinkSchema {
table,
statistic,
catalogManager.getCatalog(catalogName).orElseThrow(IllegalStateException::new),
- converterFactory,
isStreamingMode,
result.isTemporary());
})
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java
index 466587e..91723f7 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java
@@ -19,19 +19,28 @@
package org.apache.flink.table.planner.delegation;
import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.planner.calcite.CalciteParser;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.calcite.SqlExprToRexConverter;
+import org.apache.flink.table.planner.expressions.RexNodeExpression;
import org.apache.flink.table.planner.operations.SqlToOperationConverter;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlNode;
import java.util.Collections;
import java.util.List;
+import java.util.function.Function;
import java.util.function.Supplier;
/**
@@ -46,14 +55,17 @@ public class ParserImpl implements Parser {
// multiple statements parsing
private final Supplier<FlinkPlannerImpl> validatorSupplier;
private final Supplier<CalciteParser> calciteParserSupplier;
+ private final Function<TableSchema, SqlExprToRexConverter> sqlExprToRexConverterCreator;
public ParserImpl(
CatalogManager catalogManager,
Supplier<FlinkPlannerImpl> validatorSupplier,
- Supplier<CalciteParser> calciteParserSupplier) {
+ Supplier<CalciteParser> calciteParserSupplier,
+ Function<TableSchema, SqlExprToRexConverter> sqlExprToRexConverterCreator) {
this.catalogManager = catalogManager;
this.validatorSupplier = validatorSupplier;
this.calciteParserSupplier = calciteParserSupplier;
+ this.sqlExprToRexConverterCreator = sqlExprToRexConverterCreator;
}
@Override
@@ -74,4 +86,12 @@ public class ParserImpl implements Parser {
SqlIdentifier sqlIdentifier = parser.parseIdentifier(identifier);
return UnresolvedIdentifier.of(sqlIdentifier.names);
}
+
+ @Override
+ public ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema inputSchema) {
+ SqlExprToRexConverter sqlExprToRexConverter = sqlExprToRexConverterCreator.apply(inputSchema);
+ RexNode rexNode = sqlExprToRexConverter.convertToRexNode(sqlExpression);
+ LogicalType logicalType = FlinkTypeFactory.toLogicalType(rexNode.getType());
+ return new RexNodeExpression(rexNode, TypeConversions.fromLogicalToDataType(logicalType));
+ }
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index 2fe0001..451f4d7 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting
import org.apache.flink.api.dag.Transformation
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api.config.ExecutionConfigOptions
-import org.apache.flink.table.api.{TableConfig, TableEnvironment, TableException}
+import org.apache.flink.table.api.{TableConfig, TableEnvironment, TableException, TableSchema}
import org.apache.flink.table.catalog._
import org.apache.flink.table.connector.sink.DynamicTableSink
import org.apache.flink.table.delegation.{Executor, Parser, Planner}
@@ -31,7 +31,7 @@ import org.apache.flink.table.factories.{FactoryUtil, TableFactoryUtil}
import org.apache.flink.table.operations.OutputConversionModifyOperation.UpdateMode
import org.apache.flink.table.operations._
import org.apache.flink.table.planner.JMap
-import org.apache.flink.table.planner.calcite.{CalciteParser, FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, SqlExprToRexConverter, SqlExprToRexConverterFactory}
+import org.apache.flink.table.planner.calcite._
import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema
import org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl
import org.apache.flink.table.planner.hint.FlinkHints
@@ -55,7 +55,7 @@ import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.tools.FrameworkConfig
import java.util
-import java.util.function.{Supplier => JSupplier}
+import java.util.function.{Function => JFunction, Supplier => JSupplier}
import _root_.scala.collection.JavaConversions._
@@ -89,17 +89,6 @@ abstract class PlannerBase(
plannerContext.createSqlExprToRexConverter(tableRowType)
}
- @VisibleForTesting
- private[flink] val plannerContext: PlannerContext =
- new PlannerContext(
- config,
- functionCatalog,
- catalogManager,
- asRootSchema(new CatalogManagerCalciteSchema(
- catalogManager, sqlExprToRexConverterFactory, isStreamingMode)),
- getTraitDefs.toList
- )
-
private val parser: Parser = new ParserImpl(
catalogManager,
new JSupplier[FlinkPlannerImpl] {
@@ -110,9 +99,24 @@ abstract class PlannerBase(
// parsing statements
new JSupplier[CalciteParser] {
override def get(): CalciteParser = plannerContext.createCalciteParser()
+ },
+ new JFunction[TableSchema, SqlExprToRexConverter] {
+ override def apply(t: TableSchema): SqlExprToRexConverter = {
+ sqlExprToRexConverterFactory.create(plannerContext.getTypeFactory.buildRelNodeRowType(t))
+ }
}
)
+ @VisibleForTesting
+ private[flink] val plannerContext: PlannerContext =
+ new PlannerContext(
+ config,
+ functionCatalog,
+ catalogManager,
+ asRootSchema(new CatalogManagerCalciteSchema(catalogManager, isStreamingMode)),
+ getTraitDefs.toList
+ )
+
/** Returns the [[FlinkRelBuilder]] of this TableEnvironment. */
private[flink] def getRelBuilder: FlinkRelBuilder = {
val currentCatalogName = catalogManager.getCurrentCatalog
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala
index 9d8880e..39eb5b6 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala
@@ -18,14 +18,12 @@
package org.apache.flink.table.planner.sources
-import org.apache.flink.table.api.{DataTypes, TableColumn, TableSchema, ValidationException, WatermarkSpec}
+import org.apache.flink.table.api.{DataTypes, TableSchema, ValidationException, WatermarkSpec}
import org.apache.flink.table.expressions.ApiExpressionUtils.{typeLiteral, valueLiteral}
import org.apache.flink.table.expressions.{CallExpression, Expression, ResolvedExpression, ResolvedFieldReference}
import org.apache.flink.table.functions.BuiltInFunctionDefinitions
-import org.apache.flink.table.planner.calcite.{FlinkTypeFactory, SqlExprToRexConverterFactory}
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.expressions.converter.ExpressionConverter
-import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
import org.apache.flink.table.runtime.types.DataTypePrecisionFixer
import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
@@ -39,7 +37,7 @@ import org.apache.calcite.plan.RelOptCluster
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.logical.LogicalValues
-import org.apache.calcite.rex.{RexCall, RexNode}
+import org.apache.calcite.rex.RexNode
import org.apache.calcite.tools.RelBuilder
import _root_.java.sql.Timestamp
@@ -119,67 +117,6 @@ object TableSourceUtil {
}
/**
- * Returns schema of the selected fields of the given [[TableSchema]].
- *
- * @param converterFactory converter to convert computed columns.
- * @param typeFactory Type factory to create the type
- * @param tableSchema Table schema to derive table field names and data types
- * @param streaming Flag to determine whether the schema of a stream or batch table is created
- * @return The row type for the selected fields of the given [[TableSchema]], this type would
- * also be patched with time attributes defined in the give [[WatermarkSpec]]
- */
- def getSourceRowTypeFromSchema(
- converterFactory: SqlExprToRexConverterFactory,
- typeFactory: FlinkTypeFactory,
- tableSchema: TableSchema,
- streaming: Boolean): RelDataType = {
- val fieldNames = tableSchema.getFieldNames
- var fieldTypes = tableSchema.getFieldDataTypes.map(fromDataTypeToLogicalType)
-
- // TODO: [FLINK-14473] we only support top-level rowtime attribute right now
- val rowTimeIdx = if (tableSchema.getWatermarkSpecs.nonEmpty) {
- val rowtime = tableSchema.getWatermarkSpecs.head.getRowtimeAttribute
- if (rowtime.contains(".")) {
- throw new ValidationException(
- s"Nested field '$rowtime' as rowtime attribute is not supported right now.")
- }
- Some(fieldNames.indexOf(rowtime))
- } else {
- None
- }
-
- val converter = converterFactory.create(
- typeFactory.buildRelNodeRowType(fieldNames, fieldTypes))
- def isProctime(column: TableColumn): Boolean = {
- toScala(column.getExpr).exists { expr =>
- converter.convertToRexNode(expr) match {
- case call: RexCall => call.getOperator == FlinkSqlOperatorTable.PROCTIME
- case _ => false
- }
- }
- }
-
- fieldTypes = fieldTypes.zipWithIndex.map {
- case (originalType, i) =>
- if (streaming && rowTimeIdx.exists(_.equals(i))) {
- new TimestampType(
- originalType.isNullable,
- TimestampKind.ROWTIME,
- originalType.asInstanceOf[TimestampType].getPrecision)
- } else if (isProctime(tableSchema.getTableColumn(i).get())) {
- new TimestampType(
- originalType.isNullable,
- TimestampKind.PROCTIME,
- originalType.asInstanceOf[TimestampType].getPrecision)
- } else {
- originalType
- }
- }
-
- typeFactory.buildRelNodeRowType(fieldNames, fieldTypes)
- }
-
- /**
* Returns schema of the selected fields of the given [[TableSource]].
*
* <p> The watermark strategy specifications should either come from the [[TableSchema]]
@@ -224,6 +161,36 @@ object TableSourceUtil {
}
/**
+ * Returns schema of the selected fields of the given [[TableSource]].
+ *
+ * <p> The watermark strategy specifications should either come from the [[TableSchema]]
+ * or [[TableSource]].
+ *
+ * @param typeFactory Type factory to create the type
+ * @param tableSchema Table schema to derive table field names and data types
+ * @param tableSource Table source to derive watermark strategies
+ * @param streaming Flag to determine whether the schema of a stream or batch table is created
+ * @return The row type for the selected fields of the given [[TableSource]], this type would
+ * also be patched with time attributes defined in the give [[WatermarkSpec]]
+ */
+ def getSourceRowType(
+ typeFactory: FlinkTypeFactory,
+ tableSchema: TableSchema,
+ tableSource: Option[TableSource[_]],
+ streaming: Boolean): RelDataType = {
+
+ val fieldNames = tableSchema.getFieldNames
+ val fieldDataTypes = tableSchema.getFieldDataTypes
+
+ if (tableSource.isDefined) {
+ getSourceRowTypeFromSource(typeFactory, tableSource.get, streaming)
+ } else {
+ val fieldTypes = fieldDataTypes.map(fromDataTypeToLogicalType)
+ typeFactory.buildRelNodeRowType(fieldNames, fieldTypes)
+ }
+ }
+
+ /**
* Returns the [[RowtimeAttributeDescriptor]] of a [[TableSource]].
*
* @param tableSource The [[TableSource]] for which the [[RowtimeAttributeDescriptor]] is
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index e9244ff..1c582a3 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.constraints.UniqueConstraint;
+import org.apache.flink.table.api.internal.CatalogTableSchemaResolver;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogFunction;
@@ -41,6 +42,7 @@ import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
import org.apache.flink.table.operations.Operation;
@@ -56,9 +58,8 @@ import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
import org.apache.flink.table.planner.calcite.CalciteParser;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
-import org.apache.flink.table.planner.calcite.SqlExprToRexConverter;
-import org.apache.flink.table.planner.calcite.SqlExprToRexConverterFactory;
import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema;
+import org.apache.flink.table.planner.delegation.ParserImpl;
import org.apache.flink.table.planner.delegation.PlannerContext;
import org.apache.flink.table.planner.expressions.utils.Func0$;
import org.apache.flink.table.planner.expressions.utils.Func1$;
@@ -67,7 +68,6 @@ import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctio
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.CatalogManagerMocks;
-import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.SqlNode;
import org.junit.After;
import org.junit.Before;
@@ -84,6 +84,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema;
@@ -102,6 +103,7 @@ import static org.junit.Assert.assertThat;
* Test cases for {@link SqlToOperationConverter}.
*/
public class SqlToOperationConverterTest {
+ private final boolean isStreamingMode = false;
private final TableConfig tableConfig = new TableConfig();
private final Catalog catalog = new GenericInMemoryCatalog("MockCatalog",
"default");
@@ -113,16 +115,25 @@ public class SqlToOperationConverterTest {
tableConfig,
catalogManager,
moduleManager);
- private SqlExprToRexConverterFactory sqlExprToRexConverterFactory = this::createSqlExprToRexConverter;
+ private final Supplier<FlinkPlannerImpl> plannerSupplier =
+ () -> getPlannerContext().createFlinkPlanner(
+ catalogManager.getCurrentCatalog(),
+ catalogManager.getCurrentDatabase());
+ private final Parser parser = new ParserImpl(
+ catalogManager,
+ plannerSupplier,
+ () -> plannerSupplier.get().parser(),
+ t -> getPlannerContext().createSqlExprToRexConverter(
+ getPlannerContext().getTypeFactory().buildRelNodeRowType(t)));
private final PlannerContext plannerContext =
new PlannerContext(tableConfig,
functionCatalog,
catalogManager,
- asRootSchema(new CatalogManagerCalciteSchema(catalogManager, sqlExprToRexConverterFactory, false)),
+ asRootSchema(new CatalogManagerCalciteSchema(catalogManager, isStreamingMode)),
new ArrayList<>());
- private SqlExprToRexConverter createSqlExprToRexConverter(RelDataType t) {
- return plannerContext.createSqlExprToRexConverter(t);
+ private PlannerContext getPlannerContext() {
+ return plannerContext;
}
@Rule
@@ -130,6 +141,7 @@ public class SqlToOperationConverterTest {
@Before
public void before() throws TableAlreadyExistException, DatabaseNotExistException {
+ catalogManager.setCatalogTableSchemaResolver(new CatalogTableSchemaResolver(parser, isStreamingMode));
final ObjectPath path1 = new ObjectPath(catalogManager.getCurrentDatabase(), "t1");
final ObjectPath path2 = new ObjectPath(catalogManager.getCurrentDatabase(), "t2");
final TableSchema tableSchema = TableSchema.builder()
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReaderTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReaderTest.java
index 296e7ce..3ec52507 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReaderTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReaderTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.table.catalog.ConnectorCatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.calcite.FlinkTypeSystem;
-import org.apache.flink.table.planner.calcite.SqlExprToRexConverter;
import org.apache.flink.table.planner.catalog.CatalogSchemaTable;
import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
@@ -34,7 +33,6 @@ import org.apache.calcite.config.CalciteConnectionProperty;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.prepare.Prepare;
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexNode;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
import org.junit.Before;
@@ -76,22 +74,9 @@ public class FlinkCalciteCatalogReaderTest {
// Mock CatalogSchemaTable.
CatalogSchemaTable mockTable = new CatalogSchemaTable(
ObjectIdentifier.of("a", "b", "c"),
- ConnectorCatalogTable.source(
- new TestTableSource(true, TableSchema.builder().build()),
- true),
+ ConnectorCatalogTable.source(new TestTableSource(true, TableSchema.builder().build()), true),
FlinkStatistic.UNKNOWN(),
null,
- tableRowType -> new SqlExprToRexConverter() {
- @Override
- public RexNode convertToRexNode(String expr) {
- return null;
- }
-
- @Override
- public RexNode[] convertToRexNodes(String[] exprs) {
- return new RexNode[0];
- }
- },
true,
false);
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java
index e3f15b1..7252ff4 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java
@@ -19,16 +19,18 @@
package org.apache.flink.table.planner.utils;
import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.internal.CatalogTableSchemaResolver;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema;
+import org.apache.flink.table.planner.delegation.ParserImpl;
import org.apache.flink.table.planner.delegation.PlannerContext;
import org.apache.flink.table.utils.CatalogManagerMocks;
import java.util.ArrayList;
-import java.util.concurrent.atomic.AtomicReference;
import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema;
@@ -37,25 +39,31 @@ import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema;
*/
public class PlannerMocks {
public static FlinkPlannerImpl createDefaultPlanner() {
+ final boolean isStreamingMode = false;
TableConfig tableConfig = new TableConfig();
CatalogManager catalogManager = CatalogManagerMocks.createEmptyCatalogManager();
ModuleManager moduleManager = new ModuleManager();
FunctionCatalog functionCatalog = new FunctionCatalog(
- tableConfig,
- catalogManager,
- moduleManager);
- AtomicReference<PlannerContext> reference = new AtomicReference<>();
+ tableConfig,
+ catalogManager,
+ moduleManager);
PlannerContext plannerContext = new PlannerContext(
- tableConfig,
- functionCatalog,
- catalogManager,
- asRootSchema(new CatalogManagerCalciteSchema(
- catalogManager, t -> reference.get().createSqlExprToRexConverter(t), false)),
- new ArrayList<>());
- reference.set(plannerContext);
- return plannerContext.createFlinkPlanner(
- catalogManager.getCurrentCatalog(),
- catalogManager.getCurrentDatabase());
+ tableConfig,
+ functionCatalog,
+ catalogManager,
+ asRootSchema(new CatalogManagerCalciteSchema(catalogManager, isStreamingMode)),
+ new ArrayList<>());
+ FlinkPlannerImpl planner = plannerContext.createFlinkPlanner(
+ catalogManager.getCurrentCatalog(),
+ catalogManager.getCurrentDatabase());
+ Parser parser = new ParserImpl(
+ catalogManager,
+ () -> planner,
+ planner::parser,
+ t -> plannerContext.createSqlExprToRexConverter(plannerContext.getTypeFactory().buildRelNodeRowType(t))
+ );
+ catalogManager.setCatalogTableSchemaResolver(new CatalogTableSchemaResolver(parser, isStreamingMode));
+ return planner;
}
private PlannerMocks() {
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
index 965fc3e..efe7f47 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
@@ -40,24 +40,24 @@ Calc(select=[id, deepNested.nested1.name AS nestedName, nested.value AS nestedVa
]]>
</Resource>
</TestCase>
- <TestCase name="testProcTimeTableSourceSimple">
+ <TestCase name="testProjectOnlyRowtime">
<Resource name="sql">
- <![CDATA[SELECT pTime, id, name, val FROM procTimeT]]>
+ <![CDATA[SELECT rtime FROM T]]>
</Resource>
<Resource name="planBefore">
<![CDATA[
-LogicalProject(pTime=[$3], id=[$0], name=[$2], val=[$1])
-+- LogicalWatermarkAssigner(rowtime=[pTime], watermark=[$3])
- +- LogicalProject(id=[$0], val=[$1], name=[$2], pTime=[PROCTIME()])
- +- LogicalTableScan(table=[[default_catalog, default_database, procTimeT]])
+LogicalProject(rtime=[$1])
++- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[$1])
+ +- LogicalProject(id=[$0], rtime=[$1], val=[$2], name=[$3], ptime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[pTime, id, name, val])
-+- WatermarkAssigner(rowtime=[pTime], watermark=[pTime])
- +- Calc(select=[id, val, name, PROCTIME() AS pTime])
- +- TableSourceScan(table=[[default_catalog, default_database, procTimeT]], fields=[id, val, name])
+Calc(select=[rtime])
++- WatermarkAssigner(rowtime=[rtime], watermark=[rtime])
+ +- Calc(select=[id, rtime, val, name, PROCTIME() AS ptime])
+ +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[id, rtime, val, name])
]]>
</Resource>
</TestCase>
@@ -80,48 +80,6 @@ GroupAggregate(select=[COUNT(*) AS EXPR$0])
]]>
</Resource>
</TestCase>
- <TestCase name="testProjectWithoutRowtime">
- <Resource name="sql">
- <![CDATA[SELECT ptime, name, val, id FROM T]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalProject(ptime=[$4], name=[$3], val=[$2], id=[$0])
-+- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[$1])
- +- LogicalProject(id=[$0], rtime=[$1], val=[$2], name=[$3], ptime=[PROCTIME()])
- +- LogicalTableScan(table=[[default_catalog, default_database, T]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-Calc(select=[PROCTIME_MATERIALIZE(ptime) AS ptime, name, val, id])
-+- WatermarkAssigner(rowtime=[rtime], watermark=[rtime])
- +- Calc(select=[id, rtime, val, name, PROCTIME() AS ptime])
- +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[id, rtime, val, name])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testProjectWithRowtimeProctime">
- <Resource name="sql">
- <![CDATA[SELECT name, val, id FROM T]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalProject(name=[$3], val=[$2], id=[$0])
-+- LogicalWatermarkAssigner(rowtime=[ptime], watermark=[$4])
- +- LogicalProject(id=[$0], rtime=[$1], val=[$2], name=[$3], ptime=[PROCTIME()])
- +- LogicalTableScan(table=[[default_catalog, default_database, T]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-Calc(select=[name, val, id])
-+- WatermarkAssigner(rowtime=[ptime], watermark=[ptime])
- +- Calc(select=[id, rtime, val, name, PROCTIME() AS ptime])
- +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[id, rtime, val, name])
-]]>
- </Resource>
- </TestCase>
<TestCase name="testRowTimeTableSourceGroupWindow">
<Resource name="sql">
<![CDATA[
@@ -153,6 +111,27 @@ Calc(select=[name, w$end AS EXPR$1, EXPR$2])
]]>
</Resource>
</TestCase>
+ <TestCase name="testProjectWithoutProctime">
+ <Resource name="sql">
+ <![CDATA[select name, val, rtime, id from T]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(name=[$3], val=[$2], rtime=[$1], id=[$0])
++- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[$1])
+ +- LogicalProject(id=[$0], rtime=[$1], val=[$2], name=[$3], ptime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[name, val, rtime, id])
++- WatermarkAssigner(rowtime=[rtime], watermark=[rtime])
+ +- Calc(select=[id, rtime, val, name, PROCTIME() AS ptime])
+ +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[id, rtime, val, name])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testTableSourceWithTimestampRowTimeField">
<Resource name="sql">
<![CDATA[SELECT rowtime, id, name, val FROM rowTimeT]]>
@@ -172,4 +151,25 @@ Calc(select=[rowtime, id, name, val])
]]>
</Resource>
</TestCase>
+ <TestCase name="testProjectWithoutRowtime">
+ <Resource name="sql">
+ <![CDATA[SELECT ptime, name, val, id FROM T]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(ptime=[$4], name=[$3], val=[$2], id=[$0])
++- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[$1])
+ +- LogicalProject(id=[$0], rtime=[$1], val=[$2], name=[$3], ptime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[PROCTIME_MATERIALIZE(ptime) AS ptime, name, val, id])
++- WatermarkAssigner(rowtime=[rtime], watermark=[rtime])
+ +- Calc(select=[id, rtime, val, name, PROCTIME() AS ptime])
+ +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[id, rtime, val, name])
+]]>
+ </Resource>
+ </TestCase>
</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/LegacyTableSourceTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/LegacyTableSourceTest.xml
index 8090ffb..a0b9a14 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/LegacyTableSourceTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/LegacyTableSourceTest.xml
@@ -61,45 +61,45 @@ Calc(select=[PROCTIME_MATERIALIZE(proctime) AS proctime, id, name, val])
]]>
</Resource>
</TestCase>
- <TestCase name="testProjectWithMapping">
+ <TestCase name="testProjectOnlyProctime">
<Resource name="planBefore">
<![CDATA[
-LogicalProject(name=[$4], rtime=[$1], val=[$2])
-+- LogicalTableScan(table=[[default_catalog, default_database, T, source: [TestSource(physical fields: p-rtime, p-id, p-name, p-val)]]])
+LogicalProject(ptime=[$3])
++- LogicalTableScan(table=[[default_catalog, default_database, T, source: [TestSource(physical fields: id, rtime, val, name)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[name, rtime, val])
-+- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [TestSource(physical fields: p-rtime, p-id, p-name, p-val)]]], fields=[id, rtime, val, ptime, name])
+Calc(select=[PROCTIME_MATERIALIZE(ptime) AS ptime])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [TestSource(physical fields: id, rtime, val, name)]]], fields=[id, rtime, val, ptime, name])
]]>
</Resource>
</TestCase>
- <TestCase name="testProjectWithoutRowtime">
+ <TestCase name="testProjectOnlyRowtime">
<Resource name="planBefore">
<![CDATA[
-LogicalProject(ptime=[$3], name=[$4], val=[$2], id=[$0])
-+- LogicalTableScan(table=[[default_catalog, default_database, T, source: [TestSource(physical fields: id, name, val, rtime)]]])
+LogicalProject(rtime=[$1])
++- LogicalTableScan(table=[[default_catalog, default_database, T, source: [TestSource(physical fields: id, rtime, val, name)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[PROCTIME_MATERIALIZE(ptime) AS ptime, name, val, id])
-+- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [TestSource(physical fields: id, name, val, rtime)]]], fields=[id, rtime, val, ptime, name])
+Calc(select=[rtime])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [TestSource(physical fields: id, rtime, val, name)]]], fields=[id, rtime, val, ptime, name])
]]>
</Resource>
</TestCase>
- <TestCase name="testProjectWithRowtimeProctime">
+ <TestCase name="testProjectWithMapping">
<Resource name="planBefore">
<![CDATA[
-LogicalProject(name=[$4], val=[$2], id=[$0])
-+- LogicalTableScan(table=[[default_catalog, default_database, T, source: [TestSource(physical fields: id, name, val, rtime)]]])
+LogicalProject(name=[$4], rtime=[$1], val=[$2])
++- LogicalTableScan(table=[[default_catalog, default_database, T, source: [TestSource(physical fields: p-rtime, p-id, p-name, p-val)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[name, val, id])
-+- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [TestSource(physical fields: id, name, val, rtime)]]], fields=[id, rtime, val, ptime, name])
+Calc(select=[name, rtime, val])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [TestSource(physical fields: p-rtime, p-id, p-name, p-val)]]], fields=[id, rtime, val, ptime, name])
]]>
</Resource>
</TestCase>
@@ -122,6 +122,34 @@ Calc(select=[name, EXPR$0, EXPR$1])
]]>
</Resource>
</TestCase>
+ <TestCase name="testProjectWithoutProctime">
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(name=[$4], val=[$2], rtime=[$1], id=[$0])
++- LogicalTableScan(table=[[default_catalog, default_database, T, source: [TestSource(physical fields: id, rtime, val, name)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[name, val, rtime, id])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [TestSource(physical fields: id, rtime, val, name)]]], fields=[id, rtime, val, ptime, name])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testProjectWithRowtimeProctime">
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(name=[$4], val=[$2], id=[$0])
++- LogicalTableScan(table=[[default_catalog, default_database, T, source: [TestSource(physical fields: id, name, val, rtime)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[name, val, id])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [TestSource(physical fields: id, name, val, rtime)]]], fields=[id, rtime, val, ptime, name])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testTableSourceWithTimestampRowTimeField">
<Resource name="planBefore">
<![CDATA[
@@ -136,6 +164,20 @@ Calc(select=[rowtime, id, name, val])
]]>
</Resource>
</TestCase>
+ <TestCase name="testProjectWithoutRowtime">
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(ptime=[$3], name=[$4], val=[$2], id=[$0])
++- LogicalTableScan(table=[[default_catalog, default_database, T, source: [TestSource(physical fields: id, name, val, rtime)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[PROCTIME_MATERIALIZE(ptime) AS ptime, name, val, id])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [TestSource(physical fields: id, name, val, rtime)]]], fields=[id, rtime, val, ptime, name])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testTableSourceWithLongRowTimeField">
<Resource name="planBefore">
<![CDATA[
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.xml
index b6dea7e..36c09f5 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.xml
@@ -16,58 +16,81 @@ See the License for the specific language governing permissions and
limitations under the License.
-->
<Root>
- <TestCase name="testTableSourceWithTimestampRowTimeField">
+ <TestCase name="testNestedProject">
<Resource name="planBefore">
<![CDATA[
-LogicalProject(rowtime=[$1], id=[$0], name=[$3], val=[$2])
-+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$1])
- +- LogicalTableScan(table=[[default_catalog, default_database, rowTimeT]])
+LogicalProject(id=[$0], nestedName=[$1.nested1.name], nestedValue=[$2.value], nestedFlag=[$1.nested2.flag], nestedNum=[$1.nested2.num])
++- LogicalTableScan(table=[[default_catalog, default_database, T]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[rowtime, id, name, val])
-+- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
- +- TableSourceScan(table=[[default_catalog, default_database, rowTimeT]], fields=[id, rowtime, val, name])
+Calc(select=[id, deepNested.nested1.name AS nestedName, nested.value AS nestedValue, deepNested.nested2.flag AS nestedFlag, deepNested.nested2.num AS nestedNum])
++- TableSourceScan(table=[[default_catalog, default_database, T, project=[id, deepNested, nested]]], fields=[id, deepNested, nested])
]]>
</Resource>
</TestCase>
- <TestCase name="testNestedProject">
+ <TestCase name="testProcTimeTableSourceOverWindow">
<Resource name="planBefore">
<![CDATA[
-LogicalProject(id=[$0], nestedName=[$1.nested1.name], nestedValue=[$2.value], nestedFlag=[$1.nested2.flag], nestedNum=[$1.nested2.num])
-+- LogicalTableScan(table=[[default_catalog, default_database, T]])
+LogicalFilter(condition=[>($2, 100)])
++- LogicalProject(id=[$0], name=[$2], valSum=[AS(SUM($1) OVER (PARTITION BY $0 ORDER BY PROCTIME() NULLS FIRST RANGE BETWEEN 7200000 PRECEDING AND CURRENT ROW), _UTF-16LE'valSum')])
+ +- LogicalTableScan(table=[[default_catalog, default_database, procTimeT]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[id, deepNested.nested1.name AS nestedName, nested.value AS nestedValue, deepNested.nested2.flag AS nestedFlag, deepNested.nested2.num AS nestedNum])
-+- TableSourceScan(table=[[default_catalog, default_database, T, project=[id, deepNested, nested]]], fields=[id, deepNested, nested])
+Calc(select=[id, name, w0$o0 AS valSum], where=[>(w0$o0, 100)])
++- OverAggregate(partitionBy=[id], orderBy=[$3 ASC], window=[ RANG BETWEEN 7200000 PRECEDING AND CURRENT ROW], select=[id, val, name, $3, SUM(val) AS w0$o0])
+ +- Exchange(distribution=[hash[id]])
+ +- Calc(select=[id, val, name, PROCTIME() AS $3])
+ +- TableSourceScan(table=[[default_catalog, default_database, procTimeT]], fields=[id, val, name])
]]>
</Resource>
</TestCase>
- <TestCase name="testProcTimeTableSourceSimple">
+ <TestCase name="testProjectOnlyRowtime">
<Resource name="planBefore">
<![CDATA[
-LogicalProject(proctime=[$3], id=[$0], name=[$2], val=[$1])
-+- LogicalWatermarkAssigner(rowtime=[proctime], watermark=[$3])
- +- LogicalProject(id=[$0], val=[$1], name=[$2], proctime=[PROCTIME()])
- +- LogicalTableScan(table=[[default_catalog, default_database, procTimeT]])
+LogicalProject(rtime=[$1])
++- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[$1])
+ +- LogicalProject(id=[$0], rtime=[$1], val=[$2], name=[$3], ptime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[proctime, id, name, val])
-+- WatermarkAssigner(rowtime=[proctime], watermark=[proctime])
- +- Calc(select=[id, val, name, PROCTIME() AS proctime])
- +- TableSourceScan(table=[[default_catalog, default_database, procTimeT]], fields=[id, val, name])
+Calc(select=[rtime])
++- WatermarkAssigner(rowtime=[rtime], watermark=[rtime])
+ +- Calc(select=[id, rtime, val, name, PROCTIME() AS ptime])
+ +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[id, rtime, val, name])
]]>
</Resource>
</TestCase>
- <TestCase name="testProjectWithoutRowtime">
+ <TestCase name="testRowTimeTableSourceGroupWindow">
<Resource name="planBefore">
<![CDATA[
-LogicalProject(ptime=[$4], name=[$3], val=[$2], id=[$0])
+LogicalProject(name=[$0], EXPR$0=[$2], EXPR$1=[$1])
++- LogicalWindowAggregate(group=[{3}], EXPR$1=[AVG($2)], window=[TumblingGroupWindow('w, rowtime, 600000)], properties=[EXPR$0])
+ +- LogicalFilter(condition=[>($2, 100)])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, rowTimeT]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[name, EXPR$0, EXPR$1])
++- GroupWindowAggregate(groupBy=[name], window=[TumblingGroupWindow('w, rowtime, 600000)], properties=[EXPR$0], select=[name, AVG(val) AS EXPR$1, end('w) AS EXPR$0])
+ +- Exchange(distribution=[hash[name]])
+ +- Calc(select=[id, rowtime, val, name], where=[>(val, 100)])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+ +- TableSourceScan(table=[[default_catalog, default_database, rowTimeT]], fields=[id, rowtime, val, name])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testProjectWithoutProctime">
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(name=[$3], val=[$2], rtime=[$1], id=[$0])
+- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[$1])
+- LogicalProject(id=[$0], rtime=[$1], val=[$2], name=[$3], ptime=[PROCTIME()])
+- LogicalTableScan(table=[[default_catalog, default_database, T]])
@@ -75,47 +98,66 @@ LogicalProject(ptime=[$4], name=[$3], val=[$2], id=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[PROCTIME_MATERIALIZE(ptime) AS ptime, name, val, id])
+Calc(select=[name, val, rtime, id])
+- WatermarkAssigner(rowtime=[rtime], watermark=[rtime])
+- Calc(select=[id, rtime, val, name, PROCTIME() AS ptime])
+- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[id, rtime, val, name])
]]>
</Resource>
</TestCase>
- <TestCase name="testProcTimeTableSourceOverWindow">
+ <TestCase name="testTableSourceWithTimestampRowTimeField">
<Resource name="planBefore">
<![CDATA[
-LogicalFilter(condition=[>($2, 100)])
-+- LogicalProject(id=[$0], name=[$2], valSum=[AS(SUM($1) OVER (PARTITION BY $0 ORDER BY PROCTIME() NULLS FIRST RANGE BETWEEN 7200000 PRECEDING AND CURRENT ROW), _UTF-16LE'valSum')])
- +- LogicalTableScan(table=[[default_catalog, default_database, procTimeT]])
+LogicalProject(rowtime=[$1], id=[$0], name=[$3], val=[$2])
++- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, rowTimeT]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[id, name, w0$o0 AS valSum], where=[>(w0$o0, 100)])
-+- OverAggregate(partitionBy=[id], orderBy=[$3 ASC], window=[ RANG BETWEEN 7200000 PRECEDING AND CURRENT ROW], select=[id, val, name, $3, SUM(val) AS w0$o0])
- +- Exchange(distribution=[hash[id]])
- +- Calc(select=[id, val, name, PROCTIME() AS $3])
- +- TableSourceScan(table=[[default_catalog, default_database, procTimeT]], fields=[id, val, name])
+Calc(select=[rowtime, id, name, val])
++- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+ +- TableSourceScan(table=[[default_catalog, default_database, rowTimeT]], fields=[id, rowtime, val, name])
]]>
</Resource>
</TestCase>
- <TestCase name="testProjectWithRowtimeProctime">
+ <TestCase name="testProjectWithoutRowtime">
<Resource name="planBefore">
<![CDATA[
-LogicalProject(name=[$3], val=[$2], id=[$0])
-+- LogicalWatermarkAssigner(rowtime=[ptime], watermark=[$4])
+LogicalProject(ptime=[$4], name=[$3], val=[$2], id=[$0])
++- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[$1])
+- LogicalProject(id=[$0], rtime=[$1], val=[$2], name=[$3], ptime=[PROCTIME()])
+- LogicalTableScan(table=[[default_catalog, default_database, T]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[name, val, id])
-+- WatermarkAssigner(rowtime=[ptime], watermark=[ptime])
+Calc(select=[PROCTIME_MATERIALIZE(ptime) AS ptime, name, val, id])
++- WatermarkAssigner(rowtime=[rtime], watermark=[rtime])
+- Calc(select=[id, rtime, val, name, PROCTIME() AS ptime])
+- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[id, rtime, val, name])
]]>
</Resource>
</TestCase>
+ <TestCase name="testRowTimeTableSourceGroupWindowWithNotNullRowTimeType">
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(name=[$0], EXPR$0=[$2], EXPR$1=[$1])
++- LogicalWindowAggregate(group=[{3}], EXPR$1=[AVG($2)], window=[TumblingGroupWindow('w, rowtime, 600000)], properties=[EXPR$0])
+ +- LogicalFilter(condition=[>($2, 100)])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1, 5000:INTERVAL SECOND)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, rowTimeT]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[name, EXPR$0, EXPR$1])
++- GroupWindowAggregate(groupBy=[name], window=[TumblingGroupWindow('w, rowtime, 600000)], properties=[EXPR$0], select=[name, AVG(val) AS EXPR$1, end('w) AS EXPR$0])
+ +- Exchange(distribution=[hash[name]])
+ +- Calc(select=[id, rowtime, val, name], where=[>(val, 100)])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 5000:INTERVAL SECOND)])
+ +- TableSourceScan(table=[[default_catalog, default_database, rowTimeT]], fields=[id, rowtime, val, name])
+]]>
+ </Resource>
+ </TestCase>
</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index ccc2b55..80cfff0 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -1060,7 +1060,7 @@ class TableEnvironmentTest {
Row.of("f25", "STRING", Boolean.box(false), null, null, null),
Row.of("f26", "ROW<`f0` INT NOT NULL, `f1` INT>", Boolean.box(false),
"PRI(f24, f26)", null, null),
- Row.of("ts", "TIMESTAMP(3)", Boolean.box(true), null, "TO_TIMESTAMP(`f25`)",
+ Row.of("ts", "TIMESTAMP(3) *ROWTIME*", Boolean.box(true), null, "TO_TIMESTAMP(`f25`)",
"`ts` - INTERVAL '1' SECOND")
).iterator(),
tableResult1.collect())
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala
index 95c6f34..6347fd8 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala
@@ -20,13 +20,14 @@ package org.apache.flink.table.planner.codegen
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.util.MockStreamingRuntimeContext
-import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.api.{TableConfig, TableSchema}
import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, ObjectIdentifier}
import org.apache.flink.table.data.{GenericRowData, TimestampData}
+import org.apache.flink.table.delegation.Parser
import org.apache.flink.table.module.ModuleManager
-import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkPlannerImpl, FlinkTypeFactory, SqlExprToRexConverter, SqlExprToRexConverterFactory}
+import org.apache.flink.table.planner.calcite.{CalciteParser, FlinkContext, FlinkPlannerImpl, FlinkTypeFactory, SqlExprToRexConverter, SqlExprToRexConverterFactory}
import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema
-import org.apache.flink.table.planner.delegation.PlannerContext
+import org.apache.flink.table.planner.delegation.{ParserImpl, PlannerContext}
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc5
import org.apache.flink.table.runtime.generated.WatermarkGenerator
import org.apache.flink.table.types.logical.{IntType, TimestampType}
@@ -40,6 +41,7 @@ import org.junit.Test
import java.lang.{Integer => JInt, Long => JLong}
import java.util.Collections
+import java.util.function.{Function => JFunction, Supplier => JSupplier}
/**
* Tests the generated [[WatermarkGenerator]] from [[WatermarkGeneratorCodeGenerator]].
@@ -54,17 +56,35 @@ class WatermarkGeneratorCodeGenTest {
override def create(tableRowType: RelDataType): SqlExprToRexConverter =
createSqlExprToRexConverter(tableRowType)
}
+ private val parser: Parser = new ParserImpl(
+ catalogManager,
+ new JSupplier[FlinkPlannerImpl] {
+ override def get(): FlinkPlannerImpl = getPlanner
+ },
+ // we do not cache the parser in order to use the most up to
+ // date configuration. Users might change parser configuration in TableConfig in between
+ // parsing statements
+ new JSupplier[CalciteParser] {
+ override def get(): CalciteParser = plannerContext.createCalciteParser()
+ },
+ new JFunction[TableSchema, SqlExprToRexConverter] {
+ override def apply(t: TableSchema): SqlExprToRexConverter = {
+ sqlExprToRexConverterFactory.create(plannerContext.getTypeFactory.buildRelNodeRowType(t))
+ }
+ }
+ )
val plannerContext = new PlannerContext(
config,
functionCatalog,
catalogManager,
- asRootSchema(new CatalogManagerCalciteSchema(
- catalogManager, sqlExprToRexConverterFactory, false)),
+ asRootSchema(new CatalogManagerCalciteSchema(catalogManager, false)),
Collections.singletonList(ConventionTraitDef.INSTANCE))
val planner: FlinkPlannerImpl = plannerContext.createFlinkPlanner(
catalogManager.getCurrentCatalog,
catalogManager.getCurrentDatabase)
+ def getPlanner: FlinkPlannerImpl = planner
+
val data = List(
GenericRowData.of(TimestampData.fromEpochMillis(1000L), JInt.valueOf(5)),
GenericRowData.of(null, JInt.valueOf(4)),
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
index b5f252b..434adb5 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.stream.sql
+import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.planner.utils._
import org.junit.Test
@@ -76,7 +77,9 @@ class TableSourceTest extends TableTestBase {
}
@Test
- def testProcTimeTableSourceSimple(): Unit = {
+ def testProctimeOnWatermarkSpec(): Unit = {
+ thrown.expect(classOf[ValidationException])
+ thrown.expectMessage("Watermark can not be defined for a processing time attribute column.")
val ddl =
s"""
|CREATE TABLE procTimeT (
@@ -96,27 +99,6 @@ class TableSourceTest extends TableTestBase {
}
@Test
- def testProjectWithRowtimeProctime(): Unit = {
- val ddl =
- s"""
- |CREATE TABLE T (
- | id int,
- | rtime timestamp(3),
- | val bigint,
- | name varchar(32),
- | ptime as PROCTIME(),
- | watermark for ptime as ptime
- |) WITH (
- | 'connector' = 'values',
- | 'bounded' = 'false'
- |)
- """.stripMargin
- util.tableEnv.executeSql(ddl)
-
- util.verifyPlan("SELECT name, val, id FROM T")
- }
-
- @Test
def testProjectWithoutRowtime(): Unit = {
val ddl =
s"""
@@ -137,6 +119,7 @@ class TableSourceTest extends TableTestBase {
util.verifyPlan("SELECT ptime, name, val, id FROM T")
}
+ @Test
def testProjectWithoutProctime(): Unit = {
val ddl =
s"""
@@ -157,26 +140,7 @@ class TableSourceTest extends TableTestBase {
util.verifyPlan("select name, val, rtime, id from T")
}
- def testProjectOnlyProctime(): Unit = {
- val ddl =
- s"""
- |CREATE TABLE T (
- | id int,
- | rtime timestamp(3),
- | val bigint,
- | name varchar(32),
- | ptime as PROCTIME(),
- | watermark for ptime as ptime
- |) WITH (
- | 'connector' = 'values',
- | 'bounded' = 'false'
- |)
- """.stripMargin
- util.tableEnv.executeSql(ddl)
-
- util.verifyPlan("SELECT ptime FROM T")
- }
-
+ @Test
def testProjectOnlyRowtime(): Unit = {
val ddl =
s"""
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/LegacyTableSourceTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/LegacyTableSourceTest.scala
index 62112cf..72d2c11 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/LegacyTableSourceTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/LegacyTableSourceTest.scala
@@ -179,6 +179,7 @@ class LegacyTableSourceTest extends TableTestBase {
util.verifyPlan(t)
}
+ @Test
def testProjectWithoutProctime(): Unit = {
val tableSchema = new TableSchema(
Array("id", "rtime", "val", "ptime", "name"),
@@ -198,6 +199,7 @@ class LegacyTableSourceTest extends TableTestBase {
util.verifyPlan(t)
}
+ @Test
def testProjectOnlyProctime(): Unit = {
val tableSchema = new TableSchema(
Array("id", "rtime", "val", "ptime", "name"),
@@ -217,6 +219,7 @@ class LegacyTableSourceTest extends TableTestBase {
util.verifyPlan(t)
}
+ @Test
def testProjectOnlyRowtime(): Unit = {
val tableSchema = new TableSchema(
Array("id", "rtime", "val", "ptime", "name"),
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.scala
index 36b273d..c97c863 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.planner.plan.stream.table
import org.apache.flink.table.api._
import org.apache.flink.table.planner.utils.TableTestBase
-import org.junit.{Ignore, Test}
+import org.junit.Test
class TableSourceTest extends TableTestBase {
@@ -48,7 +48,6 @@ class TableSourceTest extends TableTestBase {
util.verifyPlan(t)
}
- @Ignore("remove ignore once FLINK-17753 is fixed")
@Test
def testRowTimeTableSourceGroupWindow(): Unit = {
val ddl =
@@ -75,15 +74,15 @@ class TableSourceTest extends TableTestBase {
}
@Test
- def testProcTimeTableSourceSimple(): Unit = {
+ def testRowTimeTableSourceGroupWindowWithNotNullRowTimeType(): Unit = {
val ddl =
s"""
- |CREATE TABLE procTimeT (
+ |CREATE TABLE rowTimeT (
| id int,
+ | rowtime timestamp(3) not null,
| val bigint,
| name varchar(32),
- | proctime as PROCTIME(),
- | watermark for proctime as proctime
+ | watermark for rowtime as rowtime - INTERVAL '5' SECONDS
|) WITH (
| 'connector' = 'values',
| 'bounded' = 'false'
@@ -91,7 +90,11 @@ class TableSourceTest extends TableTestBase {
""".stripMargin
util.tableEnv.executeSql(ddl)
- val t = util.tableEnv.from("procTimeT").select($"proctime", $"id", $"name", $"val")
+ val t = util.tableEnv.from("rowTimeT")
+ .where($"val" > 100)
+ .window(Tumble over 10.minutes on 'rowtime as 'w)
+ .groupBy('name, 'w)
+ .select('name, 'w.end, 'val.avg)
util.verifyPlan(t)
}
@@ -119,28 +122,6 @@ class TableSourceTest extends TableTestBase {
}
@Test
- def testProjectWithRowtimeProctime(): Unit = {
- val ddl =
- s"""
- |CREATE TABLE T (
- | id int,
- | rtime timestamp(3),
- | val bigint,
- | name varchar(32),
- | ptime as PROCTIME(),
- | watermark for ptime as ptime
- |) WITH (
- | 'connector' = 'values',
- | 'bounded' = 'false'
- |)
- """.stripMargin
- util.tableEnv.executeSql(ddl)
-
- val t = util.tableEnv.from("T").select('name, 'val, 'id)
- util.verifyPlan(t)
- }
-
- @Test
def testProjectWithoutRowtime(): Unit = {
val ddl =
s"""
@@ -162,6 +143,7 @@ class TableSourceTest extends TableTestBase {
util.verifyPlan(t)
}
+ @Test
def testProjectWithoutProctime(): Unit = {
val ddl =
s"""
@@ -183,7 +165,11 @@ class TableSourceTest extends TableTestBase {
util.verifyPlan(t)
}
- def testProjectOnlyProctime(): Unit = {
+ @Test
+ def testProctimeOnWatermarkSpec(): Unit = {
+ thrown.expect(classOf[TableException])
+ thrown.expectMessage("Watermark can not be defined for a processing time attribute column.")
+
val ddl =
s"""
|CREATE TABLE T (
@@ -204,6 +190,7 @@ class TableSourceTest extends TableTestBase {
util.verifyPlan(t)
}
+ @Test
def testProjectOnlyRowtime(): Unit = {
val ddl =
s"""
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/ParserImpl.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/ParserImpl.java
index 402a3f4..17fd3e0 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/ParserImpl.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/ParserImpl.java
@@ -19,11 +19,13 @@
package org.apache.flink.table.planner;
import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.calcite.CalciteParser;
import org.apache.flink.table.calcite.FlinkPlannerImpl;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.sqlexec.SqlToOperationConverter;
@@ -78,4 +80,9 @@ public class ParserImpl implements Parser {
SqlIdentifier sqlIdentifier = parser.parseIdentifier(identifier);
return UnresolvedIdentifier.of(sqlIdentifier.names);
}
+
+ @Override
+ public ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema inputSchema) {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 5d5fc12..905a031 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -127,6 +127,8 @@ abstract class TableEnvImpl(
}
)
+ catalogManager.setCatalogTableSchemaResolver(new CatalogTableSchemaResolver(parser, false))
+
def getConfig: TableConfig = config
private val UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG =
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java
index 23bcd94..27f6aeb 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java
@@ -19,7 +19,9 @@
package org.apache.flink.table.catalog;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.CatalogTableSchemaResolver;
import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.utils.ParserMock;
import org.apache.flink.util.TestLogger;
import org.junit.Rule;
@@ -110,6 +112,7 @@ public class CatalogManagerTest extends TestLogger {
.builtin(
database(BUILTIN_DEFAULT_DATABASE_NAME))
.build();
+ manager.setCatalogTableSchemaResolver(new CatalogTableSchemaResolver(new ParserMock(), true));
CatalogTest.TestTable table = new CatalogTest.TestTable();
manager.createTemporaryTable(table, tempIdentifier, true);
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/DatabaseCalciteSchemaTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/DatabaseCalciteSchemaTest.java
index a850f48..a7481c7 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/DatabaseCalciteSchemaTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/DatabaseCalciteSchemaTest.java
@@ -20,11 +20,13 @@ package org.apache.flink.table.catalog;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.internal.CatalogTableSchemaResolver;
import org.apache.flink.table.catalog.TestExternalTableSourceFactory.TestExternalTableSource;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.plan.schema.TableSourceTable;
import org.apache.flink.table.utils.CatalogManagerMocks;
+import org.apache.flink.table.utils.ParserMock;
import org.apache.calcite.schema.Table;
import org.junit.Test;
@@ -53,7 +55,10 @@ public class DatabaseCalciteSchemaTest {
CatalogManager catalogManager = CatalogManagerMocks.preparedCatalogManager()
.defaultCatalog(catalogName, catalog)
.build();
- DatabaseCalciteSchema calciteSchema = new DatabaseCalciteSchema(true,
+ catalogManager.setCatalogTableSchemaResolver(new CatalogTableSchemaResolver(new ParserMock(), true));
+
+ DatabaseCalciteSchema calciteSchema = new DatabaseCalciteSchema(
+ true,
databaseName,
catalogName,
catalogManager,
@@ -78,6 +83,10 @@ public class DatabaseCalciteSchemaTest {
return this;
}
+ public CatalogTable copy(TableSchema tableSchema) {
+ return this;
+ }
+
@Override
public Map<String, String> toProperties() {
Map<String, String> properties = new HashMap<>();
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
index cd981cb..2bd9347 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
+import org.apache.flink.table.api.internal.CatalogTableSchemaResolver;
import org.apache.flink.table.calcite.CalciteParser;
import org.apache.flink.table.calcite.FlinkPlannerImpl;
import org.apache.flink.table.catalog.Catalog;
@@ -58,6 +59,7 @@ import org.apache.flink.table.planner.PlanningConfigurationBuilder;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.table.utils.CatalogManagerMocks;
+import org.apache.flink.table.utils.ParserMock;
import org.apache.calcite.sql.SqlNode;
import org.junit.After;
@@ -103,6 +105,7 @@ public class SqlToOperationConverterTest {
@Before
public void before() throws TableAlreadyExistException, DatabaseNotExistException {
+ catalogManager.setCatalogTableSchemaResolver(new CatalogTableSchemaResolver(new ParserMock(), true));
final ObjectPath path1 = new ObjectPath(catalogManager.getCurrentDatabase(), "t1");
final ObjectPath path2 = new ObjectPath(catalogManager.getCurrentDatabase(), "t2");
final TableSchema tableSchema = TableSchema.builder()