You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/05/08 06:14:03 UTC
[flink] branch master updated: [FLINK-12365][table] Add stats
related catalog APIs
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 12d698b [FLINK-12365][table] Add stats related catalog APIs
12d698b is described below
commit 12d698b6e1c1aef03834963fba8a0974b3729c50
Author: xuefuz <xu...@users.noreply.github.com>
AuthorDate: Tue May 7 23:13:50 2019 -0700
[FLINK-12365][table] Add stats related catalog APIs
This closes #8314
---
.../catalog/hive/GenericHiveMetastoreCatalog.java | 52 ++++++++
.../flink/table/catalog/hive/HiveCatalog.java | 43 ++++++
.../hive/util/GenericHiveMetastoreCatalogUtil.java | 3 +-
.../hive/GenericHiveMetastoreCatalogTest.java | 7 -
.../flink/table/catalog/GenericCatalogTable.java | 15 +--
.../table/catalog/GenericInMemoryCatalog.java | 148 +++++++++++++++++++++
.../table/catalog/GenericInMemoryCatalogTest.java | 76 ++++++++++-
.../apache/flink/table/catalog/CatalogTable.java | 8 --
.../flink/table/catalog/ReadableCatalog.java | 56 +++++++-
.../table/catalog/ReadableWritableCatalog.java | 67 ++++++++++
.../catalog/stats/CatalogColumnStatistics.java | 71 ++++++++++
.../CatalogColumnStatisticsDataBase.java} | 47 ++++---
.../stats/CatalogColumnStatisticsDataBinary.java | 62 +++++++++
.../stats/CatalogColumnStatisticsDataBoolean.java | 62 +++++++++
.../stats/CatalogColumnStatisticsDataDate.java | 73 ++++++++++
.../stats/CatalogColumnStatisticsDataDouble.java | 73 ++++++++++
.../stats/CatalogColumnStatisticsDataLong.java | 73 ++++++++++
.../stats/CatalogColumnStatisticsDataString.java | 73 ++++++++++
.../catalog/stats/CatalogTableStatistics.java | 98 ++++++++++++++
.../catalog/{CatalogTable.java => stats/Date.java} | 39 +++---
.../flink/table/catalog/CatalogTestUtil.java | 110 ++++++++++++++-
21 files changed, 1169 insertions(+), 87 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
index 3156107..843b4a2 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
@@ -37,6 +37,8 @@ import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.catalog.hive.util.GenericHiveMetastoreCatalogUtil;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
@@ -311,4 +313,54 @@ public class GenericHiveMetastoreCatalog extends HiveCatalogBase {
public boolean functionExists(ObjectPath functionPath) throws CatalogException {
throw new UnsupportedOperationException();
}
+
+ // ------ statistics ------
+
+ @Override
+ public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics,
+ boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics,
+ boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws PartitionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws PartitionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 7f96852..e1c2c30 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -35,6 +35,8 @@ import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Database;
@@ -194,6 +196,26 @@ public class HiveCatalog extends HiveCatalogBase {
}
@Override
+ public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
+
+ }
+
+ @Override
+ public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
+
+ }
+
+ @Override
+ public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+
+ }
+
+ @Override
+ public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+
+ }
+
+ @Override
public List<String> listFunctions(String dbName) throws DatabaseNotExistException, CatalogException {
throw new UnsupportedOperationException();
}
@@ -207,4 +229,25 @@ public class HiveCatalog extends HiveCatalogBase {
public boolean functionExists(ObjectPath functionPath) throws CatalogException {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/GenericHiveMetastoreCatalogUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/GenericHiveMetastoreCatalogUtil.java
index f3927f9..a88f083 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/GenericHiveMetastoreCatalogUtil.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/GenericHiveMetastoreCatalogUtil.java
@@ -30,7 +30,6 @@ import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalogBaseUtil;
import org.apache.flink.table.catalog.hive.HiveTableConfig;
import org.apache.flink.table.catalog.hive.HiveTypeUtil;
-import org.apache.flink.table.plan.stats.TableStats;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.Database;
@@ -180,7 +179,7 @@ public class GenericHiveMetastoreCatalogUtil {
);
} else {
return new GenericCatalogTable(
- tableSchema, new TableStats(0), partitionKeys, properties, comment);
+ tableSchema, partitionKeys, properties, comment);
}
}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
index fb89e53..390ab1d 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
@@ -31,7 +31,6 @@ import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.GenericCatalogDatabase;
import org.apache.flink.table.catalog.GenericCatalogTable;
import org.apache.flink.table.catalog.GenericCatalogView;
-import org.apache.flink.table.plan.stats.TableStats;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -82,7 +81,6 @@ public class GenericHiveMetastoreCatalogTest extends CatalogTestBase {
CatalogTable table = new GenericCatalogTable(
new TableSchema(colNames, types),
- new TableStats(0),
getBatchTableProperties(),
TEST_COMMENT
);
@@ -122,7 +120,6 @@ public class GenericHiveMetastoreCatalogTest extends CatalogTestBase {
public CatalogTable createTable() {
return new GenericCatalogTable(
createTableSchema(),
- new TableStats(0),
getBatchTableProperties(),
TEST_COMMENT);
}
@@ -131,7 +128,6 @@ public class GenericHiveMetastoreCatalogTest extends CatalogTestBase {
public CatalogTable createAnotherTable() {
return new GenericCatalogTable(
createAnotherTableSchema(),
- new TableStats(0),
getBatchTableProperties(),
TEST_COMMENT);
}
@@ -140,7 +136,6 @@ public class GenericHiveMetastoreCatalogTest extends CatalogTestBase {
public CatalogTable createStreamingTable() {
return new GenericCatalogTable(
createTableSchema(),
- new TableStats(0),
getStreamingTableProperties(),
TEST_COMMENT);
}
@@ -149,7 +144,6 @@ public class GenericHiveMetastoreCatalogTest extends CatalogTestBase {
public CatalogTable createPartitionedTable() {
return new GenericCatalogTable(
createTableSchema(),
- new TableStats(0),
createPartitionKeys(),
getBatchTableProperties(),
TEST_COMMENT);
@@ -159,7 +153,6 @@ public class GenericHiveMetastoreCatalogTest extends CatalogTestBase {
public CatalogTable createAnotherPartitionedTable() {
return new GenericCatalogTable(
createAnotherTableSchema(),
- new TableStats(0),
createPartitionKeys(),
getBatchTableProperties(),
TEST_COMMENT);
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
index 73c2dbc..56d8377 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.catalog;
import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.plan.stats.TableStats;
import java.util.ArrayList;
import java.util.HashMap;
@@ -35,8 +34,6 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
public class GenericCatalogTable implements CatalogTable {
// Schema of the table (column names and types)
private final TableSchema tableSchema;
- // Statistics of the table
- private final TableStats tableStats;
// Partition keys if this is a partitioned table. It's an empty set if the table is not partitioned
private final List<String> partitionKeys;
// Properties of the table
@@ -46,12 +43,10 @@ public class GenericCatalogTable implements CatalogTable {
public GenericCatalogTable(
TableSchema tableSchema,
- TableStats tableStats,
List<String> partitionKeys,
Map<String, String> properties,
String comment) {
this.tableSchema = checkNotNull(tableSchema, "tableSchema cannot be null");
- this.tableStats = checkNotNull(tableStats, "tableStats cannot be null");
this.partitionKeys = checkNotNull(partitionKeys, "partitionKeys cannot be null");
this.properties = checkNotNull(properties, "properties cannot be null");
this.comment = comment;
@@ -59,15 +54,9 @@ public class GenericCatalogTable implements CatalogTable {
public GenericCatalogTable(
TableSchema tableSchema,
- TableStats tableStats,
Map<String, String> properties,
String description) {
- this(tableSchema, tableStats, new ArrayList<>(), properties, description);
- }
-
- @Override
- public TableStats getStatistics() {
- return this.tableStats;
+ this(tableSchema, new ArrayList<>(), properties, description);
}
@Override
@@ -98,7 +87,7 @@ public class GenericCatalogTable implements CatalogTable {
@Override
public GenericCatalogTable copy() {
return new GenericCatalogTable(
- this.tableSchema.copy(), this.tableStats.copy(), new ArrayList<>(partitionKeys), new HashMap<>(this.properties), comment);
+ this.tableSchema.copy(), new ArrayList<>(partitionKeys), new HashMap<>(this.properties), comment);
}
@Override
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
index dadd390..19655ce 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
@@ -30,6 +30,8 @@ import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.util.StringUtils;
import java.util.ArrayList;
@@ -57,6 +59,11 @@ public class GenericInMemoryCatalog implements ReadableWritableCatalog {
private final Map<ObjectPath, CatalogFunction> functions;
private final Map<ObjectPath, Map<CatalogPartitionSpec, CatalogPartition>> partitions;
+ private final Map<ObjectPath, CatalogTableStatistics> tableStats;
+ private final Map<ObjectPath, CatalogColumnStatistics> tableColumnStats;
+ private final Map<ObjectPath, Map<CatalogPartitionSpec, CatalogTableStatistics>> partitionStats;
+ private final Map<ObjectPath, Map<CatalogPartitionSpec, CatalogColumnStatistics>> partitionColumnStats;
+
public GenericInMemoryCatalog(String name) {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(name), "name cannot be null or empty");
@@ -66,6 +73,10 @@ public class GenericInMemoryCatalog implements ReadableWritableCatalog {
this.tables = new LinkedHashMap<>();
this.functions = new LinkedHashMap<>();
this.partitions = new LinkedHashMap<>();
+ this.tableStats = new LinkedHashMap<>();
+ this.tableColumnStats = new LinkedHashMap<>();
+ this.partitionStats = new LinkedHashMap<>();
+ this.partitionColumnStats = new LinkedHashMap<>();
}
@Override
@@ -193,6 +204,8 @@ public class GenericInMemoryCatalog implements ReadableWritableCatalog {
if (isPartitionedTable(tablePath)) {
partitions.put(tablePath, new LinkedHashMap<>());
+ partitionStats.put(tablePath, new LinkedHashMap<>());
+ partitionColumnStats.put(tablePath, new LinkedHashMap<>());
}
}
}
@@ -222,8 +235,12 @@ public class GenericInMemoryCatalog implements ReadableWritableCatalog {
if (tableExists(tablePath)) {
tables.remove(tablePath);
+ tableStats.remove(tablePath);
+ tableColumnStats.remove(tablePath);
partitions.remove(tablePath);
+ partitionStats.remove(tablePath);
+ partitionColumnStats.remove(tablePath);
} else if (!ignoreIfNotExists) {
throw new TableNotExistException(catalogName, tablePath);
}
@@ -243,9 +260,30 @@ public class GenericInMemoryCatalog implements ReadableWritableCatalog {
} else {
tables.put(newPath, tables.remove(tablePath));
+ // table statistics
+ if (tableStats.containsKey(tablePath)) {
+ tableStats.put(newPath, tableStats.remove(tablePath));
+ }
+
+ // table column statistics
+ if (tableColumnStats.containsKey(tablePath)) {
+ tableColumnStats.put(newPath, tableColumnStats.remove(tablePath));
+ }
+
+ // partitions
if (partitions.containsKey(tablePath)) {
partitions.put(newPath, partitions.remove(tablePath));
}
+
+ // partition statistics
+ if (partitionStats.containsKey(tablePath)) {
+ partitionStats.put(newPath, partitionStats.remove(tablePath));
+ }
+
+ // partition column statistics
+ if (partitionColumnStats.containsKey(tablePath)) {
+ partitionColumnStats.put(newPath, partitionColumnStats.remove(tablePath));
+ }
}
} else if (!ignoreIfNotExists) {
throw new TableNotExistException(catalogName, tablePath);
@@ -411,6 +449,8 @@ public class GenericInMemoryCatalog implements ReadableWritableCatalog {
if (partitionExists(tablePath, partitionSpec)) {
partitions.get(tablePath).remove(partitionSpec);
+ partitionStats.get(tablePath).remove(partitionSpec);
+ partitionColumnStats.get(tablePath).remove(partitionSpec);
} else if (!ignoreIfNotExists) {
throw new PartitionNotExistException(catalogName, tablePath, partitionSpec);
}
@@ -543,4 +583,112 @@ public class GenericInMemoryCatalog implements ReadableWritableCatalog {
return (table instanceof CatalogTable) && ((CatalogTable) table).isPartitioned();
}
+ // ------ statistics ------
+
+ @Override
+ public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws TableNotExistException {
+ checkNotNull(tablePath);
+
+ if (!tableExists(tablePath)) {
+ throw new TableNotExistException(catalogName, tablePath);
+ }
+
+ CatalogTableStatistics result = tableStats.get(tablePath);
+ return result != null ? result.copy() : CatalogTableStatistics.UNKNOWN;
+ }
+
+ @Override
+ public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) throws TableNotExistException {
+ checkNotNull(tablePath);
+
+ if (!tableExists(tablePath)) {
+ throw new TableNotExistException(catalogName, tablePath);
+ }
+
+ CatalogColumnStatistics result = tableColumnStats.get(tablePath);
+ return result != null ? result.copy() : CatalogColumnStatistics.UNKNOWN;
+ }
+
+ @Override
+ public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws PartitionNotExistException {
+ checkNotNull(tablePath);
+ checkNotNull(partitionSpec);
+
+ if (!partitionExists(tablePath, partitionSpec)) {
+ throw new PartitionNotExistException(catalogName, tablePath, partitionSpec);
+ }
+
+ CatalogTableStatistics result = partitionStats.get(tablePath).get(partitionSpec);
+ return result != null ? result.copy() : CatalogTableStatistics.UNKNOWN;
+ }
+
+ @Override
+ public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws PartitionNotExistException {
+ checkNotNull(tablePath);
+ checkNotNull(partitionSpec);
+
+ if (!partitionExists(tablePath, partitionSpec)) {
+ throw new PartitionNotExistException(catalogName, tablePath, partitionSpec);
+ }
+
+ CatalogColumnStatistics result = partitionColumnStats.get(tablePath).get(partitionSpec);
+ return result != null ? result.copy() : CatalogColumnStatistics.UNKNOWN;
+ }
+
+ @Override
+ public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists)
+ throws TableNotExistException {
+ checkNotNull(tablePath);
+ checkNotNull(tableStatistics);
+
+ if (tableExists(tablePath)) {
+ tableStats.put(tablePath, tableStatistics.copy());
+ } else if (!ignoreIfNotExists) {
+ throw new TableNotExistException(catalogName, tablePath);
+ }
+ }
+
+ @Override
+ public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics,
+ boolean ignoreIfNotExists) throws TableNotExistException {
+ checkNotNull(tablePath);
+ checkNotNull(columnStatistics);
+
+ if (tableExists(tablePath)) {
+ tableColumnStats.put(tablePath, columnStatistics.copy());
+ } else if (!ignoreIfNotExists) {
+ throw new TableNotExistException(catalogName, tablePath);
+ }
+ }
+
+ @Override
+ public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics,
+ boolean ignoreIfNotExists) throws PartitionNotExistException {
+ checkNotNull(tablePath);
+ checkNotNull(partitionSpec);
+ checkNotNull(partitionStatistics);
+
+ if (partitionExists(tablePath, partitionSpec)) {
+ partitionStats.get(tablePath).put(partitionSpec, partitionStatistics.copy());
+ } else if (!ignoreIfNotExists) {
+ throw new PartitionNotExistException(catalogName, tablePath, partitionSpec);
+ }
+ }
+
+ @Override
+ public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics,
+ boolean ignoreIfNotExists) throws PartitionNotExistException {
+ checkNotNull(tablePath);
+ checkNotNull(partitionSpec);
+ checkNotNull(columnStatistics);
+
+ if (partitionExists(tablePath, partitionSpec)) {
+ partitionColumnStats.get(tablePath).put(partitionSpec, columnStatistics.copy());
+ } else if (!ignoreIfNotExists) {
+ throw new PartitionNotExistException(catalogName, tablePath, partitionSpec);
+ }
+ }
+
}
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
index 43d9dda..6fabb78 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
@@ -26,8 +26,17 @@ import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBase;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBinary;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBoolean;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDate;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDouble;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataLong;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataString;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.catalog.stats.Date;
import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.table.plan.stats.TableStats;
import org.junit.After;
import org.junit.BeforeClass;
@@ -575,6 +584,47 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
catalog.dropDatabase(db1, false);
}
+ // ------ statistics ------
+
+ @Test
+ public void testStatistics() throws Exception {
+ // Table related
+ catalog.createDatabase(db1, createDb(), false);
+ CatalogTable table = createTable();
+ catalog.createTable(path1, table, false);
+
+ CatalogTestUtil.checkEquals(catalog.getTableStatistics(path1), CatalogTableStatistics.UNKNOWN);
+ CatalogTestUtil.checkEquals(catalog.getTableColumnStatistics(path1), CatalogColumnStatistics.UNKNOWN);
+
+ CatalogTableStatistics tableStatistics = new CatalogTableStatistics(5, 2, 100, 575);
+ catalog.alterTableStatistics(path1, tableStatistics, false);
+ CatalogTestUtil.checkEquals(tableStatistics, catalog.getTableStatistics(path1));
+ CatalogColumnStatistics columnStatistics = createColumnStats();
+ catalog.alterTableColumnStatistics(path1, columnStatistics, false);
+ CatalogTestUtil.checkEquals(columnStatistics, catalog.getTableColumnStatistics(path1));
+
+ // Partition related
+ catalog.createDatabase(db2, createDb(), false);
+ CatalogTable table2 = createPartitionedTable();
+ catalog.createTable(path2, table2, false);
+ CatalogPartitionSpec partitionSpec = createPartitionSpec();
+ catalog.createPartition(path2, partitionSpec, createPartition(), false);
+
+ CatalogTestUtil.checkEquals(catalog.getPartitionStatistics(path2, partitionSpec), CatalogTableStatistics.UNKNOWN);
+ CatalogTestUtil.checkEquals(catalog.getPartitionColumnStatistics(path2, partitionSpec), CatalogColumnStatistics.UNKNOWN);
+
+ catalog.alterPartitionStatistics(path2, partitionSpec, tableStatistics, false);
+ CatalogTestUtil.checkEquals(tableStatistics, catalog.getPartitionStatistics(path2, partitionSpec));
+ catalog.alterPartitionColumnStatistics(path2, partitionSpec, columnStatistics, false);
+ CatalogTestUtil.checkEquals(columnStatistics, catalog.getPartitionColumnStatistics(path2, partitionSpec));
+
+ // Clean up
+ catalog.dropTable(path1, false);
+ catalog.dropDatabase(db1, false);
+ catalog.dropTable(path2, false);
+ catalog.dropDatabase(db2, false);
+ }
+
// ------ utilities ------
@Override
@@ -604,7 +654,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
public GenericCatalogTable createStreamingTable() {
return new GenericCatalogTable(
createTableSchema(),
- new TableStats(0),
getStreamingTableProperties(),
TEST_COMMENT);
}
@@ -613,7 +662,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
public CatalogTable createTable() {
return new GenericCatalogTable(
createTableSchema(),
- new TableStats(0),
getBatchTableProperties(),
TEST_COMMENT);
}
@@ -622,7 +670,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
public CatalogTable createAnotherTable() {
return new GenericCatalogTable(
createAnotherTableSchema(),
- new TableStats(0),
getBatchTableProperties(),
TEST_COMMENT);
}
@@ -631,7 +678,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
public CatalogTable createPartitionedTable() {
return new GenericCatalogTable(
createTableSchema(),
- new TableStats(0),
createPartitionKeys(),
getBatchTableProperties(),
TEST_COMMENT);
@@ -641,7 +687,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
public CatalogTable createAnotherPartitionedTable() {
return new GenericCatalogTable(
createAnotherTableSchema(),
- new TableStats(0),
createPartitionKeys(),
getBatchTableProperties(),
TEST_COMMENT);
@@ -709,6 +754,24 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
"This is another view");
}
+ private CatalogColumnStatistics createColumnStats() {
+ CatalogColumnStatisticsDataBoolean booleanColStats = new CatalogColumnStatisticsDataBoolean(55L, 45L, 5L);
+ CatalogColumnStatisticsDataLong longColStats = new CatalogColumnStatisticsDataLong(-123L, 763322L, 23L, 79L);
+ CatalogColumnStatisticsDataString stringColStats = new CatalogColumnStatisticsDataString(152L, 43.5D, 20L, 0L);
+ CatalogColumnStatisticsDataDate dateColStats = new CatalogColumnStatisticsDataDate(new Date(71L),
+ new Date(17923L), 1321, 0L);
+ CatalogColumnStatisticsDataDouble doubleColStats = new CatalogColumnStatisticsDataDouble(-123.35D, 7633.22D, 23L, 79L);
+ CatalogColumnStatisticsDataBinary binaryColStats = new CatalogColumnStatisticsDataBinary(755L, 43.5D, 20L);
+ Map<String, CatalogColumnStatisticsDataBase> colStatsMap = new HashMap<>(6);
+ colStatsMap.put("b1", booleanColStats);
+ colStatsMap.put("l2", longColStats);
+ colStatsMap.put("s3", stringColStats);
+ colStatsMap.put("d4", dateColStats);
+ colStatsMap.put("dd5", doubleColStats);
+ colStatsMap.put("bb6", binaryColStats);
+ return new CatalogColumnStatistics(colStatsMap);
+ }
+
protected CatalogFunction createFunction() {
return new GenericCatalogFunction(MyScalarFunction.class.getName());
}
@@ -734,4 +797,5 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
return String.valueOf(i);
}
}
+
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
index 545fad9..e38e930 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
@@ -18,8 +18,6 @@
package org.apache.flink.table.catalog;
-import org.apache.flink.table.plan.stats.TableStats;
-
import java.util.List;
/**
@@ -27,12 +25,6 @@ import java.util.List;
*/
public interface CatalogTable extends CatalogBaseTable {
/**
- * Get the statistics of the table.
- * @return table statistics
- */
- TableStats getStatistics();
-
- /**
* Check if the table is partitioned or not.
*
* @return true if the table is partitioned; otherwise, false
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
index 5586348..15adabb 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
@@ -24,6 +24,8 @@ import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import java.util.List;
@@ -194,12 +196,12 @@ public interface ReadableCatalog {
/**
* List the names of all functions in the given database. An empty list is returned if none is registered.
*
- * @param dbName name of the database.
+ * @param databaseName name of the database.
* @return a list of the names of the functions in this database
* @throws DatabaseNotExistException if the database does not exist
* @throws CatalogException in case of any runtime exception
*/
- List<String> listFunctions(String dbName) throws DatabaseNotExistException, CatalogException;
+ List<String> listFunctions(String databaseName) throws DatabaseNotExistException, CatalogException;
/**
* Get the function.
@@ -221,4 +223,54 @@ public interface ReadableCatalog {
*/
boolean functionExists(ObjectPath functionPath) throws CatalogException;
+ // ------ statistics ------
+
+ /**
+ * Get the statistics of a table.
+ *
+ * @param tablePath path of the table
+ * @return statistics of the given table
+ *
+ * @throws TableNotExistException if the table does not exist in the catalog
+ * @throws CatalogException in case of any runtime exception
+ */
+ CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException;
+
+ /**
+ * Get the column statistics of a table.
+ *
+ * @param tablePath path of the table
+ * @return column statistics of the given table
+ *
+ * @throws TableNotExistException if the table does not exist in the catalog
+ * @throws CatalogException in case of any runtime exception
+ */
+ CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException;
+
+ /**
+ * Get the statistics of a partition.
+ *
+ * @param tablePath path of the table
+ * @param partitionSpec partition spec of the partition
+ * @return statistics of the given partition
+ *
+ * @throws PartitionNotExistException if the partition does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws PartitionNotExistException, CatalogException;
+
+ /**
+ * Get the column statistics of a partition.
+ *
+ * @param tablePath path of the table
+ * @param partitionSpec partition spec of the partition
+ * @return column statistics of the given partition
+ *
+ * @throws PartitionNotExistException if the partition does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws PartitionNotExistException, CatalogException;
+
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
index 60bc93d..a398b72 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
@@ -30,6 +30,8 @@ import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
/**
* An interface responsible for manipulating catalog metadata.
@@ -233,4 +235,69 @@ public interface ReadableWritableCatalog extends ReadableCatalog {
*/
void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
throws FunctionNotExistException, CatalogException;
+
+ // ------ statistics ------
+
+ /**
+ * Update the statistics of a table.
+ *
+ * @param tablePath path of the table
+ * @param tableStatistics new statistics to update
+ * @param ignoreIfNotExists flag to specify behavior if the table does not exist:
+ * if set to false, throw an exception,
+ * if set to true, nothing happens.
+ *
+ * @throws TableNotExistException if the table does not exist in the catalog
+ * @throws CatalogException in case of any runtime exception
+ */
+ void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException;
+
+ /**
+ * Update the column statistics of a table.
+ *
+ * @param tablePath path of the table
+ * @param columnStatistics new column statistics to update
+ * @param ignoreIfNotExists flag to specify behavior if the table does not exist:
+ * if set to false, throw an exception,
+ * if set to true, nothing happens.
+ *
+ * @throws TableNotExistException if the table does not exist in the catalog
+ * @throws CatalogException in case of any runtime exception
+ */
+ void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException;
+
+ /**
+ * Update the statistics of a table partition.
+ *
+ * @param tablePath path of the table
+ * @param partitionSpec partition spec of the partition
+ * @param partitionStatistics new statistics to update
+ * @param ignoreIfNotExists flag to specify behavior if the partition does not exist:
+ * if set to false, throw an exception,
+ * if set to true, nothing happens.
+ *
+ * @throws PartitionNotExistException if the partition does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics,
+ boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException;
+
+ /**
+ * Update the column statistics of a table partition.
+ *
+ * @param tablePath path of the table
+ * @param partitionSpec partition spec of the partition
+ * @@param columnStatistics new column statistics to update
+ * @param ignoreIfNotExists flag to specify behavior if the partition does not exist:
+ * if set to false, throw an exception,
+ * if set to true, nothing happens.
+ *
+ * @throws PartitionNotExistException if the partition does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics,
+ boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException;
+
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatistics.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatistics.java
new file mode 100644
index 0000000..e024107
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatistics.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.stats;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Column statistics of a table or partition.
+ */
+public class CatalogColumnStatistics {
+ public static final CatalogColumnStatistics UNKNOWN = new CatalogColumnStatistics(new HashMap<>());
+
+ /**
+ * A map of column name and column statistic data.
+ */
+ private final Map<String, CatalogColumnStatisticsDataBase> columnStatisticsData;
+
+ private final Map<String, String> properties;
+
+ public CatalogColumnStatistics(Map<String, CatalogColumnStatisticsDataBase> columnStatisticsData) {
+ this(columnStatisticsData, new HashMap<>());
+ }
+
+ public CatalogColumnStatistics(Map<String, CatalogColumnStatisticsDataBase> columnStatisticsData, Map<String, String> properties) {
+ checkNotNull(columnStatisticsData);
+ checkNotNull(properties);
+
+ this.columnStatisticsData = columnStatisticsData;
+ this.properties = properties;
+ }
+
+ public Map<String, CatalogColumnStatisticsDataBase> getColumnStatisticsData() {
+ return columnStatisticsData;
+ }
+
+ public Map<String, String> getProperties() {
+ return this.properties;
+ }
+
+ /**
+ * Create a deep copy of "this" instance.
+ * @return a deep copy
+ */
+ public CatalogColumnStatistics copy() {
+ Map<String, CatalogColumnStatisticsDataBase> copy = new HashMap<>(columnStatisticsData.size());
+ for (Map.Entry<String, CatalogColumnStatisticsDataBase> entry : columnStatisticsData.entrySet()) {
+ copy.put(entry.getKey(), entry.getValue().copy());
+ }
+ return new CatalogColumnStatistics(copy, new HashMap<>(this.properties));
+ }
+
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataBase.java
similarity index 51%
copy from flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
copy to flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataBase.java
index 545fad9..1c4983b 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataBase.java
@@ -16,33 +16,44 @@
* limitations under the License.
*/
-package org.apache.flink.table.catalog;
+package org.apache.flink.table.catalog.stats;
-import org.apache.flink.table.plan.stats.TableStats;
-
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
/**
- * Represents a table in a catalog.
+ * Column statistics value base class.
*/
-public interface CatalogTable extends CatalogBaseTable {
+public abstract class CatalogColumnStatisticsDataBase {
/**
- * Get the statistics of the table.
- * @return table statistics
+ * number of null values.
*/
- TableStats getStatistics();
+ private final long nullCount;
- /**
- * Check if the table is partitioned or not.
- *
- * @return true if the table is partitioned; otherwise, false
- */
- boolean isPartitioned();
+ private final Map<String, String> properties;
+
+ public CatalogColumnStatisticsDataBase(long nullCount) {
+ this(nullCount, new HashMap<>());
+ }
+
+ public CatalogColumnStatisticsDataBase(long nullCount, Map<String, String> properties) {
+ this.nullCount = nullCount;
+ this.properties = properties;
+ }
+
+ public long getNullCount() {
+ return nullCount;
+ }
+
+ public Map<String, String> getProperties() {
+ return this.properties;
+ }
/**
- * Get the partition keys of the table. This will be an empty set if the table is not partitioned.
+ * Create a deep copy of "this" instance.
*
- * @return partition keys of the table
+ * @return a deep copy
*/
- List<String> getPartitionKeys();
+ public abstract CatalogColumnStatisticsDataBase copy();
+
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataBinary.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataBinary.java
new file mode 100644
index 0000000..c777aae
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataBinary.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.stats;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Column statistics value of binary type.
+ */
+public class CatalogColumnStatisticsDataBinary extends CatalogColumnStatisticsDataBase {
+ /**
+ * max length of all values.
+ */
+ private final long maxLength;
+
+ /**
+ * average length of all values.
+ */
+ private final double avgLength;
+
+ public CatalogColumnStatisticsDataBinary(long maxLength, double avgLength, long nullCount) {
+ super(nullCount);
+ this.maxLength = maxLength;
+ this.avgLength = avgLength;
+ }
+
+ public CatalogColumnStatisticsDataBinary(long maxLength, double avgLength, long nullCount, Map<String, String> properties) {
+ super(nullCount, properties);
+ this.maxLength = maxLength;
+ this.avgLength = avgLength;
+ }
+
+ public long getMaxLength() {
+ return maxLength;
+ }
+
+ public double getAvgLength() {
+ return avgLength;
+ }
+
+ public CatalogColumnStatisticsDataBinary copy() {
+ return new CatalogColumnStatisticsDataBinary(maxLength, avgLength, getNullCount(), new HashMap<>(getProperties()));
+ }
+
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataBoolean.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataBoolean.java
new file mode 100644
index 0000000..f499918
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataBoolean.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.stats;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Column statistics value of boolean type.
+ */
+public class CatalogColumnStatisticsDataBoolean extends CatalogColumnStatisticsDataBase {
+ /**
+ * number of "true" values.
+ */
+ private final long trueCount;
+
+ /**
+ * number of "false" values.
+ */
+ private final long falseCount;
+
+ public CatalogColumnStatisticsDataBoolean(long trueCount, long falseCount, long nullCount) {
+ super(nullCount);
+ this.trueCount = trueCount;
+ this.falseCount = falseCount;
+ }
+
+ public CatalogColumnStatisticsDataBoolean(long trueCount, long falseCount, long nullCount, Map<String, String> properties) {
+ super(nullCount, properties);
+ this.trueCount = trueCount;
+ this.falseCount = falseCount;
+ }
+
+ public Long getTrueCount() {
+ return trueCount;
+ }
+
+ public Long getFalseCount() {
+ return falseCount;
+ }
+
+ public CatalogColumnStatisticsDataBoolean copy() {
+ return new CatalogColumnStatisticsDataBoolean(trueCount, falseCount, getNullCount(), new HashMap<>(getProperties()));
+ }
+
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataDate.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataDate.java
new file mode 100644
index 0000000..fe7fad2
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataDate.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.stats;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Column statistics value of date type.
+ */
+public class CatalogColumnStatisticsDataDate extends CatalogColumnStatisticsDataBase {
+ /**
+ * mim value.
+ */
+ private final Date min;
+
+ /**
+ * max value.
+ */
+ private final Date max;
+
+ /**
+ * number of distinct values.
+ */
+ private final long ndv;
+
+ public CatalogColumnStatisticsDataDate(Date min, Date max, long ndv, long nullCount) {
+ super(nullCount);
+ this.min = min;
+ this.max = max;
+ this.ndv = ndv;
+ }
+
+ public CatalogColumnStatisticsDataDate(Date min, Date max, long ndv, long nullCount, Map<String, String> properties) {
+ super(nullCount, properties);
+ this.min = min;
+ this.max = max;
+ this.ndv = ndv;
+ }
+
+ public Date getMin() {
+ return min;
+ }
+
+ public Date getMax() {
+ return max;
+ }
+
+ public long getNdv() {
+ return ndv;
+ }
+
+ public CatalogColumnStatisticsDataDate copy() {
+ return new CatalogColumnStatisticsDataDate(min, max, ndv, getNullCount(), new HashMap<>(getProperties()));
+ }
+
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataDouble.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataDouble.java
new file mode 100644
index 0000000..b7ec6db
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataDouble.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.stats;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Column statistics value of double type.
+ */
+public class CatalogColumnStatisticsDataDouble extends CatalogColumnStatisticsDataBase {
+ /**
+ * mim value.
+ */
+ private final double min;
+
+ /**
+ * max value.
+ */
+ private final double max;
+
+ /**
+ * number of distinct values.
+ */
+ private final long ndv;
+
+ public CatalogColumnStatisticsDataDouble(double min, double max, long ndv, long nullCount) {
+ super(nullCount);
+ this.min = min;
+ this.max = max;
+ this.ndv = ndv;
+ }
+
+ public CatalogColumnStatisticsDataDouble(double min, double max, long ndv, long nullCount, Map<String, String> properties) {
+ super(nullCount, properties);
+ this.min = min;
+ this.max = max;
+ this.ndv = ndv;
+ }
+
+ public double getMin() {
+ return min;
+ }
+
+ public double getMax() {
+ return max;
+ }
+
+ public long getNdv() {
+ return ndv;
+ }
+
+ public CatalogColumnStatisticsDataDouble copy() {
+ return new CatalogColumnStatisticsDataDouble(min, max, ndv, getNullCount(), new HashMap<>(getProperties()));
+ }
+
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataLong.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataLong.java
new file mode 100644
index 0000000..65b54c3
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataLong.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.stats;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Column statistics value of long type.
+ */
+public class CatalogColumnStatisticsDataLong extends CatalogColumnStatisticsDataBase {
+ /**
+ * mim value.
+ */
+ private final long min;
+
+ /**
+ * max value.
+ */
+ private final long max;
+
+ /**
+ * number of distinct values.
+ */
+ private final long ndv;
+
+ public CatalogColumnStatisticsDataLong(long min, long max, long ndv, long nullCount) {
+ super(nullCount);
+ this.min = min;
+ this.max = max;
+ this.ndv = ndv;
+ }
+
+ public CatalogColumnStatisticsDataLong(long min, long max, long ndv, long nullCount, Map<String, String> properties) {
+ super(nullCount, properties);
+ this.min = min;
+ this.max = max;
+ this.ndv = ndv;
+ }
+
+ public long getMin() {
+ return min;
+ }
+
+ public long getMax() {
+ return max;
+ }
+
+ public long getNdv() {
+ return ndv;
+ }
+
+ public CatalogColumnStatisticsDataLong copy() {
+ return new CatalogColumnStatisticsDataLong(min, max, ndv, getNullCount(), new HashMap<>(getProperties()));
+ }
+
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataString.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataString.java
new file mode 100644
index 0000000..2fdecc8
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataString.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.stats;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Column statistics value of string type.
+ */
+public class CatalogColumnStatisticsDataString extends CatalogColumnStatisticsDataBase {
+ /**
+ * max length of all values.
+ */
+ private final long maxLength;
+
+ /**
+ * average length of all values.
+ */
+ private final double avgLength;
+
+ /**
+ * number of distinct values.
+ */
+ private final long ndv;
+
+ public CatalogColumnStatisticsDataString(long maxLength, double avgLength, long ndv, long nullCount) {
+ super(nullCount);
+ this.maxLength = maxLength;
+ this.avgLength = avgLength;
+ this.ndv = ndv;
+ }
+
+ public CatalogColumnStatisticsDataString(long maxLength, double avgLength, long ndv, long nullCount, Map<String, String> properties) {
+ super(nullCount, properties);
+ this.maxLength = maxLength;
+ this.avgLength = avgLength;
+ this.ndv = ndv;
+ }
+
+ public long getMaxLength() {
+ return maxLength;
+ }
+
+ public double getAvgLength() {
+ return avgLength;
+ }
+
+ public long getNdv() {
+ return ndv;
+ }
+
+ public CatalogColumnStatisticsDataString copy() {
+ return new CatalogColumnStatisticsDataString(maxLength, avgLength, ndv, getNullCount(), new HashMap<>(getProperties()));
+ }
+
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogTableStatistics.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogTableStatistics.java
new file mode 100644
index 0000000..55433a3
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogTableStatistics.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.stats;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Statistics for a non-partitioned table or a partition of a partitioned table.
+ */
+public class CatalogTableStatistics {
+ public static final CatalogTableStatistics UNKNOWN = new CatalogTableStatistics(0, 0, 0, 0);
+
+ /**
+ * The number of rows in the table or partition.
+ */
+ private final long rowCount;
+
+ /**
+ * The number of files on disk.
+ */
+ private final int fileCount;
+
+ /**
+ * The total size in bytes.
+ */
+ private final long totalSize;
+
+ /**
+ * The raw data size (size when loaded in memory) in bytes.
+ */
+ private final long rawDataSize;
+
+ private Map<String, String> properties;
+
+ public CatalogTableStatistics(long rowCount, int fileCount, long totalSize, long rawDataSize) {
+ this(rowCount, fileCount, totalSize, rawDataSize, new HashMap<>());
+ }
+
+ public CatalogTableStatistics(long rowCount, int fileCount, long totalSize, long rawDataSize,
+ Map<String, String> properties) {
+ this.rowCount = rowCount;
+ this.fileCount = fileCount;
+ this.totalSize = totalSize;
+ this.rawDataSize = rawDataSize;
+ this.properties = properties;
+ }
+
+ /**
+ * The number of rows.
+ */
+ public long getRowCount() {
+ return this.rowCount;
+ }
+
+ public int getFileCount() {
+ return this.fileCount;
+ }
+
+ public long getTotalSize() {
+ return this.totalSize;
+ }
+
+ public long getRawDataSize() {
+ return this.rawDataSize;
+ }
+
+ public Map<String, String> getProperties() {
+ return this.properties;
+ }
+
+ /**
+ * Create a deep copy of "this" instance.
+ *
+ * @return a deep copy
+ */
+ public CatalogTableStatistics copy() {
+ return new CatalogTableStatistics(this.rowCount, this.fileCount, this.totalSize, this.rawDataSize,
+ new HashMap<>(this.properties));
+ }
+
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/Date.java
similarity index 55%
copy from flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
copy to flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/Date.java
index 545fad9..059a10c 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/Date.java
@@ -16,33 +16,24 @@
* limitations under the License.
*/
-package org.apache.flink.table.catalog;
-
-import org.apache.flink.table.plan.stats.TableStats;
-
-import java.util.List;
+package org.apache.flink.table.catalog.stats;
/**
- * Represents a table in a catalog.
+ * Class representing a date value in statistics.
*/
-public interface CatalogTable extends CatalogBaseTable {
- /**
- * Get the statistics of the table.
- * @return table statistics
- */
- TableStats getStatistics();
+public class Date {
+ private long daysSinceEpoch;
+
+ public Date(long daysSinceEpoch) {
+ this.daysSinceEpoch = daysSinceEpoch;
+ }
+
+ public long getDaysSinceEpoch() {
+ return daysSinceEpoch;
+ }
- /**
- * Check if the table is partitioned or not.
- *
- * @return true if the table is partitioned; otherwise, false
- */
- boolean isPartitioned();
+ public Date copy() {
+ return new Date(daysSinceEpoch);
+ }
- /**
- * Get the partition keys of the table. This will be an empty set if the table is not partitioned.
- *
- * @return partition keys of the table
- */
- List<String> getPartitionKeys();
}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
index bf44bf1..b131698 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
@@ -18,9 +18,21 @@
package org.apache.flink.table.catalog;
-import org.apache.flink.table.plan.stats.TableStats;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBase;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBinary;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBoolean;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDate;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDouble;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataLong;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataString;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.catalog.stats.Date;
+
+import java.util.Map;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
/**
* Utility class for catalog testing.
@@ -29,16 +41,11 @@ public class CatalogTestUtil {
public static void checkEquals(CatalogTable t1, CatalogTable t2) {
assertEquals(t1.getSchema(), t2.getSchema());
- checkEquals(t1.getStatistics(), t2.getStatistics());
assertEquals(t1.getComment(), t2.getComment());
assertEquals(t1.getProperties(), t2.getProperties());
assertEquals(t1.getPartitionKeys(), t2.getPartitionKeys());
assertEquals(t1.isPartitioned(), t2.isPartitioned());
- }
-
- public static void checkEquals(TableStats ts1, TableStats ts2) {
- assertEquals(ts1.getRowCount(), ts2.getRowCount());
- assertEquals(ts1.getColumnStats().size(), ts2.getColumnStats().size());
+ assertEquals(t1.getDescription(), t2.getDescription());
}
public static void checkEquals(CatalogView v1, CatalogView v2) {
@@ -61,4 +68,93 @@ public class CatalogTestUtil {
public static void checkEquals(CatalogPartition p1, CatalogPartition p2) {
assertEquals(p1.getProperties(), p2.getProperties());
}
+
+ static void checkEquals(CatalogTableStatistics ts1, CatalogTableStatistics ts2) {
+ assertEquals(ts1.getRowCount(), ts2.getRowCount());
+ assertEquals(ts1.getFileCount(), ts2.getFileCount());
+ assertEquals(ts1.getTotalSize(), ts2.getTotalSize());
+ assertEquals(ts1.getRawDataSize(), ts2.getRawDataSize());
+ assertEquals(ts1.getProperties(), ts2.getProperties());
+ }
+
+ static void checkEquals(CatalogColumnStatistics cs1, CatalogColumnStatistics cs2) {
+ checkEquals(cs1.getColumnStatisticsData(), cs2.getColumnStatisticsData());
+ assertEquals(cs1.getProperties(), cs2.getProperties());
+ }
+
+ private static void checkEquals(Map<String, CatalogColumnStatisticsDataBase> m1, Map<String, CatalogColumnStatisticsDataBase> m2) {
+ assertEquals(m1.size(), m2.size());
+ for (Map.Entry<String, CatalogColumnStatisticsDataBase> entry : m2.entrySet()) {
+ assertTrue(m1.containsKey(entry.getKey()));
+ checkEquals(m2.get(entry.getKey()), entry.getValue());
+ }
+ }
+
+ private static void checkEquals(CatalogColumnStatisticsDataBase v1, CatalogColumnStatisticsDataBase v2) {
+ assertEquals(v1.getClass(), v2.getClass());
+ if (v1 instanceof CatalogColumnStatisticsDataBoolean) {
+ checkEquals((CatalogColumnStatisticsDataBoolean) v1, (CatalogColumnStatisticsDataBoolean) v2);
+ } else if (v1 instanceof CatalogColumnStatisticsDataLong) {
+ checkEquals((CatalogColumnStatisticsDataLong) v1, (CatalogColumnStatisticsDataLong) v2);
+ } else if (v1 instanceof CatalogColumnStatisticsDataBinary) {
+ checkEquals((CatalogColumnStatisticsDataBinary) v1, (CatalogColumnStatisticsDataBinary) v2);
+ } else if (v1 instanceof CatalogColumnStatisticsDataDate) {
+ checkEquals((CatalogColumnStatisticsDataDate) v1, (CatalogColumnStatisticsDataDate) v2);
+ } else if (v1 instanceof CatalogColumnStatisticsDataString) {
+ checkEquals((CatalogColumnStatisticsDataString) v1, (CatalogColumnStatisticsDataString) v2);
+ } else if (v1 instanceof CatalogColumnStatisticsDataDouble) {
+ checkEquals((CatalogColumnStatisticsDataDouble) v1, (CatalogColumnStatisticsDataDouble) v2);
+ }
+ }
+
+ private static void checkEquals(CatalogColumnStatisticsDataBoolean v1, CatalogColumnStatisticsDataBoolean v2) {
+ assertEquals(v1.getFalseCount(), v2.getFalseCount());
+ assertEquals(v1.getTrueCount(), v2.getTrueCount());
+ assertEquals(v1.getNullCount(), v2.getNullCount());
+ assertEquals(v1.getProperties(), v2.getProperties());
+ }
+
+ private static void checkEquals(CatalogColumnStatisticsDataLong v1, CatalogColumnStatisticsDataLong v2) {
+ assertEquals(v1.getMin(), v2.getMin());
+ assertEquals(v1.getMax(), v2.getMax());
+ assertEquals(v1.getNdv(), v2.getNdv());
+ assertEquals(v1.getNullCount(), v2.getNullCount());
+ assertEquals(v1.getProperties(), v2.getProperties());
+ }
+
+ private static void checkEquals(CatalogColumnStatisticsDataDouble v1, CatalogColumnStatisticsDataDouble v2) {
+ assertEquals(v1.getMin(), v2.getMin(), 0.05D);
+ assertEquals(v1.getMax(), v2.getMax(), 0.05D);
+ assertEquals(v1.getNdv(), v2.getNdv());
+ assertEquals(v1.getNullCount(), v2.getNullCount());
+ assertEquals(v1.getProperties(), v2.getProperties());
+ }
+
+ private static void checkEquals(CatalogColumnStatisticsDataString v1, CatalogColumnStatisticsDataString v2) {
+ assertEquals(v1.getMaxLength(), v2.getMaxLength());
+ assertEquals(v1.getAvgLength(), v2.getAvgLength(), 0.05D);
+ assertEquals(v1.getNdv(), v2.getNdv());
+ assertEquals(v1.getNullCount(), v2.getNullCount());
+ assertEquals(v1.getProperties(), v2.getProperties());
+ }
+
+ private static void checkEquals(CatalogColumnStatisticsDataBinary v1, CatalogColumnStatisticsDataBinary v2) {
+ assertEquals(v1.getMaxLength(), v2.getMaxLength());
+ assertEquals(v1.getAvgLength(), v2.getAvgLength(), 0.05D);
+ assertEquals(v1.getNullCount(), v2.getNullCount());
+ assertEquals(v1.getProperties(), v2.getProperties());
+ }
+
+ private static void checkEquals(CatalogColumnStatisticsDataDate v1, CatalogColumnStatisticsDataDate v2) {
+ checkEquals(v1.getMin(), v2.getMin());
+ checkEquals(v1.getMax(), v2.getMax());
+ assertEquals(v1.getNdv(), v2.getNdv());
+ assertEquals(v1.getNullCount(), v2.getNullCount());
+ assertEquals(v1.getProperties(), v2.getProperties());
+ }
+
+ private static void checkEquals(Date v1, Date v2) {
+ assertEquals(v1.getDaysSinceEpoch(), v2.getDaysSinceEpoch());
+ }
+
}