You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/02/26 11:56:51 UTC

[flink] 01/02: [FLINK-21435][table] Implement Schema, ResolvedSchema, SchemaResolver

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 011a34982facc817fef40de74c4b95497830a037
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed Feb 3 08:39:21 2021 +0100

    [FLINK-21435][table] Implement Schema, ResolvedSchema, SchemaResolver
    
    This closes #14996.
---
 flink-python/pyflink/table/descriptors.py          |   2 +-
 .../flink/table/catalog/DefaultSchemaResolver.java | 353 +++++++++
 .../expressions/resolver/ExpressionResolver.java   |   7 +-
 .../resolver/rules/ResolveSqlCallRule.java         |   5 +
 .../expressions/resolver/rules/ResolverRule.java   |   3 +
 .../flink/table/catalog/SchemaResolutionTest.java  | 384 +++++++++
 .../java/org/apache/flink/table/api/Schema.java    | 857 +++++++++++++++++++++
 .../flink/table/catalog/AbstractConstraint.java    |  70 ++
 .../org/apache/flink/table/catalog/Column.java     | 330 ++++++++
 .../org/apache/flink/table/catalog/Constraint.java |  63 ++
 .../apache/flink/table/catalog/ResolvedSchema.java | 216 ++++++
 .../apache/flink/table/catalog/SchemaResolver.java |  37 +
 .../flink/table/catalog/UniqueConstraint.java      | 116 +++
 .../apache/flink/table/catalog/WatermarkSpec.java  |  95 +++
 .../expressions/utils/ResolvedExpressionMock.java  |  76 ++
 .../apache/flink/table/api/TableSourceTest.scala   |   4 +-
 16 files changed, 2614 insertions(+), 4 deletions(-)

