You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/02/26 11:56:50 UTC

[flink] branch master updated (da3b2ad -> ebde3ab)

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from da3b2ad  [hotfix] Add logging to the DefaultAllocatedSlotPool
     new 011a349  [FLINK-21435][table] Implement Schema, ResolvedSchema, SchemaResolver
     new ebde3ab  [hotfix][docs] Fix computed columns documentation

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/content.zh/docs/dev/table/sql/create.md       |   6 +-
 docs/content/docs/dev/table/sql/create.md          |   6 +-
 flink-python/pyflink/table/descriptors.py          |   2 +-
 .../flink/table/catalog/DefaultSchemaResolver.java | 353 +++++++++
 .../expressions/resolver/ExpressionResolver.java   |   7 +-
 .../resolver/rules/ResolveSqlCallRule.java         |   5 +
 .../expressions/resolver/rules/ResolverRule.java   |   3 +
 .../flink/table/catalog/SchemaResolutionTest.java  | 384 +++++++++
 .../java/org/apache/flink/table/api/Schema.java    | 857 +++++++++++++++++++++
 .../flink/table/catalog/AbstractConstraint.java    |  70 ++
 .../org/apache/flink/table/catalog/Column.java     | 330 ++++++++
 .../org/apache/flink/table/catalog/Constraint.java |  63 ++
 .../apache/flink/table/catalog/ResolvedSchema.java | 216 ++++++
 .../apache/flink/table/catalog/SchemaResolver.java |  37 +
 .../flink/table/catalog/UniqueConstraint.java      | 116 +++
 .../apache/flink/table/catalog/WatermarkSpec.java  |  95 +++
 .../expressions/utils/ResolvedExpressionMock.java  |  76 ++
 .../apache/flink/table/api/TableSourceTest.scala   |   4 +-
 18 files changed, 2620 insertions(+), 10 deletions(-)
 create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java
 create mode 100644 flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Schema.java
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/AbstractConstraint.java
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Column.java
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Constraint.java
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/SchemaResolver.java
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/UniqueConstraint.java
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/WatermarkSpec.java
 create mode 100644 flink-table/flink-table-common/src/test/java/org/apache/flink/table/expressions/utils/ResolvedExpressionMock.java


[flink] 02/02: [hotfix][docs] Fix computed columns documentation

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ebde3abd3357b1489b219fe3269048c1f4ec41b7
Author: Timo Walther <tw...@apache.org>
AuthorDate: Thu Feb 25 18:28:24 2021 +0100

    [hotfix][docs] Fix computed columns documentation
---
 docs/content.zh/docs/dev/table/sql/create.md | 6 +++---
 docs/content/docs/dev/table/sql/create.md    | 6 +++---
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/docs/content.zh/docs/dev/table/sql/create.md b/docs/content.zh/docs/dev/table/sql/create.md
index 5426ae1..0101285 100644
--- a/docs/content.zh/docs/dev/table/sql/create.md
+++ b/docs/content.zh/docs/dev/table/sql/create.md
@@ -304,9 +304,9 @@ MyTable(`timestamp` BIGINT, `user_id` BIGINT, `name` STRING)
 Computed columns are virtual columns that are generated using the syntax `column_name AS computed_column_expression`.
 
 A computed column evaluates an expression that can reference other columns declared in the same table.
-Both physical columns and metadata columns can be accessed if they preceed the computed column in the
-schema declaration. The column itself is not physically stored within the table. The column's data type
-is derived automatically from the given expression and does not have to be declared manually.
+Both physical columns and metadata columns can be accessed. The column itself is not physically stored
+within the table. The column's data type is derived automatically from the given expression and does
+not have to be declared manually.
 
 The planner will transform computed columns into a regular projection after the source. For optimization
 or [watermark strategy push down]({{< ref "docs/dev/table/sourcesSinks" >}}), the evaluation might be spread
diff --git a/docs/content/docs/dev/table/sql/create.md b/docs/content/docs/dev/table/sql/create.md
index 4bc822e..0e7e2e9 100644
--- a/docs/content/docs/dev/table/sql/create.md
+++ b/docs/content/docs/dev/table/sql/create.md
@@ -304,9 +304,9 @@ MyTable(`timestamp` BIGINT, `user_id` BIGINT, `name` STRING)
 Computed columns are virtual columns that are generated using the syntax `column_name AS computed_column_expression`.
 
 A computed column evaluates an expression that can reference other columns declared in the same table.
-Both physical columns and metadata columns can be accessed if they preceed the computed column in the
-schema declaration. The column itself is not physically stored within the table. The column's data type
-is derived automatically from the given expression and does not have to be declared manually.
+Both physical columns and metadata columns can be accessed. The column itself is not physically stored
+within the table. The column's data type is derived automatically from the given expression and does
+not have to be declared manually.
 
 The planner will transform computed columns into a regular projection after the source. For optimization
 or [watermark strategy push down]({{< ref "docs/dev/table/sourcesSinks" >}}), the evaluation might be spread


[flink] 01/02: [FLINK-21435][table] Implement Schema, ResolvedSchema, SchemaResolver

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 011a34982facc817fef40de74c4b95497830a037
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed Feb 3 08:39:21 2021 +0100

    [FLINK-21435][table] Implement Schema, ResolvedSchema, SchemaResolver
    
    This closes #14996.
---
 flink-python/pyflink/table/descriptors.py          |   2 +-
 .../flink/table/catalog/DefaultSchemaResolver.java | 353 +++++++++
 .../expressions/resolver/ExpressionResolver.java   |   7 +-
 .../resolver/rules/ResolveSqlCallRule.java         |   5 +
 .../expressions/resolver/rules/ResolverRule.java   |   3 +
 .../flink/table/catalog/SchemaResolutionTest.java  | 384 +++++++++
 .../java/org/apache/flink/table/api/Schema.java    | 857 +++++++++++++++++++++
 .../flink/table/catalog/AbstractConstraint.java    |  70 ++
 .../org/apache/flink/table/catalog/Column.java     | 330 ++++++++
 .../org/apache/flink/table/catalog/Constraint.java |  63 ++
 .../apache/flink/table/catalog/ResolvedSchema.java | 216 ++++++
 .../apache/flink/table/catalog/SchemaResolver.java |  37 +
 .../flink/table/catalog/UniqueConstraint.java      | 116 +++
 .../apache/flink/table/catalog/WatermarkSpec.java  |  95 +++
 .../expressions/utils/ResolvedExpressionMock.java  |  76 ++
 .../apache/flink/table/api/TableSourceTest.scala   |   4 +-
 16 files changed, 2614 insertions(+), 4 deletions(-)

