You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/05/08 06:14:03 UTC

[flink] branch master updated: [FLINK-12365][table] Add stats related catalog APIs

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 12d698b  [FLINK-12365][table] Add stats related catalog APIs
12d698b is described below

commit 12d698b6e1c1aef03834963fba8a0974b3729c50
Author: xuefuz <xu...@users.noreply.github.com>
AuthorDate: Tue May 7 23:13:50 2019 -0700

    [FLINK-12365][table] Add stats related catalog APIs
    
    This closes #8314
---
 .../catalog/hive/GenericHiveMetastoreCatalog.java  |  52 ++++++++
 .../flink/table/catalog/hive/HiveCatalog.java      |  43 ++++++
 .../hive/util/GenericHiveMetastoreCatalogUtil.java |   3 +-
 .../hive/GenericHiveMetastoreCatalogTest.java      |   7 -
 .../flink/table/catalog/GenericCatalogTable.java   |  15 +--
 .../table/catalog/GenericInMemoryCatalog.java      | 148 +++++++++++++++++++++
 .../table/catalog/GenericInMemoryCatalogTest.java  |  76 ++++++++++-
 .../apache/flink/table/catalog/CatalogTable.java   |   8 --
 .../flink/table/catalog/ReadableCatalog.java       |  56 +++++++-
 .../table/catalog/ReadableWritableCatalog.java     |  67 ++++++++++
 .../catalog/stats/CatalogColumnStatistics.java     |  71 ++++++++++
 .../CatalogColumnStatisticsDataBase.java}          |  47 ++++---
 .../stats/CatalogColumnStatisticsDataBinary.java   |  62 +++++++++
 .../stats/CatalogColumnStatisticsDataBoolean.java  |  62 +++++++++
 .../stats/CatalogColumnStatisticsDataDate.java     |  73 ++++++++++
 .../stats/CatalogColumnStatisticsDataDouble.java   |  73 ++++++++++
 .../stats/CatalogColumnStatisticsDataLong.java     |  73 ++++++++++
 .../stats/CatalogColumnStatisticsDataString.java   |  73 ++++++++++
 .../catalog/stats/CatalogTableStatistics.java      |  98 ++++++++++++++
 .../catalog/{CatalogTable.java => stats/Date.java} |  39 +++---
 .../flink/table/catalog/CatalogTestUtil.java       | 110 ++++++++++++++-
 21 files changed, 1169 insertions(+), 87 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
index 3156107..843b4a2 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
@@ -37,6 +37,8 @@ import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 import org.apache.flink.table.catalog.hive.util.GenericHiveMetastoreCatalogUtil;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.TableType;
@@ -311,4 +313,54 @@ public class GenericHiveMetastoreCatalog extends HiveCatalogBase {
 	public boolean functionExists(ObjectPath functionPath) throws CatalogException {
 		throw new UnsupportedOperationException();
 	}
+
+	// ------ statistics ------
+
+	@Override
+	public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists)
+			throws TableNotExistException, CatalogException {
+
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists)
+			throws TableNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics,
+			boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics,
+			boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+			throws PartitionNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+			throws PartitionNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
 }
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 7f96852..e1c2c30 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -35,6 +35,8 @@ import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Database;
@@ -194,6 +196,26 @@ public class HiveCatalog extends HiveCatalogBase {
 	}
 
 	@Override
+	public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
+
+	}
+
+	@Override
+	public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
+
+	}
+
+	@Override
+	public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+
+	}
+
+	@Override
+	public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+
+	}
+
+	@Override
 	public List<String> listFunctions(String dbName) throws DatabaseNotExistException, CatalogException {
 		throw new UnsupportedOperationException();
 	}
@@ -207,4 +229,25 @@ public class HiveCatalog extends HiveCatalogBase {
 	public boolean functionExists(ObjectPath functionPath) throws CatalogException {
 		throw new UnsupportedOperationException();
 	}
+
+	@Override
+	public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
 }
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/GenericHiveMetastoreCatalogUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/GenericHiveMetastoreCatalogUtil.java
index f3927f9..a88f083 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/GenericHiveMetastoreCatalogUtil.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/GenericHiveMetastoreCatalogUtil.java
@@ -30,7 +30,6 @@ import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.HiveCatalogBaseUtil;
 import org.apache.flink.table.catalog.hive.HiveTableConfig;
 import org.apache.flink.table.catalog.hive.HiveTypeUtil;
-import org.apache.flink.table.plan.stats.TableStats;
 
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.Database;
@@ -180,7 +179,7 @@ public class GenericHiveMetastoreCatalogUtil {
 			);
 		} else {
 			return new GenericCatalogTable(
-				tableSchema, new TableStats(0), partitionKeys, properties, comment);
+				tableSchema, partitionKeys, properties, comment);
 		}
 	}
 
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
index fb89e53..390ab1d 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
@@ -31,7 +31,6 @@ import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.GenericCatalogDatabase;
 import org.apache.flink.table.catalog.GenericCatalogTable;
 import org.apache.flink.table.catalog.GenericCatalogView;
