You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/02/26 11:56:51 UTC
[flink] 01/02: [FLINK-21435][table] Implement Schema, ResolvedSchema,
SchemaResolver
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 011a34982facc817fef40de74c4b95497830a037
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed Feb 3 08:39:21 2021 +0100
[FLINK-21435][table] Implement Schema, ResolvedSchema, SchemaResolver
This closes #14996.
---
flink-python/pyflink/table/descriptors.py | 2 +-
.../flink/table/catalog/DefaultSchemaResolver.java | 353 +++++++++
.../expressions/resolver/ExpressionResolver.java | 7 +-
.../resolver/rules/ResolveSqlCallRule.java | 5 +
.../expressions/resolver/rules/ResolverRule.java | 3 +
.../flink/table/catalog/SchemaResolutionTest.java | 384 +++++++++
.../java/org/apache/flink/table/api/Schema.java | 857 +++++++++++++++++++++
.../flink/table/catalog/AbstractConstraint.java | 70 ++
.../org/apache/flink/table/catalog/Column.java | 330 ++++++++
.../org/apache/flink/table/catalog/Constraint.java | 63 ++
.../apache/flink/table/catalog/ResolvedSchema.java | 216 ++++++
.../apache/flink/table/catalog/SchemaResolver.java | 37 +
.../flink/table/catalog/UniqueConstraint.java | 116 +++
.../apache/flink/table/catalog/WatermarkSpec.java | 95 +++
.../expressions/utils/ResolvedExpressionMock.java | 76 ++
.../apache/flink/table/api/TableSourceTest.scala | 4 +-
16 files changed, 2614 insertions(+), 4 deletions(-)
diff --git a/flink-python/pyflink/table/descriptors.py b/flink-python/pyflink/table/descriptors.py
index 73bae1b..f44c872 100644
--- a/flink-python/pyflink/table/descriptors.py
+++ b/flink-python/pyflink/table/descriptors.py
@@ -193,7 +193,7 @@ class Schema(Descriptor):
event-time attribute.
"""
gateway = get_gateway()
- self._j_schema = gateway.jvm.Schema()
+ self._j_schema = gateway.jvm.org.apache.flink.table.descriptors.Schema()
super(Schema, self).__init__(self._j_schema)
if schema is not None:
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java
new file mode 100644
index 0000000..e53fc4b
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Schema.UnresolvedComputedColumn;
+import org.apache.flink.table.api.Schema.UnresolvedMetadataColumn;
+import org.apache.flink.table.api.Schema.UnresolvedPhysicalColumn;
+import org.apache.flink.table.api.Schema.UnresolvedPrimaryKey;
+import org.apache.flink.table.api.Schema.UnresolvedWatermarkSpec;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Column.ComputedColumn;
+import org.apache.flink.table.catalog.Column.MetadataColumn;
+import org.apache.flink.table.catalog.Column.PhysicalColumn;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.LocalReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver.ExpressionResolverBuilder;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.table.expressions.ApiExpressionUtils.localRef;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isProctimeAttribute;
+import static org.apache.flink.table.types.utils.DataTypeUtils.replaceLogicalType;
+
+/** Default implementation of {@link SchemaResolver}. */
+@Internal
+class DefaultSchemaResolver implements SchemaResolver {
+
+ private final boolean isStreamingMode;
+ private final boolean supportsMetadata;
+ private final DataTypeFactory dataTypeFactory;
+ private final ExpressionResolverBuilder resolverBuilder;
+
+ DefaultSchemaResolver(
+ boolean isStreamingMode,
+ boolean supportsMetadata,
+ DataTypeFactory dataTypeFactory,
+ ExpressionResolverBuilder resolverBuilder) {
+ this.isStreamingMode = isStreamingMode;
+ this.supportsMetadata = supportsMetadata;
+ this.dataTypeFactory = dataTypeFactory;
+ this.resolverBuilder = resolverBuilder;
+ }
+
+ public SchemaResolver withMetadata(boolean supportsMetadata) {
+ return new DefaultSchemaResolver(
+ isStreamingMode, supportsMetadata, dataTypeFactory, resolverBuilder);
+ }
+
+ @Override
+ public ResolvedSchema resolve(Schema schema) {
+ final List<Column> columns = resolveColumns(schema.getColumns());
+
+ final List<WatermarkSpec> watermarkSpecs =
+ resolveWatermarkSpecs(schema.getWatermarkSpecs(), columns);
+
+ final List<Column> columnsWithRowtime = adjustRowtimeAttributes(watermarkSpecs, columns);
+
+ final UniqueConstraint primaryKey =
+ resolvePrimaryKey(schema.getPrimaryKey().orElse(null), columnsWithRowtime);
+
+ return new ResolvedSchema(columnsWithRowtime, watermarkSpecs, primaryKey);
+ }
+
+ @Override
+ public boolean isStreamingMode() {
+ return isStreamingMode;
+ }
+
+ @Override
+ public boolean supportsMetadata() {
+ return supportsMetadata;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private List<Column> resolveColumns(List<Schema.UnresolvedColumn> unresolvedColumns) {
+
+ validateDuplicateColumns(unresolvedColumns);
+
+ final Column[] resolvedColumns = new Column[unresolvedColumns.size()];
+ // process source columns first before computed columns
+ for (int pos = 0; pos < unresolvedColumns.size(); pos++) {
+ final Schema.UnresolvedColumn unresolvedColumn = unresolvedColumns.get(pos);
+ if (unresolvedColumn instanceof UnresolvedPhysicalColumn) {
+ resolvedColumns[pos] =
+ resolvePhysicalColumn((UnresolvedPhysicalColumn) unresolvedColumn);
+ } else if (unresolvedColumn instanceof UnresolvedMetadataColumn) {
+ resolvedColumns[pos] =
+ resolveMetadataColumn((UnresolvedMetadataColumn) unresolvedColumn);
+ } else if (!(unresolvedColumn instanceof UnresolvedComputedColumn)) {
+ throw new IllegalArgumentException(
+ "Unknown unresolved column type: " + unresolvedColumn.getClass().getName());
+ }
+ }
+ // fill in computed columns
+ final List<Column> sourceColumns =
+ Stream.of(resolvedColumns).filter(Objects::nonNull).collect(Collectors.toList());
+ for (int pos = 0; pos < unresolvedColumns.size(); pos++) {
+ final Schema.UnresolvedColumn unresolvedColumn = unresolvedColumns.get(pos);
+ if (unresolvedColumn instanceof UnresolvedComputedColumn) {
+ resolvedColumns[pos] =
+ resolveComputedColumn(
+ (UnresolvedComputedColumn) unresolvedColumn, sourceColumns);
+ }
+ }
+
+ return Arrays.asList(resolvedColumns);
+ }
+
+ private PhysicalColumn resolvePhysicalColumn(UnresolvedPhysicalColumn unresolvedColumn) {
+ return Column.physical(
+ unresolvedColumn.getName(),
+ dataTypeFactory.createDataType(unresolvedColumn.getDataType()));
+ }
+
+ private MetadataColumn resolveMetadataColumn(UnresolvedMetadataColumn unresolvedColumn) {
+ if (!supportsMetadata) {
+ throw new ValidationException(
+ "Metadata columns are not supported in a schema at the current location.");
+ }
+ return Column.metadata(
+ unresolvedColumn.getName(),
+ dataTypeFactory.createDataType(unresolvedColumn.getDataType()),
+ unresolvedColumn.getMetadataKey(),
+ unresolvedColumn.isVirtual());
+ }
+
+ private ComputedColumn resolveComputedColumn(
+ UnresolvedComputedColumn unresolvedColumn, List<Column> inputColumns) {
+ final ResolvedExpression resolvedExpression;
+ try {
+ resolvedExpression = resolveExpression(inputColumns, unresolvedColumn.getExpression());
+ } catch (Exception e) {
+ throw new ValidationException(
+ String.format(
+ "Invalid expression for computed column '%s'.",
+ unresolvedColumn.getName()),
+ e);
+ }
+ return Column.computed(unresolvedColumn.getName(), resolvedExpression);
+ }
+
+ private void validateDuplicateColumns(List<Schema.UnresolvedColumn> columns) {
+ final List<String> names =
+ columns.stream().map(Schema.UnresolvedColumn::getName).collect(Collectors.toList());
+ final List<String> duplicates =
+ names.stream()
+ .filter(name -> Collections.frequency(names, name) > 1)
+ .distinct()
+ .collect(Collectors.toList());
+ if (duplicates.size() > 0) {
+ throw new ValidationException(
+ String.format(
+ "Schema must not contain duplicate column names. Found duplicates: %s",
+ duplicates));
+ }
+ }
+
+ private List<WatermarkSpec> resolveWatermarkSpecs(
+ List<UnresolvedWatermarkSpec> unresolvedWatermarkSpecs, List<Column> inputColumns) {
+ if (unresolvedWatermarkSpecs.size() == 0) {
+ return Collections.emptyList();
+ }
+ if (unresolvedWatermarkSpecs.size() > 1) {
+ throw new ValidationException("Multiple watermark definitions are not supported yet.");
+ }
+ final UnresolvedWatermarkSpec watermarkSpec = unresolvedWatermarkSpecs.get(0);
+
+ // validate time attribute
+ final String timeColumn = watermarkSpec.getColumnName();
+ validateTimeColumn(timeColumn, inputColumns);
+
+ // resolve watermark expression
+ final ResolvedExpression watermarkExpression;
+ try {
+ watermarkExpression =
+ resolveExpression(inputColumns, watermarkSpec.getWatermarkExpression());
+ } catch (Exception e) {
+ throw new ValidationException(
+ String.format(
+ "Invalid expression for watermark '%s'.", watermarkSpec.toString()),
+ e);
+ }
+ validateWatermarkExpression(watermarkExpression.getOutputDataType().getLogicalType());
+
+ return Collections.singletonList(
+ new WatermarkSpec(watermarkSpec.getColumnName(), watermarkExpression));
+ }
+
+ private void validateTimeColumn(String columnName, List<Column> columns) {
+ final Optional<Column> timeColumn =
+ columns.stream().filter(c -> c.getName().equals(columnName)).findFirst();
+ if (!timeColumn.isPresent()) {
+ throw new ValidationException(
+ String.format(
+ "Invalid column name '%s' for rowtime attribute in watermark declaration. Available columns are: %s",
+ columnName,
+ columns.stream().map(Column::getName).collect(Collectors.toList())));
+ }
+ final LogicalType timeFieldType = timeColumn.get().getDataType().getLogicalType();
+ if (!hasRoot(timeFieldType, LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)
+ || getPrecision(timeFieldType) != 3) {
+ throw new ValidationException(
+ "Invalid data type of time field for watermark definition. "
+ + "The field must be of type TIMESTAMP(3) WITHOUT TIME ZONE.");
+ }
+ if (isProctimeAttribute(timeFieldType)) {
+ throw new ValidationException(
+ "A watermark can not be defined for a processing-time attribute.");
+ }
+ }
+
+ private void validateWatermarkExpression(LogicalType watermarkType) {
+ if (!hasRoot(watermarkType, LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)
+ || getPrecision(watermarkType) != 3) {
+ throw new ValidationException(
+ "Invalid data type of expression for watermark definition. "
+ + "The field must be of type TIMESTAMP(3) WITHOUT TIME ZONE.");
+ }
+ }
+
+ /** Updates the data type of columns that are referenced by {@link WatermarkSpec}. */
+ private List<Column> adjustRowtimeAttributes(
+ List<WatermarkSpec> watermarkSpecs, List<Column> columns) {
+ return columns.stream()
+ .map(column -> adjustRowtimeAttribute(watermarkSpecs, column))
+ .collect(Collectors.toList());
+ }
+
+ private Column adjustRowtimeAttribute(List<WatermarkSpec> watermarkSpecs, Column column) {
+ final String name = column.getName();
+ final DataType dataType = column.getDataType();
+ final boolean hasWatermarkSpec =
+ watermarkSpecs.stream().anyMatch(s -> s.getRowtimeAttribute().equals(name));
+ if (hasWatermarkSpec && isStreamingMode) {
+ final TimestampType originalType = (TimestampType) dataType.getLogicalType();
+ final LogicalType rowtimeType =
+ new TimestampType(
+ originalType.isNullable(),
+ TimestampKind.ROWTIME,
+ originalType.getPrecision());
+ return column.copy(replaceLogicalType(dataType, rowtimeType));
+ }
+ return column;
+ }
+
+ private @Nullable UniqueConstraint resolvePrimaryKey(
+ @Nullable UnresolvedPrimaryKey unresolvedPrimaryKey, List<Column> columns) {
+ if (unresolvedPrimaryKey == null) {
+ return null;
+ }
+
+ final UniqueConstraint primaryKey =
+ UniqueConstraint.primaryKey(
+ unresolvedPrimaryKey.getConstraintName(),
+ unresolvedPrimaryKey.getColumnNames());
+
+ validatePrimaryKey(primaryKey, columns);
+
+ return primaryKey;
+ }
+
+ private void validatePrimaryKey(UniqueConstraint primaryKey, List<Column> columns) {
+ final Map<String, Column> columnsByNameLookup =
+ columns.stream().collect(Collectors.toMap(Column::getName, Function.identity()));
+
+ final Set<String> duplicateColumns =
+ primaryKey.getColumns().stream()
+ .filter(name -> Collections.frequency(primaryKey.getColumns(), name) > 1)
+ .collect(Collectors.toSet());
+
+ if (!duplicateColumns.isEmpty()) {
+ throw new ValidationException(
+ String.format(
+ "Invalid primary key '%s'. A primary key must not contain duplicate columns. Found: %s",
+ primaryKey.getName(), duplicateColumns));
+ }
+
+ for (String columnName : primaryKey.getColumns()) {
+ Column column = columnsByNameLookup.get(columnName);
+ if (column == null) {
+ throw new ValidationException(
+ String.format(
+ "Invalid primary key '%s'. Column '%s' does not exist.",
+ primaryKey.getName(), columnName));
+ }
+
+ if (!column.isPhysical()) {
+ throw new ValidationException(
+ String.format(
+ "Invalid primary key '%s'. Column '%s' is not a physical column.",
+ primaryKey.getName(), columnName));
+ }
+
+ final LogicalType columnType = column.getDataType().getLogicalType();
+ if (columnType.isNullable()) {
+ throw new ValidationException(
+ String.format(
+ "Invalid primary key '%s'. Column '%s' is nullable.",
+ primaryKey.getName(), columnName));
+ }
+ }
+ }
+
+ private ResolvedExpression resolveExpression(List<Column> columns, Expression expression) {
+ final LocalReferenceExpression[] localRefs =
+ columns.stream()
+ .map(c -> localRef(c.getName(), c.getDataType()))
+ .toArray(LocalReferenceExpression[]::new);
+ return resolverBuilder
+ .withLocalReferences(localRefs)
+ .build()
+ .resolve(Collections.singletonList(expression))
+ .get(0);
+ }
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
index 39d4a4a..5b9f830 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
@@ -308,6 +308,11 @@ public class ExpressionResolver {
}
@Override
+ public List<LocalReferenceExpression> getLocalReferences() {
+ return new ArrayList<>(localReferences.values());
+ }
+
+ @Override
public Optional<LocalOverWindow> getOverWindow(Expression alias) {
return Optional.ofNullable(localOverWindows.get(alias));
}
@@ -445,7 +450,7 @@ public class ExpressionResolver {
public ExpressionResolverBuilder withLocalReferences(
LocalReferenceExpression... localReferences) {
- this.localReferences.addAll(Arrays.asList(localReferences));
+ this.localReferences = Arrays.asList(localReferences);
return this;
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveSqlCallRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveSqlCallRule.java
index f8bb3b3..cf2d74b 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveSqlCallRule.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveSqlCallRule.java
@@ -53,10 +53,15 @@ final class ResolveSqlCallRule implements ResolverRule {
final SqlExpressionResolver resolver = resolutionContext.sqlExpressionResolver();
final TableSchema.Builder builder = TableSchema.builder();
+ // input references
resolutionContext
.referenceLookup()
.getAllInputFields()
.forEach(f -> builder.field(f.getName(), f.getOutputDataType()));
+ // local references
+ resolutionContext
+ .getLocalReferences()
+ .forEach(refs -> builder.field(refs.getName(), refs.getOutputDataType()));
return resolver.resolveExpression(sqlCall.getSqlExpression(), builder.build());
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java
index c82cf67..1a8c7cf 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java
@@ -84,6 +84,9 @@ public interface ResolverRule {
/** Access to available local references. */
Optional<LocalReferenceExpression> getLocalReference(String alias);
+ /** Access to available local references. */
+ List<LocalReferenceExpression> getLocalReferences();
+
/** Access to available local over windows. */
Optional<LocalOverWindow> getOverWindow(Expression alias);
}
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
new file mode 100644
index 0000000..3e7c029
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.core.testutils.FlinkMatchers;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.expressions.utils.ResolvedExpressionMock;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.utils.DataTypeFactoryMock;
+import org.apache.flink.table.utils.FunctionLookupMock;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+
+import static org.apache.flink.table.api.Expressions.callSql;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isProctimeAttribute;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isRowtimeAttribute;
+import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Tests for {@link Schema}, {@link DefaultSchemaResolver}, and {@link ResolvedSchema}. */
+public class SchemaResolutionTest {
+
+ private static final String COMPUTED_SQL = "orig_ts - INTERVAL '60' MINUTE";
+
+ private static final ResolvedExpression COMPUTED_COLUMN_RESOLVED =
+ new ResolvedExpressionMock(DataTypes.TIMESTAMP(3), () -> COMPUTED_SQL);
+
+ private static final String WATERMARK_SQL = "ts - INTERVAL '5' SECOND";
+
+ private static final ResolvedExpression WATERMARK_RESOLVED =
+ new ResolvedExpressionMock(DataTypes.TIMESTAMP(3), () -> WATERMARK_SQL);
+
+ private static final String PROCTIME_SQL = "PROCTIME()";
+
+ private static final ResolvedExpression PROCTIME_RESOLVED =
+ new ResolvedExpressionMock(
+ fromLogicalToDataType(new TimestampType(false, TimestampKind.PROCTIME, 3)),
+ () -> PROCTIME_SQL);
+
+ private static final Schema SCHEMA =
+ Schema.newBuilder()
+ .primaryKeyNamed("primary_constraint", "id") // out of order
+ .column("id", DataTypes.INT().notNull())
+ .column("counter", DataTypes.INT().notNull())
+ .column("payload", "ROW<name STRING, age INT, flag BOOLEAN>")
+ .columnByMetadata("topic", DataTypes.STRING(), true)
+ .columnByExpression("ts", callSql(COMPUTED_SQL)) // out of order API expression
+ .columnByMetadata("orig_ts", DataTypes.TIMESTAMP(3), "timestamp")
+ .watermark("ts", WATERMARK_SQL)
+ .columnByExpression("proctime", PROCTIME_SQL)
+ .build();
+
+ @Test
+ public void testSchemaResolution() {
+ final ResolvedSchema expectedSchema =
+ new ResolvedSchema(
+ Arrays.asList(
+ Column.physical("id", DataTypes.INT().notNull()),
+ Column.physical("counter", DataTypes.INT().notNull()),
+ Column.physical(
+ "payload",
+ DataTypes.ROW(
+ DataTypes.FIELD("name", DataTypes.STRING()),
+ DataTypes.FIELD("age", DataTypes.INT()),
+ DataTypes.FIELD("flag", DataTypes.BOOLEAN()))),
+ Column.metadata("topic", DataTypes.STRING(), true),
+ Column.computed("ts", COMPUTED_COLUMN_RESOLVED),
+ Column.metadata("orig_ts", DataTypes.TIMESTAMP(3), "timestamp"),
+ Column.computed("proctime", PROCTIME_RESOLVED)),
+ Collections.singletonList(new WatermarkSpec("ts", WATERMARK_RESOLVED)),
+ UniqueConstraint.primaryKey(
+ "primary_constraint", Collections.singletonList("id")));
+
+ final ResolvedSchema actualStreamSchema = resolveSchema(SCHEMA, true, true);
+ {
+ assertThat(actualStreamSchema, equalTo(expectedSchema));
+ assertTrue(isRowtimeAttribute(getType(actualStreamSchema, "ts")));
+ assertTrue(isProctimeAttribute(getType(actualStreamSchema, "proctime")));
+ }
+
+ final ResolvedSchema actualBatchSchema = resolveSchema(SCHEMA, false, true);
+ {
+ assertThat(actualBatchSchema, equalTo(expectedSchema));
+ assertFalse(isRowtimeAttribute(getType(actualBatchSchema, "ts")));
+ assertTrue(isProctimeAttribute(getType(actualBatchSchema, "proctime")));
+ }
+ }
+
+ @Test
+ public void testSchemaResolutionErrors() {
+
+ // columns
+
+ testError(
+ Schema.newBuilder().fromSchema(SCHEMA).column("id", DataTypes.STRING()).build(),
+ "Schema must not contain duplicate column names.");
+
+ testError(
+ SCHEMA,
+ "Metadata columns are not supported in a schema at the current location.",
+ true,
+ false);
+
+ testError(
+ Schema.newBuilder().columnByExpression("invalid", callSql("INVALID")).build(),
+ "Invalid expression for computed column 'invalid'.");
+
+ // time attributes and watermarks
+
+ testError(
+ Schema.newBuilder()
+ .column("ts", DataTypes.BOOLEAN())
+ .watermark("ts", callSql(WATERMARK_SQL))
+ .build(),
+ "Invalid data type of time field for watermark definition. The field must be of type TIMESTAMP(3) WITHOUT TIME ZONE.");
+
+ testError(
+ Schema.newBuilder()
+ .column("ts", DataTypes.TIMESTAMP(3))
+ .watermark("ts", callSql("INVALID"))
+ .build(),
+ "Invalid expression for watermark 'WATERMARK FOR `ts` AS [INVALID]'.");
+
+ testError(
+ Schema.newBuilder()
+ .column("ts", DataTypes.TIMESTAMP(3))
+ .watermark("other_ts", callSql(WATERMARK_SQL))
+ .build(),
+ "Invalid column name 'other_ts' for rowtime attribute in watermark declaration. Available columns are: [ts]");
+
+ testError(
+ Schema.newBuilder().fromSchema(SCHEMA).watermark("orig_ts", WATERMARK_SQL).build(),
+ "Multiple watermark definitions are not supported yet.");
+
+ testError(
+ Schema.newBuilder()
+ .columnByExpression("ts", PROCTIME_SQL)
+ .watermark("ts", WATERMARK_SQL)
+ .build(),
+ "A watermark can not be defined for a processing-time attribute.");
+
+ // primary keys
+
+ testError(
+ Schema.newBuilder().column("id", DataTypes.INT()).primaryKey("INVALID").build(),
+ "Column 'INVALID' does not exist.");
+
+ testError(
+ Schema.newBuilder()
+ .column("nullable_col", DataTypes.INT())
+ .primaryKey("nullable_col")
+ .build(),
+ "Column 'nullable_col' is nullable.");
+
+ testError(
+ Schema.newBuilder()
+ .column("orig_ts", DataTypes.TIMESTAMP(3))
+ .columnByExpression("ts", COMPUTED_SQL)
+ .primaryKey("ts")
+ .build(),
+ "Column 'ts' is not a physical column.");
+
+ testError(
+ Schema.newBuilder().column("id", DataTypes.INT()).primaryKey("id", "id").build(),
+ "Invalid primary key 'PK_id_id'. A primary key must not contain duplicate columns. Found: [id]");
+ }
+
+ @Test
+ public void testUnresolvedSchemaString() {
+ assertThat(
+ SCHEMA.toString(),
+ equalTo(
+ "(\n"
+ + " `id` INT NOT NULL, \n"
+ + " `counter` INT NOT NULL, \n"
+ + " `payload` [ROW<name STRING, age INT, flag BOOLEAN>], \n"
+ + " `topic` METADATA VIRTUAL, \n"
+ + " `ts` AS [orig_ts - INTERVAL '60' MINUTE], \n"
+ + " `orig_ts` METADATA FROM 'timestamp', \n"
+ + " `proctime` AS [PROCTIME()], \n"
+ + " WATERMARK FOR `ts` AS [ts - INTERVAL '5' SECOND], \n"
+ + " CONSTRAINT `primary_constraint` PRIMARY KEY (`id`) NOT ENFORCED\n"
+ + ")"));
+ }
+
+ @Test
+ public void testResolvedSchemaString() {
+ final ResolvedSchema resolvedSchema = resolveSchema(SCHEMA);
+ assertThat(
+ resolvedSchema.toString(),
+ equalTo(
+ "(\n"
+ + " `id` INT NOT NULL, \n"
+ + " `counter` INT NOT NULL, \n"
+ + " `payload` ROW<`name` STRING, `age` INT, `flag` BOOLEAN>, \n"
+ + " `topic` STRING METADATA VIRTUAL, \n"
+ + " `ts` TIMESTAMP(3) *ROWTIME* AS orig_ts - INTERVAL '60' MINUTE, \n"
+ + " `orig_ts` TIMESTAMP(3) METADATA FROM 'timestamp', \n"
+ + " `proctime` TIMESTAMP(3) NOT NULL *PROCTIME* AS PROCTIME(), \n"
+ + " WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - INTERVAL '5' SECOND, \n"
+ + " CONSTRAINT `primary_constraint` PRIMARY KEY (`id`) NOT ENFORCED\n"
+ + ")"));
+ }
+
+ @Test
+ public void testGeneratedConstraintName() {
+ final Schema schema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("b", DataTypes.STRING())
+ .column("c", DataTypes.STRING())
+ .primaryKey("b", "a")
+ .build();
+ assertThat(
+ schema.getPrimaryKey().orElseThrow(IllegalStateException::new).getConstraintName(),
+ equalTo("PK_b_a"));
+ }
+
+ @Test
+ public void testSinkRowDataType() {
+ final ResolvedSchema resolvedSchema = resolveSchema(SCHEMA);
+ final DataType expectedDataType =
+ DataTypes.ROW(
+ DataTypes.FIELD("id", DataTypes.INT().notNull()),
+ DataTypes.FIELD("counter", DataTypes.INT().notNull()),
+ DataTypes.FIELD(
+ "payload",
+ DataTypes.ROW(
+ DataTypes.FIELD("name", DataTypes.STRING()),
+ DataTypes.FIELD("age", DataTypes.INT()),
+ DataTypes.FIELD("flag", DataTypes.BOOLEAN()))),
+ DataTypes.FIELD("orig_ts", DataTypes.TIMESTAMP(3)))
+ .notNull();
+ assertThat(resolvedSchema.toSinkRowDataType(), equalTo(expectedDataType));
+ }
+
+ @Test
+ public void testPhysicalRowDataType() {
+ final ResolvedSchema resolvedSchema1 = resolveSchema(SCHEMA);
+ final DataType expectedDataType =
+ DataTypes.ROW(
+ DataTypes.FIELD("id", DataTypes.INT().notNull()),
+ DataTypes.FIELD("counter", DataTypes.INT().notNull()),
+ DataTypes.FIELD(
+ "payload",
+ DataTypes.ROW(
+ DataTypes.FIELD("name", DataTypes.STRING()),
+ DataTypes.FIELD("age", DataTypes.INT()),
+ DataTypes.FIELD("flag", DataTypes.BOOLEAN()))))
+ .notNull();
+
+ final DataType physicalDataType1 = resolvedSchema1.toPhysicalRowDataType();
+ assertThat(physicalDataType1, equalTo(expectedDataType));
+
+ final ResolvedSchema resolvedSchema2 =
+ resolveSchema(Schema.newBuilder().fromRowDataType(physicalDataType1).build());
+ assertThat(resolvedSchema2.toPhysicalRowDataType(), equalTo(physicalDataType1));
+ }
+
+ @Test
+ public void testSourceRowDataType() {
+ final ResolvedSchema resolvedSchema = resolveSchema(SCHEMA, true, true);
+ final DataType expectedDataType =
+ DataTypes.ROW(
+ DataTypes.FIELD("id", DataTypes.INT().notNull()),
+ DataTypes.FIELD("counter", DataTypes.INT().notNull()),
+ DataTypes.FIELD(
+ "payload",
+ DataTypes.ROW(
+ DataTypes.FIELD("name", DataTypes.STRING()),
+ DataTypes.FIELD("age", DataTypes.INT()),
+ DataTypes.FIELD("flag", DataTypes.BOOLEAN()))),
+ DataTypes.FIELD("topic", DataTypes.STRING()),
+ DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)),
+ DataTypes.FIELD("orig_ts", DataTypes.TIMESTAMP(3)),
+ DataTypes.FIELD("proctime", DataTypes.TIMESTAMP(3).notNull()))
+ .notNull();
+ assertThat(resolvedSchema.toSourceRowDataType(), equalTo(expectedDataType));
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private static void testError(Schema schema, String errorMessage) {
+ testError(schema, errorMessage, true, true);
+ }
+
+ private static void testError(
+ Schema schema, String errorMessage, boolean isStreaming, boolean supportsMetadata) {
+ try {
+ resolveSchema(schema, isStreaming, supportsMetadata);
+ fail("Error message expected: " + errorMessage);
+ } catch (Throwable t) {
+ assertThat(t, FlinkMatchers.containsMessage(errorMessage));
+ }
+ }
+
+ private static ResolvedSchema resolveSchema(Schema schema) {
+ return resolveSchema(schema, true, true);
+ }
+
+ private static ResolvedSchema resolveSchema(
+ Schema schema, boolean isStreamingMode, boolean supportsMetadata) {
+ final SchemaResolver resolver =
+ new DefaultSchemaResolver(
+ isStreamingMode,
+ supportsMetadata,
+ dataTypeFactory(),
+ expressionResolverBuilder());
+ return resolver.resolve(schema);
+ }
+
+ private static ExpressionResolver.ExpressionResolverBuilder expressionResolverBuilder() {
+ return ExpressionResolver.resolverFor(
+ new TableConfig(),
+ name -> Optional.empty(),
+ new FunctionLookupMock(Collections.emptyMap()),
+ dataTypeFactory(),
+ SchemaResolutionTest::resolveSqlExpression);
+ }
+
+ private static DataTypeFactory dataTypeFactory() {
+ return new DataTypeFactoryMock();
+ }
+
+ private static ResolvedExpression resolveSqlExpression(
+ String sqlExpression, TableSchema inputSchema) {
+ switch (sqlExpression) {
+ case COMPUTED_SQL:
+ assertThat(
+ inputSchema.getFieldDataType("orig_ts").orElse(null),
+ equalTo(DataTypes.TIMESTAMP(3)));
+ return COMPUTED_COLUMN_RESOLVED;
+ case WATERMARK_SQL:
+ assertThat(
+ inputSchema.getFieldDataType("ts").orElse(null),
+ equalTo(DataTypes.TIMESTAMP(3)));
+ return WATERMARK_RESOLVED;
+ case PROCTIME_SQL:
+ return PROCTIME_RESOLVED;
+ default:
+ throw new UnsupportedOperationException("Unknown SQL expression.");
+ }
+ }
+
+ private static LogicalType getType(ResolvedSchema resolvedSchema, String column) {
+ return resolvedSchema
+ .getColumn(column)
+ .orElseThrow(IllegalStateException::new)
+ .getDataType()
+ .getLogicalType();
+ }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Schema.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Schema.java
new file mode 100644
index 0000000..d749a81
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Schema.java
@@ -0,0 +1,857 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.Column.ComputedColumn;
+import org.apache.flink.table.catalog.Column.MetadataColumn;
+import org.apache.flink.table.catalog.Column.PhysicalColumn;
+import org.apache.flink.table.catalog.Constraint;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.SchemaResolver;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.catalog.WatermarkSpec;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.SqlCallExpression;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot;
+
+/**
+ * Schema of a table or view.
+ *
+ * <p>A schema represents the schema part of a {@code CREATE TABLE (schema) WITH (options)} DDL
+ * statement in SQL. It defines columns of different kind, constraints, time attributes, and
+ * watermark strategies. It is possible to reference objects (such as functions or types) across
+ * different catalogs.
+ *
+ * <p>This class is used in the API and catalogs to define an unresolved schema that will be
+ * translated to {@link ResolvedSchema}. Some methods of this class perform basic validation,
+ * however, the main validation happens during the resolution.
+ *
+ * <p>Since an instance of this class is unresolved, it should not be directly persisted. The {@link
+ * #toString()} shows only a summary of the contained objects.
+ */
+@PublicEvolving
+public final class Schema {
+
+ private final List<UnresolvedColumn> columns;
+
+ private final List<UnresolvedWatermarkSpec> watermarkSpecs;
+
+ private final @Nullable UnresolvedPrimaryKey primaryKey;
+
+ private Schema(
+ List<UnresolvedColumn> columns,
+ List<UnresolvedWatermarkSpec> watermarkSpecs,
+ @Nullable UnresolvedPrimaryKey primaryKey) {
+ this.columns = columns;
+ this.watermarkSpecs = watermarkSpecs;
+ this.primaryKey = primaryKey;
+ }
+
+ /** Builder for configuring and creating instances of {@link Schema}. */
+ public static Schema.Builder newBuilder() {
+ return new Builder();
+ }
+
+ public List<UnresolvedColumn> getColumns() {
+ return columns;
+ }
+
+ public List<UnresolvedWatermarkSpec> getWatermarkSpecs() {
+ return watermarkSpecs;
+ }
+
+ public Optional<UnresolvedPrimaryKey> getPrimaryKey() {
+ return Optional.ofNullable(primaryKey);
+ }
+
+ /** Resolves the given {@link Schema} to a validated {@link ResolvedSchema}. */
+ public ResolvedSchema resolve(SchemaResolver resolver) {
+ return resolver.resolve(this);
+ }
+
+ @Override
+ public String toString() {
+ final List<Object> components = new ArrayList<>();
+ components.addAll(columns);
+ components.addAll(watermarkSpecs);
+ if (primaryKey != null) {
+ components.add(primaryKey);
+ }
+ return components.stream()
+ .map(Objects::toString)
+ .map(s -> " " + s)
+ .collect(Collectors.joining(", \n", "(\n", "\n)"));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Schema schema = (Schema) o;
+ return columns.equals(schema.columns)
+ && watermarkSpecs.equals(schema.watermarkSpecs)
+ && Objects.equals(primaryKey, schema.primaryKey);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(columns, watermarkSpecs, primaryKey);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /** A builder for constructing an immutable but still unresolved {@link Schema}. */
+ public static final class Builder {
+
+ private final List<UnresolvedColumn> columns;
+
+ private final List<UnresolvedWatermarkSpec> watermarkSpecs;
+
+ private @Nullable UnresolvedPrimaryKey primaryKey;
+
+ private Builder() {
+ columns = new ArrayList<>();
+ watermarkSpecs = new ArrayList<>();
+ }
+
+ /** Adopts all members from the given unresolved schema. */
+ public Builder fromSchema(Schema unresolvedSchema) {
+ columns.addAll(unresolvedSchema.columns);
+ watermarkSpecs.addAll(unresolvedSchema.watermarkSpecs);
+ if (unresolvedSchema.primaryKey != null) {
+ primaryKeyNamed(
+ unresolvedSchema.primaryKey.getConstraintName(),
+ unresolvedSchema.primaryKey.getColumnNames());
+ }
+ return this;
+ }
+
+ /** Adopts all members from the given resolved schema. */
+ public Builder fromResolvedSchema(ResolvedSchema resolvedSchema) {
+ addResolvedColumns(resolvedSchema.getColumns());
+ addResolvedWatermarkSpec(resolvedSchema.getWatermarkSpecs());
+ resolvedSchema.getPrimaryKey().ifPresent(this::addResolvedConstraint);
+ return this;
+ }
+
+ /** Adopts all fields of the given row as physical columns of the schema. */
+ public Builder fromRowDataType(DataType dataType) {
+ Preconditions.checkNotNull(dataType, "Data type must not be null.");
+ Preconditions.checkArgument(
+ hasRoot(dataType.getLogicalType(), LogicalTypeRoot.ROW),
+ "Data type of ROW expected.");
+ final List<DataType> fieldDataTypes = dataType.getChildren();
+ final List<String> fieldNames = ((RowType) dataType.getLogicalType()).getFieldNames();
+ IntStream.range(0, fieldDataTypes.size())
+ .forEach(i -> column(fieldNames.get(i), fieldDataTypes.get(i)));
+ return this;
+ }
+
+ /**
+ * Declares a physical column that is appended to this schema.
+ *
+ * <p>Physical columns are regular columns known from databases. They define the names, the
+ * types, and the order of fields in the physical data. Thus, physical columns represent the
+ * payload that is read from and written to an external system. Connectors and formats use
+ * these columns (in the defined order) to configure themselves. Other kinds of columns can
+ * be declared between physical columns but will not influence the final physical schema.
+ *
+ * @param columnName column name
+ * @param dataType data type of the column
+ */
+ public Builder column(String columnName, AbstractDataType<?> dataType) {
+ Preconditions.checkNotNull(columnName, "Column name must not be null.");
+ Preconditions.checkNotNull(dataType, "Data type must not be null.");
+ columns.add(new UnresolvedPhysicalColumn(columnName, dataType));
+ return this;
+ }
+
+ /**
+ * Declares a physical column that is appended to this schema.
+ *
+ * <p>See {@link #column(String, AbstractDataType)} for a detailed explanation.
+ *
+ * <p>This method uses a type string that can be easily persisted in a durable catalog.
+ *
+ * @param columnName column name
+ * @param serializableTypeString data type of the column as a serializable string
+ * @see LogicalType#asSerializableString()
+ */
+ public Builder column(String columnName, String serializableTypeString) {
+ return column(columnName, DataTypes.of(serializableTypeString));
+ }
+
+ /**
+ * Declares a computed column that is appended to this schema.
+ *
+ * <p>Computed columns are virtual columns that are generated by evaluating an expression
+ * that can reference other columns declared in the same table. Both physical columns and
+ * metadata columns can be accessed. The column itself is not physically stored within the
+ * table. The column’s data type is derived automatically from the given expression and does
+ * not have to be declared manually.
+ *
+ * <p>Computed columns are commonly used for defining time attributes. For example, the
+ * computed column can be used if the original field is not TIMESTAMP(3) type or is nested
+ * in a JSON string.
+ *
+ * <p>Any scalar expression can be used for in-memory/temporary tables. However, currently,
+ * only SQL expressions can be persisted in a catalog. User-defined functions (also defined
+ * in different catalogs) are supported.
+ *
+ * <p>Example: {@code .columnByExpression("ts", $("json_obj").get("ts").cast(TIMESTAMP(3))}
+ *
+ * @param columnName column name
+ * @param expression computation of the column
+ */
+ public Builder columnByExpression(String columnName, Expression expression) {
+ Preconditions.checkNotNull(columnName, "Column name must not be null.");
+ Preconditions.checkNotNull(expression, "Expression must not be null.");
+ columns.add(new UnresolvedComputedColumn(columnName, expression));
+ return this;
+ }
+
+ /**
+ * Declares a computed column that is appended to this schema.
+ *
+ * <p>See {@link #columnByExpression(String, Expression)} for a detailed explanation.
+ *
+ * <p>This method uses a SQL expression that can be easily persisted in a durable catalog.
+ *
+ * <p>Example: {@code .columnByExpression("ts", "CAST(json_obj.ts AS TIMESTAMP(3))")}
+ *
+ * @param columnName column name
+ * @param sqlExpression computation of the column using SQL
+ */
+ public Builder columnByExpression(String columnName, String sqlExpression) {
+ return columnByExpression(columnName, new SqlCallExpression(sqlExpression));
+ }
+
+ /**
+ * Declares a metadata column that is appended to this schema.
+ *
+ * <p>Metadata columns allow to access connector and/or format specific fields for every row
+ * of a table. For example, a metadata column can be used to read and write the timestamp
+ * from and to Kafka records for time-based operations. The connector and format
+ * documentation lists the available metadata fields for every component.
+ *
+ * <p>Every metadata field is identified by a string-based key and has a documented data
+ * type. For convenience, the runtime will perform an explicit cast if the data type of the
+ * column differs from the data type of the metadata field. Of course, this requires that
+ * the two data types are compatible.
+ *
+ * <p>By default, a metadata column can be used for both reading and writing. However, in
+ * many cases an external system provides more read-only metadata fields than writable
+ * fields. Therefore, it is possible to exclude metadata columns from persisting by setting
+ * the {@code isVirtual} flag to {@code true}.
+ *
+ * <p>Note: This method assumes that the metadata key is equal to the column name.
+ *
+ * @param columnName column name
+ * @param dataType data type of the column
+ * @param isVirtual whether the column should be persisted or not
+ */
+ public Builder columnByMetadata(
+ String columnName, AbstractDataType<?> dataType, boolean isVirtual) {
+ Preconditions.checkNotNull(columnName, "Column name must not be null.");
+ Preconditions.checkNotNull(dataType, "Data type must not be null.");
+ columns.add(new UnresolvedMetadataColumn(columnName, dataType, null, isVirtual));
+ return this;
+ }
+
+ /**
+ * Declares a metadata column that is appended to this schema.
+ *
+ * <p>Metadata columns allow to access connector and/or format specific fields for every row
+ * of a table. For example, a metadata column can be used to read and write the timestamp
+ * from and to Kafka records for time-based operations. The connector and format
+ * documentation lists the available metadata fields for every component.
+ *
+ * <p>Every metadata field is identified by a string-based key and has a documented data
+ * type. The metadata key can be omitted if the column name should be used as the
+ * identifying metadata key. For convenience, the runtime will perform an explicit cast if
+ * the data type of the column differs from the data type of the metadata field. Of course,
+ * this requires that the two data types are compatible.
+ *
+ * <p>Note: This method assumes that a metadata column can be used for both reading and
+ * writing.
+ *
+ * @param columnName column name
+ * @param dataType data type of the column
+ * @param metadataKey identifying metadata key, if null the column name will be used as
+ * metadata key
+ */
+ public Builder columnByMetadata(
+ String columnName, AbstractDataType<?> dataType, @Nullable String metadataKey) {
+ Preconditions.checkNotNull(columnName, "Column name must not be null.");
+ Preconditions.checkNotNull(dataType, "Data type must not be null.");
+ columns.add(new UnresolvedMetadataColumn(columnName, dataType, metadataKey, false));
+ return this;
+ }
+
+ /**
+ * Declares a metadata column that is appended to this schema.
+ *
+ * <p>Metadata columns allow to access connector and/or format specific fields for every row
+ * of a table. For example, a metadata column can be used to read and write the timestamp
+ * from and to Kafka records for time-based operations. The connector and format
+ * documentation lists the available metadata fields for every component.
+ *
+ * <p>Every metadata field is identified by a string-based key and has a documented data
+ * type. The metadata key can be omitted if the column name should be used as the
+ * identifying metadata key. For convenience, the runtime will perform an explicit cast if
+ * the data type of the column differs from the data type of the metadata field. Of course,
+ * this requires that the two data types are compatible.
+ *
+ * <p>By default, a metadata column can be used for both reading and writing. However, in
+ * many cases an external system provides more read-only metadata fields than writable
+ * fields. Therefore, it is possible to exclude metadata columns from persisting by setting
+ * the {@code isVirtual} flag to {@code true}.
+ *
+ * @param columnName column name
+ * @param dataType data type of the column
+ * @param metadataKey identifying metadata key, if null the column name will be used as
+ * metadata key
+ * @param isVirtual whether the column should be persisted or not
+ */
+ public Builder columnByMetadata(
+ String columnName,
+ AbstractDataType<?> dataType,
+ @Nullable String metadataKey,
+ boolean isVirtual) {
+ Preconditions.checkNotNull(columnName, "Column name must not be null.");
+ columns.add(new UnresolvedMetadataColumn(columnName, dataType, metadataKey, isVirtual));
+ return this;
+ }
+
+ /**
+ * Declares that the given column should serve as an event-time (i.e. rowtime) attribute and
+ * specifies a corresponding watermark strategy as an expression.
+ *
+ * <p>The column must be of type {@code TIMESTAMP(3)} and be a top-level column in the
+ * schema. It may be a computed column.
+ *
+ * <p>The watermark generation expression is evaluated by the framework for every record
+ * during runtime. The framework will periodically emit the largest generated watermark. If
+ * the current watermark is still identical to the previous one, or is null, or the value of
+ * the returned watermark is smaller than that of the last emitted one, then no new
+ * watermark will be emitted. A watermark is emitted in an interval defined by the
+ * configuration.
+ *
+ * <p>Any scalar expression can be used for declaring a watermark strategy for
+ * in-memory/temporary tables. However, currently, only SQL expressions can be persisted in
+ * a catalog. The expression's return data type must be {@code TIMESTAMP(3)}. User-defined
+ * functions (also defined in different catalogs) are supported.
+ *
+ * <p>Example: {@code .watermark("ts", $("ts).minus(lit(5).seconds())}
+ *
+ * @param columnName the column name used as a rowtime attribute
+ * @param watermarkExpression the expression used for watermark generation
+ */
+ public Builder watermark(String columnName, Expression watermarkExpression) {
+ Preconditions.checkNotNull(columnName, "Column name must not be null.");
+ Preconditions.checkNotNull(
+ watermarkExpression, "Watermark expression must not be null.");
+ this.watermarkSpecs.add(new UnresolvedWatermarkSpec(columnName, watermarkExpression));
+ return this;
+ }
+
+ /**
+ * Declares that the given column should serve as an event-time (i.e. rowtime) attribute and
+ * specifies a corresponding watermark strategy as an expression.
+ *
+ * <p>See {@link #watermark(String, Expression)} for a detailed explanation.
+ *
+ * <p>This method uses a SQL expression that can be easily persisted in a durable catalog.
+ *
+ * <p>Example: {@code .watermark("ts", "ts - INTERVAL '5' SECOND")}
+ */
+ public Builder watermark(String columnName, String sqlExpression) {
+ return watermark(columnName, new SqlCallExpression(sqlExpression));
+ }
+
+ /**
+ * Declares a primary key constraint for a set of given columns. Primary key uniquely
+ * identify a row in a table. Neither of columns in a primary can be nullable. The primary
+ * key is informational only. It will not be enforced. It can be used for optimizations. It
+ * is the data owner's responsibility to ensure uniqueness of the data.
+ *
+ * <p>The primary key will be assigned a random name.
+ *
+ * @param columnNames columns that form a unique primary key
+ */
+ public Builder primaryKey(String... columnNames) {
+ Preconditions.checkNotNull(columnNames, "Primary key column names must not be null.");
+ return primaryKey(Arrays.asList(columnNames));
+ }
+
+ /**
+ * Declares a primary key constraint for a set of given columns. Primary key uniquely
+ * identify a row in a table. Neither of columns in a primary can be nullable. The primary
+ * key is informational only. It will not be enforced. It can be used for optimizations. It
+ * is the data owner's responsibility to ensure uniqueness of the data.
+ *
+ * <p>The primary key will be assigned a generated name in the format {@code PK_col1_col2}.
+ *
+ * @param columnNames columns that form a unique primary key
+ */
+ public Builder primaryKey(List<String> columnNames) {
+ Preconditions.checkNotNull(columnNames, "Primary key column names must not be null.");
+ final String generatedConstraintName =
+ columnNames.stream().collect(Collectors.joining("_", "PK_", ""));
+ return primaryKeyNamed(generatedConstraintName, columnNames);
+ }
+
+ /**
+ * Declares a primary key constraint for a set of given columns. Primary key uniquely
+ * identify a row in a table. Neither of columns in a primary can be nullable. The primary
+ * key is informational only. It will not be enforced. It can be used for optimizations. It
+ * is the data owner's responsibility to ensure uniqueness of the data.
+ *
+ * @param constraintName name for the primary key, can be used to reference the constraint
+ * @param columnNames columns that form a unique primary key
+ */
+ public Builder primaryKeyNamed(String constraintName, String... columnNames) {
+ Preconditions.checkNotNull(columnNames, "Primary key column names must not be null.");
+ return primaryKeyNamed(constraintName, Arrays.asList(columnNames));
+ }
+
+ /**
+ * Declares a primary key constraint for a set of given columns. Primary key uniquely
+ * identify a row in a table. Neither of columns in a primary can be nullable. The primary
+ * key is informational only. It will not be enforced. It can be used for optimizations. It
+ * is the data owner's responsibility to ensure uniqueness of the data.
+ *
+ * @param constraintName name for the primary key, can be used to reference the constraint
+ * @param columnNames columns that form a unique primary key
+ */
+ public Builder primaryKeyNamed(String constraintName, List<String> columnNames) {
+ Preconditions.checkState(
+ primaryKey == null, "Multiple primary keys are not supported.");
+ Preconditions.checkNotNull(
+ constraintName, "Primary key constraint name must not be null.");
+ Preconditions.checkArgument(
+ !StringUtils.isNullOrWhitespaceOnly(constraintName),
+ "Primary key constraint name must not be empty.");
+ Preconditions.checkArgument(
+ columnNames != null && columnNames.size() > 0,
+ "Primary key constraint must be defined for at least a single column.");
+ primaryKey = new UnresolvedPrimaryKey(constraintName, columnNames);
+ return this;
+ }
+
+ /** Returns an instance of an unresolved {@link Schema}. */
+ public Schema build() {
+ return new Schema(columns, watermarkSpecs, primaryKey);
+ }
+
+ // ----------------------------------------------------------------------------------------
+
+ private void addResolvedColumns(List<Column> columns) {
+ columns.forEach(
+ c -> {
+ if (c instanceof PhysicalColumn) {
+ final PhysicalColumn physicalColumn = (PhysicalColumn) c;
+ column(physicalColumn.getName(), physicalColumn.getDataType());
+ } else if (c instanceof ComputedColumn) {
+ final ComputedColumn computedColumn = (ComputedColumn) c;
+ columnByExpression(
+ computedColumn.getName(), computedColumn.getExpression());
+ } else if (c instanceof MetadataColumn) {
+ final MetadataColumn metadataColumn = (MetadataColumn) c;
+ columnByMetadata(
+ metadataColumn.getName(),
+ metadataColumn.getDataType(),
+ metadataColumn.getMetadataAlias().orElse(null),
+ metadataColumn.isVirtual());
+ }
+ });
+ }
+
+ private void addResolvedWatermarkSpec(List<WatermarkSpec> specs) {
+ specs.forEach(
+ s ->
+ watermarkSpecs.add(
+ new UnresolvedWatermarkSpec(
+ s.getRowtimeAttribute(), s.getWatermarkExpression())));
+ }
+
+ private void addResolvedConstraint(UniqueConstraint constraint) {
+ if (constraint.getType() == Constraint.ConstraintType.PRIMARY_KEY) {
+ primaryKeyNamed(constraint.getName(), constraint.getColumns());
+ } else {
+ throw new IllegalArgumentException("Unsupported constraint type.");
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Helper classes for representing the schema
+ // --------------------------------------------------------------------------------------------
+
+ /** Super class for all kinds of columns in an unresolved schema. */
+ public abstract static class UnresolvedColumn {
+ final String columnName;
+
+ UnresolvedColumn(String columnName) {
+ this.columnName = columnName;
+ }
+
+ public String getName() {
+ return columnName;
+ }
+
+ @Override
+ public String toString() {
+ return EncodingUtils.escapeIdentifier(columnName);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ UnresolvedColumn that = (UnresolvedColumn) o;
+ return columnName.equals(that.columnName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(columnName);
+ }
+ }
+
+ /**
+ * Declaration of a physical column that will be resolved to {@link PhysicalColumn} during
+ * schema resolution.
+ */
+ public static final class UnresolvedPhysicalColumn extends UnresolvedColumn {
+
+ private final AbstractDataType<?> dataType;
+
+ UnresolvedPhysicalColumn(String columnName, AbstractDataType<?> dataType) {
+ super(columnName);
+ this.dataType = dataType;
+ }
+
+ public AbstractDataType<?> getDataType() {
+ return dataType;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s %s", super.toString(), dataType.toString());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ UnresolvedPhysicalColumn that = (UnresolvedPhysicalColumn) o;
+ return dataType.equals(that.dataType);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), dataType);
+ }
+ }
+
+ /**
+ * Declaration of a computed column that will be resolved to {@link ComputedColumn} during
+ * schema resolution.
+ */
+ public static final class UnresolvedComputedColumn extends UnresolvedColumn {
+
+ private final Expression expression;
+
+ UnresolvedComputedColumn(String columnName, Expression expression) {
+ super(columnName);
+ this.expression = expression;
+ }
+
+ public Expression getExpression() {
+ return expression;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s AS %s", super.toString(), expression.asSummaryString());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ UnresolvedComputedColumn that = (UnresolvedComputedColumn) o;
+ return expression.equals(that.expression);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), expression);
+ }
+ }
+
+ /**
+ * Declaration of a metadata column that will be resolved to {@link MetadataColumn} during
+ * schema resolution.
+ */
+ public static final class UnresolvedMetadataColumn extends UnresolvedColumn {
+
+ private final AbstractDataType<?> dataType;
+ private final @Nullable String metadataKey;
+ private final boolean isVirtual;
+
+ UnresolvedMetadataColumn(
+ String columnName,
+ AbstractDataType<?> dataType,
+ @Nullable String metadataKey,
+ boolean isVirtual) {
+ super(columnName);
+ this.dataType = dataType;
+ this.metadataKey = metadataKey;
+ this.isVirtual = isVirtual;
+ }
+
+ public AbstractDataType<?> getDataType() {
+ return dataType;
+ }
+
+ public @Nullable String getMetadataKey() {
+ return metadataKey;
+ }
+
+ public boolean isVirtual() {
+ return isVirtual;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append(super.toString());
+ sb.append(" METADATA");
+ if (metadataKey != null) {
+ sb.append(" FROM '");
+ sb.append(EncodingUtils.escapeSingleQuotes(metadataKey));
+ sb.append("'");
+ }
+ if (isVirtual) {
+ sb.append(" VIRTUAL");
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ UnresolvedMetadataColumn that = (UnresolvedMetadataColumn) o;
+ return isVirtual == that.isVirtual
+ && dataType.equals(that.dataType)
+ && Objects.equals(metadataKey, that.metadataKey);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), dataType, metadataKey, isVirtual);
+ }
+ }
+
+ /**
+ * Declaration of a watermark strategy that will be resolved to {@link WatermarkSpec} during
+ * schema resolution.
+ */
+ public static final class UnresolvedWatermarkSpec {
+
+ private final String columnName;
+ private final Expression watermarkExpression;
+
+ UnresolvedWatermarkSpec(String columnName, Expression watermarkExpression) {
+ this.columnName = columnName;
+ this.watermarkExpression = watermarkExpression;
+ }
+
+ public String getColumnName() {
+ return columnName;
+ }
+
+ public Expression getWatermarkExpression() {
+ return watermarkExpression;
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "WATERMARK FOR %s AS %s",
+ EncodingUtils.escapeIdentifier(columnName),
+ watermarkExpression.asSummaryString());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ UnresolvedWatermarkSpec that = (UnresolvedWatermarkSpec) o;
+ return columnName.equals(that.columnName)
+ && watermarkExpression.equals(that.watermarkExpression);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(columnName, watermarkExpression);
+ }
+ }
+
+ /** Super class for all kinds of constraints in an unresolved schema. */
+ public abstract static class UnresolvedConstraint {
+
+ private final String constraintName;
+
+ UnresolvedConstraint(String constraintName) {
+ this.constraintName = constraintName;
+ }
+
+ public String getConstraintName() {
+ return constraintName;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("CONSTRAINT %s", EncodingUtils.escapeIdentifier(constraintName));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ UnresolvedConstraint that = (UnresolvedConstraint) o;
+ return constraintName.equals(that.constraintName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(constraintName);
+ }
+ }
+
+ /**
+ * Declaration of a primary key that will be resolved to {@link UniqueConstraint} during schema
+ * resolution.
+ */
+ public static final class UnresolvedPrimaryKey extends UnresolvedConstraint {
+
+ private final List<String> columnNames;
+
+ UnresolvedPrimaryKey(String constraintName, List<String> columnNames) {
+ super(constraintName);
+ this.columnNames = columnNames;
+ }
+
+ public List<String> getColumnNames() {
+ return columnNames;
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "%s PRIMARY KEY (%s) NOT ENFORCED",
+ super.toString(),
+ columnNames.stream()
+ .map(EncodingUtils::escapeIdentifier)
+ .collect(Collectors.joining(", ")));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ UnresolvedPrimaryKey that = (UnresolvedPrimaryKey) o;
+ return columnNames.equals(that.columnNames);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), columnNames);
+ }
+ }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/AbstractConstraint.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/AbstractConstraint.java
new file mode 100644
index 0000000..51a541d
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/AbstractConstraint.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Base class for {@link Constraint constraints}. */
+@Internal
+abstract class AbstractConstraint implements Constraint {
+
+ private final String name;
+ private final boolean enforced;
+
+ AbstractConstraint(String name, boolean enforced) {
+ this.name = checkNotNull(name);
+ this.enforced = enforced;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public boolean isEnforced() {
+ return enforced;
+ }
+
+ @Override
+ public String toString() {
+ return asSummaryString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ AbstractConstraint that = (AbstractConstraint) o;
+ return enforced == that.enforced && Objects.equals(name, that.name);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, enforced);
+ }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Column.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Column.java
new file mode 100644
index 0000000..019a9ef
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Column.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Representation of a column in a {@link ResolvedSchema}.
+ *
+ * <p>A table column describes either a {@link PhysicalColumn}, {@link ComputedColumn}, or {@link
+ * MetadataColumn}.
+ *
+ * <p>Every column is fully resolved. The enclosed {@link DataType} indicates whether the column is
+ * a time attribute and thus might differ from the original data type.
+ */
+@PublicEvolving
+public abstract class Column {
+
+ protected final String name;
+
+ protected final DataType dataType;
+
+ private Column(String name, DataType dataType) {
+ this.name = name;
+ this.dataType = dataType;
+ }
+
+ /** Creates a regular table column that represents physical data. */
+ public static PhysicalColumn physical(String name, DataType dataType) {
+ Preconditions.checkNotNull(name, "Column name can not be null.");
+ Preconditions.checkNotNull(dataType, "Column data type can not be null.");
+ return new PhysicalColumn(name, dataType);
+ }
+
+ /** Creates a computed column that is computed from the given {@link ResolvedExpression}. */
+ public static ComputedColumn computed(String name, ResolvedExpression expression) {
+ Preconditions.checkNotNull(name, "Column name can not be null.");
+ Preconditions.checkNotNull(expression, "Column expression can not be null.");
+ return new ComputedColumn(name, expression.getOutputDataType(), expression);
+ }
+
+ /**
+ * Creates a metadata column from metadata of the given column name.
+ *
+ * <p>The column is not virtual by default.
+ */
+ public static MetadataColumn metadata(String name, DataType dataType) {
+ return metadata(name, dataType, null, false);
+ }
+
+ /**
+ * Creates a metadata column from metadata of the given column name.
+ *
+ * <p>Allows to specify whether the column is virtual or not.
+ */
+ public static MetadataColumn metadata(String name, DataType type, boolean isVirtual) {
+ return metadata(name, type, null, isVirtual);
+ }
+
+ /**
+ * Creates a metadata column from metadata of the given alias.
+ *
+ * <p>The column is not virtual by default.
+ */
+ public static MetadataColumn metadata(String name, DataType type, String metadataAlias) {
+ Preconditions.checkNotNull(metadataAlias, "Metadata alias can not be null.");
+ return metadata(name, type, metadataAlias, false);
+ }
+
+ /**
+ * Creates a metadata column from metadata of the given column name or from metadata of the
+ * given alias (if not null).
+ *
+ * <p>Allows to specify whether the column is virtual or not.
+ */
+ public static MetadataColumn metadata(
+ String name, DataType dataType, @Nullable String metadataAlias, boolean isVirtual) {
+ Preconditions.checkNotNull(name, "Column name can not be null.");
+ Preconditions.checkNotNull(dataType, "Column data type can not be null.");
+ return new MetadataColumn(name, dataType, metadataAlias, isVirtual);
+ }
+
+ /**
+ * Returns whether the given column is a physical column of a table; neither computed nor
+ * metadata.
+ */
+ public abstract boolean isPhysical();
+
+ /** Returns whether the given column is persisted in a sink operation. */
+ public abstract boolean isPersisted();
+
+ /** Returns the data type of this column. */
+ public DataType getDataType() {
+ return this.dataType;
+ }
+
+ /** Returns the name of this column. */
+ public String getName() {
+ return name;
+ }
+
+ /** Returns a string that summarizes this column for printing to a console. */
+ public String asSummaryString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append(EncodingUtils.escapeIdentifier(name));
+ sb.append(" ");
+ sb.append(dataType);
+ explainExtras()
+ .ifPresent(
+ e -> {
+ sb.append(" ");
+ sb.append(e);
+ });
+ return sb.toString();
+ }
+
+ /** Returns an explanation of specific column extras next to name and type. */
+ public abstract Optional<String> explainExtras();
+
+ /** Returns a copy of the column with a replaced {@link DataType}. */
+ public abstract Column copy(DataType newType);
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Column that = (Column) o;
+ return Objects.equals(this.name, that.name) && Objects.equals(this.dataType, that.dataType);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(this.name, this.dataType);
+ }
+
+ @Override
+ public String toString() {
+ return asSummaryString();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Specific kinds of columns
+ // --------------------------------------------------------------------------------------------
+
+ /** Representation of a physical column. */
+ public static final class PhysicalColumn extends Column {
+
+ private PhysicalColumn(String name, DataType dataType) {
+ super(name, dataType);
+ }
+
+ @Override
+ public boolean isPhysical() {
+ return true;
+ }
+
+ @Override
+ public boolean isPersisted() {
+ return true;
+ }
+
+ @Override
+ public Optional<String> explainExtras() {
+ return Optional.empty();
+ }
+
+ @Override
+ public Column copy(DataType newDataType) {
+ return new PhysicalColumn(name, newDataType);
+ }
+ }
+
+ /** Representation of a computed column. */
+ public static final class ComputedColumn extends Column {
+
+ private final ResolvedExpression expression;
+
+ private ComputedColumn(String name, DataType dataType, ResolvedExpression expression) {
+ super(name, dataType);
+ this.expression = expression;
+ }
+
+ @Override
+ public boolean isPhysical() {
+ return false;
+ }
+
+ @Override
+ public boolean isPersisted() {
+ return false;
+ }
+
+ public ResolvedExpression getExpression() {
+ return expression;
+ }
+
+ @Override
+ public Optional<String> explainExtras() {
+ return Optional.of("AS " + expression.asSummaryString());
+ }
+
+ @Override
+ public Column copy(DataType newDataType) {
+ return new ComputedColumn(name, newDataType, expression);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ ComputedColumn that = (ComputedColumn) o;
+ return expression.equals(that.expression);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), expression);
+ }
+ }
+
+ /** Representation of a metadata column. */
+ public static final class MetadataColumn extends Column {
+
+ private final @Nullable String metadataAlias;
+
+ private final boolean isVirtual;
+
+ private MetadataColumn(
+ String name, DataType dataType, @Nullable String metadataAlias, boolean isVirtual) {
+ super(name, dataType);
+ this.metadataAlias = metadataAlias;
+ this.isVirtual = isVirtual;
+ }
+
+ public boolean isVirtual() {
+ return isVirtual;
+ }
+
+ public Optional<String> getMetadataAlias() {
+ return Optional.ofNullable(metadataAlias);
+ }
+
+ @Override
+ public boolean isPhysical() {
+ return false;
+ }
+
+ @Override
+ public boolean isPersisted() {
+ return !isVirtual;
+ }
+
+ @Override
+ public Optional<String> explainExtras() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("METADATA");
+ if (metadataAlias != null) {
+ sb.append(" FROM ");
+ sb.append("'");
+ sb.append(EncodingUtils.escapeSingleQuotes(metadataAlias));
+ sb.append("'");
+ }
+ if (isVirtual) {
+ sb.append(" VIRTUAL");
+ }
+ return Optional.of(sb.toString());
+ }
+
+ @Override
+ public Column copy(DataType newDataType) {
+ return new MetadataColumn(name, newDataType, metadataAlias, isVirtual);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ MetadataColumn that = (MetadataColumn) o;
+ return isVirtual == that.isVirtual && Objects.equals(metadataAlias, that.metadataAlias);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), metadataAlias, isVirtual);
+ }
+ }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Constraint.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Constraint.java
new file mode 100644
index 0000000..dcc0dd8
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Constraint.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Integrity constraints, generally referred to simply as constraints, define the valid states of
+ * SQL-data by constraining the values in the base tables.
+ */
+@PublicEvolving
+public interface Constraint {
+
+ String getName();
+
+ /**
+ * Constraints can either be enforced or non-enforced. If a constraint is enforced it will be
+ * checked whenever any SQL statement is executed that results in data or schema changes. If the
+ * constraint is not enforced the owner of the data is responsible for ensuring data integrity.
+ * Flink will rely the information is valid and might use it for query optimisations.
+ */
+ boolean isEnforced();
+
+ /** Tells what kind of constraint it is, e.g. PRIMARY KEY, UNIQUE, ... */
+ ConstraintType getType();
+
+ /** Prints the constraint in a readable way. */
+ String asSummaryString();
+
+ /**
+ * Type of the constraint.
+ *
+ * <p>Unique constraints:
+ *
+ * <ul>
+ * <li>UNIQUE - is satisfied if and only if there do not exist two rows that have same
+ * non-null values in the unique columns
+ * <li>PRIMARY KEY - additionally to UNIQUE constraint, it requires none of the values in
+ * specified columns be a null value. Moreover there can be only a single PRIMARY KEY
+ * defined for a Table.
+ * </ul>
+ */
+ enum ConstraintType {
+ PRIMARY_KEY,
+ UNIQUE_KEY
+ }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java
new file mode 100644
index 0000000..1a1ec01
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.ROW;
+
+/**
+ * Schema of a table or view consisting of columns, constraints, and watermark specifications.
+ *
+ * <p>This class is the result of resolving a {@link Schema} into a final validated representation.
+ *
+ * <ul>
+ * <li>Data types and functions have been expanded to fully qualified identifiers.
+ * <li>Time attributes are represented in the column's data type.
+ * <li>{@link Expression}s have been translated to {@link ResolvedExpression}.
+ * <li>{@link AbstractDataType}s have been translated to {@link DataType}.
+ * </ul>
+ */
+@PublicEvolving
+public final class ResolvedSchema {
+
+ private final List<Column> columns;
+ private final List<WatermarkSpec> watermarkSpecs;
+ private final @Nullable UniqueConstraint primaryKey;
+
+ ResolvedSchema(
+ List<Column> columns,
+ List<WatermarkSpec> watermarkSpecs,
+ @Nullable UniqueConstraint primaryKey) {
+ this.columns = Preconditions.checkNotNull(columns, "Columns must not be null.");
+ this.watermarkSpecs =
+ Preconditions.checkNotNull(watermarkSpecs, "Watermark specs must not be null.");
+ this.primaryKey = primaryKey;
+ }
+
+ /** Returns the number of {@link Column}s of this schema. */
+ public int getColumnCount() {
+ return columns.size();
+ }
+
+ /** Returns all {@link Column}s of this schema. */
+ public List<Column> getColumns() {
+ return columns;
+ }
+
+ /**
+ * Returns the {@link Column} instance for the given column index.
+ *
+ * @param columnIndex the index of the column
+ */
+ public Optional<Column> getColumn(int columnIndex) {
+ if (columnIndex < 0 || columnIndex >= columns.size()) {
+ return Optional.empty();
+ }
+ return Optional.of(this.columns.get(columnIndex));
+ }
+
+ /**
+ * Returns the {@link Column} instance for the given column name.
+ *
+ * @param columnName the name of the column
+ */
+ public Optional<Column> getColumn(String columnName) {
+ return this.columns.stream()
+ .filter(column -> column.getName().equals(columnName))
+ .findFirst();
+ }
+
+ /**
+ * Returns a list of watermark specifications each consisting of a rowtime attribute and
+ * watermark strategy expression.
+ *
+ * <p>Note: Currently, there is at most one {@link WatermarkSpec} in the list, because we don't
+ * support multiple watermark definitions yet.
+ */
+ public List<WatermarkSpec> getWatermarkSpecs() {
+ return watermarkSpecs;
+ }
+
+ /** Returns the primary key if it has been defined. */
+ public Optional<UniqueConstraint> getPrimaryKey() {
+ return Optional.ofNullable(primaryKey);
+ }
+
+ /**
+ * Converts all columns of this schema into a (possibly nested) row data type.
+ *
+ * <p>This method returns the <b>source-to-query schema</b>.
+ *
+ * <p>Note: The returned row data type contains physical, computed, and metadata columns. Be
+ * careful when using this method in a table source or table sink. In many cases, {@link
+ * #toPhysicalRowDataType()} might be more appropriate.
+ *
+ * @see DataTypes#ROW(DataTypes.Field...)
+ * @see #toPhysicalRowDataType()
+ * @see #toSinkRowDataType()
+ */
+ public DataType toSourceRowDataType() {
+ final DataTypes.Field[] fields =
+ columns.stream()
+ .map(column -> FIELD(column.getName(), column.getDataType()))
+ .toArray(DataTypes.Field[]::new);
+ // the row should never be null
+ return ROW(fields).notNull();
+ }
+
+ /**
+ * Converts all physical columns of this schema into a (possibly nested) row data type.
+ *
+ * <p>Note: The returned row data type contains only physical columns. It does not include
+ * computed or metadata columns.
+ *
+ * @see DataTypes#ROW(DataTypes.Field...)
+ * @see #toSourceRowDataType()
+ * @see #toSinkRowDataType()
+ */
+ public DataType toPhysicalRowDataType() {
+ final DataTypes.Field[] fields =
+ columns.stream()
+ .filter(Column::isPhysical)
+ .map(column -> FIELD(column.getName(), column.getDataType()))
+ .toArray(DataTypes.Field[]::new);
+ // the row should never be null
+ return ROW(fields).notNull();
+ }
+
+ /**
+ * Converts all persisted columns of this schema into a (possibly nested) row data type.
+ *
+ * <p>This method returns the <b>query-to-sink schema</b>.
+ *
+ * <p>Note: Computed columns and virtual columns are excluded in the returned row data type. The
+ * data type contains the columns of {@link #toPhysicalRowDataType()} plus persisted metadata
+ * columns.
+ *
+ * @see DataTypes#ROW(DataTypes.Field...)
+ * @see #toSourceRowDataType()
+ * @see #toPhysicalRowDataType()
+ */
+ public DataType toSinkRowDataType() {
+ final DataTypes.Field[] fields =
+ columns.stream()
+ .filter(Column::isPersisted)
+ .map(column -> FIELD(column.getName(), column.getDataType()))
+ .toArray(DataTypes.Field[]::new);
+ // the row should never be null
+ return ROW(fields).notNull();
+ }
+
+ @Override
+ public String toString() {
+ final List<Object> components = new ArrayList<>();
+ components.addAll(columns);
+ components.addAll(watermarkSpecs);
+ if (primaryKey != null) {
+ components.add(primaryKey);
+ }
+ return components.stream()
+ .map(Objects::toString)
+ .map(s -> " " + s)
+ .collect(Collectors.joining(", \n", "(\n", "\n)"));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final ResolvedSchema that = (ResolvedSchema) o;
+ return Objects.equals(columns, that.columns)
+ && Objects.equals(watermarkSpecs, that.watermarkSpecs)
+ && Objects.equals(primaryKey, that.primaryKey);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(columns, watermarkSpecs, primaryKey);
+ }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/SchemaResolver.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/SchemaResolver.java
new file mode 100644
index 0000000..6f2d947
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/SchemaResolver.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Schema.UnresolvedMetadataColumn;
+
+/** Resolves a {@link Schema} to a validated {@link ResolvedSchema}. */
+public interface SchemaResolver {
+
+ ResolvedSchema resolve(Schema schema);
+
+ /** Returns whether the resolution happens in streaming mode. */
+ boolean isStreamingMode();
+
+ /**
+ * Returns whether metadata columns are supported or an exception should be thrown during the
+ * resolution of {@link UnresolvedMetadataColumn}.
+ */
+ boolean supportsMetadata();
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/UniqueConstraint.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/UniqueConstraint.java
new file mode 100644
index 0000000..a60f0d2
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/UniqueConstraint.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A unique key constraint. It can be declared also as a PRIMARY KEY.
+ *
+ * @see ConstraintType
+ */
+@PublicEvolving
+public final class UniqueConstraint extends AbstractConstraint {
+
+ private final List<String> columns;
+ private final ConstraintType type;
+
+ /** Creates a non enforced {@link ConstraintType#PRIMARY_KEY} constraint. */
+ public static UniqueConstraint primaryKey(String name, List<String> columns) {
+ return new UniqueConstraint(name, false, ConstraintType.PRIMARY_KEY, columns);
+ }
+
+ private UniqueConstraint(
+ String name, boolean enforced, ConstraintType type, List<String> columns) {
+ super(name, enforced);
+
+ this.columns = checkNotNull(columns);
+ this.type = checkNotNull(type);
+ }
+
+ /** List of column names for which the primary key was defined. */
+ public List<String> getColumns() {
+ return columns;
+ }
+
+ @Override
+ public ConstraintType getType() {
+ return type;
+ }
+
+ /**
+ * Returns constraint's summary. All constraints summary will be formatted as
+ *
+ * <pre>
+ * CONSTRAINT [constraint-name] [constraint-type] ([constraint-definition])
+ *
+ * E.g CONSTRAINT pk PRIMARY KEY (f0, f1) NOT ENFORCED
+ * </pre>
+ */
+ @Override
+ public final String asSummaryString() {
+ final String typeString;
+ switch (getType()) {
+ case PRIMARY_KEY:
+ typeString = "PRIMARY KEY";
+ break;
+ case UNIQUE_KEY:
+ typeString = "UNIQUE";
+ break;
+ default:
+ throw new IllegalStateException("Unknown key type: " + getType());
+ }
+
+ return String.format(
+ "CONSTRAINT %s %s (%s)%s",
+ EncodingUtils.escapeIdentifier(getName()),
+ typeString,
+ columns.stream()
+ .map(EncodingUtils::escapeIdentifier)
+ .collect(Collectors.joining(", ")),
+ isEnforced() ? "" : " NOT ENFORCED");
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ UniqueConstraint that = (UniqueConstraint) o;
+ return Objects.equals(columns, that.columns) && type == that.type;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), columns, type);
+ }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/WatermarkSpec.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/WatermarkSpec.java
new file mode 100644
index 0000000..f3d1e9c
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/WatermarkSpec.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Representation of a watermark specification in a {@link ResolvedSchema}.
+ *
+ * <p>It defines the rowtime attribute and a {@link ResolvedExpression} for watermark generation.
+ */
+@PublicEvolving
+public final class WatermarkSpec {
+
+ private final String rowtimeAttribute;
+
+ private final ResolvedExpression watermarkExpression;
+
+ public WatermarkSpec(String rowtimeAttribute, ResolvedExpression watermarkExpression) {
+ this.rowtimeAttribute =
+ checkNotNull(rowtimeAttribute, "Rowtime attribute must not be null.");
+ this.watermarkExpression =
+ checkNotNull(watermarkExpression, "Watermark expression must not be null.");
+ }
+
+ /**
+ * Returns the name of a rowtime attribute.
+ *
+ * <p>The referenced attribute must be present in the {@link ResolvedSchema} and must be of
+ * {@link TimestampType}.
+ */
+ public String getRowtimeAttribute() {
+ return rowtimeAttribute;
+ }
+
+ /** Returns the {@link ResolvedExpression} for watermark generation. */
+ public ResolvedExpression getWatermarkExpression() {
+ return watermarkExpression;
+ }
+
+ public String asSummaryString() {
+ return "WATERMARK FOR "
+ + String.join(".", EncodingUtils.escapeIdentifier(rowtimeAttribute))
+ + ": "
+ + watermarkExpression.getOutputDataType()
+ + " AS "
+ + watermarkExpression.asSummaryString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final WatermarkSpec that = (WatermarkSpec) o;
+ return rowtimeAttribute.equals(that.rowtimeAttribute)
+ && watermarkExpression.equals(that.watermarkExpression);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(rowtimeAttribute, watermarkExpression);
+ }
+
+ @Override
+ public String toString() {
+ return asSummaryString();
+ }
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/expressions/utils/ResolvedExpressionMock.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/expressions/utils/ResolvedExpressionMock.java
new file mode 100644
index 0000000..533ee2a
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/expressions/utils/ResolvedExpressionMock.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions.utils;
+
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionVisitor;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Supplier;
+
+/** {@link ResolvedExpression} mock for testing purposes. */
+public class ResolvedExpressionMock implements ResolvedExpression {
+
+ private final DataType outputDataType;
+
+ private final Supplier<String> stringRepresentation;
+
+ public ResolvedExpressionMock(DataType outputDataType, Supplier<String> stringRepresentation) {
+ this.outputDataType = outputDataType;
+ this.stringRepresentation = stringRepresentation;
+ }
+
+ @Override
+ public String asSummaryString() {
+ return stringRepresentation.get();
+ }
+
+ @Override
+ public List<Expression> getChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public <R> R accept(ExpressionVisitor<R> visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
+ public String asSerializableString() {
+ return stringRepresentation.get();
+ }
+
+ @Override
+ public DataType getOutputDataType() {
+ return outputDataType;
+ }
+
+ @Override
+ public List<ResolvedExpression> getResolvedChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public String toString() {
+ return asSummaryString();
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
index 247a442..f04e529 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
@@ -21,8 +21,8 @@ package org.apache.flink.table.api
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.table.api.internal.TableEnvironmentInternal
+import org.apache.flink.table.descriptors.ConnectorDescriptor
import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR
-import org.apache.flink.table.descriptors.{ConnectorDescriptor, Schema}
import org.apache.flink.table.expressions.utils._
import org.apache.flink.table.runtime.utils.CommonTestData
import org.apache.flink.table.sources.{CsvTableSource, TableSource}
@@ -394,7 +394,7 @@ class TableSourceTest extends TableTestBase {
context
}
}).withSchema(
- new Schema()
+ new org.apache.flink.table.descriptors.Schema()
.schema(TableSchema.builder()
.field("id", DataTypes.INT())
.field("name", DataTypes.STRING())