diff --git a/flink-python/pyflink/table/descriptors.py b/flink-python/pyflink/table/descriptors.py
index 73bae1b..f44c872 100644
--- a/flink-python/pyflink/table/descriptors.py
+++ b/flink-python/pyflink/table/descriptors.py
@@ -193,7 +193,7 @@ class Schema(Descriptor):
                         event-time attribute.
         """
         gateway = get_gateway()
-        self._j_schema = gateway.jvm.Schema()
+        self._j_schema = gateway.jvm.org.apache.flink.table.descriptors.Schema()
         super(Schema, self).__init__(self._j_schema)
 
         if schema is not None:
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java
new file mode 100644
index 0000000..e53fc4b
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Schema.UnresolvedComputedColumn;
+import org.apache.flink.table.api.Schema.UnresolvedMetadataColumn;
+import org.apache.flink.table.api.Schema.UnresolvedPhysicalColumn;
+import org.apache.flink.table.api.Schema.UnresolvedPrimaryKey;
+import org.apache.flink.table.api.Schema.UnresolvedWatermarkSpec;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Column.ComputedColumn;
+import org.apache.flink.table.catalog.Column.MetadataColumn;
+import org.apache.flink.table.catalog.Column.PhysicalColumn;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.LocalReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver.ExpressionResolverBuilder;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.table.expressions.ApiExpressionUtils.localRef;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isProctimeAttribute;
+import static org.apache.flink.table.types.utils.DataTypeUtils.replaceLogicalType;
+
+/** Default implementation of {@link SchemaResolver}. */
+@Internal
+class DefaultSchemaResolver implements SchemaResolver {
+
+    private final boolean isStreamingMode;
+    private final boolean supportsMetadata;
+    private final DataTypeFactory dataTypeFactory;
+    private final ExpressionResolverBuilder resolverBuilder;
+
+    DefaultSchemaResolver(
+            boolean isStreamingMode,
+            boolean supportsMetadata,
+            DataTypeFactory dataTypeFactory,
+            ExpressionResolverBuilder resolverBuilder) {
+        this.isStreamingMode = isStreamingMode;
+        this.supportsMetadata = supportsMetadata;
+        this.dataTypeFactory = dataTypeFactory;
+        this.resolverBuilder = resolverBuilder;
+    }
+
+    public SchemaResolver withMetadata(boolean supportsMetadata) {
+        return new DefaultSchemaResolver(
+                isStreamingMode, supportsMetadata, dataTypeFactory, resolverBuilder);
+    }
+
+    @Override
+    public ResolvedSchema resolve(Schema schema) {
+        final List<Column> columns = resolveColumns(schema.getColumns());
+
+        final List<WatermarkSpec> watermarkSpecs =
+                resolveWatermarkSpecs(schema.getWatermarkSpecs(), columns);
+
+        final List<Column> columnsWithRowtime = adjustRowtimeAttributes(watermarkSpecs, columns);
+
+        final UniqueConstraint primaryKey =
+                resolvePrimaryKey(schema.getPrimaryKey().orElse(null), columnsWithRowtime);
+
+        return new ResolvedSchema(columnsWithRowtime, watermarkSpecs, primaryKey);
+    }
+
+    @Override
+    public boolean isStreamingMode() {
+        return isStreamingMode;
+    }
+
+    @Override
+    public boolean supportsMetadata() {
+        return supportsMetadata;
+    }
+
+    // --------------------------------------------------------------------------------------------
+
+    private List<Column> resolveColumns(List<Schema.UnresolvedColumn> unresolvedColumns) {
+
+        validateDuplicateColumns(unresolvedColumns);
+
+        final Column[] resolvedColumns = new Column[unresolvedColumns.size()];
+        // process source columns first before computed columns
+        for (int pos = 0; pos < unresolvedColumns.size(); pos++) {
+            final Schema.UnresolvedColumn unresolvedColumn = unresolvedColumns.get(pos);
+            if (unresolvedColumn instanceof UnresolvedPhysicalColumn) {
+                resolvedColumns[pos] =
+                        resolvePhysicalColumn((UnresolvedPhysicalColumn) unresolvedColumn);
+            } else if (unresolvedColumn instanceof UnresolvedMetadataColumn) {
+                resolvedColumns[pos] =
+                        resolveMetadataColumn((UnresolvedMetadataColumn) unresolvedColumn);
+            } else if (!(unresolvedColumn instanceof UnresolvedComputedColumn)) {
+                throw new IllegalArgumentException(
+                        "Unknown unresolved column type: " + unresolvedColumn.getClass().getName());
+            }
+        }
+        // fill in computed columns
+        final List<Column> sourceColumns =
+                Stream.of(resolvedColumns).filter(Objects::nonNull).collect(Collectors.toList());
+        for (int pos = 0; pos < unresolvedColumns.size(); pos++) {
+            final Schema.UnresolvedColumn unresolvedColumn = unresolvedColumns.get(pos);
+            if (unresolvedColumn instanceof UnresolvedComputedColumn) {
+                resolvedColumns[pos] =
+                        resolveComputedColumn(
+                                (UnresolvedComputedColumn) unresolvedColumn, sourceColumns);
+            }
+        }
+
+        return Arrays.asList(resolvedColumns);
+    }
+
+    private PhysicalColumn resolvePhysicalColumn(UnresolvedPhysicalColumn unresolvedColumn) {
+        return Column.physical(
+                unresolvedColumn.getName(),
+                dataTypeFactory.createDataType(unresolvedColumn.getDataType()));
+    }
+
+    private MetadataColumn resolveMetadataColumn(UnresolvedMetadataColumn unresolvedColumn) {
+        if (!supportsMetadata) {
+            throw new ValidationException(
+                    "Metadata columns are not supported in a schema at the current location.");
+        }
+        return Column.metadata(
+                unresolvedColumn.getName(),
+                dataTypeFactory.createDataType(unresolvedColumn.getDataType()),
+                unresolvedColumn.getMetadataKey(),
+                unresolvedColumn.isVirtual());
+    }
+
+    private ComputedColumn resolveComputedColumn(
+            UnresolvedComputedColumn unresolvedColumn, List<Column> inputColumns) {
+        final ResolvedExpression resolvedExpression;
+        try {
+            resolvedExpression = resolveExpression(inputColumns, unresolvedColumn.getExpression());
+        } catch (Exception e) {
+            throw new ValidationException(
+                    String.format(
+                            "Invalid expression for computed column '%s'.",
+                            unresolvedColumn.getName()),
+                    e);
+        }
+        return Column.computed(unresolvedColumn.getName(), resolvedExpression);
+    }
+
+    private void validateDuplicateColumns(List<Schema.UnresolvedColumn> columns) {
+        final List<String> names =
+                columns.stream().map(Schema.UnresolvedColumn::getName).collect(Collectors.toList());
+        final List<String> duplicates =
+                names.stream()
+                        .filter(name -> Collections.frequency(names, name) > 1)
+                        .distinct()
+                        .collect(Collectors.toList());
+        if (duplicates.size() > 0) {
+            throw new ValidationException(
+                    String.format(
+                            "Schema must not contain duplicate column names. Found duplicates: %s",
+                            duplicates));
+        }
+    }
+
+    private List<WatermarkSpec> resolveWatermarkSpecs(
+            List<UnresolvedWatermarkSpec> unresolvedWatermarkSpecs, List<Column> inputColumns) {
+        if (unresolvedWatermarkSpecs.size() == 0) {
+            return Collections.emptyList();
+        }
+        if (unresolvedWatermarkSpecs.size() > 1) {
+            throw new ValidationException("Multiple watermark definitions are not supported yet.");
+        }
+        final UnresolvedWatermarkSpec watermarkSpec = unresolvedWatermarkSpecs.get(0);
+
+        // validate time attribute
+        final String timeColumn = watermarkSpec.getColumnName();
+        validateTimeColumn(timeColumn, inputColumns);
+
+        // resolve watermark expression
+        final ResolvedExpression watermarkExpression;
+        try {
+            watermarkExpression =
+                    resolveExpression(inputColumns, watermarkSpec.getWatermarkExpression());
+        } catch (Exception e) {
+            throw new ValidationException(
+                    String.format(
+                            "Invalid expression for watermark '%s'.", watermarkSpec.toString()),
+                    e);
+        }
+        validateWatermarkExpression(watermarkExpression.getOutputDataType().getLogicalType());
+
+        return Collections.singletonList(
+                new WatermarkSpec(watermarkSpec.getColumnName(), watermarkExpression));
+    }
+
+    private void validateTimeColumn(String columnName, List<Column> columns) {
+        final Optional<Column> timeColumn =
+                columns.stream().filter(c -> c.getName().equals(columnName)).findFirst();
+        if (!timeColumn.isPresent()) {
+            throw new ValidationException(
+                    String.format(
+                            "Invalid column name '%s' for rowtime attribute in watermark declaration. Available columns are: %s",
+                            columnName,
+                            columns.stream().map(Column::getName).collect(Collectors.toList())));
+        }
+        final LogicalType timeFieldType = timeColumn.get().getDataType().getLogicalType();
+        if (!hasRoot(timeFieldType, LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)
+                || getPrecision(timeFieldType) != 3) {
+            throw new ValidationException(
+                    "Invalid data type of time field for watermark definition. "
+                            + "The field must be of type TIMESTAMP(3) WITHOUT TIME ZONE.");
+        }
+        if (isProctimeAttribute(timeFieldType)) {
+            throw new ValidationException(
+                    "A watermark can not be defined for a processing-time attribute.");
+        }
+    }
+
+    private void validateWatermarkExpression(LogicalType watermarkType) {
+        if (!hasRoot(watermarkType, LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)
+                || getPrecision(watermarkType) != 3) {
+            throw new ValidationException(
+                    "Invalid data type of expression for watermark definition. "
+                            + "The field must be of type TIMESTAMP(3) WITHOUT TIME ZONE.");
+        }
+    }
+
+    /** Updates the data type of columns that are referenced by {@link WatermarkSpec}. */
+    private List<Column> adjustRowtimeAttributes(
+            List<WatermarkSpec> watermarkSpecs, List<Column> columns) {
+        return columns.stream()
+                .map(column -> adjustRowtimeAttribute(watermarkSpecs, column))
+                .collect(Collectors.toList());
+    }
+
+    private Column adjustRowtimeAttribute(List<WatermarkSpec> watermarkSpecs, Column column) {
+        final String name = column.getName();
+        final DataType dataType = column.getDataType();
+        final boolean hasWatermarkSpec =
+                watermarkSpecs.stream().anyMatch(s -> s.getRowtimeAttribute().equals(name));
+        if (hasWatermarkSpec && isStreamingMode) {
+            final TimestampType originalType = (TimestampType) dataType.getLogicalType();
+            final LogicalType rowtimeType =
+                    new TimestampType(
+                            originalType.isNullable(),
+                            TimestampKind.ROWTIME,
+                            originalType.getPrecision());
+            return column.copy(replaceLogicalType(dataType, rowtimeType));
+        }
+        return column;
+    }
+
+    private @Nullable UniqueConstraint resolvePrimaryKey(
+            @Nullable UnresolvedPrimaryKey unresolvedPrimaryKey, List<Column> columns) {
+        if (unresolvedPrimaryKey == null) {
+            return null;
+        }
+
+        final UniqueConstraint primaryKey =
+                UniqueConstraint.primaryKey(
+                        unresolvedPrimaryKey.getConstraintName(),
+                        unresolvedPrimaryKey.getColumnNames());
+
+        validatePrimaryKey(primaryKey, columns);
+
+        return primaryKey;
+    }
+
+    private void validatePrimaryKey(UniqueConstraint primaryKey, List<Column> columns) {
+        final Map<String, Column> columnsByNameLookup =
+                columns.stream().collect(Collectors.toMap(Column::getName, Function.identity()));
+
+        final Set<String> duplicateColumns =
+                primaryKey.getColumns().stream()
+                        .filter(name -> Collections.frequency(primaryKey.getColumns(), name) > 1)
+                        .collect(Collectors.toSet());
+
+        if (!duplicateColumns.isEmpty()) {
+            throw new ValidationException(
+                    String.format(
+                            "Invalid primary key '%s'. A primary key must not contain duplicate columns. Found: %s",
+                            primaryKey.getName(), duplicateColumns));
+        }
+
+        for (String columnName : primaryKey.getColumns()) {
+            Column column = columnsByNameLookup.get(columnName);
+            if (column == null) {
+                throw new ValidationException(
+                        String.format(
+                                "Invalid primary key '%s'. Column '%s' does not exist.",
+                                primaryKey.getName(), columnName));
+            }
+
+            if (!column.isPhysical()) {
+                throw new ValidationException(
+                        String.format(
+                                "Invalid primary key '%s'. Column '%s' is not a physical column.",
+                                primaryKey.getName(), columnName));
+            }
+
+            final LogicalType columnType = column.getDataType().getLogicalType();
+            if (columnType.isNullable()) {
+                throw new ValidationException(
+                        String.format(
+                                "Invalid primary key '%s'. Column '%s' is nullable.",
+                                primaryKey.getName(), columnName));
+            }
+        }
+    }
+
+    private ResolvedExpression resolveExpression(List<Column> columns, Expression expression) {
+        final LocalReferenceExpression[] localRefs =
+                columns.stream()
+                        .map(c -> localRef(c.getName(), c.getDataType()))
+                        .toArray(LocalReferenceExpression[]::new);
+        return resolverBuilder
+                .withLocalReferences(localRefs)
+                .build()
+                .resolve(Collections.singletonList(expression))
+                .get(0);
+    }
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
index 39d4a4a..5b9f830 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
@@ -308,6 +308,11 @@ public class ExpressionResolver {
         }
 
         @Override
+        public List<LocalReferenceExpression> getLocalReferences() {
+            return new ArrayList<>(localReferences.values());
+        }
+
+        @Override
         public Optional<LocalOverWindow> getOverWindow(Expression alias) {
             return Optional.ofNullable(localOverWindows.get(alias));
         }
@@ -445,7 +450,7 @@ public class ExpressionResolver {
 
         public ExpressionResolverBuilder withLocalReferences(
                 LocalReferenceExpression... localReferences) {
-            this.localReferences.addAll(Arrays.asList(localReferences));
+            this.localReferences = Arrays.asList(localReferences);
             return this;
         }
 
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveSqlCallRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveSqlCallRule.java
index f8bb3b3..cf2d74b 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveSqlCallRule.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveSqlCallRule.java
@@ -53,10 +53,15 @@ final class ResolveSqlCallRule implements ResolverRule {
             final SqlExpressionResolver resolver = resolutionContext.sqlExpressionResolver();
 
             final TableSchema.Builder builder = TableSchema.builder();
+            // input references
             resolutionContext
                     .referenceLookup()
                     .getAllInputFields()
                     .forEach(f -> builder.field(f.getName(), f.getOutputDataType()));
+            // local references
+            resolutionContext
+                    .getLocalReferences()
+                    .forEach(refs -> builder.field(refs.getName(), refs.getOutputDataType()));
             return resolver.resolveExpression(sqlCall.getSqlExpression(), builder.build());
         }
 
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java
index c82cf67..1a8c7cf 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java
@@ -84,6 +84,9 @@ public interface ResolverRule {
         /** Access to available local references. */
         Optional<LocalReferenceExpression> getLocalReference(String alias);
 
+        /** Access to available local references. */
+        List<LocalReferenceExpression> getLocalReferences();
+
         /** Access to available local over windows. */
         Optional<LocalOverWindow> getOverWindow(Expression alias);
     }
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
new file mode 100644
index 0000000..3e7c029
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.core.testutils.FlinkMatchers;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.expressions.utils.ResolvedExpressionMock;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.utils.DataTypeFactoryMock;
+import org.apache.flink.table.utils.FunctionLookupMock;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+
+import static org.apache.flink.table.api.Expressions.callSql;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isProctimeAttribute;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isRowtimeAttribute;
+import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Tests for {@link Schema}, {@link DefaultSchemaResolver}, and {@link ResolvedSchema}. */
+public class SchemaResolutionTest {
+
+    private static final String COMPUTED_SQL = "orig_ts - INTERVAL '60' MINUTE";
+
+    private static final ResolvedExpression COMPUTED_COLUMN_RESOLVED =
+            new ResolvedExpressionMock(DataTypes.TIMESTAMP(3), () -> COMPUTED_SQL);
+
+    private static final String WATERMARK_SQL = "ts - INTERVAL '5' SECOND";
+
+    private static final ResolvedExpression WATERMARK_RESOLVED =
+            new ResolvedExpressionMock(DataTypes.TIMESTAMP(3), () -> WATERMARK_SQL);
+
+    private static final String PROCTIME_SQL = "PROCTIME()";
+
+    private static final ResolvedExpression PROCTIME_RESOLVED =
+            new ResolvedExpressionMock(
+                    fromLogicalToDataType(new TimestampType(false, TimestampKind.PROCTIME, 3)),
+                    () -> PROCTIME_SQL);
+
+    private static final Schema SCHEMA =
+            Schema.newBuilder()
+                    .primaryKeyNamed("primary_constraint", "id") // out of order
+                    .column("id", DataTypes.INT().notNull())
+                    .column("counter", DataTypes.INT().notNull())
+                    .column("payload", "ROW<name STRING, age INT, flag BOOLEAN>")
+                    .columnByMetadata("topic", DataTypes.STRING(), true)
+                    .columnByExpression("ts", callSql(COMPUTED_SQL)) // out of order API expression
+                    .columnByMetadata("orig_ts", DataTypes.TIMESTAMP(3), "timestamp")
+                    .watermark("ts", WATERMARK_SQL)
+                    .columnByExpression("proctime", PROCTIME_SQL)
+                    .build();
+
+    @Test
+    public void testSchemaResolution() {
+        final ResolvedSchema expectedSchema =
+                new ResolvedSchema(
+                        Arrays.asList(
+                                Column.physical("id", DataTypes.INT().notNull()),
+                                Column.physical("counter", DataTypes.INT().notNull()),
+                                Column.physical(
+                                        "payload",
+                                        DataTypes.ROW(
+                                                DataTypes.FIELD("name", DataTypes.STRING()),
+                                                DataTypes.FIELD("age", DataTypes.INT()),
+                                                DataTypes.FIELD("flag", DataTypes.BOOLEAN()))),
+                                Column.metadata("topic", DataTypes.STRING(), true),
+                                Column.computed("ts", COMPUTED_COLUMN_RESOLVED),
+                                Column.metadata("orig_ts", DataTypes.TIMESTAMP(3), "timestamp"),
+                                Column.computed("proctime", PROCTIME_RESOLVED)),
+                        Collections.singletonList(new WatermarkSpec("ts", WATERMARK_RESOLVED)),
+                        UniqueConstraint.primaryKey(
+                                "primary_constraint", Collections.singletonList("id")));
+
+        final ResolvedSchema actualStreamSchema = resolveSchema(SCHEMA, true, true);
+        {
+            assertThat(actualStreamSchema, equalTo(expectedSchema));
+            assertTrue(isRowtimeAttribute(getType(actualStreamSchema, "ts")));
+            assertTrue(isProctimeAttribute(getType(actualStreamSchema, "proctime")));
+        }
+
+        final ResolvedSchema actualBatchSchema = resolveSchema(SCHEMA, false, true);
+        {
+            assertThat(actualBatchSchema, equalTo(expectedSchema));
+            assertFalse(isRowtimeAttribute(getType(actualBatchSchema, "ts")));
+            assertTrue(isProctimeAttribute(getType(actualBatchSchema, "proctime")));
+        }
+    }
+
+    @Test
+    public void testSchemaResolutionErrors() {
+
+        // columns
+
+        testError(
+                Schema.newBuilder().fromSchema(SCHEMA).column("id", DataTypes.STRING()).build(),
+                "Schema must not contain duplicate column names.");
+
+        testError(
+                SCHEMA,
+                "Metadata columns are not supported in a schema at the current location.",
+                true,
+                false);
+
+        testError(
+                Schema.newBuilder().columnByExpression("invalid", callSql("INVALID")).build(),
+                "Invalid expression for computed column 'invalid'.");
+
+        // time attributes and watermarks
+
+        testError(
+                Schema.newBuilder()
+                        .column("ts", DataTypes.BOOLEAN())
+                        .watermark("ts", callSql(WATERMARK_SQL))
+                        .build(),
+                "Invalid data type of time field for watermark definition. The field must be of type TIMESTAMP(3) WITHOUT TIME ZONE.");
+
+        testError(
+                Schema.newBuilder()
+                        .column("ts", DataTypes.TIMESTAMP(3))
+                        .watermark("ts", callSql("INVALID"))
+                        .build(),
+                "Invalid expression for watermark 'WATERMARK FOR `ts` AS [INVALID]'.");
+
+        testError(
+                Schema.newBuilder()
+                        .column("ts", DataTypes.TIMESTAMP(3))
+                        .watermark("other_ts", callSql(WATERMARK_SQL))
+                        .build(),
+                "Invalid column name 'other_ts' for rowtime attribute in watermark declaration. Available columns are: [ts]");
+
+        testError(
+                Schema.newBuilder().fromSchema(SCHEMA).watermark("orig_ts", WATERMARK_SQL).build(),
+                "Multiple watermark definitions are not supported yet.");
+
+        testError(
+                Schema.newBuilder()
+                        .columnByExpression("ts", PROCTIME_SQL)
+                        .watermark("ts", WATERMARK_SQL)
+                        .build(),
+                "A watermark can not be defined for a processing-time attribute.");
+
+        // primary keys
+
+        testError(
+                Schema.newBuilder().column("id", DataTypes.INT()).primaryKey("INVALID").build(),
+                "Column 'INVALID' does not exist.");
+
+        testError(
+                Schema.newBuilder()
+                        .column("nullable_col", DataTypes.INT())
+                        .primaryKey("nullable_col")
+                        .build(),
+                "Column 'nullable_col' is nullable.");
+
+        testError(
+                Schema.newBuilder()
+                        .column("orig_ts", DataTypes.TIMESTAMP(3))
+                        .columnByExpression("ts", COMPUTED_SQL)
+                        .primaryKey("ts")
+                        .build(),
+                "Column 'ts' is not a physical column.");
+
+        testError(
+                Schema.newBuilder().column("id", DataTypes.INT()).primaryKey("id", "id").build(),
+                "Invalid primary key 'PK_id_id'. A primary key must not contain duplicate columns. Found: [id]");
+    }
+
+    @Test
+    public void testUnresolvedSchemaString() {
+        assertThat(
+                SCHEMA.toString(),
+                equalTo(
+                        "(\n"
+                                + "  `id` INT NOT NULL, \n"
+                                + "  `counter` INT NOT NULL, \n"
+                                + "  `payload` [ROW<name STRING, age INT, flag BOOLEAN>], \n"
+                                + "  `topic` METADATA VIRTUAL, \n"
+                                + "  `ts` AS [orig_ts - INTERVAL '60' MINUTE], \n"
+                                + "  `orig_ts` METADATA FROM 'timestamp', \n"
+                                + "  `proctime` AS [PROCTIME()], \n"
+                                + "  WATERMARK FOR `ts` AS [ts - INTERVAL '5' SECOND], \n"
+                                + "  CONSTRAINT `primary_constraint` PRIMARY KEY (`id`) NOT ENFORCED\n"
+                                + ")"));
+    }
+
+    @Test
+    public void testResolvedSchemaString() {
+        final ResolvedSchema resolvedSchema = resolveSchema(SCHEMA);
+        assertThat(
+                resolvedSchema.toString(),
+                equalTo(
+                        "(\n"
+                                + "  `id` INT NOT NULL, \n"
+                                + "  `counter` INT NOT NULL, \n"
+                                + "  `payload` ROW<`name` STRING, `age` INT, `flag` BOOLEAN>, \n"
+                                + "  `topic` STRING METADATA VIRTUAL, \n"
+                                + "  `ts` TIMESTAMP(3) *ROWTIME* AS orig_ts - INTERVAL '60' MINUTE, \n"
+                                + "  `orig_ts` TIMESTAMP(3) METADATA FROM 'timestamp', \n"
+                                + "  `proctime` TIMESTAMP(3) NOT NULL *PROCTIME* AS PROCTIME(), \n"
+                                + "  WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - INTERVAL '5' SECOND, \n"
+                                + "  CONSTRAINT `primary_constraint` PRIMARY KEY (`id`) NOT ENFORCED\n"
+                                + ")"));
+    }
+
+    @Test
+    public void testGeneratedConstraintName() {
+        final Schema schema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.STRING())
+                        .column("c", DataTypes.STRING())
+                        .primaryKey("b", "a")
+                        .build();
+        assertThat(
+                schema.getPrimaryKey().orElseThrow(IllegalStateException::new).getConstraintName(),
+                equalTo("PK_b_a"));
+    }
+
+    @Test
+    public void testSinkRowDataType() {
+        final ResolvedSchema resolvedSchema = resolveSchema(SCHEMA);
+        final DataType expectedDataType =
+                DataTypes.ROW(
+                                DataTypes.FIELD("id", DataTypes.INT().notNull()),
+                                DataTypes.FIELD("counter", DataTypes.INT().notNull()),
+                                DataTypes.FIELD(
+                                        "payload",
+                                        DataTypes.ROW(
+                                                DataTypes.FIELD("name", DataTypes.STRING()),
+                                                DataTypes.FIELD("age", DataTypes.INT()),
+                                                DataTypes.FIELD("flag", DataTypes.BOOLEAN()))),
+                                DataTypes.FIELD("orig_ts", DataTypes.TIMESTAMP(3)))
+                        .notNull();
+        assertThat(resolvedSchema.toSinkRowDataType(), equalTo(expectedDataType));
+    }
+
+    @Test
+    public void testPhysicalRowDataType() {
+        final ResolvedSchema resolvedSchema1 = resolveSchema(SCHEMA);
+        final DataType expectedDataType =
+                DataTypes.ROW(
+                                DataTypes.FIELD("id", DataTypes.INT().notNull()),
+                                DataTypes.FIELD("counter", DataTypes.INT().notNull()),
+                                DataTypes.FIELD(
+                                        "payload",
+                                        DataTypes.ROW(
+                                                DataTypes.FIELD("name", DataTypes.STRING()),
+                                                DataTypes.FIELD("age", DataTypes.INT()),
+                                                DataTypes.FIELD("flag", DataTypes.BOOLEAN()))))
+                        .notNull();
+
+        final DataType physicalDataType1 = resolvedSchema1.toPhysicalRowDataType();
+        assertThat(physicalDataType1, equalTo(expectedDataType));
+
+        final ResolvedSchema resolvedSchema2 =
+                resolveSchema(Schema.newBuilder().fromRowDataType(physicalDataType1).build());
+        assertThat(resolvedSchema2.toPhysicalRowDataType(), equalTo(physicalDataType1));
+    }
+
+    @Test
+    public void testSourceRowDataType() {
+        final ResolvedSchema resolvedSchema = resolveSchema(SCHEMA, true, true);
+        final DataType expectedDataType =
+                DataTypes.ROW(
+                                DataTypes.FIELD("id", DataTypes.INT().notNull()),
+                                DataTypes.FIELD("counter", DataTypes.INT().notNull()),
+                                DataTypes.FIELD(
+                                        "payload",
+                                        DataTypes.ROW(
+                                                DataTypes.FIELD("name", DataTypes.STRING()),
+                                                DataTypes.FIELD("age", DataTypes.INT()),
+                                                DataTypes.FIELD("flag", DataTypes.BOOLEAN()))),
+                                DataTypes.FIELD("topic", DataTypes.STRING()),
+                                DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)),
+                                DataTypes.FIELD("orig_ts", DataTypes.TIMESTAMP(3)),
+                                DataTypes.FIELD("proctime", DataTypes.TIMESTAMP(3).notNull()))
+                        .notNull();
+        assertThat(resolvedSchema.toSourceRowDataType(), equalTo(expectedDataType));
+    }
+
+    // --------------------------------------------------------------------------------------------
+
+    private static void testError(Schema schema, String errorMessage) {
+        testError(schema, errorMessage, true, true);
+    }
+
+    private static void testError(
+            Schema schema, String errorMessage, boolean isStreaming, boolean supportsMetadata) {
+        try {
+            resolveSchema(schema, isStreaming, supportsMetadata);
+            fail("Error message expected: " + errorMessage);
+        } catch (Throwable t) {
+            assertThat(t, FlinkMatchers.containsMessage(errorMessage));
+        }
+    }
+
+    private static ResolvedSchema resolveSchema(Schema schema) {
+        return resolveSchema(schema, true, true);
+    }
+
+    private static ResolvedSchema resolveSchema(
+            Schema schema, boolean isStreamingMode, boolean supportsMetadata) {
+        final SchemaResolver resolver =
+                new DefaultSchemaResolver(
+                        isStreamingMode,
+                        supportsMetadata,
+                        dataTypeFactory(),
+                        expressionResolverBuilder());
+        return resolver.resolve(schema);
+    }
+
+    private static ExpressionResolver.ExpressionResolverBuilder expressionResolverBuilder() {
+        return ExpressionResolver.resolverFor(
+                new TableConfig(),
+                name -> Optional.empty(),
+                new FunctionLookupMock(Collections.emptyMap()),
+                dataTypeFactory(),
+                SchemaResolutionTest::resolveSqlExpression);
+    }
+
+    private static DataTypeFactory dataTypeFactory() {
+        return new DataTypeFactoryMock();
+    }
+
+    private static ResolvedExpression resolveSqlExpression(
+            String sqlExpression, TableSchema inputSchema) {
+        switch (sqlExpression) {
+            case COMPUTED_SQL:
+                assertThat(
+                        inputSchema.getFieldDataType("orig_ts").orElse(null),
+                        equalTo(DataTypes.TIMESTAMP(3)));
+                return COMPUTED_COLUMN_RESOLVED;
+            case WATERMARK_SQL:
+                assertThat(
+                        inputSchema.getFieldDataType("ts").orElse(null),
+                        equalTo(DataTypes.TIMESTAMP(3)));
+                return WATERMARK_RESOLVED;
+            case PROCTIME_SQL:
+                return PROCTIME_RESOLVED;
+            default:
+                throw new UnsupportedOperationException("Unknown SQL expression.");
+        }
+    }
+
+    private static LogicalType getType(ResolvedSchema resolvedSchema, String column) {
+        return resolvedSchema
+                .getColumn(column)
+                .orElseThrow(IllegalStateException::new)
+                .getDataType()
+                .getLogicalType();
+    }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Schema.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Schema.java
new file mode 100644
index 0000000..d749a81
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Schema.java
@@ -0,0 +1,857 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.Column.ComputedColumn;
+import org.apache.flink.table.catalog.Column.MetadataColumn;
+import org.apache.flink.table.catalog.Column.PhysicalColumn;
+import org.apache.flink.table.catalog.Constraint;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.SchemaResolver;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.catalog.WatermarkSpec;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.SqlCallExpression;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot;
+
+/**
+ * Schema of a table or view.
+ *
+ * <p>A schema represents the schema part of a {@code CREATE TABLE (schema) WITH (options)} DDL
+ * statement in SQL. It defines columns of different kind, constraints, time attributes, and
+ * watermark strategies. It is possible to reference objects (such as functions or types) across
+ * different catalogs.
+ *
+ * <p>This class is used in the API and catalogs to define an unresolved schema that will be
+ * translated to {@link ResolvedSchema}. Some methods of this class perform basic validation,
+ * however, the main validation happens during the resolution.
+ *
+ * <p>Since an instance of this class is unresolved, it should not be directly persisted. The {@link
+ * #toString()} shows only a summary of the contained objects.
+ */
+@PublicEvolving
+public final class Schema {
+
+    private final List<UnresolvedColumn> columns;
+
+    private final List<UnresolvedWatermarkSpec> watermarkSpecs;
+
+    private final @Nullable UnresolvedPrimaryKey primaryKey;
+
+    private Schema(
+            List<UnresolvedColumn> columns,
+            List<UnresolvedWatermarkSpec> watermarkSpecs,
+            @Nullable UnresolvedPrimaryKey primaryKey) {
+        this.columns = columns;
+        this.watermarkSpecs = watermarkSpecs;
+        this.primaryKey = primaryKey;
+    }
+
+    /** Builder for configuring and creating instances of {@link Schema}. */
+    public static Schema.Builder newBuilder() {
+        return new Builder();
+    }
+
+    public List<UnresolvedColumn> getColumns() {
+        return columns;
+    }
+
+    public List<UnresolvedWatermarkSpec> getWatermarkSpecs() {
+        return watermarkSpecs;
+    }
+
+    public Optional<UnresolvedPrimaryKey> getPrimaryKey() {
+        return Optional.ofNullable(primaryKey);
+    }
+
+    /** Resolves the given {@link Schema} to a validated {@link ResolvedSchema}. */
+    public ResolvedSchema resolve(SchemaResolver resolver) {
+        return resolver.resolve(this);
+    }
+
+    @Override
+    public String toString() {
+        final List<Object> components = new ArrayList<>();
+        components.addAll(columns);
+        components.addAll(watermarkSpecs);
+        if (primaryKey != null) {
+            components.add(primaryKey);
+        }
+        return components.stream()
+                .map(Objects::toString)
+                .map(s -> "  " + s)
+                .collect(Collectors.joining(", \n", "(\n", "\n)"));
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        Schema schema = (Schema) o;
+        return columns.equals(schema.columns)
+                && watermarkSpecs.equals(schema.watermarkSpecs)
+                && Objects.equals(primaryKey, schema.primaryKey);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(columns, watermarkSpecs, primaryKey);
+    }
+
+    // --------------------------------------------------------------------------------------------
+
+    /** A builder for constructing an immutable but still unresolved {@link Schema}. */
+    public static final class Builder {
+
+        private final List<UnresolvedColumn> columns;
+
+        private final List<UnresolvedWatermarkSpec> watermarkSpecs;
+
+        private @Nullable UnresolvedPrimaryKey primaryKey;
+
+        private Builder() {
+            columns = new ArrayList<>();
+            watermarkSpecs = new ArrayList<>();
+        }
+
+        /** Adopts all members from the given unresolved schema. */
+        public Builder fromSchema(Schema unresolvedSchema) {
+            columns.addAll(unresolvedSchema.columns);
+            watermarkSpecs.addAll(unresolvedSchema.watermarkSpecs);
+            if (unresolvedSchema.primaryKey != null) {
+                primaryKeyNamed(
+                        unresolvedSchema.primaryKey.getConstraintName(),
+                        unresolvedSchema.primaryKey.getColumnNames());
+            }
+            return this;
+        }
+
+        /** Adopts all members from the given resolved schema. */
+        public Builder fromResolvedSchema(ResolvedSchema resolvedSchema) {
+            addResolvedColumns(resolvedSchema.getColumns());
+            addResolvedWatermarkSpec(resolvedSchema.getWatermarkSpecs());
+            resolvedSchema.getPrimaryKey().ifPresent(this::addResolvedConstraint);
+            return this;
+        }
+
+        /** Adopts all fields of the given row as physical columns of the schema. */
+        public Builder fromRowDataType(DataType dataType) {
+            Preconditions.checkNotNull(dataType, "Data type must not be null.");
+            Preconditions.checkArgument(
+                    hasRoot(dataType.getLogicalType(), LogicalTypeRoot.ROW),
+                    "Data type of ROW expected.");
+            final List<DataType> fieldDataTypes = dataType.getChildren();
+            final List<String> fieldNames = ((RowType) dataType.getLogicalType()).getFieldNames();
+            IntStream.range(0, fieldDataTypes.size())
+                    .forEach(i -> column(fieldNames.get(i), fieldDataTypes.get(i)));
+            return this;
+        }
+
+        /**
+         * Declares a physical column that is appended to this schema.
+         *
+         * <p>Physical columns are regular columns known from databases. They define the names, the
+         * types, and the order of fields in the physical data. Thus, physical columns represent the
+         * payload that is read from and written to an external system. Connectors and formats use
+         * these columns (in the defined order) to configure themselves. Other kinds of columns can
+         * be declared between physical columns but will not influence the final physical schema.
+         *
+         * @param columnName column name
+         * @param dataType data type of the column
+         */
+        public Builder column(String columnName, AbstractDataType<?> dataType) {
+            Preconditions.checkNotNull(columnName, "Column name must not be null.");
+            Preconditions.checkNotNull(dataType, "Data type must not be null.");
+            columns.add(new UnresolvedPhysicalColumn(columnName, dataType));
+            return this;
+        }
+
+        /**
+         * Declares a physical column that is appended to this schema.
+         *
+         * <p>See {@link #column(String, AbstractDataType)} for a detailed explanation.
+         *
+         * <p>This method uses a type string that can be easily persisted in a durable catalog.
+         *
+         * @param columnName column name
+         * @param serializableTypeString data type of the column as a serializable string
+         * @see LogicalType#asSerializableString()
+         */
+        public Builder column(String columnName, String serializableTypeString) {
+            return column(columnName, DataTypes.of(serializableTypeString));
+        }
+
+        /**
+         * Declares a computed column that is appended to this schema.
+         *
+         * <p>Computed columns are virtual columns that are generated by evaluating an expression
+         * that can reference other columns declared in the same table. Both physical columns and
+         * metadata columns can be accessed. The column itself is not physically stored within the
+         * table. The column’s data type is derived automatically from the given expression and does
+         * not have to be declared manually.
+         *
+         * <p>Computed columns are commonly used for defining time attributes. For example, the
+         * computed column can be used if the original field is not TIMESTAMP(3) type or is nested
+         * in a JSON string.
+         *
+         * <p>Any scalar expression can be used for in-memory/temporary tables. However, currently,
+         * only SQL expressions can be persisted in a catalog. User-defined functions (also defined
+         * in different catalogs) are supported.
+         *
+         * <p>Example: {@code .columnByExpression("ts", $("json_obj").get("ts").cast(TIMESTAMP(3))}
+         *
+         * @param columnName column name
+         * @param expression computation of the column
+         */
+        public Builder columnByExpression(String columnName, Expression expression) {
+            Preconditions.checkNotNull(columnName, "Column name must not be null.");
+            Preconditions.checkNotNull(expression, "Expression must not be null.");
+            columns.add(new UnresolvedComputedColumn(columnName, expression));
+            return this;
+        }
+
+        /**
+         * Declares a computed column that is appended to this schema.
+         *
+         * <p>See {@link #columnByExpression(String, Expression)} for a detailed explanation.
+         *
+         * <p>This method uses a SQL expression that can be easily persisted in a durable catalog.
+         *
+         * <p>Example: {@code .columnByExpression("ts", "CAST(json_obj.ts AS TIMESTAMP(3))")}
+         *
+         * @param columnName column name
+         * @param sqlExpression computation of the column using SQL
+         */
+        public Builder columnByExpression(String columnName, String sqlExpression) {
+            return columnByExpression(columnName, new SqlCallExpression(sqlExpression));
+        }
+
+        /**
+         * Declares a metadata column that is appended to this schema.
+         *
+         * <p>Metadata columns allow to access connector and/or format specific fields for every row
+         * of a table. For example, a metadata column can be used to read and write the timestamp
+         * from and to Kafka records for time-based operations. The connector and format
+         * documentation lists the available metadata fields for every component.
+         *
+         * <p>Every metadata field is identified by a string-based key and has a documented data
+         * type. For convenience, the runtime will perform an explicit cast if the data type of the
+         * column differs from the data type of the metadata field. Of course, this requires that
+         * the two data types are compatible.
+         *
+         * <p>By default, a metadata column can be used for both reading and writing. However, in
+         * many cases an external system provides more read-only metadata fields than writable
+         * fields. Therefore, it is possible to exclude metadata columns from persisting by setting
+         * the {@code isVirtual} flag to {@code true}.
+         *
+         * <p>Note: This method assumes that the metadata key is equal to the column name.
+         *
+         * @param columnName column name
+         * @param dataType data type of the column
+         * @param isVirtual whether the column should be persisted or not
+         */
+        public Builder columnByMetadata(
+                String columnName, AbstractDataType<?> dataType, boolean isVirtual) {
+            Preconditions.checkNotNull(columnName, "Column name must not be null.");
+            Preconditions.checkNotNull(dataType, "Data type must not be null.");
+            columns.add(new UnresolvedMetadataColumn(columnName, dataType, null, isVirtual));
+            return this;
+        }
+
+        /**
+         * Declares a metadata column that is appended to this schema.
+         *
+         * <p>Metadata columns allow to access connector and/or format specific fields for every row
+         * of a table. For example, a metadata column can be used to read and write the timestamp
+         * from and to Kafka records for time-based operations. The connector and format
+         * documentation lists the available metadata fields for every component.
+         *
+         * <p>Every metadata field is identified by a string-based key and has a documented data
+         * type. The metadata key can be omitted if the column name should be used as the
+         * identifying metadata key. For convenience, the runtime will perform an explicit cast if
+         * the data type of the column differs from the data type of the metadata field. Of course,
+         * this requires that the two data types are compatible.
+         *
+         * <p>Note: This method assumes that a metadata column can be used for both reading and
+         * writing.
+         *
+         * @param columnName column name
+         * @param dataType data type of the column
+         * @param metadataKey identifying metadata key, if null the column name will be used as
+         *     metadata key
+         */
+        public Builder columnByMetadata(
+                String columnName, AbstractDataType<?> dataType, @Nullable String metadataKey) {
+            Preconditions.checkNotNull(columnName, "Column name must not be null.");
+            Preconditions.checkNotNull(dataType, "Data type must not be null.");
+            columns.add(new UnresolvedMetadataColumn(columnName, dataType, metadataKey, false));
+            return this;
+        }
+
+        /**
+         * Declares a metadata column that is appended to this schema.
+         *
+         * <p>Metadata columns allow to access connector and/or format specific fields for every row
+         * of a table. For example, a metadata column can be used to read and write the timestamp
+         * from and to Kafka records for time-based operations. The connector and format
+         * documentation lists the available metadata fields for every component.
+         *
+         * <p>Every metadata field is identified by a string-based key and has a documented data
+         * type. The metadata key can be omitted if the column name should be used as the
+         * identifying metadata key. For convenience, the runtime will perform an explicit cast if
+         * the data type of the column differs from the data type of the metadata field. Of course,
+         * this requires that the two data types are compatible.
+         *
+         * <p>By default, a metadata column can be used for both reading and writing. However, in
+         * many cases an external system provides more read-only metadata fields than writable
+         * fields. Therefore, it is possible to exclude metadata columns from persisting by setting
+         * the {@code isVirtual} flag to {@code true}.
+         *
+         * @param columnName column name
+         * @param dataType data type of the column
+         * @param metadataKey identifying metadata key, if null the column name will be used as
+         *     metadata key
+         * @param isVirtual whether the column should be persisted or not
+         */
+        public Builder columnByMetadata(
+                String columnName,
+                AbstractDataType<?> dataType,
+                @Nullable String metadataKey,
+                boolean isVirtual) {
+            Preconditions.checkNotNull(columnName, "Column name must not be null.");
+            columns.add(new UnresolvedMetadataColumn(columnName, dataType, metadataKey, isVirtual));
+            return this;
+        }
+
+        /**
+         * Declares that the given column should serve as an event-time (i.e. rowtime) attribute and
+         * specifies a corresponding watermark strategy as an expression.
+         *
+         * <p>The column must be of type {@code TIMESTAMP(3)} and be a top-level column in the
+         * schema. It may be a computed column.
+         *
+         * <p>The watermark generation expression is evaluated by the framework for every record
+         * during runtime. The framework will periodically emit the largest generated watermark. If
+         * the current watermark is still identical to the previous one, or is null, or the value of
+         * the returned watermark is smaller than that of the last emitted one, then no new
+         * watermark will be emitted. A watermark is emitted in an interval defined by the
+         * configuration.
+         *
+         * <p>Any scalar expression can be used for declaring a watermark strategy for
+         * in-memory/temporary tables. However, currently, only SQL expressions can be persisted in
+         * a catalog. The expression's return data type must be {@code TIMESTAMP(3)}. User-defined
+         * functions (also defined in different catalogs) are supported.
+         *
+         * <p>Example: {@code .watermark("ts", $("ts).minus(lit(5).seconds())}
+         *
+         * @param columnName the column name used as a rowtime attribute
+         * @param watermarkExpression the expression used for watermark generation
+         */
+        public Builder watermark(String columnName, Expression watermarkExpression) {
+            Preconditions.checkNotNull(columnName, "Column name must not be null.");
+            Preconditions.checkNotNull(
+                    watermarkExpression, "Watermark expression must not be null.");
+            this.watermarkSpecs.add(new UnresolvedWatermarkSpec(columnName, watermarkExpression));
+            return this;
+        }
+
+        /**
+         * Declares that the given column should serve as an event-time (i.e. rowtime) attribute and
+         * specifies a corresponding watermark strategy as an expression.
+         *
+         * <p>See {@link #watermark(String, Expression)} for a detailed explanation.
+         *
+         * <p>This method uses a SQL expression that can be easily persisted in a durable catalog.
+         *
+         * <p>Example: {@code .watermark("ts", "ts - INTERVAL '5' SECOND")}
+         */
+        public Builder watermark(String columnName, String sqlExpression) {
+            return watermark(columnName, new SqlCallExpression(sqlExpression));
+        }
+
+        /**
+         * Declares a primary key constraint for a set of given columns. Primary key uniquely
+         * identify a row in a table. Neither of columns in a primary can be nullable. The primary
+         * key is informational only. It will not be enforced. It can be used for optimizations. It
+         * is the data owner's responsibility to ensure uniqueness of the data.
+         *
+         * <p>The primary key will be assigned a random name.
+         *
+         * @param columnNames columns that form a unique primary key
+         */
+        public Builder primaryKey(String... columnNames) {
+            Preconditions.checkNotNull(columnNames, "Primary key column names must not be null.");
+            return primaryKey(Arrays.asList(columnNames));
+        }
+
+        /**
+         * Declares a primary key constraint for a set of given columns. Primary key uniquely
+         * identify a row in a table. Neither of columns in a primary can be nullable. The primary
+         * key is informational only. It will not be enforced. It can be used for optimizations. It
+         * is the data owner's responsibility to ensure uniqueness of the data.
+         *
+         * <p>The primary key will be assigned a generated name in the format {@code PK_col1_col2}.
+         *
+         * @param columnNames columns that form a unique primary key
+         */
+        public Builder primaryKey(List<String> columnNames) {
+            Preconditions.checkNotNull(columnNames, "Primary key column names must not be null.");
+            final String generatedConstraintName =
+                    columnNames.stream().collect(Collectors.joining("_", "PK_", ""));
+            return primaryKeyNamed(generatedConstraintName, columnNames);
+        }
+
+        /**
+         * Declares a primary key constraint for a set of given columns. Primary key uniquely
+         * identify a row in a table. Neither of columns in a primary can be nullable. The primary
+         * key is informational only. It will not be enforced. It can be used for optimizations. It
+         * is the data owner's responsibility to ensure uniqueness of the data.
+         *
+         * @param constraintName name for the primary key, can be used to reference the constraint
+         * @param columnNames columns that form a unique primary key
+         */
+        public Builder primaryKeyNamed(String constraintName, String... columnNames) {
+            Preconditions.checkNotNull(columnNames, "Primary key column names must not be null.");
+            return primaryKeyNamed(constraintName, Arrays.asList(columnNames));
+        }
+
+        /**
+         * Declares a primary key constraint for a set of given columns. Primary key uniquely
+         * identify a row in a table. Neither of columns in a primary can be nullable. The primary
+         * key is informational only. It will not be enforced. It can be used for optimizations. It
+         * is the data owner's responsibility to ensure uniqueness of the data.
+         *
+         * @param constraintName name for the primary key, can be used to reference the constraint
+         * @param columnNames columns that form a unique primary key
+         */
+        public Builder primaryKeyNamed(String constraintName, List<String> columnNames) {
+            Preconditions.checkState(
+                    primaryKey == null, "Multiple primary keys are not supported.");
+            Preconditions.checkNotNull(
+                    constraintName, "Primary key constraint name must not be null.");
+            Preconditions.checkArgument(
+                    !StringUtils.isNullOrWhitespaceOnly(constraintName),
+                    "Primary key constraint name must not be empty.");
+            Preconditions.checkArgument(
+                    columnNames != null && columnNames.size() > 0,
+                    "Primary key constraint must be defined for at least a single column.");
+            primaryKey = new UnresolvedPrimaryKey(constraintName, columnNames);
+            return this;
+        }
+
+        /** Returns an instance of an unresolved {@link Schema}. */
+        public Schema build() {
+            return new Schema(columns, watermarkSpecs, primaryKey);
+        }
+
+        // ----------------------------------------------------------------------------------------
+
+        private void addResolvedColumns(List<Column> columns) {
+            columns.forEach(
+                    c -> {
+                        if (c instanceof PhysicalColumn) {
+                            final PhysicalColumn physicalColumn = (PhysicalColumn) c;
+                            column(physicalColumn.getName(), physicalColumn.getDataType());
+                        } else if (c instanceof ComputedColumn) {
+                            final ComputedColumn computedColumn = (ComputedColumn) c;
+                            columnByExpression(
+                                    computedColumn.getName(), computedColumn.getExpression());
+                        } else if (c instanceof MetadataColumn) {
+                            final MetadataColumn metadataColumn = (MetadataColumn) c;
+                            columnByMetadata(
+                                    metadataColumn.getName(),
+                                    metadataColumn.getDataType(),
+                                    metadataColumn.getMetadataAlias().orElse(null),
+                                    metadataColumn.isVirtual());
+                        }
+                    });
+        }
+
+        private void addResolvedWatermarkSpec(List<WatermarkSpec> specs) {
+            specs.forEach(
+                    s ->
+                            watermarkSpecs.add(
+                                    new UnresolvedWatermarkSpec(
+                                            s.getRowtimeAttribute(), s.getWatermarkExpression())));
+        }
+
+        private void addResolvedConstraint(UniqueConstraint constraint) {
+            if (constraint.getType() == Constraint.ConstraintType.PRIMARY_KEY) {
+                primaryKeyNamed(constraint.getName(), constraint.getColumns());
+            } else {
+                throw new IllegalArgumentException("Unsupported constraint type.");
+            }
+        }
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Helper classes for representing the schema
+    // --------------------------------------------------------------------------------------------
+
+    /** Super class for all kinds of columns in an unresolved schema. */
+    public abstract static class UnresolvedColumn {
+        final String columnName;
+
+        UnresolvedColumn(String columnName) {
+            this.columnName = columnName;
+        }
+
+        public String getName() {
+            return columnName;
+        }
+
+        @Override
+        public String toString() {
+            return EncodingUtils.escapeIdentifier(columnName);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            UnresolvedColumn that = (UnresolvedColumn) o;
+            return columnName.equals(that.columnName);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(columnName);
+        }
+    }
+
+    /**
+     * Declaration of a physical column that will be resolved to {@link PhysicalColumn} during
+     * schema resolution.
+     */
+    public static final class UnresolvedPhysicalColumn extends UnresolvedColumn {
+
+        private final AbstractDataType<?> dataType;
+
+        UnresolvedPhysicalColumn(String columnName, AbstractDataType<?> dataType) {
+            super(columnName);
+            this.dataType = dataType;
+        }
+
+        public AbstractDataType<?> getDataType() {
+            return dataType;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("%s %s", super.toString(), dataType.toString());
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            if (!super.equals(o)) {
+                return false;
+            }
+            UnresolvedPhysicalColumn that = (UnresolvedPhysicalColumn) o;
+            return dataType.equals(that.dataType);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(super.hashCode(), dataType);
+        }
+    }
+
+    /**
+     * Declaration of a computed column that will be resolved to {@link ComputedColumn} during
+     * schema resolution.
+     */
+    public static final class UnresolvedComputedColumn extends UnresolvedColumn {
+
+        private final Expression expression;
+
+        UnresolvedComputedColumn(String columnName, Expression expression) {
+            super(columnName);
+            this.expression = expression;
+        }
+
+        public Expression getExpression() {
+            return expression;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("%s AS %s", super.toString(), expression.asSummaryString());
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            if (!super.equals(o)) {
+                return false;
+            }
+            UnresolvedComputedColumn that = (UnresolvedComputedColumn) o;
+            return expression.equals(that.expression);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(super.hashCode(), expression);
+        }
+    }
+
+    /**
+     * Declaration of a metadata column that will be resolved to {@link MetadataColumn} during
+     * schema resolution.
+     */
+    public static final class UnresolvedMetadataColumn extends UnresolvedColumn {
+
+        private final AbstractDataType<?> dataType;
+        private final @Nullable String metadataKey;
+        private final boolean isVirtual;
+
+        UnresolvedMetadataColumn(
+                String columnName,
+                AbstractDataType<?> dataType,
+                @Nullable String metadataKey,
+                boolean isVirtual) {
+            super(columnName);
+            this.dataType = dataType;
+            this.metadataKey = metadataKey;
+            this.isVirtual = isVirtual;
+        }
+
+        public AbstractDataType<?> getDataType() {
+            return dataType;
+        }
+
+        public @Nullable String getMetadataKey() {
+            return metadataKey;
+        }
+
+        public boolean isVirtual() {
+            return isVirtual;
+        }
+
+        @Override
+        public String toString() {
+            final StringBuilder sb = new StringBuilder();
+            sb.append(super.toString());
+            sb.append(" METADATA");
+            if (metadataKey != null) {
+                sb.append(" FROM '");
+                sb.append(EncodingUtils.escapeSingleQuotes(metadataKey));
+                sb.append("'");
+            }
+            if (isVirtual) {
+                sb.append(" VIRTUAL");
+            }
+            return sb.toString();
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            if (!super.equals(o)) {
+                return false;
+            }
+            UnresolvedMetadataColumn that = (UnresolvedMetadataColumn) o;
+            return isVirtual == that.isVirtual
+                    && dataType.equals(that.dataType)
+                    && Objects.equals(metadataKey, that.metadataKey);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(super.hashCode(), dataType, metadataKey, isVirtual);
+        }
+    }
+
+    /**
+     * Declaration of a watermark strategy that will be resolved to {@link WatermarkSpec} during
+     * schema resolution.
+     */
+    public static final class UnresolvedWatermarkSpec {
+
+        private final String columnName;
+        private final Expression watermarkExpression;
+
+        UnresolvedWatermarkSpec(String columnName, Expression watermarkExpression) {
+            this.columnName = columnName;
+            this.watermarkExpression = watermarkExpression;
+        }
+
+        public String getColumnName() {
+            return columnName;
+        }
+
+        public Expression getWatermarkExpression() {
+            return watermarkExpression;
+        }
+
+        @Override
+        public String toString() {
+            return String.format(
+                    "WATERMARK FOR %s AS %s",
+                    EncodingUtils.escapeIdentifier(columnName),
+                    watermarkExpression.asSummaryString());
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            UnresolvedWatermarkSpec that = (UnresolvedWatermarkSpec) o;
+            return columnName.equals(that.columnName)
+                    && watermarkExpression.equals(that.watermarkExpression);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(columnName, watermarkExpression);
+        }
+    }
+
+    /** Super class for all kinds of constraints in an unresolved schema. */
+    public abstract static class UnresolvedConstraint {
+
+        private final String constraintName;
+
+        UnresolvedConstraint(String constraintName) {
+            this.constraintName = constraintName;
+        }
+
+        public String getConstraintName() {
+            return constraintName;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("CONSTRAINT %s", EncodingUtils.escapeIdentifier(constraintName));
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            UnresolvedConstraint that = (UnresolvedConstraint) o;
+            return constraintName.equals(that.constraintName);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(constraintName);
+        }
+    }
+
+    /**
+     * Declaration of a primary key that will be resolved to {@link UniqueConstraint} during schema
+     * resolution.
+     */
+    public static final class UnresolvedPrimaryKey extends UnresolvedConstraint {
+
+        private final List<String> columnNames;
+
+        UnresolvedPrimaryKey(String constraintName, List<String> columnNames) {
+            super(constraintName);
+            this.columnNames = columnNames;
+        }
+
+        public List<String> getColumnNames() {
+            return columnNames;
+        }
+
+        @Override
+        public String toString() {
+            return String.format(
+                    "%s PRIMARY KEY (%s) NOT ENFORCED",
+                    super.toString(),
+                    columnNames.stream()
+                            .map(EncodingUtils::escapeIdentifier)
+                            .collect(Collectors.joining(", ")));
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            if (!super.equals(o)) {
+                return false;
+            }
+            UnresolvedPrimaryKey that = (UnresolvedPrimaryKey) o;
+            return columnNames.equals(that.columnNames);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(super.hashCode(), columnNames);
+        }
+    }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/AbstractConstraint.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/AbstractConstraint.java
new file mode 100644
index 0000000..51a541d
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/AbstractConstraint.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Base class for {@link Constraint constraints}. */
+@Internal
+abstract class AbstractConstraint implements Constraint {
+
+    private final String name;
+    private final boolean enforced;
+
+    AbstractConstraint(String name, boolean enforced) {
+        this.name = checkNotNull(name);
+        this.enforced = enforced;
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public boolean isEnforced() {
+        return enforced;
+    }
+
+    @Override
+    public String toString() {
+        return asSummaryString();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        AbstractConstraint that = (AbstractConstraint) o;
+        return enforced == that.enforced && Objects.equals(name, that.name);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, enforced);
+    }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Column.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Column.java
new file mode 100644
index 0000000..019a9ef
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Column.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Representation of a column in a {@link ResolvedSchema}.
+ *
+ * <p>A table column describes either a {@link PhysicalColumn}, {@link ComputedColumn}, or {@link
+ * MetadataColumn}.
+ *
+ * <p>Every column is fully resolved. The enclosed {@link DataType} indicates whether the column is
+ * a time attribute and thus might differ from the original data type.
+ */
+@PublicEvolving
+public abstract class Column {
+
+    protected final String name;
+
+    protected final DataType dataType;
+
+    private Column(String name, DataType dataType) {
+        this.name = name;
+        this.dataType = dataType;
+    }
+
+    /** Creates a regular table column that represents physical data. */
+    public static PhysicalColumn physical(String name, DataType dataType) {
+        Preconditions.checkNotNull(name, "Column name can not be null.");
+        Preconditions.checkNotNull(dataType, "Column data type can not be null.");
+        return new PhysicalColumn(name, dataType);
+    }
+
+    /** Creates a computed column that is computed from the given {@link ResolvedExpression}. */
+    public static ComputedColumn computed(String name, ResolvedExpression expression) {
+        Preconditions.checkNotNull(name, "Column name can not be null.");
+        Preconditions.checkNotNull(expression, "Column expression can not be null.");
+        return new ComputedColumn(name, expression.getOutputDataType(), expression);
+    }
+
+    /**
+     * Creates a metadata column from metadata of the given column name.
+     *
+     * <p>The column is not virtual by default.
+     */
+    public static MetadataColumn metadata(String name, DataType dataType) {
+        return metadata(name, dataType, null, false);
+    }
+
+    /**
+     * Creates a metadata column from metadata of the given column name.
+     *
+     * <p>Allows to specify whether the column is virtual or not.
+     */
+    public static MetadataColumn metadata(String name, DataType type, boolean isVirtual) {
+        return metadata(name, type, null, isVirtual);
+    }
+
+    /**
+     * Creates a metadata column from metadata of the given alias.
+     *
+     * <p>The column is not virtual by default.
+     */
+    public static MetadataColumn metadata(String name, DataType type, String metadataAlias) {
+        Preconditions.checkNotNull(metadataAlias, "Metadata alias can not be null.");
+        return metadata(name, type, metadataAlias, false);
+    }
+
+    /**
+     * Creates a metadata column from metadata of the given column name or from metadata of the
+     * given alias (if not null).
+     *
+     * <p>Allows to specify whether the column is virtual or not.
+     */
+    public static MetadataColumn metadata(
+            String name, DataType dataType, @Nullable String metadataAlias, boolean isVirtual) {
+        Preconditions.checkNotNull(name, "Column name can not be null.");
+        Preconditions.checkNotNull(dataType, "Column data type can not be null.");
+        return new MetadataColumn(name, dataType, metadataAlias, isVirtual);
+    }
+
+    /**
+     * Returns whether the given column is a physical column of a table; neither computed nor
+     * metadata.
+     */
+    public abstract boolean isPhysical();
+
+    /** Returns whether the given column is persisted in a sink operation. */
+    public abstract boolean isPersisted();
+
+    /** Returns the data type of this column. */
+    public DataType getDataType() {
+        return this.dataType;
+    }
+
+    /** Returns the name of this column. */
+    public String getName() {
+        return name;
+    }
+
+    /** Returns a string that summarizes this column for printing to a console. */
+    public String asSummaryString() {
+        final StringBuilder sb = new StringBuilder();
+        sb.append(EncodingUtils.escapeIdentifier(name));
+        sb.append(" ");
+        sb.append(dataType);
+        explainExtras()
+                .ifPresent(
+                        e -> {
+                            sb.append(" ");
+                            sb.append(e);
+                        });
+        return sb.toString();
+    }
+
+    /** Returns an explanation of specific column extras next to name and type. */
+    public abstract Optional<String> explainExtras();
+
+    /** Returns a copy of the column with a replaced {@link DataType}. */
+    public abstract Column copy(DataType newType);
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        Column that = (Column) o;
+        return Objects.equals(this.name, that.name) && Objects.equals(this.dataType, that.dataType);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(this.name, this.dataType);
+    }
+
+    @Override
+    public String toString() {
+        return asSummaryString();
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Specific kinds of columns
+    // --------------------------------------------------------------------------------------------
+
+    /** Representation of a physical column. */
+    public static final class PhysicalColumn extends Column {
+
+        private PhysicalColumn(String name, DataType dataType) {
+            super(name, dataType);
+        }
+
+        @Override
+        public boolean isPhysical() {
+            return true;
+        }
+
+        @Override
+        public boolean isPersisted() {
+            return true;
+        }
+
+        @Override
+        public Optional<String> explainExtras() {
+            return Optional.empty();
+        }
+
+        @Override
+        public Column copy(DataType newDataType) {
+            return new PhysicalColumn(name, newDataType);
+        }
+    }
+
+    /** Representation of a computed column. */
+    public static final class ComputedColumn extends Column {
+
+        private final ResolvedExpression expression;
+
+        private ComputedColumn(String name, DataType dataType, ResolvedExpression expression) {
+            super(name, dataType);
+            this.expression = expression;
+        }
+
+        @Override
+        public boolean isPhysical() {
+            return false;
+        }
+
+        @Override
+        public boolean isPersisted() {
+            return false;
+        }
+
+        public ResolvedExpression getExpression() {
+            return expression;
+        }
+
+        @Override
+        public Optional<String> explainExtras() {
+            return Optional.of("AS " + expression.asSummaryString());
+        }
+
+        @Override
+        public Column copy(DataType newDataType) {
+            return new ComputedColumn(name, newDataType, expression);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            if (!super.equals(o)) {
+                return false;
+            }
+            ComputedColumn that = (ComputedColumn) o;
+            return expression.equals(that.expression);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(super.hashCode(), expression);
+        }
+    }
+
+    /** Representation of a metadata column. */
+    public static final class MetadataColumn extends Column {
+
+        private final @Nullable String metadataAlias;
+
+        private final boolean isVirtual;
+
+        private MetadataColumn(
+                String name, DataType dataType, @Nullable String metadataAlias, boolean isVirtual) {
+            super(name, dataType);
+            this.metadataAlias = metadataAlias;
+            this.isVirtual = isVirtual;
+        }
+
+        public boolean isVirtual() {
+            return isVirtual;
+        }
+
+        public Optional<String> getMetadataAlias() {
+            return Optional.ofNullable(metadataAlias);
+        }
+
+        @Override
+        public boolean isPhysical() {
+            return false;
+        }
+
+        @Override
+        public boolean isPersisted() {
+            return !isVirtual;
+        }
+
+        @Override
+        public Optional<String> explainExtras() {
+            final StringBuilder sb = new StringBuilder();
+            sb.append("METADATA");
+            if (metadataAlias != null) {
+                sb.append(" FROM ");
+                sb.append("'");
+                sb.append(EncodingUtils.escapeSingleQuotes(metadataAlias));
+                sb.append("'");
+            }
+            if (isVirtual) {
+                sb.append(" VIRTUAL");
+            }
+            return Optional.of(sb.toString());
+        }
+
+        @Override
+        public Column copy(DataType newDataType) {
+            return new MetadataColumn(name, newDataType, metadataAlias, isVirtual);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            if (!super.equals(o)) {
+                return false;
+            }
+            MetadataColumn that = (MetadataColumn) o;
+            return isVirtual == that.isVirtual && Objects.equals(metadataAlias, that.metadataAlias);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(super.hashCode(), metadataAlias, isVirtual);
+        }
+    }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Constraint.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Constraint.java
new file mode 100644
index 0000000..dcc0dd8
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Constraint.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Integrity constraints, generally referred to simply as constraints, define the valid states of
+ * SQL-data by constraining the values in the base tables.
+ */
+@PublicEvolving
+public interface Constraint {
+
+    String getName();
+
+    /**
+     * Constraints can either be enforced or non-enforced. If a constraint is enforced it will be
+     * checked whenever any SQL statement is executed that results in data or schema changes. If the
+     * constraint is not enforced the owner of the data is responsible for ensuring data integrity.
+     * Flink will rely the information is valid and might use it for query optimisations.
+     */
+    boolean isEnforced();
+
+    /** Tells what kind of constraint it is, e.g. PRIMARY KEY, UNIQUE, ... */
+    ConstraintType getType();
+
+    /** Prints the constraint in a readable way. */
+    String asSummaryString();
+
+    /**
+     * Type of the constraint.
+     *
+     * <p>Unique constraints:
+     *
+     * <ul>
+     *   <li>UNIQUE - is satisfied if and only if there do not exist two rows that have same
+     *       non-null values in the unique columns
+     *   <li>PRIMARY KEY - additionally to UNIQUE constraint, it requires none of the values in
+     *       specified columns be a null value. Moreover there can be only a single PRIMARY KEY
+     *       defined for a Table.
+     * </ul>
+     */
+    enum ConstraintType {
+        PRIMARY_KEY,
+        UNIQUE_KEY
+    }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java
new file mode 100644
index 0000000..1a1ec01
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.ROW;
+
+/**
+ * Schema of a table or view consisting of columns, constraints, and watermark specifications.
+ *
+ * <p>This class is the result of resolving a {@link Schema} into a final validated representation.
+ *
+ * <ul>
+ *   <li>Data types and functions have been expanded to fully qualified identifiers.
+ *   <li>Time attributes are represented in the column's data type.
+ *   <li>{@link Expression}s have been translated to {@link ResolvedExpression}.
+ *   <li>{@link AbstractDataType}s have been translated to {@link DataType}.
+ * </ul>
+ */
+@PublicEvolving
+public final class ResolvedSchema {
+
+    private final List<Column> columns;
+    private final List<WatermarkSpec> watermarkSpecs;
+    private final @Nullable UniqueConstraint primaryKey;
+
+    ResolvedSchema(
+            List<Column> columns,
+            List<WatermarkSpec> watermarkSpecs,
+            @Nullable UniqueConstraint primaryKey) {
+        this.columns = Preconditions.checkNotNull(columns, "Columns must not be null.");
+        this.watermarkSpecs =
+                Preconditions.checkNotNull(watermarkSpecs, "Watermark specs must not be null.");
+        this.primaryKey = primaryKey;
+    }
+
+    /** Returns the number of {@link Column}s of this schema. */
+    public int getColumnCount() {
+        return columns.size();
+    }
+
+    /** Returns all {@link Column}s of this schema. */
+    public List<Column> getColumns() {
+        return columns;
+    }
+
+    /**
+     * Returns the {@link Column} instance for the given column index.
+     *
+     * @param columnIndex the index of the column
+     */
+    public Optional<Column> getColumn(int columnIndex) {
+        if (columnIndex < 0 || columnIndex >= columns.size()) {
+            return Optional.empty();
+        }
+        return Optional.of(this.columns.get(columnIndex));
+    }
+
+    /**
+     * Returns the {@link Column} instance for the given column name.
+     *
+     * @param columnName the name of the column
+     */
+    public Optional<Column> getColumn(String columnName) {
+        return this.columns.stream()
+                .filter(column -> column.getName().equals(columnName))
+                .findFirst();
+    }
+
+    /**
+     * Returns a list of watermark specifications each consisting of a rowtime attribute and
+     * watermark strategy expression.
+     *
+     * <p>Note: Currently, there is at most one {@link WatermarkSpec} in the list, because we don't
+     * support multiple watermark definitions yet.
+     */
+    public List<WatermarkSpec> getWatermarkSpecs() {
+        return watermarkSpecs;
+    }
+
+    /** Returns the primary key if it has been defined. */
+    public Optional<UniqueConstraint> getPrimaryKey() {
+        return Optional.ofNullable(primaryKey);
+    }
+
+    /**
+     * Converts all columns of this schema into a (possibly nested) row data type.
+     *
+     * <p>This method returns the <b>source-to-query schema</b>.
+     *
+     * <p>Note: The returned row data type contains physical, computed, and metadata columns. Be
+     * careful when using this method in a table source or table sink. In many cases, {@link
+     * #toPhysicalRowDataType()} might be more appropriate.
+     *
+     * @see DataTypes#ROW(DataTypes.Field...)
+     * @see #toPhysicalRowDataType()
+     * @see #toSinkRowDataType()
+     */
+    public DataType toSourceRowDataType() {
+        final DataTypes.Field[] fields =
+                columns.stream()
+                        .map(column -> FIELD(column.getName(), column.getDataType()))
+                        .toArray(DataTypes.Field[]::new);
+        // the row should never be null
+        return ROW(fields).notNull();
+    }
+
+    /**
+     * Converts all physical columns of this schema into a (possibly nested) row data type.
+     *
+     * <p>Note: The returned row data type contains only physical columns. It does not include
+     * computed or metadata columns.
+     *
+     * @see DataTypes#ROW(DataTypes.Field...)
+     * @see #toSourceRowDataType()
+     * @see #toSinkRowDataType()
+     */
+    public DataType toPhysicalRowDataType() {
+        final DataTypes.Field[] fields =
+                columns.stream()
+                        .filter(Column::isPhysical)
+                        .map(column -> FIELD(column.getName(), column.getDataType()))
+                        .toArray(DataTypes.Field[]::new);
+        // the row should never be null
+        return ROW(fields).notNull();
+    }
+
+    /**
+     * Converts all persisted columns of this schema into a (possibly nested) row data type.
+     *
+     * <p>This method returns the <b>query-to-sink schema</b>.
+     *
+     * <p>Note: Computed columns and virtual columns are excluded in the returned row data type. The
+     * data type contains the columns of {@link #toPhysicalRowDataType()} plus persisted metadata
+     * columns.
+     *
+     * @see DataTypes#ROW(DataTypes.Field...)
+     * @see #toSourceRowDataType()
+     * @see #toPhysicalRowDataType()
+     */
+    public DataType toSinkRowDataType() {
+        final DataTypes.Field[] fields =
+                columns.stream()
+                        .filter(Column::isPersisted)
+                        .map(column -> FIELD(column.getName(), column.getDataType()))
+                        .toArray(DataTypes.Field[]::new);
+        // the row should never be null
+        return ROW(fields).notNull();
+    }
+
+    @Override
+    public String toString() {
+        final List<Object> components = new ArrayList<>();
+        components.addAll(columns);
+        components.addAll(watermarkSpecs);
+        if (primaryKey != null) {
+            components.add(primaryKey);
+        }
+        return components.stream()
+                .map(Objects::toString)
+                .map(s -> "  " + s)
+                .collect(Collectors.joining(", \n", "(\n", "\n)"));
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final ResolvedSchema that = (ResolvedSchema) o;
+        return Objects.equals(columns, that.columns)
+                && Objects.equals(watermarkSpecs, that.watermarkSpecs)
+                && Objects.equals(primaryKey, that.primaryKey);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(columns, watermarkSpecs, primaryKey);
+    }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/SchemaResolver.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/SchemaResolver.java
new file mode 100644
index 0000000..6f2d947
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/SchemaResolver.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Schema.UnresolvedMetadataColumn;
+
+/** Resolves a {@link Schema} to a validated {@link ResolvedSchema}. */
+public interface SchemaResolver {
+
+    ResolvedSchema resolve(Schema schema);
+
+    /** Returns whether the resolution happens in streaming mode. */
+    boolean isStreamingMode();
+
+    /**
+     * Returns whether metadata columns are supported or an exception should be thrown during the
+     * resolution of {@link UnresolvedMetadataColumn}.
+     */
+    boolean supportsMetadata();
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/UniqueConstraint.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/UniqueConstraint.java
new file mode 100644
index 0000000..a60f0d2
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/UniqueConstraint.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A unique key constraint. It can be declared also as a PRIMARY KEY.
+ *
+ * @see ConstraintType
+ */
+@PublicEvolving
+public final class UniqueConstraint extends AbstractConstraint {
+
+    private final List<String> columns;
+    private final ConstraintType type;
+
+    /** Creates a non enforced {@link ConstraintType#PRIMARY_KEY} constraint. */
+    public static UniqueConstraint primaryKey(String name, List<String> columns) {
+        return new UniqueConstraint(name, false, ConstraintType.PRIMARY_KEY, columns);
+    }
+
+    private UniqueConstraint(
+            String name, boolean enforced, ConstraintType type, List<String> columns) {
+        super(name, enforced);
+
+        this.columns = checkNotNull(columns);
+        this.type = checkNotNull(type);
+    }
+
+    /** List of column names for which the primary key was defined. */
+    public List<String> getColumns() {
+        return columns;
+    }
+
+    @Override
+    public ConstraintType getType() {
+        return type;
+    }
+
+    /**
+     * Returns constraint's summary. All constraints summary will be formatted as
+     *
+     * <pre>
+     * CONSTRAINT [constraint-name] [constraint-type] ([constraint-definition])
+     *
+     * E.g CONSTRAINT pk PRIMARY KEY (f0, f1) NOT ENFORCED
+     * </pre>
+     */
+    @Override
+    public final String asSummaryString() {
+        final String typeString;
+        switch (getType()) {
+            case PRIMARY_KEY:
+                typeString = "PRIMARY KEY";
+                break;
+            case UNIQUE_KEY:
+                typeString = "UNIQUE";
+                break;
+            default:
+                throw new IllegalStateException("Unknown key type: " + getType());
+        }
+
+        return String.format(
+                "CONSTRAINT %s %s (%s)%s",
+                EncodingUtils.escapeIdentifier(getName()),
+                typeString,
+                columns.stream()
+                        .map(EncodingUtils::escapeIdentifier)
+                        .collect(Collectors.joining(", ")),
+                isEnforced() ? "" : " NOT ENFORCED");
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        UniqueConstraint that = (UniqueConstraint) o;
+        return Objects.equals(columns, that.columns) && type == that.type;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), columns, type);
+    }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/WatermarkSpec.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/WatermarkSpec.java
new file mode 100644
index 0000000..f3d1e9c
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/WatermarkSpec.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Representation of a watermark specification in a {@link ResolvedSchema}.
+ *
+ * <p>It defines the rowtime attribute and a {@link ResolvedExpression} for watermark generation.
+ */
+@PublicEvolving
+public final class WatermarkSpec {
+
+    private final String rowtimeAttribute;
+
+    private final ResolvedExpression watermarkExpression;
+
+    public WatermarkSpec(String rowtimeAttribute, ResolvedExpression watermarkExpression) {
+        this.rowtimeAttribute =
+                checkNotNull(rowtimeAttribute, "Rowtime attribute must not be null.");
+        this.watermarkExpression =
+                checkNotNull(watermarkExpression, "Watermark expression must not be null.");
+    }
+
+    /**
+     * Returns the name of a rowtime attribute.
+     *
+     * <p>The referenced attribute must be present in the {@link ResolvedSchema} and must be of
+     * {@link TimestampType}.
+     */
+    public String getRowtimeAttribute() {
+        return rowtimeAttribute;
+    }
+
+    /** Returns the {@link ResolvedExpression} for watermark generation. */
+    public ResolvedExpression getWatermarkExpression() {
+        return watermarkExpression;
+    }
+
+    public String asSummaryString() {
+        return "WATERMARK FOR "
+                + String.join(".", EncodingUtils.escapeIdentifier(rowtimeAttribute))
+                + ": "
+                + watermarkExpression.getOutputDataType()
+                + " AS "
+                + watermarkExpression.asSummaryString();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final WatermarkSpec that = (WatermarkSpec) o;
+        return rowtimeAttribute.equals(that.rowtimeAttribute)
+                && watermarkExpression.equals(that.watermarkExpression);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(rowtimeAttribute, watermarkExpression);
+    }
+
+    @Override
+    public String toString() {
+        return asSummaryString();
+    }
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/expressions/utils/ResolvedExpressionMock.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/expressions/utils/ResolvedExpressionMock.java
new file mode 100644
index 0000000..533ee2a
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/expressions/utils/ResolvedExpressionMock.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions.utils;
+
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionVisitor;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Supplier;
+
+/** {@link ResolvedExpression} mock for testing purposes. */
+public class ResolvedExpressionMock implements ResolvedExpression {
+
+    private final DataType outputDataType;
+
+    private final Supplier<String> stringRepresentation;
+
+    public ResolvedExpressionMock(DataType outputDataType, Supplier<String> stringRepresentation) {
+        this.outputDataType = outputDataType;
+        this.stringRepresentation = stringRepresentation;
+    }
+
+    @Override
+    public String asSummaryString() {
+        return stringRepresentation.get();
+    }
+
+    @Override
+    public List<Expression> getChildren() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public <R> R accept(ExpressionVisitor<R> visitor) {
+        return visitor.visit(this);
+    }
+
+    @Override
+    public String asSerializableString() {
+        return stringRepresentation.get();
+    }
+
+    @Override
+    public DataType getOutputDataType() {
+        return outputDataType;
+    }
+
+    @Override
+    public List<ResolvedExpression> getResolvedChildren() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public String toString() {
+        return asSummaryString();
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
index 247a442..f04e529 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
@@ -21,8 +21,8 @@ package org.apache.flink.table.api
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.table.api.internal.TableEnvironmentInternal
+import org.apache.flink.table.descriptors.ConnectorDescriptor
 import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR
-import org.apache.flink.table.descriptors.{ConnectorDescriptor, Schema}
 import org.apache.flink.table.expressions.utils._
 import org.apache.flink.table.runtime.utils.CommonTestData
 import org.apache.flink.table.sources.{CsvTableSource, TableSource}
@@ -394,7 +394,7 @@ class TableSourceTest extends TableTestBase {
         context
       }
     }).withSchema(
-        new Schema()
+        new org.apache.flink.table.descriptors.Schema()
           .schema(TableSchema.builder()
             .field("id", DataTypes.INT())
             .field("name", DataTypes.STRING())