-import org.apache.flink.table.plan.stats.TableStats;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -82,7 +81,6 @@ public class GenericHiveMetastoreCatalogTest extends CatalogTestBase {
 
 		CatalogTable table = new GenericCatalogTable(
 			new TableSchema(colNames, types),
-			new TableStats(0),
 			getBatchTableProperties(),
 			TEST_COMMENT
 		);
@@ -122,7 +120,6 @@ public class GenericHiveMetastoreCatalogTest extends CatalogTestBase {
 	public CatalogTable createTable() {
 		return new GenericCatalogTable(
 			createTableSchema(),
-			new TableStats(0),
 			getBatchTableProperties(),
 			TEST_COMMENT);
 	}
@@ -131,7 +128,6 @@ public class GenericHiveMetastoreCatalogTest extends CatalogTestBase {
 	public CatalogTable createAnotherTable() {
 		return new GenericCatalogTable(
 			createAnotherTableSchema(),
-			new TableStats(0),
 			getBatchTableProperties(),
 			TEST_COMMENT);
 	}
@@ -140,7 +136,6 @@ public class GenericHiveMetastoreCatalogTest extends CatalogTestBase {
 	public CatalogTable createStreamingTable() {
 		return new GenericCatalogTable(
 			createTableSchema(),
-			new TableStats(0),
 			getStreamingTableProperties(),
 			TEST_COMMENT);
 	}
@@ -149,7 +144,6 @@ public class GenericHiveMetastoreCatalogTest extends CatalogTestBase {
 	public CatalogTable createPartitionedTable() {
 		return new GenericCatalogTable(
 			createTableSchema(),
-			new TableStats(0),
 			createPartitionKeys(),
 			getBatchTableProperties(),
 			TEST_COMMENT);
@@ -159,7 +153,6 @@ public class GenericHiveMetastoreCatalogTest extends CatalogTestBase {
 	public CatalogTable createAnotherPartitionedTable() {
 		return new GenericCatalogTable(
 			createAnotherTableSchema(),
-			new TableStats(0),
 			createPartitionKeys(),
 			getBatchTableProperties(),
 			TEST_COMMENT);
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
index 73c2dbc..56d8377 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.catalog;
 
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.plan.stats.TableStats;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -35,8 +34,6 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 public class GenericCatalogTable implements CatalogTable {
 	// Schema of the table (column names and types)
 	private final TableSchema tableSchema;
-	// Statistics of the table
-	private final TableStats tableStats;
 	// Partition keys if this is a partitioned table. It's an empty set if the table is not partitioned
 	private final List<String> partitionKeys;
 	// Properties of the table
@@ -46,12 +43,10 @@ public class GenericCatalogTable implements CatalogTable {
 
 	public GenericCatalogTable(
 			TableSchema tableSchema,
-			TableStats tableStats,
 			List<String> partitionKeys,
 			Map<String, String> properties,
 			String comment) {
 		this.tableSchema = checkNotNull(tableSchema, "tableSchema cannot be null");
-		this.tableStats = checkNotNull(tableStats, "tableStats cannot be null");
 		this.partitionKeys = checkNotNull(partitionKeys, "partitionKeys cannot be null");
 		this.properties = checkNotNull(properties, "properties cannot be null");
 		this.comment = comment;
@@ -59,15 +54,9 @@ public class GenericCatalogTable implements CatalogTable {
 
 	public GenericCatalogTable(
 			TableSchema tableSchema,
-			TableStats tableStats,
 			Map<String, String> properties,
 			String description) {
-		this(tableSchema, tableStats, new ArrayList<>(), properties, description);
-	}
-
-	@Override
-	public TableStats getStatistics() {
-		return this.tableStats;
+		this(tableSchema, new ArrayList<>(), properties, description);
 	}
 
 	@Override
@@ -98,7 +87,7 @@ public class GenericCatalogTable implements CatalogTable {
 	@Override
 	public GenericCatalogTable copy() {
 		return new GenericCatalogTable(
-			this.tableSchema.copy(), this.tableStats.copy(), new ArrayList<>(partitionKeys), new HashMap<>(this.properties), comment);
+			this.tableSchema.copy(), new ArrayList<>(partitionKeys), new HashMap<>(this.properties), comment);
 	}
 
 	@Override
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
index dadd390..19655ce 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
@@ -30,6 +30,8 @@ import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.util.StringUtils;
 
 import java.util.ArrayList;
@@ -57,6 +59,11 @@ public class GenericInMemoryCatalog implements ReadableWritableCatalog {
 	private final Map<ObjectPath, CatalogFunction> functions;
 	private final Map<ObjectPath, Map<CatalogPartitionSpec, CatalogPartition>> partitions;
 
+	private final Map<ObjectPath, CatalogTableStatistics> tableStats;
+	private final Map<ObjectPath, CatalogColumnStatistics> tableColumnStats;
+	private final Map<ObjectPath, Map<CatalogPartitionSpec, CatalogTableStatistics>> partitionStats;
+	private final Map<ObjectPath, Map<CatalogPartitionSpec, CatalogColumnStatistics>> partitionColumnStats;
+
 	public GenericInMemoryCatalog(String name) {
 		checkArgument(!StringUtils.isNullOrWhitespaceOnly(name), "name cannot be null or empty");
 
@@ -66,6 +73,10 @@ public class GenericInMemoryCatalog implements ReadableWritableCatalog {
 		this.tables = new LinkedHashMap<>();
 		this.functions = new LinkedHashMap<>();
 		this.partitions = new LinkedHashMap<>();
+		this.tableStats = new LinkedHashMap<>();
+		this.tableColumnStats = new LinkedHashMap<>();
+		this.partitionStats = new LinkedHashMap<>();
+		this.partitionColumnStats = new LinkedHashMap<>();
 	}
 
 	@Override
@@ -193,6 +204,8 @@ public class GenericInMemoryCatalog implements ReadableWritableCatalog {
 
 			if (isPartitionedTable(tablePath)) {
 				partitions.put(tablePath, new LinkedHashMap<>());
+				partitionStats.put(tablePath, new LinkedHashMap<>());
+				partitionColumnStats.put(tablePath, new LinkedHashMap<>());
 			}
 		}
 	}
@@ -222,8 +235,12 @@ public class GenericInMemoryCatalog implements ReadableWritableCatalog {
 
 		if (tableExists(tablePath)) {
 			tables.remove(tablePath);
+			tableStats.remove(tablePath);
+			tableColumnStats.remove(tablePath);
 
 			partitions.remove(tablePath);
+			partitionStats.remove(tablePath);
+			partitionColumnStats.remove(tablePath);
 		} else if (!ignoreIfNotExists) {
 			throw new TableNotExistException(catalogName, tablePath);
 		}
@@ -243,9 +260,30 @@ public class GenericInMemoryCatalog implements ReadableWritableCatalog {
 			} else {
 				tables.put(newPath, tables.remove(tablePath));
 
+				// table statistics
+				if (tableStats.containsKey(tablePath)) {
+					tableStats.put(newPath, tableStats.remove(tablePath));
+				}
+
+				// table column statistics
+				if (tableColumnStats.containsKey(tablePath)) {
+					tableColumnStats.put(newPath, tableColumnStats.remove(tablePath));
+				}
+
+				// partitions
 				if (partitions.containsKey(tablePath)) {
 					partitions.put(newPath, partitions.remove(tablePath));
 				}
+
+				// partition statistics
+				if (partitionStats.containsKey(tablePath)) {
+					partitionStats.put(newPath, partitionStats.remove(tablePath));
+				}
+
+				// partition column statistics
+				if (partitionColumnStats.containsKey(tablePath)) {
+					partitionColumnStats.put(newPath, partitionColumnStats.remove(tablePath));
+				}
 			}
 		} else if (!ignoreIfNotExists) {
 			throw new TableNotExistException(catalogName, tablePath);
@@ -411,6 +449,8 @@ public class GenericInMemoryCatalog implements ReadableWritableCatalog {
 
 		if (partitionExists(tablePath, partitionSpec)) {
 			partitions.get(tablePath).remove(partitionSpec);
+			partitionStats.get(tablePath).remove(partitionSpec);
+			partitionColumnStats.get(tablePath).remove(partitionSpec);
 		} else if (!ignoreIfNotExists) {
 			throw new PartitionNotExistException(catalogName, tablePath, partitionSpec);
 		}
@@ -543,4 +583,112 @@ public class GenericInMemoryCatalog implements ReadableWritableCatalog {
 		return (table instanceof CatalogTable) && ((CatalogTable) table).isPartitioned();
 	}
 
+	// ------ statistics ------
+
+	@Override
+	public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws TableNotExistException {
+		checkNotNull(tablePath);
+
+		if (!tableExists(tablePath)) {
+			throw new TableNotExistException(catalogName, tablePath);
+		}
+
+		CatalogTableStatistics result = tableStats.get(tablePath);
+		return result != null ? result.copy() : CatalogTableStatistics.UNKNOWN;
+	}
+
+	@Override
+	public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) throws TableNotExistException {
+		checkNotNull(tablePath);
+
+		if (!tableExists(tablePath)) {
+			throw new TableNotExistException(catalogName, tablePath);
+		}
+
+		CatalogColumnStatistics result = tableColumnStats.get(tablePath);
+		return result != null ? result.copy() : CatalogColumnStatistics.UNKNOWN;
+	}
+
+	@Override
+	public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+			throws PartitionNotExistException {
+		checkNotNull(tablePath);
+		checkNotNull(partitionSpec);
+
+		if (!partitionExists(tablePath, partitionSpec)) {
+			throw new PartitionNotExistException(catalogName, tablePath, partitionSpec);
+		}
+
+		CatalogTableStatistics result = partitionStats.get(tablePath).get(partitionSpec);
+		return result != null ? result.copy() : CatalogTableStatistics.UNKNOWN;
+	}
+
+	@Override
+	public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+			throws PartitionNotExistException {
+		checkNotNull(tablePath);
+		checkNotNull(partitionSpec);
+
+		if (!partitionExists(tablePath, partitionSpec)) {
+			throw new PartitionNotExistException(catalogName, tablePath, partitionSpec);
+		}
+
+		CatalogColumnStatistics result = partitionColumnStats.get(tablePath).get(partitionSpec);
+		return result != null ? result.copy() : CatalogColumnStatistics.UNKNOWN;
+	}
+
+	@Override
+	public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists)
+			throws TableNotExistException {
+		checkNotNull(tablePath);
+		checkNotNull(tableStatistics);
+
+		if (tableExists(tablePath)) {
+			tableStats.put(tablePath, tableStatistics.copy());
+		} else if (!ignoreIfNotExists) {
+			throw new TableNotExistException(catalogName, tablePath);
+		}
+	}
+
+	@Override
+	public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics,
+			boolean ignoreIfNotExists) throws TableNotExistException {
+		checkNotNull(tablePath);
+		checkNotNull(columnStatistics);
+
+		if (tableExists(tablePath)) {
+			tableColumnStats.put(tablePath, columnStatistics.copy());
+		} else if (!ignoreIfNotExists) {
+			throw new TableNotExistException(catalogName, tablePath);
+		}
+	}
+
+	@Override
+	public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics,
+			boolean ignoreIfNotExists) throws PartitionNotExistException {
+		checkNotNull(tablePath);
+		checkNotNull(partitionSpec);
+		checkNotNull(partitionStatistics);
+
+		if (partitionExists(tablePath, partitionSpec)) {
+			partitionStats.get(tablePath).put(partitionSpec, partitionStatistics.copy());
+		} else if (!ignoreIfNotExists) {
+			throw new PartitionNotExistException(catalogName, tablePath, partitionSpec);
+		}
+	}
+
+	@Override
+	public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics,
+			boolean ignoreIfNotExists) throws PartitionNotExistException {
+		checkNotNull(tablePath);
+		checkNotNull(partitionSpec);
+		checkNotNull(columnStatistics);
+
+		if (partitionExists(tablePath, partitionSpec)) {
+			partitionColumnStats.get(tablePath).put(partitionSpec, columnStatistics.copy());
+		} else if (!ignoreIfNotExists) {
+			throw new PartitionNotExistException(catalogName, tablePath, partitionSpec);
+		}
+	}
+
 }
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
index 43d9dda..6fabb78 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
@@ -26,8 +26,17 @@ import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
 import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBase;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBinary;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBoolean;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDate;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDouble;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataLong;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataString;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.catalog.stats.Date;
 import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.table.plan.stats.TableStats;
 
 import org.junit.After;
 import org.junit.BeforeClass;
@@ -575,6 +584,47 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 		catalog.dropDatabase(db1, false);
 	}
 
+	// ------ statistics ------
+
+	@Test
+	public void testStatistics() throws Exception {
+		// Table related
+		catalog.createDatabase(db1, createDb(), false);
+		CatalogTable table = createTable();
+		catalog.createTable(path1, table, false);
+
+		CatalogTestUtil.checkEquals(catalog.getTableStatistics(path1), CatalogTableStatistics.UNKNOWN);
+		CatalogTestUtil.checkEquals(catalog.getTableColumnStatistics(path1), CatalogColumnStatistics.UNKNOWN);
+
+		CatalogTableStatistics tableStatistics = new CatalogTableStatistics(5, 2, 100, 575);
+		catalog.alterTableStatistics(path1, tableStatistics, false);
+		CatalogTestUtil.checkEquals(tableStatistics, catalog.getTableStatistics(path1));
+		CatalogColumnStatistics columnStatistics = createColumnStats();
+		catalog.alterTableColumnStatistics(path1, columnStatistics, false);
+		CatalogTestUtil.checkEquals(columnStatistics, catalog.getTableColumnStatistics(path1));
+
+		// Partition related
+		catalog.createDatabase(db2, createDb(), false);
+		CatalogTable table2 = createPartitionedTable();
+		catalog.createTable(path2, table2, false);
+		CatalogPartitionSpec partitionSpec = createPartitionSpec();
+		catalog.createPartition(path2, partitionSpec, createPartition(), false);
+
+		CatalogTestUtil.checkEquals(catalog.getPartitionStatistics(path2, partitionSpec), CatalogTableStatistics.UNKNOWN);
+		CatalogTestUtil.checkEquals(catalog.getPartitionColumnStatistics(path2, partitionSpec), CatalogColumnStatistics.UNKNOWN);
+
+		catalog.alterPartitionStatistics(path2, partitionSpec, tableStatistics, false);
+		CatalogTestUtil.checkEquals(tableStatistics, catalog.getPartitionStatistics(path2, partitionSpec));
+		catalog.alterPartitionColumnStatistics(path2, partitionSpec, columnStatistics, false);
+		CatalogTestUtil.checkEquals(columnStatistics, catalog.getPartitionColumnStatistics(path2, partitionSpec));
+
+		// Clean up
+		catalog.dropTable(path1, false);
+		catalog.dropDatabase(db1, false);
+		catalog.dropTable(path2, false);
+		catalog.dropDatabase(db2, false);
+	}
+
 	// ------ utilities ------
 
 	@Override
@@ -604,7 +654,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 	public GenericCatalogTable createStreamingTable() {
 		return new GenericCatalogTable(
 			createTableSchema(),
-			new TableStats(0),
 			getStreamingTableProperties(),
 			TEST_COMMENT);
 	}
@@ -613,7 +662,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 	public CatalogTable createTable() {
 		return new GenericCatalogTable(
 			createTableSchema(),
-			new TableStats(0),
 			getBatchTableProperties(),
 			TEST_COMMENT);
 	}
@@ -622,7 +670,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 	public CatalogTable createAnotherTable() {
 		return new GenericCatalogTable(
 			createAnotherTableSchema(),
-			new TableStats(0),
 			getBatchTableProperties(),
 			TEST_COMMENT);
 	}
@@ -631,7 +678,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 	public CatalogTable createPartitionedTable() {
 		return new GenericCatalogTable(
 			createTableSchema(),
-			new TableStats(0),
 			createPartitionKeys(),
 			getBatchTableProperties(),
 			TEST_COMMENT);
@@ -641,7 +687,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 	public CatalogTable createAnotherPartitionedTable() {
 		return new GenericCatalogTable(
 			createAnotherTableSchema(),
-			new TableStats(0),
 			createPartitionKeys(),
 			getBatchTableProperties(),
 			TEST_COMMENT);
@@ -709,6 +754,24 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 			"This is another view");
 	}
 
+	private CatalogColumnStatistics createColumnStats() {
+		CatalogColumnStatisticsDataBoolean booleanColStats = new CatalogColumnStatisticsDataBoolean(55L, 45L, 5L);
+		CatalogColumnStatisticsDataLong longColStats = new CatalogColumnStatisticsDataLong(-123L, 763322L, 23L, 79L);
+		CatalogColumnStatisticsDataString stringColStats = new CatalogColumnStatisticsDataString(152L, 43.5D, 20L, 0L);
+		CatalogColumnStatisticsDataDate dateColStats = new CatalogColumnStatisticsDataDate(new Date(71L),
+			new Date(17923L), 1321, 0L);
+		CatalogColumnStatisticsDataDouble doubleColStats = new CatalogColumnStatisticsDataDouble(-123.35D, 7633.22D, 23L, 79L);
+		CatalogColumnStatisticsDataBinary binaryColStats = new CatalogColumnStatisticsDataBinary(755L, 43.5D, 20L);
+		Map<String, CatalogColumnStatisticsDataBase> colStatsMap = new HashMap<>(6);
+		colStatsMap.put("b1", booleanColStats);
+		colStatsMap.put("l2", longColStats);
+		colStatsMap.put("s3", stringColStats);
+		colStatsMap.put("d4", dateColStats);
+		colStatsMap.put("dd5", doubleColStats);
+		colStatsMap.put("bb6", binaryColStats);
+		return new CatalogColumnStatistics(colStatsMap);
+	}
+
 	protected CatalogFunction createFunction() {
 		return new GenericCatalogFunction(MyScalarFunction.class.getName());
 	}
@@ -734,4 +797,5 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 			return String.valueOf(i);
 		}
 	}
+
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
index 545fad9..e38e930 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.catalog;
 
-import org.apache.flink.table.plan.stats.TableStats;
-
 import java.util.List;
 
 /**
@@ -27,12 +25,6 @@ import java.util.List;
  */
 public interface CatalogTable extends CatalogBaseTable {
 	/**
-	 * Get the statistics of the table.
-	 * @return table statistics
-	 */
-	TableStats getStatistics();
-
-	/**
 	 * Check if the table is partitioned or not.
 	 *
 	 * @return true if the table is partitioned; otherwise, false
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
index 5586348..15adabb 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
@@ -24,6 +24,8 @@ import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
 import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 
 import java.util.List;
 
@@ -194,12 +196,12 @@ public interface ReadableCatalog {
 	/**
 	 * List the names of all functions in the given database. An empty list is returned if none is registered.
 	 *
-	 * @param dbName name of the database.
+	 * @param databaseName name of the database.
 	 * @return a list of the names of the functions in this database
 	 * @throws DatabaseNotExistException if the database does not exist
 	 * @throws CatalogException in case of any runtime exception
 	 */
-	List<String> listFunctions(String dbName) throws DatabaseNotExistException, CatalogException;
+	List<String> listFunctions(String databaseName) throws DatabaseNotExistException, CatalogException;
 
 	/**
 	 * Get the function.
@@ -221,4 +223,54 @@ public interface ReadableCatalog {
 	 */
 	boolean functionExists(ObjectPath functionPath) throws CatalogException;
 
+	// ------ statistics ------
+
+	/**
+	 * Get the statistics of a table.
+	 *
+	 * @param tablePath path of the table
+	 * @return statistics of the given table
+	 *
+	 * @throws TableNotExistException if the table does not exist in the catalog
+	 * @throws CatalogException	in case of any runtime exception
+	 */
+	CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException;
+
+	/**
+	 * Get the column statistics of a table.
+	 *
+	 * @param tablePath path of the table
+	 * @return column statistics of the given table
+	 *
+	 * @throws TableNotExistException if the table does not exist in the catalog
+	 * @throws CatalogException	in case of any runtime exception
+	 */
+	CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException;
+
+	/**
+	 * Get the statistics of a partition.
+	 *
+	 * @param tablePath path of the table
+	 * @param partitionSpec partition spec of the partition
+	 * @return statistics of the given partition
+	 *
+	 * @throws PartitionNotExistException if the partition does not exist
+	 * @throws CatalogException	in case of any runtime exception
+	 */
+	CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+		throws PartitionNotExistException, CatalogException;
+
+	/**
+	 * Get the column statistics of a partition.
+	 *
+	 * @param tablePath path of the table
+	 * @param partitionSpec partition spec of the partition
+	 * @return column statistics of the given partition
+	 *
+	 * @throws PartitionNotExistException if the partition does not exist
+	 * @throws CatalogException	in case of any runtime exception
+	 */
+	CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+		throws PartitionNotExistException, CatalogException;
+
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
index 60bc93d..a398b72 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
@@ -30,6 +30,8 @@ import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 
 /**
  * An interface responsible for manipulating catalog metadata.
@@ -233,4 +235,69 @@ public interface ReadableWritableCatalog extends ReadableCatalog {
 	 */
 	void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
 		throws FunctionNotExistException, CatalogException;
+
+	// ------ statistics ------
+
+	/**
+	 * Update the statistics of a table.
+	 *
+	 * @param tablePath path of the table
+	 * @param tableStatistics new statistics to update
+	 * @param ignoreIfNotExists flag to specify behavior if the table does not exist:
+	 *                          if set to false, throw an exception,
+	 *                          if set to true, nothing happens.
+	 *
+	 * @throws TableNotExistException if the table does not exist in the catalog
+	 * @throws CatalogException	in case of any runtime exception
+	 */
+	void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists)
+		throws TableNotExistException, CatalogException;
+
+	/**
+	 * Update the column statistics of a table.
+	 *
+	 * @param tablePath path of the table
+	 * @param columnStatistics new column statistics to update
+	 * @param ignoreIfNotExists flag to specify behavior if the table does not exist:
+	 *                          if set to false, throw an exception,
+	 *                          if set to true, nothing happens.
+	 *
+	 * @throws TableNotExistException if the table does not exist in the catalog
+	 * @throws CatalogException	in case of any runtime exception
+	 */
+	void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists)
+		throws TableNotExistException, CatalogException;
+
+	/**
+	 * Update the statistics of a table partition.
+	 *
+	 * @param tablePath path of the table
+	 * @param partitionSpec partition spec of the partition
+	 * @param partitionStatistics new statistics to update
+	 * @param ignoreIfNotExists flag to specify behavior if the partition does not exist:
+	 *                          if set to false, throw an exception,
+	 *                          if set to true, nothing happens.
+	 *
+	 * @throws PartitionNotExistException if the partition does not exist
+	 * @throws CatalogException	in case of any runtime exception
+	 */
+	void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics,
+		boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException;
+
+	/**
+	 * Update the column statistics of a table partition.
+	 *
+	 * @param tablePath path of the table
+	 * @param partitionSpec partition spec of the partition
+	 * @@param columnStatistics new column statistics to update
+	 * @param ignoreIfNotExists flag to specify behavior if the partition does not exist:
+	 *                          if set to false, throw an exception,
+	 *                          if set to true, nothing happens.
+	 *
+	 * @throws PartitionNotExistException if the partition does not exist
+	 * @throws CatalogException	in case of any runtime exception
+	 */
+	void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics,
+		boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException;
+
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatistics.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatistics.java
new file mode 100644
index 0000000..e024107
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatistics.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.stats;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Column statistics of a table or partition.
+ */
+public class CatalogColumnStatistics {
+	public static final CatalogColumnStatistics UNKNOWN = new CatalogColumnStatistics(new HashMap<>());
+
+	/**
+	 * A map of column name and column statistic data.
+	 */
+	private final Map<String, CatalogColumnStatisticsDataBase> columnStatisticsData;
+
+	private final Map<String, String> properties;
+
+	public CatalogColumnStatistics(Map<String, CatalogColumnStatisticsDataBase> columnStatisticsData) {
+		this(columnStatisticsData, new HashMap<>());
+	}
+
+	public CatalogColumnStatistics(Map<String, CatalogColumnStatisticsDataBase> columnStatisticsData, Map<String, String> properties) {
+		checkNotNull(columnStatisticsData);
+		checkNotNull(properties);
+
+		this.columnStatisticsData = columnStatisticsData;
+		this.properties = properties;
+	}
+
+	public Map<String, CatalogColumnStatisticsDataBase> getColumnStatisticsData() {
+		return columnStatisticsData;
+	}
+
+	public Map<String, String> getProperties() {
+		return this.properties;
+	}
+
+	/**
+	 * Create a deep copy of "this" instance.
+	 * @return a deep copy
+	 */
+	public CatalogColumnStatistics copy() {
+		Map<String, CatalogColumnStatisticsDataBase> copy = new HashMap<>(columnStatisticsData.size());
+		for (Map.Entry<String, CatalogColumnStatisticsDataBase> entry : columnStatisticsData.entrySet()) {
+			copy.put(entry.getKey(), entry.getValue().copy());
+		}
+		return new CatalogColumnStatistics(copy, new HashMap<>(this.properties));
+	}
+
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataBase.java
similarity index 51%
copy from flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
copy to flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataBase.java
index 545fad9..1c4983b 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataBase.java
@@ -16,33 +16,44 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.catalog;
+package org.apache.flink.table.catalog.stats;
 
-import org.apache.flink.table.plan.stats.TableStats;
-
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
- * Represents a table in a catalog.
+ * Column statistics value base class.
  */
-public interface CatalogTable extends CatalogBaseTable {
+public abstract class CatalogColumnStatisticsDataBase {
 	/**
-	 * Get the statistics of the table.
-	 * @return table statistics
+	 * number of null values.
 	 */
-	TableStats getStatistics();
+	private final long nullCount;
 
-	/**
-	 * Check if the table is partitioned or not.
-	 *
-	 * @return true if the table is partitioned; otherwise, false
-	 */
-	boolean isPartitioned();
+	private final Map<String, String> properties;
+
+	public CatalogColumnStatisticsDataBase(long nullCount) {
+		this(nullCount, new HashMap<>());
+	}
+
+	public CatalogColumnStatisticsDataBase(long nullCount, Map<String, String> properties) {
+		this.nullCount = nullCount;
+		this.properties = properties;
+	}
+
+	public long getNullCount() {
+		return nullCount;
+	}
+
+	public Map<String, String> getProperties() {
+		return this.properties;
+	}
 
 	/**
-	 * Get the partition keys of the table. This will be an empty set if the table is not partitioned.
+	 * Create a deep copy of "this" instance.
 	 *
-	 * @return partition keys of the table
+	 * @return a deep copy
 	 */
-	List<String> getPartitionKeys();
+	public abstract CatalogColumnStatisticsDataBase copy();
+
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataBinary.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataBinary.java
new file mode 100644
index 0000000..c777aae
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataBinary.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.stats;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Column statistics value of binary type.
+ */
+public class CatalogColumnStatisticsDataBinary extends CatalogColumnStatisticsDataBase {
+	/**
+	 * max length of all values.
+	 */
+	private final long maxLength;
+
+	/**
+	 * average length of all values.
+	 */
+	private final double avgLength;
+
+	public CatalogColumnStatisticsDataBinary(long maxLength, double avgLength, long nullCount) {
+		super(nullCount);
+		this.maxLength = maxLength;
+		this.avgLength = avgLength;
+	}
+
+	public CatalogColumnStatisticsDataBinary(long maxLength, double avgLength, long nullCount, Map<String, String> properties) {
+		super(nullCount, properties);
+		this.maxLength = maxLength;
+		this.avgLength = avgLength;
+	}
+
+	public long getMaxLength() {
+		return maxLength;
+	}
+
+	public double getAvgLength() {
+		return avgLength;
+	}
+
+	public CatalogColumnStatisticsDataBinary copy() {
+		return new CatalogColumnStatisticsDataBinary(maxLength, avgLength, getNullCount(), new HashMap<>(getProperties()));
+	}
+
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataBoolean.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataBoolean.java
new file mode 100644
index 0000000..f499918
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataBoolean.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.stats;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Column statistics value of boolean type.
+ */
+public class CatalogColumnStatisticsDataBoolean extends CatalogColumnStatisticsDataBase {
+	/**
+	 * number of "true" values.
+	 */
+	private final long trueCount;
+
+	/**
+	 * number of "false" values.
+	 */
+	private final long falseCount;
+
+	public CatalogColumnStatisticsDataBoolean(long trueCount, long falseCount, long nullCount) {
+		super(nullCount);
+		this.trueCount = trueCount;
+		this.falseCount = falseCount;
+	}
+
+	public CatalogColumnStatisticsDataBoolean(long trueCount, long falseCount, long nullCount, Map<String, String> properties) {
+		super(nullCount, properties);
+		this.trueCount = trueCount;
+		this.falseCount = falseCount;
+	}
+
+	public Long getTrueCount() {
+		return trueCount;
+	}
+
+	public Long getFalseCount() {
+		return falseCount;
+	}
+
+	public CatalogColumnStatisticsDataBoolean copy() {
+		return new CatalogColumnStatisticsDataBoolean(trueCount, falseCount, getNullCount(), new HashMap<>(getProperties()));
+	}
+
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataDate.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataDate.java
new file mode 100644
index 0000000..fe7fad2
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataDate.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.stats;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Column statistics value of date type.
+ */
+public class CatalogColumnStatisticsDataDate extends CatalogColumnStatisticsDataBase {
+	/**
+	 * mim value.
+	 */
+	private final Date min;
+
+	/**
+	 * max value.
+	 */
+	private final Date max;
+
+	/**
+	 * number of distinct values.
+	 */
+	private final long ndv;
+
+	public CatalogColumnStatisticsDataDate(Date min, Date max, long ndv, long nullCount) {
+		super(nullCount);
+		this.min = min;
+		this.max = max;
+		this.ndv = ndv;
+	}
+
+	public CatalogColumnStatisticsDataDate(Date min, Date max, long ndv, long nullCount, Map<String, String> properties) {
+		super(nullCount, properties);
+		this.min = min;
+		this.max = max;
+		this.ndv = ndv;
+	}
+
+	public Date getMin() {
+		return min;
+	}
+
+	public Date getMax() {
+		return max;
+	}
+
+	public long getNdv() {
+		return ndv;
+	}
+
+	public CatalogColumnStatisticsDataDate copy() {
+		return new CatalogColumnStatisticsDataDate(min, max, ndv, getNullCount(), new HashMap<>(getProperties()));
+	}
+
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataDouble.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataDouble.java
new file mode 100644
index 0000000..b7ec6db
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataDouble.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.stats;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Column statistics value of double type.
+ */
+public class CatalogColumnStatisticsDataDouble extends CatalogColumnStatisticsDataBase {
+	/**
+	 * mim value.
+	 */
+	private final double min;
+
+	/**
+	 * max value.
+	 */
+	private final double max;
+
+	/**
+	 * number of distinct values.
+	 */
+	private final long ndv;
+
+	public CatalogColumnStatisticsDataDouble(double min, double max, long ndv, long nullCount) {
+		super(nullCount);
+		this.min = min;
+		this.max = max;
+		this.ndv = ndv;
+	}
+
+	public CatalogColumnStatisticsDataDouble(double min, double max, long ndv, long nullCount, Map<String, String> properties) {
+		super(nullCount, properties);
+		this.min = min;
+		this.max = max;
+		this.ndv = ndv;
+	}
+
+	public double getMin() {
+		return min;
+	}
+
+	public double getMax() {
+		return max;
+	}
+
+	public long getNdv() {
+		return ndv;
+	}
+
+	public CatalogColumnStatisticsDataDouble copy() {
+		return new CatalogColumnStatisticsDataDouble(min, max, ndv, getNullCount(), new HashMap<>(getProperties()));
+	}
+
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataLong.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataLong.java
new file mode 100644
index 0000000..65b54c3
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataLong.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.stats;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Column statistics value of long type.
+ */
+public class CatalogColumnStatisticsDataLong extends CatalogColumnStatisticsDataBase {
+	/**
+	 * mim value.
+	 */
+	private final long min;
+
+	/**
+	 * max value.
+	 */
+	private final long max;
+
+	/**
+	 * number of distinct values.
+	 */
+	private final long ndv;
+
+	public CatalogColumnStatisticsDataLong(long min, long max, long ndv, long nullCount) {
+		super(nullCount);
+		this.min = min;
+		this.max = max;
+		this.ndv = ndv;
+	}
+
+	public CatalogColumnStatisticsDataLong(long min, long max, long ndv, long nullCount, Map<String, String> properties) {
+		super(nullCount, properties);
+		this.min = min;
+		this.max = max;
+		this.ndv = ndv;
+	}
+
+	public long getMin() {
+		return min;
+	}
+
+	public long getMax() {
+		return max;
+	}
+
+	public long getNdv() {
+		return ndv;
+	}
+
+	public CatalogColumnStatisticsDataLong copy() {
+		return new CatalogColumnStatisticsDataLong(min, max, ndv, getNullCount(), new HashMap<>(getProperties()));
+	}
+
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataString.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataString.java
new file mode 100644
index 0000000..2fdecc8
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataString.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.stats;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Column statistics value of string type.
+ */
+public class CatalogColumnStatisticsDataString extends CatalogColumnStatisticsDataBase {
+	/**
+	 * max length of all values.
+	 */
+	private final long maxLength;
+
+	/**
+	 * average length of all values.
+	 */
+	private final double avgLength;
+
+	/**
+	 * number of distinct values.
+	 */
+	private final long ndv;
+
+	public CatalogColumnStatisticsDataString(long maxLength, double avgLength, long ndv, long nullCount) {
+		super(nullCount);
+		this.maxLength = maxLength;
+		this.avgLength = avgLength;
+		this.ndv = ndv;
+	}
+
+	public CatalogColumnStatisticsDataString(long maxLength, double avgLength, long ndv, long nullCount, Map<String, String> properties) {
+		super(nullCount, properties);
+		this.maxLength = maxLength;
+		this.avgLength = avgLength;
+		this.ndv = ndv;
+	}
+
+	public long getMaxLength() {
+		return maxLength;
+	}
+
+	public double getAvgLength() {
+		return avgLength;
+	}
+
+	public long getNdv() {
+		return ndv;
+	}
+
+	public CatalogColumnStatisticsDataString copy() {
+		return new CatalogColumnStatisticsDataString(maxLength, avgLength, ndv, getNullCount(), new HashMap<>(getProperties()));
+	}
+
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogTableStatistics.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogTableStatistics.java
new file mode 100644
index 0000000..55433a3
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogTableStatistics.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.stats;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Statistics for a non-partitioned table or a partition of a partitioned table.
+ */
+public class CatalogTableStatistics {
+	public static final CatalogTableStatistics UNKNOWN = new CatalogTableStatistics(0, 0, 0, 0);
+
+	/**
+	 * The number of rows in the table or partition.
+	 */
+	private final long rowCount;
+
+	/**
+	 * The number of files on disk.
+	 */
+	private final int fileCount;
+
+	/**
+	 * The total size in bytes.
+	 */
+	private final long totalSize;
+
+	/**
+	 * The raw data size (size when loaded in memory) in bytes.
+	 */
+	private final long rawDataSize;
+
+	private Map<String, String> properties;
+
+	public CatalogTableStatistics(long rowCount, int fileCount, long totalSize, long rawDataSize) {
+		this(rowCount, fileCount, totalSize, rawDataSize, new HashMap<>());
+	}
+
+	public CatalogTableStatistics(long rowCount, int fileCount, long totalSize, long rawDataSize,
+			Map<String, String> properties) {
+		this.rowCount = rowCount;
+		this.fileCount = fileCount;
+		this.totalSize = totalSize;
+		this.rawDataSize = rawDataSize;
+		this.properties = properties;
+	}
+
+	/**
+	 * The number of rows.
+	 */
+	public long getRowCount() {
+		return this.rowCount;
+	}
+
+	public int getFileCount() {
+		return this.fileCount;
+	}
+
+	public long getTotalSize() {
+		return this.totalSize;
+	}
+
+	public long getRawDataSize() {
+		return this.rawDataSize;
+	}
+
+	public Map<String, String> getProperties() {
+		return this.properties;
+	}
+
+	/**
+	 * Create a deep copy of "this" instance.
+	 *
+	 * @return a deep copy
+	 */
+	public CatalogTableStatistics copy() {
+		return new CatalogTableStatistics(this.rowCount, this.fileCount, this.totalSize, this.rawDataSize,
+			new HashMap<>(this.properties));
+	}
+
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/Date.java
similarity index 55%
copy from flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
copy to flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/Date.java
index 545fad9..059a10c 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/Date.java
@@ -16,33 +16,24 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.catalog;
-
-import org.apache.flink.table.plan.stats.TableStats;
-
-import java.util.List;
+package org.apache.flink.table.catalog.stats;
 
 /**
- * Represents a table in a catalog.
+ * Class representing a date value in statistics.
  */
-public interface CatalogTable extends CatalogBaseTable {
-	/**
-	 * Get the statistics of the table.
-	 * @return table statistics
-	 */
-	TableStats getStatistics();
+public class Date {
+	private long daysSinceEpoch;
+
+	public Date(long daysSinceEpoch) {
+		this.daysSinceEpoch = daysSinceEpoch;
+	}
+
+	public long getDaysSinceEpoch() {
+		return daysSinceEpoch;
+	}
 
-	/**
-	 * Check if the table is partitioned or not.
-	 *
-	 * @return true if the table is partitioned; otherwise, false
-	 */
-	boolean isPartitioned();
+	public Date copy() {
+		return new Date(daysSinceEpoch);
+	}
 
-	/**
-	 * Get the partition keys of the table. This will be an empty set if the table is not partitioned.
-	 *
-	 * @return partition keys of the table
-	 */
-	List<String> getPartitionKeys();
 }
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
index bf44bf1..b131698 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
@@ -18,9 +18,21 @@
 
 package org.apache.flink.table.catalog;
 
-import org.apache.flink.table.plan.stats.TableStats;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBase;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBinary;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBoolean;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDate;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDouble;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataLong;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataString;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.catalog.stats.Date;
+
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Utility class for catalog testing.
@@ -29,16 +41,11 @@ public class CatalogTestUtil {
 
 	public static void checkEquals(CatalogTable t1, CatalogTable t2) {
 		assertEquals(t1.getSchema(), t2.getSchema());
-		checkEquals(t1.getStatistics(), t2.getStatistics());
 		assertEquals(t1.getComment(), t2.getComment());
 		assertEquals(t1.getProperties(), t2.getProperties());
 		assertEquals(t1.getPartitionKeys(), t2.getPartitionKeys());
 		assertEquals(t1.isPartitioned(), t2.isPartitioned());
-	}
-
-	public static void checkEquals(TableStats ts1, TableStats ts2) {
-		assertEquals(ts1.getRowCount(), ts2.getRowCount());
-		assertEquals(ts1.getColumnStats().size(), ts2.getColumnStats().size());
+		assertEquals(t1.getDescription(), t2.getDescription());
 	}
 
 	public static void checkEquals(CatalogView v1, CatalogView v2) {
@@ -61,4 +68,93 @@ public class CatalogTestUtil {
 	public static void checkEquals(CatalogPartition p1, CatalogPartition p2) {
 		assertEquals(p1.getProperties(), p2.getProperties());
 	}
+
+	static void checkEquals(CatalogTableStatistics ts1, CatalogTableStatistics ts2) {
+		assertEquals(ts1.getRowCount(), ts2.getRowCount());
+		assertEquals(ts1.getFileCount(), ts2.getFileCount());
+		assertEquals(ts1.getTotalSize(), ts2.getTotalSize());
+		assertEquals(ts1.getRawDataSize(), ts2.getRawDataSize());
+		assertEquals(ts1.getProperties(), ts2.getProperties());
+	}
+
+	static void checkEquals(CatalogColumnStatistics cs1, CatalogColumnStatistics cs2) {
+		checkEquals(cs1.getColumnStatisticsData(), cs2.getColumnStatisticsData());
+		assertEquals(cs1.getProperties(), cs2.getProperties());
+	}
+
+	private static void checkEquals(Map<String, CatalogColumnStatisticsDataBase> m1, Map<String, CatalogColumnStatisticsDataBase> m2) {
+		assertEquals(m1.size(), m2.size());
+		for (Map.Entry<String, CatalogColumnStatisticsDataBase> entry : m2.entrySet()) {
+			assertTrue(m1.containsKey(entry.getKey()));
+			checkEquals(m2.get(entry.getKey()), entry.getValue());
+		}
+	}
+
+	private static void checkEquals(CatalogColumnStatisticsDataBase v1, CatalogColumnStatisticsDataBase v2) {
+		assertEquals(v1.getClass(), v2.getClass());
+		if (v1 instanceof CatalogColumnStatisticsDataBoolean) {
+			checkEquals((CatalogColumnStatisticsDataBoolean) v1, (CatalogColumnStatisticsDataBoolean) v2);
+		} else if (v1 instanceof CatalogColumnStatisticsDataLong) {
+			checkEquals((CatalogColumnStatisticsDataLong) v1, (CatalogColumnStatisticsDataLong) v2);
+		} else if (v1 instanceof CatalogColumnStatisticsDataBinary) {
+			checkEquals((CatalogColumnStatisticsDataBinary) v1, (CatalogColumnStatisticsDataBinary) v2);
+		} else if (v1 instanceof CatalogColumnStatisticsDataDate) {
+			checkEquals((CatalogColumnStatisticsDataDate) v1, (CatalogColumnStatisticsDataDate) v2);
+		} else if (v1 instanceof CatalogColumnStatisticsDataString) {
+			checkEquals((CatalogColumnStatisticsDataString) v1, (CatalogColumnStatisticsDataString) v2);
+		} else if (v1 instanceof CatalogColumnStatisticsDataDouble) {
+			checkEquals((CatalogColumnStatisticsDataDouble) v1, (CatalogColumnStatisticsDataDouble) v2);
+		}
+	}
+
+	private static void checkEquals(CatalogColumnStatisticsDataBoolean v1, CatalogColumnStatisticsDataBoolean v2) {
+		assertEquals(v1.getFalseCount(), v2.getFalseCount());
+		assertEquals(v1.getTrueCount(), v2.getTrueCount());
+		assertEquals(v1.getNullCount(), v2.getNullCount());
+		assertEquals(v1.getProperties(), v2.getProperties());
+	}
+
+	private static void checkEquals(CatalogColumnStatisticsDataLong v1, CatalogColumnStatisticsDataLong v2) {
+		assertEquals(v1.getMin(), v2.getMin());
+		assertEquals(v1.getMax(), v2.getMax());
+		assertEquals(v1.getNdv(), v2.getNdv());
+		assertEquals(v1.getNullCount(), v2.getNullCount());
+		assertEquals(v1.getProperties(), v2.getProperties());
+	}
+
+	private static void checkEquals(CatalogColumnStatisticsDataDouble v1, CatalogColumnStatisticsDataDouble v2) {
+		assertEquals(v1.getMin(), v2.getMin(), 0.05D);
+		assertEquals(v1.getMax(), v2.getMax(), 0.05D);
+		assertEquals(v1.getNdv(), v2.getNdv());
+		assertEquals(v1.getNullCount(), v2.getNullCount());
+		assertEquals(v1.getProperties(), v2.getProperties());
+	}
+
+	private static void checkEquals(CatalogColumnStatisticsDataString v1, CatalogColumnStatisticsDataString v2) {
+		assertEquals(v1.getMaxLength(), v2.getMaxLength());
+		assertEquals(v1.getAvgLength(), v2.getAvgLength(), 0.05D);
+		assertEquals(v1.getNdv(), v2.getNdv());
+		assertEquals(v1.getNullCount(), v2.getNullCount());
+		assertEquals(v1.getProperties(), v2.getProperties());
+	}
+
+	private static void checkEquals(CatalogColumnStatisticsDataBinary v1, CatalogColumnStatisticsDataBinary v2) {
+		assertEquals(v1.getMaxLength(), v2.getMaxLength());
+		assertEquals(v1.getAvgLength(), v2.getAvgLength(), 0.05D);
+		assertEquals(v1.getNullCount(), v2.getNullCount());
+		assertEquals(v1.getProperties(), v2.getProperties());
+	}
+
+	private static void checkEquals(CatalogColumnStatisticsDataDate v1, CatalogColumnStatisticsDataDate v2) {
+		checkEquals(v1.getMin(), v2.getMin());
+		checkEquals(v1.getMax(), v2.getMax());
+		assertEquals(v1.getNdv(), v2.getNdv());
+		assertEquals(v1.getNullCount(), v2.getNullCount());
+		assertEquals(v1.getProperties(), v2.getProperties());
+	}
+
+	private static void checkEquals(Date v1, Date v2) {
+		assertEquals(v1.getDaysSinceEpoch(), v2.getDaysSinceEpoch());
+	}
+
 }