You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/07/05 19:08:15 UTC
[flink] branch master updated: [FLINK-12237][hive]Support Hive
table stats related operations in HiveCatalog
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 6d7a15e [FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog
6d7a15e is described below
commit 6d7a15e328e8af6f001d6191c3f167bd8038bf14
Author: zjuwangg <zj...@foxmail.com>
AuthorDate: Wed Jun 12 00:53:00 2019 +0800
[FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog
This pull request makes HiveCatalog support Hive table stats related operations.
This closes #8636.
---
.../flink/table/catalog/hive/HiveCatalog.java | 80 ++++++++++++++++++++--
.../hive/client/HiveMetastoreClientWrapper.java | 11 +--
.../flink/table/catalog/hive/client/HiveShim.java | 14 ++++
.../table/catalog/hive/client/HiveShimV1.java | 10 +++
.../table/catalog/hive/client/HiveShimV2.java | 9 +++
.../table/catalog/hive/util/HiveStatsUtil.java | 2 +
.../hive/HiveCatalogGenericMetadataTest.java | 8 +++
.../table/catalog/GenericInMemoryCatalog.java | 9 ++-
.../apache/flink/table/catalog/CatalogTest.java | 79 +++++++++++++++++++++
9 files changed, 208 insertions(+), 14 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index d83692b..fea0de4 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -59,6 +59,7 @@ import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.factories.TableFactory;
import org.apache.flink.util.StringUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.TableType;
@@ -826,7 +827,7 @@ public class HiveCatalog extends AbstractCatalog {
}
private void ensurePartitionedTable(ObjectPath tablePath, Table hiveTable) throws TableNotPartitionedException {
- if (hiveTable.getPartitionKeysSize() == 0) {
+ if (!isTablePartitioned(hiveTable)) {
throw new TableNotPartitionedException(getName(), tablePath);
}
}
@@ -1077,7 +1078,19 @@ public class HiveCatalog extends AbstractCatalog {
@Override
public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
-
+ try {
+ Table hiveTable = getHiveTable(tablePath);
+ // Set table stats
+ if (compareAndUpdateStatisticsProperties(tableStatistics, hiveTable.getParameters())) {
+ client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), hiveTable);
+ }
+ } catch (TableNotExistException e) {
+ if (!ignoreIfNotExists) {
+ throw e;
+ }
+ } catch (TException e) {
+ throw new CatalogException(String.format("Failed to alter table stats of table %s", tablePath.getFullName()), e);
+ }
}
@Override
@@ -1099,9 +1112,50 @@ public class HiveCatalog extends AbstractCatalog {
}
}
+ /**
+ * Determine if statistics is need to be updated, if it needs to be updated and updated its parameters.
+ * @param statistics original ``hive table statistics.
+ * @param parameters new catalog table statistics parameters.
+ * @return needUpdateStatistics flag which indicates whether need to update stats.
+ */
+ private static boolean compareAndUpdateStatisticsProperties(CatalogTableStatistics statistics, Map<String, String> parameters) {
+ boolean needUpdateStatistics;
+ String oldRowCount = parameters.getOrDefault(StatsSetupConst.ROW_COUNT, HiveStatsUtil.DEFAULT_STATS_ZERO_CONST);
+ String oldTotalSize = parameters.getOrDefault(StatsSetupConst.TOTAL_SIZE, HiveStatsUtil.DEFAULT_STATS_ZERO_CONST);
+ String oldNumFiles = parameters.getOrDefault(StatsSetupConst.NUM_FILES, HiveStatsUtil.DEFAULT_STATS_ZERO_CONST);
+ String oldRawDataSize = parameters.getOrDefault(StatsSetupConst.RAW_DATA_SIZE, HiveStatsUtil.DEFAULT_STATS_ZERO_CONST);
+ needUpdateStatistics = statistics.getRowCount() != Long.parseLong(oldRowCount) || statistics.getTotalSize() != Long.parseLong(oldTotalSize)
+ || statistics.getFileCount() != Integer.parseInt(oldNumFiles) || statistics.getRawDataSize() != Long.parseLong(oldRawDataSize);
+ if (needUpdateStatistics) {
+ parameters.put(StatsSetupConst.ROW_COUNT, String.valueOf(statistics.getRowCount()));
+ parameters.put(StatsSetupConst.TOTAL_SIZE, String.valueOf(statistics.getTotalSize()));
+ parameters.put(StatsSetupConst.NUM_FILES, String.valueOf(statistics.getFileCount()));
+ parameters.put(StatsSetupConst.RAW_DATA_SIZE, String.valueOf(statistics.getRawDataSize()));
+ }
+ return needUpdateStatistics;
+ }
+
+ private static CatalogTableStatistics createCatalogTableStatistics(Map<String, String> parameters) {
+ long rowRount = Long.parseLong(parameters.getOrDefault(StatsSetupConst.ROW_COUNT, HiveStatsUtil.DEFAULT_STATS_ZERO_CONST));
+ long totalSize = Long.parseLong(parameters.getOrDefault(StatsSetupConst.TOTAL_SIZE, HiveStatsUtil.DEFAULT_STATS_ZERO_CONST));
+ int numFiles = Integer.parseInt(parameters.getOrDefault(StatsSetupConst.NUM_FILES, HiveStatsUtil.DEFAULT_STATS_ZERO_CONST));
+ long rawDataSize = Long.parseLong(parameters.getOrDefault(StatsSetupConst.RAW_DATA_SIZE, HiveStatsUtil.DEFAULT_STATS_ZERO_CONST));
+ return new CatalogTableStatistics(rowRount, numFiles, totalSize, rawDataSize);
+ }
+
@Override
public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
-
+ try {
+ Partition hivePartition = getHivePartition(tablePath, partitionSpec);
+ // Set table stats
+ if (compareAndUpdateStatisticsProperties(partitionStatistics, hivePartition.getParameters())) {
+ client.alter_partition(tablePath.getDatabaseName(), tablePath.getObjectName(), hivePartition);
+ }
+ } catch (TableNotExistException | PartitionSpecInvalidException e) {
+ throw new PartitionNotExistException(getName(), tablePath, partitionSpec, e);
+ } catch (TException e) {
+ throw new CatalogException(String.format("Failed to alter table stats of table %s 's partition %s", tablePath.getFullName(), String.valueOf(partitionSpec)), e);
+ }
}
@Override
@@ -1132,8 +1186,14 @@ public class HiveCatalog extends AbstractCatalog {
}
@Override
- public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException {
- throw new UnsupportedOperationException();
+ public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws TableNotExistException,
+ CatalogException {
+ Table hiveTable = getHiveTable(tablePath);
+ if (!isTablePartitioned(hiveTable)) {
+ return createCatalogTableStatistics(hiveTable.getParameters());
+ } else {
+ return CatalogTableStatistics.UNKNOWN;
+ }
}
@Override
@@ -1156,7 +1216,15 @@ public class HiveCatalog extends AbstractCatalog {
@Override
public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException {
- throw new UnsupportedOperationException();
+ try {
+ Partition partition = getHivePartition(tablePath, partitionSpec);
+ return createCatalogTableStatistics(partition.getParameters());
+ } catch (TableNotExistException | PartitionSpecInvalidException e) {
+ throw new PartitionNotExistException(getName(), tablePath, partitionSpec, e);
+ } catch (TException e) {
+ throw new CatalogException(String.format("Failed to get partition stats of table %s 's partition %s",
+ tablePath.getFullName(), String.valueOf(partitionSpec)), e);
+ }
}
@Override
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java
index 6f26ada..e1b25e6 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java
@@ -137,11 +137,6 @@ public class HiveMetastoreClientWrapper implements AutoCloseable {
client.createTable(table);
}
- public void alter_table(String databaseName, String tableName, Table table)
- throws InvalidOperationException, MetaException, TException {
- client.alter_table(databaseName, tableName, table);
- }
-
public void createDatabase(Database database)
throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
client.createDatabase(database);
@@ -234,4 +229,10 @@ public class HiveMetastoreClientWrapper implements AutoCloseable {
HiveShim hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
return hiveShim.getFunction(client, databaseName, functionName);
}
+
+ public void alter_table(String databaseName, String tableName, Table table)
+ throws InvalidOperationException, MetaException, TException {
+ HiveShim hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
+ hiveShim.alterTable(client, databaseName, tableName, table);
+ }
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
index b0eab75..320f078 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
@@ -24,7 +24,10 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.thrift.TException;
@@ -83,4 +86,15 @@ public interface HiveShim {
* @throws IOException if the file/directory cannot be properly moved or deleted
*/
boolean moveToTrash(FileSystem fs, Path path, Configuration conf, boolean purge) throws IOException;
+
+ /**
+ * Alters a Hive table.
+ *
+ * @param client the Hive metastore client
+ * @param databaseName the name of the database to which the table belongs
+ * @param tableName the name of the table to be altered
+ * @param table the new Hive table
+ */
+ void alterTable(IMetaStoreClient client, String databaseName, String tableName, Table table)
+ throws InvalidOperationException, MetaException, TException;
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV1.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV1.java
index f28fc5c..830d6e6 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV1.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV1.java
@@ -24,10 +24,12 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Table;
@@ -95,4 +97,12 @@ public class HiveShimV1 implements HiveShim {
throw new IOException("Failed to move " + path + " to trash", e);
}
}
+
+ @Override
+ public void alterTable(IMetaStoreClient client, String databaseName, String tableName, Table table) throws InvalidOperationException, MetaException, TException {
+ // For Hive-1.2.1, we need to tell HMS not to update stats. Otherwise, the stats we put in the table
+ // parameters can be overridden. The extra config we add here will be removed by HMS after it's used.
+ table.getParameters().put(StatsSetupConst.DO_NOT_UPDATE_STATS, "true");
+ client.alter_table(databaseName, tableName, table);
+ }
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV2.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV2.java
index 58bb460..7df0aaf 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV2.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV2.java
@@ -29,7 +29,10 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.thrift.TException;
@@ -86,4 +89,10 @@ public class HiveShimV2 implements HiveShim {
throw new IOException("Failed to move " + path + " to trash", e);
}
}
+
+ @Override
+ public void alterTable(IMetaStoreClient client, String databaseName, String tableName, Table table) throws InvalidOperationException, MetaException, TException {
+ // For Hive-2.3.4, we don't need to tell HMS not to update stats.
+ client.alter_table(databaseName, tableName, table);
+ }
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java
index 80baf72..97bfa56 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java
@@ -62,6 +62,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
public class HiveStatsUtil {
private static final Logger LOG = LoggerFactory.getLogger(HiveStatsUtil.class);
+ public static final String DEFAULT_STATS_ZERO_CONST = "0";
+
private HiveStatsUtil() {}
/**
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
index 5bc6ff6..83e0132 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
@@ -266,6 +266,14 @@ public class HiveCatalogGenericMetadataTest extends CatalogTestBase {
public void testListPartitionPartialSpec() throws Exception {
}
+ @Override
+ public void testGetPartitionStats() throws Exception {
+ }
+
+ @Override
+ public void testAlterPartitionTableStats() throws Exception {
+ }
+
// ------ test utils ------
@Override
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
index 9efce69..c0924ca 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
@@ -590,9 +590,12 @@ public class GenericInMemoryCatalog extends AbstractCatalog {
if (!tableExists(tablePath)) {
throw new TableNotExistException(getName(), tablePath);
}
-
- CatalogTableStatistics result = tableStats.get(tablePath);
- return result != null ? result.copy() : CatalogTableStatistics.UNKNOWN;
+ if (!isPartitionedTable(tablePath)) {
+ CatalogTableStatistics result = tableStats.get(tablePath);
+ return result != null ? result.copy() : CatalogTableStatistics.UNKNOWN;
+ } else {
+ return CatalogTableStatistics.UNKNOWN;
+ }
}
@Override
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
index 0c2b632..357eb23 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.junit.After;
import org.junit.AfterClass;
@@ -1056,6 +1057,84 @@ public abstract class CatalogTest {
assertEquals(1, catalog.listPartitions(path1, createAnotherPartitionSpecSubset()).size());
}
+
+ // ------ table and column stats ------
+
+ @Test
+ public void testGetTableStats_TableNotExistException() throws Exception{
+ catalog.createDatabase(db1, createDb(), false);
+ exception.expect(org.apache.flink.table.catalog.exceptions.TableNotExistException.class);
+ catalog.getTableStatistics(path1);
+ }
+
+ @Test
+ public void testGetPartitionStats() throws Exception{
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, createPartitionedTable(), false);
+ catalog.createPartition(path1, createPartitionSpec(), createPartition(), false);
+ CatalogTableStatistics tableStatistics = catalog.getPartitionStatistics(path1, createPartitionSpec());
+ assertEquals(0, tableStatistics.getFileCount());
+ assertEquals(0, tableStatistics.getRawDataSize());
+ assertEquals(0, tableStatistics.getTotalSize());
+ assertEquals(0, tableStatistics.getRowCount());
+ }
+
+ @Test
+ public void testAlterTableStats() throws Exception{
+ // Non-partitioned table
+ catalog.createDatabase(db1, createDb(), false);
+ CatalogTable table = createTable();
+ catalog.createTable(path1, table, false);
+ CatalogTableStatistics tableStats = new CatalogTableStatistics(100, 10, 1000, 10000);
+ catalog.alterTableStatistics(path1, tableStats, false);
+ CatalogTableStatistics actual = catalog.getTableStatistics(path1);
+
+ // we don't check fileCount and totalSize here for hive will automatically calc and set to real num.
+ assertEquals(tableStats.getRowCount(), actual.getRowCount());
+ assertEquals(tableStats.getRawDataSize(), actual.getRawDataSize());
+ }
+
+ @Test
+ public void testAlterTableStats_partitionedTable() throws Exception {
+ // alterTableStats() should do nothing for partitioned tables
+ // getTableStats() should return unknown column stats for partitioned tables
+ catalog.createDatabase(db1, createDb(), false);
+ CatalogTable catalogTable = createPartitionedTable();
+ catalog.createTable(path1, catalogTable, false);
+
+ CatalogTableStatistics stats = new CatalogTableStatistics(100, 1, 1000, 10000);
+
+ catalog.alterTableStatistics(path1, stats, false);
+
+ assertEquals(CatalogTableStatistics.UNKNOWN, catalog.getTableStatistics(path1));
+ }
+
+ @Test
+ public void testAlterPartitionTableStats() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ CatalogTable catalogTable = createPartitionedTable();
+ catalog.createTable(path1, catalogTable, false);
+ CatalogPartitionSpec partitionSpec = createPartitionSpec();
+ catalog.createPartition(path1, partitionSpec, createPartition(), true);
+ CatalogTableStatistics stats = new CatalogTableStatistics(100, 1, 1000, 10000);
+ catalog.alterPartitionStatistics(path1, partitionSpec, stats, false);
+ CatalogTableStatistics actual = catalog.getPartitionStatistics(path1, partitionSpec);
+ assertEquals(stats.getRowCount(), actual.getRowCount());
+ assertEquals(stats.getRawDataSize(), actual.getRawDataSize());
+ }
+
+ @Test
+ public void testAlterTableStats_TableNotExistException() throws Exception {
+ exception.expect(TableNotExistException.class);
+ catalog.alterTableStatistics(new ObjectPath(catalog.getDefaultDatabase(), "nonexist"), CatalogTableStatistics.UNKNOWN, false);
+ }
+
+ @Test
+ public void testAlterTableStats_TableNotExistException_ignore() throws Exception {
+ catalog.alterTableStatistics(new ObjectPath("non", "exist"), CatalogTableStatistics.UNKNOWN, true);
+ }
+
+
// ------ utilities ------
/**