diff --git a/flink-python/pyflink/table/descriptors.py b/flink-python/pyflink/table/descriptors.py
index 73bae1b..f44c872 100644
--- a/flink-python/pyflink/table/descriptors.py
+++ b/flink-python/pyflink/table/descriptors.py
@@ -193,7 +193,7 @@ class Schema(Descriptor):
                         event-time attribute.
         """
         gateway = get_gateway()
-        self._j_schema = gateway.jvm.Schema()
+        self._j_schema = gateway.jvm.org.apache.flink.table.descriptors.Schema()
         super(Schema, self).__init__(self._j_schema)
 
         if schema is not None:
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java
new file mode 100644
index 0000000..e53fc4b
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Schema.UnresolvedComputedColumn;
+import org.apache.flink.table.api.Schema.UnresolvedMetadataColumn;
+import org.apache.flink.table.api.Schema.UnresolvedPhysicalColumn;
+import org.apache.flink.table.api.Schema.UnresolvedPrimaryKey;
+import org.apache.flink.table.api.Schema.UnresolvedWatermarkSpec;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Column.ComputedColumn;
+import org.apache.flink.table.catalog.Column.MetadataColumn;
+import org.apache.flink.table.catalog.Column.PhysicalColumn;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.LocalReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver.ExpressionResolverBuilder;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.table.expressions.ApiExpressionUtils.localRef;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isProctimeAttribute;
+import static org.apache.flink.table.types.utils.DataTypeUtils.replaceLogicalType;
+
+/** Default implementation of {@link SchemaResolver}. */
+@Internal
+class DefaultSchemaResolver implements SchemaResolver {
+
+    private final boolean isStreamingMode;
+    private final boolean supportsMetadata;
+    private final DataTypeFactory dataTypeFactory;
+    private final ExpressionResolverBuilder resolverBuilder;
+
+    DefaultSchemaResolver(
+            boolean isStreamingMode,
+            boolean supportsMetadata,
+            DataTypeFactory dataTypeFactory,
+            ExpressionResolverBuilder resolverBuilder) {
+        this.isStreamingMode = isStreamingMode;
+        this.supportsMetadata = supportsMetadata;
+        this.dataTypeFactory = dataTypeFactory;
+        this.resolverBuilder = resolverBuilder;
+    }
+
+    public SchemaResolver withMetadata(boolean supportsMetadata) {
+        return new DefaultSchemaResolver(
+                isStreamingMode, supportsMetadata, dataTypeFactory, resolverBuilder);
+    }
+
+    @Override
+    public ResolvedSchema resolve(Schema schema) {
+        final List<Column> columns = resolveColumns(schema.getColumns());
+
+        final List<WatermarkSpec> watermarkSpecs =
+                resolveWatermarkSpecs(schema.getWatermarkSpecs(), columns);
+
+        final List<Column> columnsWithRowtime = adjustRowtimeAttributes(watermarkSpecs, columns);
+
+        final UniqueConstraint primaryKey =
+                resolvePrimaryKey(schema.getPrimaryKey().orElse(null), columnsWithRowtime);
+
+        return new ResolvedSchema(columnsWithRowtime, watermarkSpecs, primaryKey);
+    }
+
+    @Override
+    public boolean isStreamingMode() {
+        return isStreamingMode;
+    }
+
+    @Override
+    public boolean supportsMetadata() {
+        return supportsMetadata;
+    }
+
+    // --------------------------------------------------------------------------------------------
+
+    private List<Column> resolveColumns(List<Schema.UnresolvedColumn> unresolvedColumns) {
+
+        validateDuplicateColumns(unresolvedColumns);
+
+        final Column[] resolvedColumns = new Column[unresolvedColumns.size()];
+        // process source columns first before computed columns
+        for (int pos = 0; pos < unresolvedColumns.size(); pos++) {
+            final Schema.UnresolvedColumn unresolvedColumn = unresolvedColumns.get(pos);
+            if (unresolvedColumn instanceof UnresolvedPhysicalColumn) {
+                resolvedColumns[pos] =
+                        resolvePhysicalColumn((UnresolvedPhysicalColumn) unresolvedColumn);
+            } else if (unresolvedColumn instanceof UnresolvedMetadataColumn) {
+                resolvedColumns[pos] =
+                        resolveMetadataColumn((UnresolvedMetadataColumn) unresolvedColumn);
+            } else if (!(unresolvedColumn instanceof UnresolvedComputedColumn)) {
+                throw new IllegalArgumentException(
+                        "Unknown unresolved column type: " + unresolvedColumn.getClass().getName());
+            }
+        }
+        // fill in computed columns
+        final List<Column> sourceColumns =
+                Stream.of(resolvedColumns).filter(Objects::nonNull).collect(Collectors.toList());
+        for (int pos = 0; pos < unresolvedColumns.size(); pos++) {
+            final Schema.UnresolvedColumn unresolvedColumn = unresolvedColumns.get(pos);
+            if (unresolvedColumn instanceof UnresolvedComputedColumn) {
+                resolvedColumns[pos] =
+                        resolveComputedColumn(
+                                (UnresolvedComputedColumn) unresolvedColumn, sourceColumns);
+            }
+        }
+
+        return Arrays.asList(resolvedColumns);
+    }
+
+    private PhysicalColumn resolvePhysicalColumn(UnresolvedPhysicalColumn unresolvedColumn) {
+        return Column.physical(
+                unresolvedColumn.getName(),
+                dataTypeFactory.createDataType(unresolvedColumn.getDataType()));
+    }
+
+    private MetadataColumn resolveMetadataColumn(UnresolvedMetadataColumn unresolvedColumn) {
+        if (!supportsMetadata) {
+            throw new ValidationException(
+                    "Metadata columns are not supported in a schema at the current location.");
+        }
+        return Column.metadata(
+                unresolvedColumn.getName(),
+                dataTypeFactory.createDataType(unresolvedColumn.getDataType()),
+                unresolvedColumn.getMetadataKey(),
+                unresolvedColumn.isVirtual());
+    }
+
+    private ComputedColumn resolveComputedColumn(
+            UnresolvedComputedColumn unresolvedColumn, List<Column> inputColumns) {
+        final ResolvedExpression resolvedExpression;
+        try {
+            resolvedExpression = resolveExpression(inputColumns, unresolvedColumn.getExpression());
+        } catch (Exception e) {
+            throw new ValidationException(
+                    String.format(
+                            "Invalid expression for computed column '%s'.",
+                            unresolvedColumn.getName()),
+                    e);
+        }
+        return Column.computed(unresolvedColumn.getName(), resolvedExpression);
+    }
+
+    private void validateDuplicateColumns(List<Schema.UnresolvedColumn> columns) {
+        final List<String> names =
+                columns.stream().map(Schema.UnresolvedColumn::getName).collect(Collectors.toList());
+        final List<String> duplicates =
+                names.stream()
+                        .filter(name -> Collections.frequency(names, name) > 1)
+                        .distinct()
+                        .collect(Collectors.toList());
+        if (duplicates.size() > 0) {
+            throw new ValidationException(
+                    String.format(
+                            "Schema must not contain duplicate column names. Found duplicates: %s",
+                            duplicates));
+        }
+    }
+
+    private List<WatermarkSpec> resolveWatermarkSpecs(
+            List<UnresolvedWatermarkSpec> unresolvedWatermarkSpecs, List<Column> inputColumns) {
+        if (unresolvedWatermarkSpecs.size() == 0) {
+            return Collections.emptyList();
+        }
+        if (unresolvedWatermarkSpecs.size() > 1) {
+            throw new ValidationException("Multiple watermark definitions are not supported yet.");
+        }
+        final UnresolvedWatermarkSpec watermarkSpec = unresolvedWatermarkSpecs.get(0);
+
+        // validate time attribute
+        final String timeColumn = watermarkSpec.getColumnName();
+        validateTimeColumn(timeColumn, inputColumns);
+
+        // resolve watermark expression
+        final ResolvedExpression watermarkExpression;
+        try {
+            watermarkExpression =
+                    resolveExpression(inputColumns, watermarkSpec.getWatermarkExpression());
+        } catch (Exception e) {
+            throw new ValidationException(
+                    String.format(
+                            "Invalid expression for watermark '%s'.", watermarkSpec.toString()),
+                    e);
+        }
+        validateWatermarkExpression(watermarkExpression.getOutputDataType().getLogicalType());
+
+        return Collections.singletonList(
+                new WatermarkSpec(watermarkSpec.getColumnName(), watermarkExpression));
+    }
+
+    private void validateTimeColumn(String columnName, List<Column> columns) {
+        final Optional<Column> timeColumn =
+                columns.stream().filter(c -> c.getName().equals(columnName)).findFirst();
+        if (!timeColumn.isPresent()) {
+            throw new ValidationException(
+                    String.format(
+                            "Invalid column name '%s' for rowtime attribute in watermark declaration. Available columns are: %s",
+                            columnName,
+                            columns.stream().map(Column::getName).collect(Collectors.toList())));
+        }
+        final LogicalType timeFieldType = timeColumn.get().getDataType().getLogicalType();
+        if (!hasRoot(timeFieldType, LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)
+                || getPrecision(timeFieldType) != 3) {
+            throw new ValidationException(
+                    "Invalid data type of time field for watermark definition. "
+                            + "The field must be of type TIMESTAMP(3) WITHOUT TIME ZONE.");
+        }
+        if (isProctimeAttribute(timeFieldType)) {
+            throw new ValidationException(
+                    "A watermark can not be defined for a processing-time attribute.");
+        }
+    }
+
+    private void validateWatermarkExpression(LogicalType watermarkType) {
+        if (!hasRoot(watermarkType, LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)
+                || getPrecision(watermarkType) != 3) {
+            throw new ValidationException(
+                    "Invalid data type of expression for watermark definition. "
+                            + "The field must be of type TIMESTAMP(3) WITHOUT TIME ZONE.");
+        }
+    }
+
+    /** Updates the data type of columns that are referenced by {@link WatermarkSpec}. */
+    private List<Column> adjustRowtimeAttributes(
+            List<WatermarkSpec> watermarkSpecs, List<Column> columns) {
+        return columns.stream()
+                .map(column -> adjustRowtimeAttribute(watermarkSpecs, column))
+                .collect(Collectors.toList());
+    }
+
+    private Column adjustRowtimeAttribute(List<WatermarkSpec> watermarkSpecs, Column column) {
+        final String name = column.getName();
+        final DataType dataType = column.getDataType();
+        final boolean hasWatermarkSpec =
+                watermarkSpecs.stream().anyMatch(s -> s.getRowtimeAttribute().equals(name));
+        if (hasWatermarkSpec && isStreamingMode) {
+            final TimestampType originalType = (TimestampType) dataType.getLogicalType();
+            final LogicalType rowtimeType =
+                    new TimestampType(
+                            originalType.isNullable(),
+                            TimestampKind.ROWTIME,
+                            originalType.getPrecision());
+            return column.copy(replaceLogicalType(dataType, rowtimeType));
+        }
+        return column;
+    }
+
+    private @Nullable UniqueConstraint resolvePrimaryKey(
+            @Nullable UnresolvedPrimaryKey unresolvedPrimaryKey, List<Column> columns) {
+        if (unresolvedPrimaryKey == null) {
+            return null;
+        }
+
+        final UniqueConstraint primaryKey =
+                UniqueConstraint.primaryKey(
+                        unresolvedPrimaryKey.getConstraintName(),
+                        unresolvedPrimaryKey.getColumnNames());
+
+        validatePrimaryKey(primaryKey, columns);
+
+        return primaryKey;
+    }
+
+    private void validatePrimaryKey(UniqueConstraint primaryKey, List<Column> columns) {
+        final Map<String, Column> columnsByNameLookup =
+                columns.stream().collect(Collectors.toMap(Column::getName, Function.identity()));
+
+        final Set<String> duplicateColumns =
+                primaryKey.getColumns().stream()
+                        .filter(name -> Collections.frequency(primaryKey.getColumns(), name) > 1)
+                        .collect(Collectors.toSet());
+
+        if (!duplicateColumns.isEmpty()) {
+            throw new ValidationException(
+                    String.format(
+                            "Invalid primary key '%s'. A primary key must not contain duplicate columns. Found: %s",
+                            primaryKey.getName(), duplicateColumns));
+        }
+
+        for (String columnName : primaryKey.getColumns()) {
+            Column column = columnsByNameLookup.get(columnName);
+            if (column == null) {
+                throw new ValidationException(
+                        String.format(
+                                "Invalid primary key '%s'. Column '%s' does not exist.",
+                                primaryKey.getName(), columnName));
+            }
+
+            if (!column.isPhysical()) {
+                throw new ValidationException(
+                        String.format(
+                                "Invalid primary key '%s'. Column '%s' is not a physical column.",
+                                primaryKey.getName(), columnName));
+            }
+
+            final LogicalType columnType = column.getDataType().getLogicalType();
+            if (columnType.isNullable()) {
+                throw new ValidationException(
+                        String.format(
+                                "Invalid primary key '%s'. Column '%s' is nullable.",
+                                primaryKey.getName(), columnName));
+            }
+        }
+    }
+
+    private ResolvedExpression resolveExpression(List<Column> columns, Expression expression) {
+        final LocalReferenceExpression[] localRefs =
+                columns.stream()
+                        .map(c -> localRef(c.getName(), c.getDataType()))
+                        .toArray(LocalReferenceExpression[]::new);
+        return resolverBuilder
+                .withLocalReferences(localRefs)
+                .build()
+                .resolve(Collections.singletonList(expression))
+                .get(0);
+    }
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
index 39d4a4a..5b9f830 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
@@ -308,6 +308,11 @@ public class ExpressionResolver {
         }
 
         @Override
+        public List<LocalReferenceExpression> getLocalReferences() {
+            return new ArrayList<>(localReferences.values());
+        }
+
+        @Override
         public Optional<LocalOverWindow> getOverWindow(Expression alias) {
             return Optional.ofNullable(localOverWindows.get(alias));
         }
@@ -445,7 +450,7 @@ public class ExpressionResolver {
 
         public ExpressionResolverBuilder withLocalReferences(
                 LocalReferenceExpression... localReferences) {
-            this.localReferences.addAll(Arrays.asList(localReferences));
+            this.localReferences = Arrays.asList(localReferences);
             return this;
         }
 
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveSqlCallRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveSqlCallRule.java
index f8bb3b3..cf2d74b 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveSqlCallRule.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveSqlCallRule.java
@@ -53,10 +53,15 @@ final class ResolveSqlCallRule implements ResolverRule {
             final SqlExpressionResolver resolver = resolutionContext.sqlExpressionResolver();
 
             final TableSchema.Builder builder = TableSchema.builder();
+            // input references
             resolutionContext
                     .referenceLookup()
                     .getAllInputFields()
                     .forEach(f -> builder.field(f.getName(), f.getOutputDataType()));
+            // local references
+            resolutionContext
+                    .getLocalReferences()
+                    .forEach(refs -> builder.field(refs.getName(), refs.getOutputDataType()));
             return resolver.resolveExpression(sqlCall.getSqlExpression(), builder.build());
         }
 
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java
index c82cf67..1a8c7cf 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java
@@ -84,6 +84,9 @@ public interface ResolverRule {
         /** Access to available local references. */
         Optional<LocalReferenceExpression> getLocalReference(String alias);
 
+        /** Access to available local references. */
+        List<LocalReferenceExpression> getLocalReferences();
+
         /** Access to available local over windows. */
         Optional<LocalOverWindow> getOverWindow(Expression alias);
     }
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
new file mode 100644
index 0000000..3e7c029
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.core.testutils.FlinkMatchers;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.expressions.utils.ResolvedExpressionMock;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.utils.DataTypeFactoryMock;
+import org.apache.flink.table.utils.FunctionLookupMock;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+
+import static org.apache.flink.table.api.Expressions.callSql;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isProctimeAttribute;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isRowtimeAttribute;
+import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Tests for {@link Schema}, {@link DefaultSchemaResolver}, and {@link ResolvedSchema}. */
+public class SchemaResolutionTest {
+
+    private static final String COMPUTED_SQL = "orig_ts - INTERVAL '60' MINUTE";
+
+    private static final ResolvedExpression COMPUTED_COLUMN_RESOLVED =
+            new ResolvedExpressionMock(DataTypes.TIMESTAMP(3), () -> COMPUTED_SQL);
+
+    private static final String WATERMARK_SQL = "ts - INTERVAL '5' SECOND";
+
+    private static final ResolvedExpression WATERMARK_RESOLVED =
+            new ResolvedExpressionMock(DataTypes.TIMESTAMP(3), () -> WATERMARK_SQL);
+
+    private static final String PROCTIME_SQL = "PROCTIME()";
+
+    private static final ResolvedExpression PROCTIME_RESOLVED =
+            new ResolvedExpressionMock(
+                    fromLogicalToDataType(new TimestampType(false, TimestampKind.PROCTIME, 3)),
+                    () -> PROCTIME_SQL);
+
+    private static final Schema SCHEMA =
+            Schema.newBuilder()
+                    .primaryKeyNamed("primary_constraint", "id") // out of order
+                    .column("id", DataTypes.INT().notNull())
+                    .column("counter", DataTypes.INT().notNull())
+                    .column("payload", "ROW<name STRING, age INT, flag BOOLEAN>")
+                    .columnByMetadata("topic", DataTypes.STRING(), true)
+                    .columnByExpression("ts", callSql(COMPUTED_SQL)) // out of order API expression
+                    .columnByMetadata("orig_ts", DataTypes.TIMESTAMP(3), "timestamp")
+                    .watermark("ts", WATERMARK_SQL)
+                    .columnByExpression("proctime", PROCTIME_SQL)
+                    .build();
+
+    @Test
+    public void testSchemaResolution() {
+        final ResolvedSchema expectedSchema =
+                new ResolvedSchema(
+                        Arrays.asList(
+                                Column.physical("id", DataTypes.INT().notNull()),
+                                Column.physical("counter", DataTypes.INT().notNull()),
+                                Column.physical(
+                                        "payload",
+                                        DataTypes.ROW(
+                                                DataTypes.FIELD("name", DataTypes.STRING()),
+                                                DataTypes.FIELD("age", DataTypes.INT()),
+                                                DataTypes.FIELD("flag", DataTypes.BOOLEAN()))),
+                                Column.metadata("topic", DataTypes.STRING(), true),
+                                Column.computed("ts", COMPUTED_COLUMN_RESOLVED),
+                                Column.metadata("orig_ts", DataTypes.TIMESTAMP(3), "timestamp"),
+                                Column.computed("proctime", PROCTIME_RESOLVED)),
+                        Collections.singletonList(new WatermarkSpec("ts", WATERMARK_RESOLVED)),
+                        UniqueConstraint.primaryKey(
+                                "primary_constraint", Collections.singletonList("id")));
+
+        final ResolvedSchema actualStreamSchema = resolveSchema(SCHEMA, true, true);
+        {
+            assertThat(actualStreamSchema, equalTo(expectedSchema));
+            assertTrue(isRowtimeAttribute(getType(actualStreamSchema, "ts")));
+            assertTrue(isProctimeAttribute(getType(actualStreamSchema, "proctime")));
+        }
+
+        final ResolvedSchema actualBatchSchema = resolveSchema(SCHEMA, false, true);
+        {
+            assertThat(actualBatchSchema, equalTo(expectedSchema));
+            assertFalse(isRowtimeAttribute(getType(actualBatchSchema, "ts")));
+            assertTrue(isProctimeAttribute(getType(actualBatchSchema, "proctime")));
+        }
+    }
+
+    @Test
+    public void testSchemaResolutionErrors() {
+
+        // columns
+
+        testError(
+                Schema.newBuilder().fromSchema(SCHEMA).column("id", DataTypes.STRING()).build(),
+                "Schema must not contain duplicate column names.");
+
+        testError(
+                SCHEMA,
+                "Metadata columns are not supported in a schema at the current location.",
+                true,
+                false);
+
+        testError(
+                Schema.newBuilder().columnByExpression("invalid", callSql("INVALID")).build(),
+                "Invalid expression for computed column 'invalid'.");
+
+        // time attributes and watermarks
+
+        testError(
+                Schema.newBuilder()
+                        .column("ts", DataTypes.BOOLEAN())
+                        .watermark("ts", callSql(WATERMARK_SQL))
+                        .build(),
+                "Invalid data type of time field for watermark definition. The field must be of type TIMESTAMP(3) WITHOUT TIME ZONE.");
+
+        testError(
+                Schema.newBuilder()
+                        .column("ts", DataTypes.TIMESTAMP(3))
+                        .watermark("ts", callSql("INVALID"))
+                        .build(),
+                "Invalid expression for watermark 'WATERMARK FOR `ts` AS [INVALID]'.");
+
+        testError(
+                Schema.newBuilder()
+                        .column("ts", DataTypes.TIMESTAMP(3))
+                        .watermark("other_ts", callSql(WATERMARK_SQL))
+                        .build(),
+                "Invalid column name 'other_ts' for rowtime attribute in watermark declaration. Available columns are: [ts]");
+
+        testError(
+                Schema.newBuilder().fromSchema(SCHEMA).watermark("orig_ts", WATERMARK_SQL).build(),
+                "Multiple watermark definitions are not supported yet.");
+
+        testError(
+                Schema.newBuilder()
+                        .columnByExpression("ts", PROCTIME_SQL)
+                        .watermark("ts", WATERMARK_SQL)
+                        .build(),
+                "A watermark can not be defined for a processing-time attribute.");
+
+        // primary keys
+
+        testError(
+                Schema.newBuilder().column("id", DataTypes.INT()).primaryKey("INVALID").build(),
+                "Column 'INVALID' does not exist.");
+
+        testError(
+                Schema.newBuilder()
+                        .column("nullable_col", DataTypes.INT())
+                        .primaryKey("nullable_col")
+                        .build(),
+                "Column 'nullable_col' is nullable.");
+
+        testError(
+                Schema.newBuilder()
+                        .column("orig_ts", DataTypes.TIMESTAMP(3))
+                        .columnByExpression("ts", COMPUTED_SQL)
+                        .primaryKey("ts")
+                        .build(),
+                "Column 'ts' is not a physical column.");
+
+        testError(
+                Schema.newBuilder().column("id", DataTypes.INT()).primaryKey("id", "id").build(),
+                "Invalid primary key 'PK_id_id'. A primary key must not contain duplicate columns. Found: [id]");
+    }
+
+    @Test
+    public void testUnresolvedSchemaString() {
+        assertThat(
+                SCHEMA.toString(),
+                equalTo(
+                        "(\n"
+                                + "  `id` INT NOT NULL, \n"
+                                + "  `counter` INT NOT NULL, \n"
+                                + "  `payload` [ROW<name STRING, age INT, flag BOOLEAN>], \n"
+                                + "  `topic` METADATA VIRTUAL, \n"
+                                + "  `ts` AS [orig_ts - INTERVAL '60' MINUTE], \n"
+                                + "  `orig_ts` METADATA FROM 'timestamp', \n"
+                                + "  `proctime` AS [PROCTIME()], \n"
+                                + "  WATERMARK FOR `ts` AS [ts - INTERVAL '5' SECOND], \n"
+                                + "  CONSTRAINT `primary_constraint` PRIMARY KEY (`id`) NOT ENFORCED\n"
+                                + ")"));
+    }
+
+    @Test
+    public void testResolvedSchemaString() {
+        final ResolvedSchema resolvedSchema = resolveSchema(SCHEMA);
+        assertThat(
+                resolvedSchema.toString(),
+                equalTo(
+                        "(\n"
+                                + "  `id` INT NOT NULL, \n"
+                                + "  `counter` INT NOT NULL, \n"
+                                + "  `payload` ROW<`name` STRING, `age` INT, `flag` BOOLEAN>, \n"
+                                + "  `topic` STRING METADATA VIRTUAL, \n"
+                                + "  `ts` TIMESTAMP(3) *ROWTIME* AS orig_ts - INTERVAL '60' MINUTE, \n"
+                                + "  `orig_ts` TIMESTAMP(3) METADATA FROM 'timestamp', \n"
+                                + "  `proctime` TIMESTAMP(3) NOT NULL *PROCTIME* AS PROCTIME(), \n"
+                                + "  WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - INTERVAL '5' SECOND, \n"
+                                + "  CONSTRAINT `primary_constraint` PRIMARY KEY (`id`) NOT ENFORCED\n"
+                                + ")"));
+    }
+
+    @Test
+    public void testGeneratedConstraintName() {
+        final Schema schema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.STRING())
+                        .column("c", DataTypes.STRING())
+                        .primaryKey("b", "a")
+                        .build();
+        assertThat(
+                schema.getPrimaryKey().orElseThrow(IllegalStateException::new).getConstraintName(),
+                equalTo("PK_b_a"));
+    }
+
+    @Test
+    public void testSinkRowDataType() {
+        final ResolvedSchema resolvedSchema = resolveSchema(SCHEMA);
+        final DataType expectedDataType =
+                DataTypes.ROW(
+                                DataTypes.FIELD("id", DataTypes.INT().notNull()),
+                                DataTypes.FIELD("counter", DataTypes.INT().notNull()),
+                                DataTypes.FIELD(
+                                        "payload",
+                                        DataTypes.ROW(
+                                                DataTypes.FIELD("name", DataTypes.STRING()),
+                                                DataTypes.FIELD("age", DataTypes.INT()),
+                                                DataTypes.FIELD("flag", DataTypes.BOOLEAN()))),
+                                DataTypes.FIELD("orig_ts", DataTypes.TIMESTAMP(3)))
+                        .notNull();
+        assertThat(resolvedSchema.toSinkRowDataType(), equalTo(expectedDataType));
+    }
+
+    @Test
+    public void testPhysicalRowDataType() {
+        final ResolvedSchema resolvedSchema1 = resolveSchema(SCHEMA);
+        final DataType expectedDataType =
+                DataTypes.ROW(
+                                DataTypes.FIELD("id", DataTypes.INT().notNull()),
+                                DataTypes.FIELD("counter", DataTypes.INT().notNull()),
+                                DataTypes.FIELD(
+                                        "payload",
+                                        DataTypes.ROW(
+                                                DataTypes.FIELD("name", DataTypes.STRING()),
+                                                DataTypes.FIELD("age", DataTypes.INT()),
+                                                DataTypes.FIELD("flag", DataTypes.BOOLEAN()))))
+                        .notNull();
+
+        final DataType physicalDataType1 = resolvedSchema1.toPhysicalRowDataType();
+        assertThat(physicalDataType1, equalTo(expectedDataType));
+
+        final ResolvedSchema resolvedSchema2 =
+                resolveSchema(Schema.newBuilder().fromRowDataType(physicalDataType1).build());
+        assertThat(resolvedSchema2.toPhysicalRowDataType(), equalTo(physicalDataType1));
+    }
+
+    @Test
+    public void testSourceRowDataType() {
+        final ResolvedSchema resolvedSchema = resolveSchema(SCHEMA, true, true);
+        final DataType expectedDataType =
+                DataTypes.ROW(
+                                DataTypes.FIELD("id", DataTypes.INT().notNull()),
+                                DataTypes.FIELD("counter", DataTypes.INT().notNull()),
+                                DataTypes.FIELD(
+                                        "payload",
+                                        DataTypes.ROW(
+                                                DataTypes.FIELD("name", DataTypes.STRING()),
+                                                DataTypes.FIELD("age", DataTypes.INT()),
+                                                DataTypes.FIELD("flag", DataTypes.BOOLEAN()))),
+                                DataTypes.FIELD("topic", DataTypes.STRING()),
+                                DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)),
+                                DataTypes.FIELD("orig_ts", DataTypes.TIMESTAMP(3)),
+                                DataTypes.FIELD("proctime", DataTypes.TIMESTAMP(3).notNull()))
+                        .notNull();
+        assertThat(resolvedSchema.toSourceRowDataType(), equalTo(expectedDataType));
+    }
+
+    // --------------------------------------------------------------------------------------------
+
+    private static void testError(Schema schema, String errorMessage) {
+        testError(schema, errorMessage, true, true);
+    }
+
+    private static void testError(
+            Schema schema, String errorMessage, boolean isStreaming, boolean supportsMetadata) {
+        try {
+            resolveSchema(schema, isStreaming, supportsMetadata);
+            fail("Error message expected: " + errorMessage);
+        } catch (Throwable t) {
+            assertThat(t, FlinkMatchers.containsMessage(errorMessage));
+        }
+    }
+
+    private static ResolvedSchema resolveSchema(Schema schema) {
+        return resolveSchema(schema, true, true);
+    }
+
+    private static ResolvedSchema resolveSchema(
+            Schema schema, boolean isStreamingMode, boolean supportsMetadata) {
+        final SchemaResolver resolver =
+                new DefaultSchemaResolver(
+                        isStreamingMode,
+                        supportsMetadata,
+                        dataTypeFactory(),
+                        expressionResolverBuilder());
+        return resolver.resolve(schema);
+    }
+
+    private static ExpressionResolver.ExpressionResolverBuilder expressionResolverBuilder() {
+        return ExpressionResolver.resolverFor(
+                new TableConfig(),
+                name -> Optional.empty(),
+                new FunctionLookupMock(Collections.emptyMap()),
+                dataTypeFactory(),
+                SchemaResolutionTest::resolveSqlExpression);
+    }
+
+    private static DataTypeFactory dataTypeFactory() {
+        return new DataTypeFactoryMock();
+    }
+
+    private static ResolvedExpression resolveSqlExpression(
+            String sqlExpression, TableSchema inputSchema) {
+        switch (sqlExpression) {
+            case COMPUTED_SQL:
+                assertThat(
+                        inputSchema.getFieldDataType("orig_ts").orElse(null),
+                        equalTo(DataTypes.TIMESTAMP(3)));
+                return COMPUTED_COLUMN_RESOLVED;
+            case WATERMARK_SQL:
+                assertThat(
+                        inputSchema.getFieldDataType("ts").orElse(null),
+                        equalTo(DataTypes.TIMESTAMP(3)));
+                return WATERMARK_RESOLVED;
+            case PROCTIME_SQL:
+                return PROCTIME_RESOLVED;
+            default:
+                throw new UnsupportedOperationException("Unknown SQL expression.");
+        }
+    }
+
+    private static LogicalType getType(ResolvedSchema resolvedSchema, String column) {
+        return resolvedSchema
+                .getColumn(column)
+                .orElseThrow(IllegalStateException::new)
+                .getDataType()
+                .getLogicalType();
+    }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Schema.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Schema.java
new file mode 100644
index 0000000..d749a81
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Schema.java
@@ -0,0 +1,857 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.Column.ComputedColumn;
+import org.apache.flink.table.catalog.Column.MetadataColumn;
+import org.apache.flink.table.catalog.Column.PhysicalColumn;
+import org.apache.flink.table.catalog.Constraint;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.SchemaResolver;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.catalog.WatermarkSpec;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.SqlCallExpression;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot;
+
+/**
+ * Schema of a table or view.
+ *
+ * <p>A schema represents the schema part of a {@code CREATE TABLE (schema) WITH (options)} DDL
+ * statement in SQL. It defines columns of different kind, constraints, time attributes, and
+ * watermark strategies. It is possible to reference objects (such as functions or types) across
+ * different catalogs.
+ *
+ * <p>This class is used in the API and catalogs to define an unresolved schema that will be
+ * translated to {@link ResolvedSchema}. Some methods of this class perform basic validation,
+ * however, the main validation happens during the resolution.
+ *
+ * <p>Since an instance of this class is unresolved, it should not be directly persisted. The {@link
+ * #toString()} shows only a summary of the contained objects.
+ */
+@PublicEvolving
+public final class Schema {
+
+    private final List<UnresolvedColumn> columns;
+
+    private final List<UnresolvedWatermarkSpec> watermarkSpecs;
+
+    private final @Nullable UnresolvedPrimaryKey primaryKey;
+
+    private Schema(
+            List<UnresolvedColumn> columns,
+            List<UnresolvedWatermarkSpec> watermarkSpecs,
+            @Nullable UnresolvedPrimaryKey primaryKey) {
+        this.columns = columns;
+        this.watermarkSpecs = watermarkSpecs;
+        this.primaryKey = primaryKey;
+    }
+
+    /** Builder for configuring and creating instances of {@link Schema}. */
+    public static Schema.Builder newBuilder() {
+        return new Builder();
+    }
+
+    public List<UnresolvedColumn> getColumns() {
+        return columns;
+    }
+
+    public List<UnresolvedWatermarkSpec> getWatermarkSpecs() {
+        return watermarkSpecs;
+    }
+
+    public Optional<UnresolvedPrimaryKey> getPrimaryKey() {
+        return Optional.ofNullable(primaryKey);
+    }
+
+    /** Resolves the given {@link Schema} to a validated {@link ResolvedSchema}. */
+    public ResolvedSchema resolve(SchemaResolver resolver) {
+        return resolver.resolve(this);
+    }
+
+    @Override
+    public String toString() {
+        final List<Object> components = new ArrayList<>();
+        components.addAll(columns);
+        components.addAll(watermarkSpecs);
+        if (primaryKey != null) {
+            components.add(primaryKey);
+        }
+        return components.stream()
+                .map(Objects::toString)
+                .map(s -> "  " + s)
+                .collect(Collectors.joining(", \n", "(\n", "\n)"));
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        Schema schema = (Schema) o;
+        return columns.equals(schema.columns)
+                && watermarkSpecs.equals(schema.watermarkSpecs)
+                && Objects.equals(primaryKey, schema.primaryKey);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(columns, watermarkSpecs, primaryKey);
+    }
+
+    // --------------------------------------------------------------------------------------------
+
+    /** A builder for constructing an immutable but still unresolved {@link Schema}. */
+    public static final class Builder {
+
+        private final List<UnresolvedColumn> columns;
+
+        private final List<UnresolvedWatermarkSpec> watermarkSpecs;
+
+        private @Nullable UnresolvedPrimaryKey primaryKey;
+
+        private Builder() {
+            columns = new ArrayList<>();
+            watermarkSpecs = new ArrayList<>();
+        }
+
+        /** Adopts all members from the given unresolved schema. */
+        public Builder fromSchema(Schema unresolvedSchema) {
+            columns.addAll(unresolvedSchema.columns);
+            watermarkSpecs.addAll(unresolvedSchema.watermarkSpecs);
+            if (unresolvedSchema.primaryKey != null) {
+                primaryKeyNamed(
+                        unresolvedSchema.primaryKey.getConstraintName(),
+                        unresolvedSchema.primaryKey.getColumnNames());
+            }
+            return this;
+        }
+
+        /** Adopts all members from the given resolved schema. */
+        public Builder fromResolvedSchema(ResolvedSchema resolvedSchema) {
+            addResolvedColumns(resolvedSchema.getColumns());
+            addResolvedWatermarkSpec(resolvedSchema.getWatermarkSpecs());
+            resolvedSchema.getPrimaryKey().ifPresent(this::addResolvedConstraint);
+            return this;
+        }
+
+        /** Adopts all fields of the given row as physical columns of the schema. */
+        public Builder fromRowDataType(DataType dataType) {
+            Preconditions.checkNotNull(dataType, "Data type must not be null.");
+            Preconditions.checkArgument(
+                    hasRoot(dataType.getLogicalType(), LogicalTypeRoot.ROW),
+                    "Data type of ROW expected.");
+            final List<DataType> fieldDataTypes = dataType.getChildren();
+            final List<String> fieldNames = ((RowType) dataType.getLogicalType()).getFieldNames();
+            IntStream.range(0, fieldDataTypes.size())
+                    .forEach(i -> column(fieldNames.get(i), fieldDataTypes.get(i)));
+            return this;
+        }
+
+        /**
+         * Declares a physical column that is appended to this schema.
+         *
+         * <p>Physical columns are regular columns known from databases. They define the names, the
+         * types, and the order of fields in the physical data. Thus, physical columns represent the
+         * payload that is read from and written to an external system. Connectors and formats use
+         * these columns (in the defined order) to configure themselves. Other kinds of columns can
+         * be declared between physical columns but will not influence the final physical schema.
+         *
+         * @param columnName column name
+         * @param dataType data type of the column
+         */
+        public Builder column(String columnName, AbstractDataType<?> dataType) {
+            Preconditions.checkNotNull(columnName, "Column name must not be null.");
+            Preconditions.checkNotNull(dataType, "Data type must not be null.");
+            columns.add(new UnresolvedPhysicalColumn(columnName, dataType));
+            return this;
+        }
+
+        /**
+         * Declares a physical column that is appended to this schema.
+         *
+         * <p>See {@link #column(String, AbstractDataType)} for a detailed explanation.
+         *
+         * <p>This method uses a type string that can be easily persisted in a durable catalog.
+         *
+         * @param columnName column name
+         * @param serializableTypeString data type of the column as a serializable string
+         * @see LogicalType#asSerializableString()
+         */
+        public Builder column(String columnName, String serializableTypeString) {
+            return column(columnName, DataTypes.of(serializableTypeString));
+        }
+
+        /**
+         * Declares a computed column that is appended to this schema.
+         *
+         * <p>Computed columns are virtual columns that are generated by evaluating an expression
+         * that can reference other columns declared in the same table. Both physical columns and
+         * metadata columns can be accessed. The column itself is not physically stored within the
+         * table. The column’s data type is derived automatically from the given expression and does
+         * not have to be declared manually.
+         *
+         * <p>Computed columns are commonly used for defining time attributes. For example, the
+         * computed column can be used if the original field is not TIMESTAMP(3) type or is nested
+         * in a JSON string.
+         *
+         * <p>Any scalar expression can be used for in-memory/temporary tables. However, currently,
+         * only SQL expressions can be persisted in a catalog. User-defined functions (also defined
+         * in different catalogs) are supported.
+         *
+         * <p>Example: {@code .columnByExpression("ts", $("json_obj").get("ts").cast(TIMESTAMP(3))}
+         *
+         * @param columnName column name
+         * @param expression computation of the column
+         */
+        public Builder columnByExpression(String columnName, Expression expression) {
+            Preconditions.checkNotNull(columnName, "Column name must not be null.");
+            Preconditions.checkNotNull(expression, "Expression must not be null.");
+            columns.add(new UnresolvedComputedColumn(columnName, expression));
+            return this;
+        }
+
+        /**
+         * Declares a computed column that is appended to this schema.
+         *
+         * <p>See {@link #columnByExpression(String, Expression)} for a detailed explanation.
+         *
+         * <p>This method uses a SQL expression that can be easily persisted in a durable catalog.
+         *
+         * <p>Example: {@code .columnByExpression("ts", "CAST(json_obj.ts AS TIMESTAMP(3))")}
+         *
+         * @param columnName column name
+         * @param sqlExpression computation of the column using SQL
+         */
+        public Builder columnByExpression(String columnName, String sqlExpression) {
+            return columnByExpression(columnName, new SqlCallExpression(sqlExpression));
+        }
+
+        /**
+         * Declares a metadata column that is appended to this schema.
+         *
+         * <p>Metadata columns allow to access connector and/or format specific fields for every row
+         * of a table. For example, a metadata column can be used to read and write the timestamp
+         * from and to Kafka records for time-based operations. The connector and format
+         * documentation lists the available metadata fields for every component.
+         *
+         * <p>Every metadata field is identified by a string-based key and has a documented data
+         * type. For convenience, the runtime will perform an explicit cast if the data type of the
+         * column differs from the data type of the metadata field. Of course, this requires that
+         * the two data types are compatible.
+         *
+         * <p>By default, a metadata column can be used for both reading and writing. However, in
+         * many cases an external system provides more read-only metadata fields than writable
+         * fields. Therefore, it is possible to exclude metadata columns from persisting by setting
+         * the {@code isVirtual} flag to {@code true}.
+         *
+         * <p>Note: This method assumes that the metadata key is equal to the column name.
+         *
+         * @param columnName column name
+         * @param dataType data type of the column
+         * @param isVirtual whether the column should be persisted or not
+         */
+        public Builder columnByMetadata(
+                String columnName, AbstractDataType<?> dataType, boolean isVirtual) {
+            Preconditions.checkNotNull(columnName, "Column name must not be null.");
+            Preconditions.checkNotNull(dataType, "Data type must not be null.");
+            columns.add(new UnresolvedMetadataColumn(columnName, dataType, null, isVirtual));
+            return this;
+        }
+
+        /**
+         * Declares a metadata column that is appended to this schema.
+         *
+         * <p>Metadata columns allow to access connector and/or format specific fields for every row
+         * of a table. For example, a metadata column can be used to read and write the timestamp
+         * from and to Kafka records for time-based operations. The connector and format
+         * documentation lists the available metadata fields for every component.
+         *
+         * <p>Every metadata field is identified by a string-based key and has a documented data
+         * type. The metadata key can be omitted if the column name should be used as the
+         * identifying metadata key. For convenience, the runtime will perform an explicit cast if
+         * the data type of the column differs from the data type of the metadata field. Of course,
+         * this requires that the two data types are compatible.
+         *
+         * <p>Note: This method assumes that a metadata column can be used for both reading and
+         * writing.
+         *
+         * @param columnName column name
+         * @param dataType data type of the column
+         * @param metadataKey identifying metadata key, if null the column name will be used as
+         *     metadata key
+         */
+        public Builder columnByMetadata(
+                String columnName, AbstractDataType<?> dataType, @Nullable String metadataKey) {
+            Preconditions.checkNotNull(columnName, "Column name must not be null.");
+            Preconditions.checkNotNull(dataType, "Data type must not be null.");
+            columns.add(new UnresolvedMetadataColumn(columnName, dataType, metadataKey, false));
+            return this;
+        }
+
+        /**
+         * Declares a metadata column that is appended to this schema.
+         *
+         * <p>Metadata columns allow to access connector and/or format specific fields for every row
+         * of a table. For example, a metadata column can be used to read and write the timestamp
+         * from and to Kafka records for time-based operations. The connector and format
+         * documentation lists the available metadata fields for every component.
+         *
+         * <p>Every metadata field is identified by a string-based key and has a documented data
+         * type. The metadata key can be omitted if the column name should be used as the
+         * identifying metadata key. For convenience, the runtime will perform an explicit cast if
+         * the data type of the column differs from the data type of the metadata field. Of course,
+         * this requires that the two data types are compatible.
+         *
+         * <p>By default, a metadata column can be used for both reading and writing. However, in
+         * many cases an external system provides more read-only metadata fields than writable
+         * fields. Therefore, it is possible to exclude metadata columns from persisting by setting
+         * the {@code isVirtual} flag to {@code true}.
+         *
+         * @param columnName column name
+         * @param dataType data type of the column
+         * @param metadataKey identifying metadata key, if null the column name will be used as
+         *     metadata key
+         * @param isVirtual whether the column should be persisted or not
+         */
+        public Builder columnByMetadata(
+                String columnName,
+                AbstractDataType<?> dataType,
+                @Nullable String metadataKey,
+                boolean isVirtual) {
+            Preconditions.checkNotNull(columnName, "Column name must not be null.");
+            columns.add(new UnresolvedMetadataColumn(columnName, dataType, metadataKey, isVirtual));
+            return this;
+        }
+
+        /**
+         * Declares that the given column should serve as an event-time (i.e. rowtime) attribute and
+         * specifies a corresponding watermark strategy as an expression.
+         *
+         * <p>The column must be of type {@code TIMESTAMP(3)} and be a top-level column in the
+         * schema. It may be a computed column.
+         *
+         * <p>The watermark generation expression is evaluated by the framework for every record
+         * during runtime. The framework will periodically emit the largest generated watermark. If
+         * the current watermark is still identical to the previous one, or is null, or the value of
+         * the returned watermark is smaller than that of the last emitted one, then no new
+         * watermark will be emitted. A watermark is emitted in an interval defined by the
+         * configuration.
+         *
+         * <p>Any scalar expression can be used for declaring a watermark strategy for
+         * in-memory/temporary tables. However, currently, only SQL expressions can be persisted in
+         * a catalog. The expression's return data type must be {@code TIMESTAMP(3)}. User-defined
+         * functions (also defined in different catalogs) are supported.
+         *
+         * <p>Example: {@code .watermark("ts", $("ts).minus(lit(5).seconds())}
+         *
+         * @param columnName the column name used as a rowtime attribute
+         * @param watermarkExpression the expression used for watermark generation
+         */
+        public Builder watermark(String columnName, Expression watermarkExpression) {
+            Preconditions.checkNotNull(columnName, "Column name must not be null.");
+            Preconditions.checkNotNull(
+                    watermarkExpression, "Watermark expression must not be null.");
+            this.watermarkSpecs.add(new UnresolvedWatermarkSpec(columnName, watermarkExpression));
+            return this;
+        }
+
+        /**
+         * Declares that the given column should serve as an event-time (i.e. rowtime) attribute and
+         * specifies a corresponding watermark strategy as an expression.
+         *
+         * <p>See {@link #watermark(String, Expression)} for a detailed explanation.
+         *
+         * <p>This method uses a SQL expression that can be easily persisted in a durable catalog.
+         *
+         * <p>Example: {@code .watermark("ts", "ts - INTERVAL '5' SECOND")}
+         */
+        public Builder watermark(String columnName, String sqlExpression) {
+            return watermark(columnName, new SqlCallExpression(sqlExpression));
+        }
+
+        /**
+         * Declares a primary key constraint for a set of given columns. Primary key uniquely
+         * identify a row in a table. Neither of columns in a primary can be nullable. The primary
+         * key is informational only. It will not be enforced. It can be used for optimizations. It
+         * is the data owner's responsibility to ensure uniqueness of the data.
+         *
+         * <p>The primary key will be assigned a random name.
+         *
+         * @param columnNames columns that form a unique primary key
+         */
+        public Builder primaryKey(String... columnNames) {
+            Preconditions.checkNotNull(columnNames, "Primary key column names must not be null.");
+            return primaryKey(Arrays.asList(columnNames));
+        }
+
+        /**
+         * Declares a primary key constraint for a set of given columns. Primary key uniquely
+         * identify a row in a table. Neither of columns in a primary can be nullable. The primary
+         * key is informational only. It will not be enforced. It can be used for optimizations. It
+         * is the data owner's responsibility to ensure uniqueness of the data.
+         *
+         * <p>The primary key will be assigned a generated name in the format {@code PK_col1_col2}.
+         *
+         * @param columnNames columns that form a unique primary key
+         */
+        public Builder primaryKey(List<String> columnNames) {
+            Preconditions.checkNotNull(columnNames, "Primary key column names must not be null.");
+            final String generatedConstraintName =
+                    columnNames.stream().collect(Collectors.joining("_", "PK_", ""));
+            return primaryKeyNamed(generatedConstraintName, columnNames);
+        }
+
+        /**
+         * Declares a primary key constraint for a set of given columns. Primary key uniquely
+         * identify a row in a table. Neither of columns in a primary can be nullable. The primary
+         * key is informational only. It will not be enforced. It can be used for optimizations. It
+         * is the data owner's responsibility to ensure uniqueness of the data.
+         *
+         * @param constraintName name for the primary key, can be used to reference the constraint
+         * @param columnNames columns that form a unique primary key
+         */
+        public Builder primaryKeyNamed(String constraintName, String... columnNames) {
+            Preconditions.checkNotNull(columnNames, "Primary key column names must not be null.");
+            return primaryKeyNamed(constraintName, Arrays.asList(columnNames));
+        }
+
+        /**
+         * Declares a primary key constraint for a set of given columns. Primary key uniquely
+         * identify a row in a table. Neither of columns in a primary can be nullable. The primary
+         * key is informational only. It will not be enforced. It can be used for optimizations. It
+         * is the data owner's responsibility to ensure uniqueness of the data.
+         *
+         * @param constraintName name for the primary key, can be used to reference the constraint
+         * @param columnNames columns that form a unique primary key
+         */
+        public Builder primaryKeyNamed(String constraintName, List<String> columnNames) {
+            Preconditions.checkState(
+                    primaryKey == null, "Multiple primary keys are not supported.");
+            Preconditions.checkNotNull(
+                    constraintName, "Primary key constraint name must not be null.");
+            Preconditions.checkArgument(
+                    !StringUtils.isNullOrWhitespaceOnly(constraintName),
+                    "Primary key constraint name must not be empty.");
+            Preconditions.checkArgument(
+                    columnNames != null && columnNames.size() > 0,
+                    "Primary key constraint must be defined for at least a single column.");
+            primaryKey = new UnresolvedPrimaryKey(constraintName, columnNames);
+            return this;
+        }
+
+        /** Returns an instance of an unresolved {@link Schema}. */
+        public Schema build() {
+            return new Schema(columns, watermarkSpecs, primaryKey);
+        }
+
+        // ----------------------------------------------------------------------------------------
+
+        private void addResolvedColumns(List<Column> columns) {
+            columns.forEach(
+                    c -> {
+                        if (c instanceof PhysicalColumn) {
+                            final PhysicalColumn physicalColumn = (PhysicalColumn) c;
+                            column(physicalColumn.getName(), physicalColumn.getDataType());
+                        } else if (c instanceof ComputedColumn) {
+                            final ComputedColumn computedColumn = (ComputedColumn) c;
+                            columnByExpression(
+                                    computedColumn.getName(), computedColumn.getExpression());
+                        } else if (c instanceof MetadataColumn) {
+                            final MetadataColumn metadataColumn = (MetadataColumn) c;
+                            columnByMetadata(
+                                    metadataColumn.getName(),
+                                    metadataColumn.getDataType(),
+                                    metadataColumn.getMetadataAlias().orElse(null),
+                                    metadataColumn.isVirtual());
+                        }
+                    });
+        }
+
+        private void addResolvedWatermarkSpec(List<WatermarkSpec> specs) {
+            specs.forEach(
+                    s ->
+                            watermarkSpecs.add(
+                                    new UnresolvedWatermarkSpec(
+                                            s.getRowtimeAttribute(), s.getWatermarkExpression())));
+        }
+
+        private void addResolvedConstraint(UniqueConstraint constraint) {
+            if (constraint.getType() == Constraint.ConstraintType.PRIMARY_KEY) {
+                primaryKeyNamed(constraint.getName(), constraint.getColumns());
+            } else {
+                throw new IllegalArgumentException("Unsupported constraint type.");
+            }
+        }
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Helper classes for representing the schema
+    // --------------------------------------------------------------------------------------------
+
+    /** Super class for all kinds of columns in an unresolved schema. */
+    public abstract static class UnresolvedColumn {
+        final String columnName;
+
+        UnresolvedColumn(String columnName) {
+            this.columnName = columnName;
+        }
+
+        public String getName() {
+            return columnName;
+        }
+
+        @Override
+        public String toString() {
+            return EncodingUtils.escapeIdentifier(columnName);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            UnresolvedColumn that = (UnresolvedColumn) o;
+            return columnName.equals(that.columnName);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(columnName);
+        }
+    }
+
+    /**
+     * Declaration of a physical column that will be resolved to {@link PhysicalColumn} during
+     * schema resolution.
+     */
+    public static final class UnresolvedPhysicalColumn extends UnresolvedColumn {
+
+        private final AbstractDataType<?> dataType;
+
+        UnresolvedPhysicalColumn(String columnName, AbstractDataType<?> dataType) {
+            super(columnName);
+            this.dataType = dataType;
+        }
+
+        public AbstractDataType<?> getDataType() {
+            return dataType;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("%s %s", super.toString(), dataType.toString());
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            if (!super.equals(o)) {
+                return false;
+            }
+            UnresolvedPhysicalColumn that = (UnresolvedPhysicalColumn) o;
+            return dataType.equals(that.dataType);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(super.hashCode(), dataType);
+        }
+    }
+
+    /**
+     * Declaration of a computed column that will be resolved to {@link ComputedColumn} during
+     * schema resolution.
+     */
+    public static final class UnresolvedComputedColumn extends UnresolvedColumn {
+
+        private final Expression expression;
+
+        UnresolvedComputedColumn(String columnName, Expression expression) {
+            super(columnName);
+            this.expression = expression;
+        }
+
+        public Expression getExpression() {
+            return expression;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("%s AS %s", super.toString(), expression.asSummaryString());
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            if (!super.equals(o)) {
+                return false;
+            }
+            UnresolvedComputedColumn that = (UnresolvedComputedColumn) o;
+            return expression.equals(that.expression);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(super.hashCode(), expression);
+        }
+    }
+
+    /**
+     * Declaration of a metadata column that will be resolved to {@link MetadataColumn} during
+     * schema resolution.
+     */
+    public static final class UnresolvedMetadataColumn extends UnresolvedColumn {
+
+        private final AbstractDataType<?> dataType;
+        private final @Nullable String metadataKey;
+        private final boolean isVirtual;
+
+        UnresolvedMetadataColumn(
+                String columnName,
+                AbstractDataType<?> dataType,
+                @Nullable String metadataKey,
+                boolean isVirtual) {
+            super(columnName);
+            this.dataType = dataType;
+            this.metadataKey = metadataKey;
+            this.isVirtual = isVirtual;
+        }
+
+        public AbstractDataType<?> getDataType() {
+            return dataType;
+        }
+
+        public @Nullable String getMetadataKey() {
+            return metadataKey;
+        }
+
+        public boolean isVirtual() {
+            return isVirtual;
+        }
+
+        @Override
+        public String toString() {
+            final StringBuilder sb = new StringBuilder();
+            sb.append(super.toString());
+            sb.append(" METADATA");
+            if (metadataKey != null) {
+                sb.append(" FROM '");
+                sb.append(EncodingUtils.escapeSingleQuotes(metadataKey));
+                sb.append("'");
+            }
+            if (isVirtual) {
+                sb.append(" VIRTUAL");
+            }
+            return sb.toString();
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            if (!super.equals(o)) {
+                return false;
+            }
+            UnresolvedMetadataColumn that = (UnresolvedMetadataColumn) o;
+            return isVirtual == that.isVirtual
+                    && dataType.equals(that.dataType)
+                    && Objects.equals(metadataKey, that.metadataKey);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(super.hashCode(), dataType, metadataKey, isVirtual);
+        }
+    }
+
+    /**
+     * Declaration of a watermark strategy that will be resolved to {@link WatermarkSpec} during
+     * schema resolution.
+     */
+    public static final class UnresolvedWatermarkSpec {
+
+        private final String columnName;
+        private final Expression watermarkExpression;
+
+        UnresolvedWatermarkSpec(String columnName, Expression watermarkExpression) {
+            this.columnName = columnName;
+            this.watermarkExpression = watermarkExpression;
+        }
+
+        public String getColumnName() {
+            return columnName;
+        }
+
+        public Expression getWatermarkExpression() {
+            return watermarkExpression;
+        }
+
+        @Override
+        public String toString() {
+            return String.format(
+                    "WATERMARK FOR %s AS %s",
+                    EncodingUtils.escapeIdentifier(columnName),
+                    watermarkExpression.asSummaryString());
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            UnresolvedWatermarkSpec that = (UnresolvedWatermarkSpec) o;
+            return columnName.equals(that.columnName)
+                    && watermarkExpression.equals(that.watermarkExpression);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(columnName, watermarkExpression);
+        }
+    }
+
+    /** Super class for all kinds of constraints in an unresolved schema. */
+    public abstract static class UnresolvedConstraint {
+
+        private final String constraintName;
+
+        UnresolvedConstraint(String constraintName) {
+            this.constraintName = constraintName;
+        }
+
+        public String getConstraintName() {
+            return constraintName;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("CONSTRAINT %s", EncodingUtils.escapeIdentifier(constraintName));
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            UnresolvedConstraint that = (UnresolvedConstraint) o;
+            return constraintName.equals(that.constraintName);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(constraintName);
+        }
+    }
+
+    /**
+     * Declaration of a primary key that will be resolved to {@link UniqueConstraint} during schema
+     * resolution.
+     */
+    public static final class UnresolvedPrimaryKey extends UnresolvedConstraint {
+
+        private final List<String> columnNames;
+
+        UnresolvedPrimaryKey(String constraintName, List<String> columnNames) {
+            super(constraintName);
+            this.columnNames = columnNames;
+        }
+
+        public List<String> getColumnNames() {
+            return columnNames;
+        }
+
+        @Override
+        public String toString() {
+            return String.format(
+                    "%s PRIMARY KEY (%s) NOT ENFORCED",
+                    super.toString(),
+                    columnNames.stream()
+                            .map(EncodingUtils::escapeIdentifier)
+                            .collect(Collectors.joining(", ")));
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            if (!super.equals(o)) {
+                return false;
+            }
+            UnresolvedPrimaryKey that = (UnresolvedPrimaryKey) o;
+            return columnNames.equals(that.columnNames);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(super.hashCode(), columnNames);
+        }
+    }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/AbstractConstraint.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/AbstractConstraint.java
new file mode 100644
index 0000000..51a541d
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/AbstractConstraint.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Base class for {@link Constraint constraints}. */
+@Internal
+abstract class AbstractConstraint implements Constraint {
+
+    private final String name;
+    private final boolean enforced;
+
+    AbstractConstraint(String name, boolean enforced) {
+        this.name = checkNotNull(name);
+        this.enforced = enforced;
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public boolean isEnforced() {
+        return enforced;
+    }
+
+    @Override
+    public String toString() {
+        return asSummaryString();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        AbstractConstraint that = (AbstractConstraint) o;
+        return enforced == that.enforced && Objects.equals(name, that.name);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, enforced);
+    }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Column.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Column.java
new file mode 100644
index 0000000..019a9ef
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Column.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Representation of a column in a {@link ResolvedSchema}.
+ *
+ * <p>A table column describes either a {@link PhysicalColumn}, {@link ComputedColumn}, or {@link
+ * MetadataColumn}.
+ *
+ * <p>Every column is fully resolved. The enclosed {@link DataType} indicates whether the column is
+ * a time attribute and thus might differ from the original data type.
+ */
+@PublicEvolving
+public abstract class Column {
+
+    protected final String name;
+
+    protected final DataType dataType;
+
+    private Column(String name, DataType dataType) {
+        this.name = name;
+        this.dataType = dataType;
+    }
+
+    /** Creates a regular table column that represents physical data. */
+    public static PhysicalColumn physical(String name, DataType dataType) {
+        Preconditions.checkNotNull(name, "Column name can not be null.");
+        Preconditions.checkNotNull(dataType, "Column data type can not be null.");
+        return new PhysicalColumn(name, dataType);
+    }
+
+    /** Creates a computed column that is computed from the given {@link ResolvedExpression}. */
+    public static ComputedColumn computed(String name, ResolvedExpression expression) {
+        Preconditions.checkNotNull(name, "Column name can not be null.");
+        Preconditions.checkNotNull(expression, "Column expression can not be null.");
+        return new ComputedColumn(name, expression.getOutputDataType(), expression);
+    }
+
+    /**
+     * Creates a metadata column from metadata of the given column name.
+     *
+     * <p>The column is not virtual by default.
+     */
+    public static MetadataColumn metadata(String name, DataType dataType) {
+        return metadata(name, dataType, null, false);
+    }
+
+    /**
+     * Creates a metadata column from metadata of the given column name.
+     *
+     * <p>Allows to specify whether the column is virtual or not.
+     */
+    public static MetadataColumn metadata(String name, DataType type, boolean isVirtual) {
+        return metadata(name, type, null, isVirtual);
+    }
+
+    /**
+     * Creates a metadata column from metadata of the given alias.
+     *
+     * <p>The column is not virtual by default.
+     */
+    public static MetadataColumn metadata(String name, DataType type, String metadataAlias) {
+        Preconditions.checkNotNull(metadataAlias, "Metadata alias can not be null.");
+        return metadata(name, type, metadataAlias, false);
+    }
+
+    /**
+     * Creates a metadata column from metadata of the given column name or from metadata of the
+     * given alias (if not null).
+     *
+     * <p>Allows to specify whether the column is virtual or not.
+     */
+    public static MetadataColumn metadata(
+            String name, DataType dataType, @Nullable String metadataAlias, boolean isVirtual) {
+        Preconditions.checkNotNull(name, "Column name can not be null.");
+        Preconditions.checkNotNull(dataType, "Column data type can not be null.");
+        return new MetadataColumn(name, dataType, metadataAlias, isVirtual);
+    }
+
+    /**
+     * Returns whether the given column is a physical column of a table; neither computed nor
+     * metadata.
+     */
+    public abstract boolean isPhysical();
+
+    /** Returns whether the given column is persisted in a sink operation. */
+    public abstract boolean isPersisted();
+
+    /** Returns the data type of this column. */
+    public DataType getDataType() {
+        return this.dataType;
+    }
+
+    /** Returns the name of this column. */
+    public String getName() {
+        return name;
+    }
+
+    /** Returns a string that summarizes this column for printing to a console. */
+    public String asSummaryString() {
+        final StringBuilder sb = new StringBuilder();
+        sb.append(EncodingUtils.escapeIdentifier(name));
+        sb.append(" ");
+        sb.append(dataType);
+        explainExtras()
+                .ifPresent(
+                        e -> {
+                            sb.append(" ");
+                            sb.append(e);
+                        });
+        return sb.toString();
+    }
+
+    /** Returns an explanation of specific column extras next to name and type. */
+    public abstract Optional<String> explainExtras();
+
+    /** Returns a copy of the column with a replaced {@link DataType}. */
+    public abstract Column copy(DataType newType);
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        Column that = (Column) o;
+        return Objects.equals(this.name, that.name) && Objects.equals(this.dataType, that.dataType);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(this.name, this.dataType);
+    }
+
+    @Override
+    public String toString() {
+        return asSummaryString();
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Specific kinds of columns
+    // --------------------------------------------------------------------------------------------
+
+    /** Representation of a physical column. */
+    public static final class PhysicalColumn extends Column {
+
+        private PhysicalColumn(String name, DataType dataType) {
+            super(name, dataType);
+        }
+
+        @Override
+        public boolean isPhysical() {
+            return true;
+        }
+
+        @Override
+        public boolean isPersisted() {
+            return true;
+        }
+
+        @Override
+        public Optional<String> explainExtras() {
+            return Optional.empty();
+        }
+
+        @Override
+        public Column copy(DataType newDataType) {
+            return new PhysicalColumn(name, newDataType);
+        }
+    }
+
+    /** Representation of a computed column. */
+    public static final class ComputedColumn extends Column {
+
+        private final ResolvedExpression expression;
+
+        private ComputedColumn(String name, DataType dataType, ResolvedExpression expression) {
+            super(name, dataType);
+            this.expression = expression;
+        }
+
+        @Override
+        public boolean isPhysical() {
+            return false;
+        }
+
+        @Override
+        public boolean isPersisted() {
+            return false;
+        }
+
+        public ResolvedExpression getExpression() {
+            return expression;
+        }
+
+        @Override
+        public Optional<String> explainExtras() {
+            return Optional.of("AS " + expression.asSummaryString());
+        }
+
+        @Override
+        public Column copy(DataType newDataType) {
+            return new ComputedColumn(name, newDataType, expression);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            if (!super.equals(o)) {
+                return false;
+            }
+            ComputedColumn that = (ComputedColumn) o;
+            return expression.equals(that.expression);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(super.hashCode(), expression);
+        }
+    }
+
+    /** Representation of a metadata column. */
+    public static final class MetadataColumn extends Column {
+
+        private final @Nullable String metadataAlias;
+
+        private final boolean isVirtual;
+
+        private MetadataColumn(
+                String name, DataType dataType, @Nullable String metadataAlias, boolean isVirtual) {
+            super(name, dataType);
+            this.metadataAlias = metadataAlias;
+            this.isVirtual = isVirtual;
+        }
+
+        public boolean isVirtual() {
+            return isVirtual;
+        }
+
+        public Optional<String> getMetadataAlias() {
+            return Optional.ofNullable(metadataAlias);
+        }
+
+        @Override
+        public boolean isPhysical() {
+            return false;
+        }
+
+        @Override
+        public boolean isPersisted() {
+            return !isVirtual;
+        }
+
+        @Override
+        public Optional<String> explainExtras() {
+            final StringBuilder sb = new StringBuilder();
+            sb.append("METADATA");
+            if (metadataAlias != null) {
+                sb.append(" FROM ");
+                sb.append("'");
+                sb.append(EncodingUtils.escapeSingleQuotes(metadataAlias));
+                sb.append("'");
+            }
+            if (isVirtual) {
+                sb.append(" VIRTUAL");
+            }
+            return Optional.of(sb.toString());
+        }
+
+        @Override
+        public Column copy(DataType newDataType) {
+            return new MetadataColumn(name, newDataType, metadataAlias, isVirtual);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            if (!super.equals(o)) {
+                return false;
+            }
+            MetadataColumn that = (MetadataColumn) o;
+            return isVirtual == that.isVirtual && Objects.equals(metadataAlias, that.metadataAlias);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(super.hashCode(), metadataAlias, isVirtual);
+        }
+    }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Constraint.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Constraint.java
new file mode 100644
index 0000000..dcc0dd8
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Constraint.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Integrity constraints, generally referred to simply as constraints, define the valid states of
+ * SQL-data by constraining the values in the base tables.
+ */
+@PublicEvolving
+public interface Constraint {
+
+    String getName();
+
+    /**
+     * Constraints can either be enforced or non-enforced. If a constraint is enforced it will be
+     * checked whenever any SQL statement is executed that results in data or schema changes. If the
+     * constraint is not enforced the owner of the data is responsible for ensuring data integrity.
+     * Flink will rely the information is valid and might use it for query optimisations.
+     */
+    boolean isEnforced();
+
+    /** Tells what kind of constraint it is, e.g. PRIMARY KEY, UNIQUE, ... */
+    ConstraintType getType();
+
+    /** Prints the constraint in a readable way. */
+    String asSummaryString();
+
+    /**
+     * Type of the constraint.
+     *
+     * <p>Unique constraints:
+     *
+     * <ul>
+     *   <li>UNIQUE - is satisfied if and only if there do not exist two rows that have same
+     *       non-null values in the unique columns
+     *   <li>PRIMARY KEY - additionally to UNIQUE constraint, it requires none of the values in
+     *       specified columns be a null value. Moreover there can be only a single PRIMARY KEY
+     *       defined for a Table.
+     * </ul>
+     */
+    enum ConstraintType {
+        PRIMARY_KEY,
+        UNIQUE_KEY
+    }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java
new file mode 100644
index 0000000..1a1ec01
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.ROW;
+
+/**
+ * Schema of a table or view consisting of columns, constraints, and watermark specifications.
+ *
+ * <p>This class is the result of resolving a {@link Schema} into a final validated representation.
+ *
+ * <ul>
+ *   <li>Data types and functions have been expanded to fully qualified identifiers.
+ *   <li>Time attributes are represented in the column's data type.
+ *   <li>{@link Expression}s have been translated to {@link ResolvedExpression}.
+ *   <li>{@link AbstractDataType}s have been translated to {@link DataType}.
+ * </ul>
+ */
+@PublicEvolving
+public final class ResolvedSchema {
+
+    private final List<Column> columns;
+    private final List<WatermarkSpec> watermarkSpecs;
+    private final @Nullable UniqueConstraint primaryKey;
+
+    ResolvedSchema(
+            List<Column> columns,
+            List<WatermarkSpec> watermarkSpecs,
+            @Nullable UniqueConstraint primaryKey) {
+        this.columns = Preconditions.checkNotNull(columns, "Columns must not be null.");
+        this.watermarkSpecs =
+                Preconditions.checkNotNull(watermarkSpecs, "Watermark specs must not be null.");
+        this.primaryKey = primaryKey;
+    }
+
+    /** Returns the number of {@link Column}s of this schema. */
+    public int getColumnCount() {
+        return columns.size();
+    }
+
+    /** Returns all {@link Column}s of this schema. */
+    public List<Column> getColumns() {
+        return columns;
+    }
+
+    /**
+     * Returns the {@link Column} instance for the given column index.
+     *
+     * @param columnIndex the index of the column
+     */
+    public Optional<Column> getColumn(int columnIndex) {
+        if (columnIndex < 0 || columnIndex >= columns.size()) {
+            return Optional.empty();
+        }
+        return Optional.of(this.columns.get(columnIndex));
+    }
+
+    /**
+     * Returns the {@link Column} instance for the given column name.
+     *
+     * @param columnName the name of the column
+     */
+    public Optional<Column> getColumn(String columnName) {
+        return this.columns.stream()
+                .filter(column -> column.getName().equals(columnName))
+                .findFirst();
+    }
+
+    /**
+     * Returns a list of watermark specifications each consisting of a rowtime attribute and
+     * watermark strategy expression.
+     *
+     * <p>Note: Currently, there is at most one {@link WatermarkSpec} in the list, because we don't
+     * support multiple watermark definitions yet.
+     */
+    public List<WatermarkSpec> getWatermarkSpecs() {
+        return watermarkSpecs;
+    }
+
+    /** Returns the primary key if it has been defined. */
+    public Optional<UniqueConstraint> getPrimaryKey() {
+        return Optional.ofNullable(primaryKey);
+    }
+
+    /**
+     * Converts all columns of this schema into a (possibly nested) row data type.
+     *
+     * <p>This method returns the <b>source-to-query schema</b>.
+     *
+     * <p>Note: The returned row data type contains physical, computed, and metadata columns. Be
+     * careful when using this method in a table source or table sink. In many cases, {@link
+     * #toPhysicalRowDataType()} might be more appropriate.
+     *
+     * @see DataTypes#ROW(DataTypes.Field...)
+     * @see #toPhysicalRowDataType()
+     * @see #toSinkRowDataType()
+     */
+    public DataType toSourceRowDataType() {
+        final DataTypes.Field[] fields =
+                columns.stream()
+                        .map(column -> FIELD(column.getName(), column.getDataType()))
+                        .toArray(DataTypes.Field[]::new);
+        // the row should never be null
+        return ROW(fields).notNull();
+    }
+
+    /**
+     * Converts all physical columns of this schema into a (possibly nested) row data type.
+     *
+     * <p>Note: The returned row data type contains only physical columns. It does not include
+     * computed or metadata columns.
+     *
+     * @see DataTypes#ROW(DataTypes.Field...)
+     * @see #toSourceRowDataType()
+     * @see #toSinkRowDataType()
+     */
+    public DataType toPhysicalRowDataType() {
+        final DataTypes.Field[] fields =
+                columns.stream()
+                        .filter(Column::isPhysical)
+                        .map(column -> FIELD(column.getName(), column.getDataType()))
+                        .toArray(DataTypes.Field[]::new);
+        // the row should never be null
+        return ROW(fields).notNull();
+    }
+
+    /**
+     * Converts all persisted columns of this schema into a (possibly nested) row data type.
+     *
+     * <p>This method returns the <b>query-to-sink schema</b>.
+     *
+     * <p>Note: Computed columns and virtual columns are excluded in the returned row data type. The
+     * data type contains the columns of {@link #toPhysicalRowDataType()} plus persisted metadata
+     * columns.
+     *
+     * @see DataTypes#ROW(DataTypes.Field...)
+     * @see #toSourceRowDataType()
+     * @see #toPhysicalRowDataType()
+     */
+    public DataType toSinkRowDataType() {
+        final DataTypes.Field[] fields =
+                columns.stream()
+                        .filter(Column::isPersisted)
+                        .map(column -> FIELD(column.getName(), column.getDataType()))
+                        .toArray(DataTypes.Field[]::new);
+        // the row should never be null
+        return ROW(fields).notNull();
+    }
+
+    @Override
+    public String toString() {
+        final List<Object> components = new ArrayList<>();
+        components.addAll(columns);
+        components.addAll(watermarkSpecs);
+        if (primaryKey != null) {
+            components.add(primaryKey);
+        }
+        return components.stream()
+                .map(Objects::toString)
+                .map(s -> "  " + s)
+                .collect(Collectors.joining(", \n", "(\n", "\n)"));
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final ResolvedSchema that = (ResolvedSchema) o;
+        return Objects.equals(columns, that.columns)
+                && Objects.equals(watermarkSpecs, that.watermarkSpecs)
+                && Objects.equals(primaryKey, that.primaryKey);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(columns, watermarkSpecs, primaryKey);
+    }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/SchemaResolver.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/SchemaResolver.java
new file mode 100644
index 0000000..6f2d947
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/SchemaResolver.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Schema.UnresolvedMetadataColumn;
+
+/** Resolves a {@link Schema} to a validated {@link ResolvedSchema}. */
+public interface SchemaResolver {
+
+    ResolvedSchema resolve(Schema schema);
+
+    /** Returns whether the resolution happens in streaming mode. */
+    boolean isStreamingMode();
+
+    /**
+     * Returns whether metadata columns are supported or an exception should be thrown during the
+     * resolution of {@link UnresolvedMetadataColumn}.
+     */
+    boolean supportsMetadata();
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/UniqueConstraint.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/UniqueConstraint.java
new file mode 100644
index 0000000..a60f0d2
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/UniqueConstraint.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A unique key constraint. It can be declared also as a PRIMARY KEY.
+ *
+ * @see ConstraintType
+ */
+@PublicEvolving
+public final class UniqueConstraint extends AbstractConstraint {
+
+    private final List<String> columns;
+    private final ConstraintType type;
+
+    /** Creates a non enforced {@link ConstraintType#PRIMARY_KEY} constraint. */
+    public static UniqueConstraint primaryKey(String name, List<String> columns) {
+        return new UniqueConstraint(name, false, ConstraintType.PRIMARY_KEY, columns);
+    }
+
+    private UniqueConstraint(
+            String name, boolean enforced, ConstraintType type, List<String> columns) {
+        super(name, enforced);
+
+        this.columns = checkNotNull(columns);
+        this.type = checkNotNull(type);
+    }
+
+    /** List of column names for which the primary key was defined. */
+    public List<String> getColumns() {
+        return columns;
+    }
+
+    @Override
+    public ConstraintType getType() {
+        return type;
+    }
+
+    /**
+     * Returns constraint's summary. All constraints summary will be formatted as
+     *
+     * <pre>
+     * CONSTRAINT [constraint-name] [constraint-type] ([constraint-definition])
+     *
+     * E.g CONSTRAINT pk PRIMARY KEY (f0, f1) NOT ENFORCED
+     * </pre>
+     */
+    @Override
+    public final String asSummaryString() {
+        final String typeString;
+        switch (getType()) {
+            case PRIMARY_KEY:
+                typeString = "PRIMARY KEY";
+                break;
+            case UNIQUE_KEY:
+                typeString = "UNIQUE";
+                break;
+            default:
+                throw new IllegalStateException("Unknown key type: " + getType());
+        }
+
+        return String.format(
+                "CONSTRAINT %s %s (%s)%s",
+                EncodingUtils.escapeIdentifier(getName()),
+                typeString,
+                columns.stream()
+                        .map(EncodingUtils::escapeIdentifier)
+                        .collect(Collectors.joining(", ")),
+                isEnforced() ? "" : " NOT ENFORCED");
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        UniqueConstraint that = (UniqueConstraint) o;
+        return Objects.equals(columns, that.columns) && type == that.type;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), columns, type);
+    }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/WatermarkSpec.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/WatermarkSpec.java
new file mode 100644
index 0000000..f3d1e9c
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/WatermarkSpec.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Representation of a watermark specification in a {@link ResolvedSchema}.
+ *
+ * <p>It defines the rowtime attribute and a {@link ResolvedExpression} for watermark generation.
+ */
+@PublicEvolving
+public final class WatermarkSpec {
+
+    private final String rowtimeAttribute;
+
+    private final ResolvedExpression watermarkExpression;
+
+    public WatermarkSpec(String rowtimeAttribute, ResolvedExpression watermarkExpression) {
+        this.rowtimeAttribute =
+                checkNotNull(rowtimeAttribute, "Rowtime attribute must not be null.");
+        this.watermarkExpression =
+                checkNotNull(watermarkExpression, "Watermark expression must not be null.");
+    }
+
+    /**
+     * Returns the name of a rowtime attribute.
+     *
+     * <p>The referenced attribute must be present in the {@link ResolvedSchema} and must be of
+     * {@link TimestampType}.
+     */
+    public String getRowtimeAttribute() {
+        return rowtimeAttribute;
+    }
+
+    /** Returns the {@link ResolvedExpression} for watermark generation. */
+    public ResolvedExpression getWatermarkExpression() {
+        return watermarkExpression;
+    }
+
+    public String asSummaryString() {
+        return "WATERMARK FOR "
+                + String.join(".", EncodingUtils.escapeIdentifier(rowtimeAttribute))
+                + ": "
+                + watermarkExpression.getOutputDataType()
+                + " AS "
+                + watermarkExpression.asSummaryString();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final WatermarkSpec that = (WatermarkSpec) o;
+        return rowtimeAttribute.equals(that.rowtimeAttribute)
+                && watermarkExpression.equals(that.watermarkExpression);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(rowtimeAttribute, watermarkExpression);
+    }
+
+    @Override
+    public String toString() {
+        return asSummaryString();
+    }
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/expressions/utils/ResolvedExpressionMock.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/expressions/utils/ResolvedExpressionMock.java
new file mode 100644
index 0000000..533ee2a
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/expressions/utils/ResolvedExpressionMock.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions.utils;
+
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionVisitor;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Supplier;
+
+/** {@link ResolvedExpression} mock for testing purposes. */
+public class ResolvedExpressionMock implements ResolvedExpression {
+
+    private final DataType outputDataType;
+
+    private final Supplier<String> stringRepresentation;
+
+    public ResolvedExpressionMock(DataType outputDataType, Supplier<String> stringRepresentation) {
+        this.outputDataType = outputDataType;
+        this.stringRepresentation = stringRepresentation;
+    }
+
+    @Override
+    public String asSummaryString() {
+        return stringRepresentation.get();
+    }
+
+    @Override
+    public List<Expression> getChildren() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public <R> R accept(ExpressionVisitor<R> visitor) {
+        return visitor.visit(this);
+    }
+
+    @Override
+    public String asSerializableString() {
+        return stringRepresentation.get();
+    }
+
+    @Override
+    public DataType getOutputDataType() {
+        return outputDataType;
+    }
+
+    @Override
+    public List<ResolvedExpression> getResolvedChildren() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public String toString() {
+        return asSummaryString();
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
index 247a442..f04e529 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
@@ -21,8 +21,8 @@ package org.apache.flink.table.api
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.table.api.internal.TableEnvironmentInternal
+import org.apache.flink.table.descriptors.ConnectorDescriptor
 import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR
-import org.apache.flink.table.descriptors.{ConnectorDescriptor, Schema}
 import org.apache.flink.table.expressions.utils._
 import org.apache.flink.table.runtime.utils.CommonTestData
 import org.apache.flink.table.sources.{CsvTableSource, TableSource}
@@ -394,7 +394,7 @@ class TableSourceTest extends TableTestBase {
         context
       }
     }).withSchema(
-        new Schema()
+        new org.apache.flink.table.descriptors.Schema()
           .schema(TableSchema.builder()
             .field("id", DataTypes.INT())
             .field("name", DataTypes.STRING())