You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/06/08 10:47:41 UTC
[flink] branch release-1.11 updated: [FLINK-18046][hive] Decimal
column stats not supported for Hive table
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 406b5d2 [FLINK-18046][hive] Decimal column stats not supported for Hive table
406b5d2 is described below
commit 406b5d24404c3f72ce84d8308f47aa76246000d3
Author: Rui Li <li...@apache.org>
AuthorDate: Mon Jun 8 18:41:11 2020 +0800
[FLINK-18046][hive] Decimal column stats not supported for Hive table
This closes #12424
---
.../table/catalog/hive/util/HiveStatsUtil.java | 51 ++++++++++++++++++++++
.../catalog/hive/HiveCatalogHiveMetadataTest.java | 10 +++--
2 files changed, 58 insertions(+), 3 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java
index 9fd4394..d4ae6d2 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java
@@ -31,12 +31,15 @@ import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataString;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.Decimal;
+import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
@@ -50,6 +53,9 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -171,6 +177,20 @@ public class HiveStatsUtil {
stringStats.isSetAvgColLen() ? stringStats.getAvgColLen() : null,
stringStats.isSetNumDVs() ? stringStats.getNumDVs() : null,
stringStats.isSetNumDVs() ? stringStats.getNumNulls() : null);
+ } else if (stats.isSetDecimalStats()) {
+ DecimalColumnStatsData decimalStats = stats.getDecimalStats();
+ // for now, just return CatalogColumnStatisticsDataDouble for decimal columns
+ Double max = null;
+ if (decimalStats.isSetHighValue()) {
+ max = toHiveDecimal(decimalStats.getHighValue()).doubleValue();
+ }
+ Double min = null;
+ if (decimalStats.isSetLowValue()) {
+ min = toHiveDecimal(decimalStats.getLowValue()).doubleValue();
+ }
+ Long ndv = decimalStats.isSetNumDVs() ? decimalStats.getNumDVs() : null;
+ Long nullCount = decimalStats.isSetNumNulls() ? decimalStats.getNumNulls() : null;
+ return new CatalogColumnStatisticsDataDouble(min, max, ndv, nullCount);
} else {
LOG.warn("Flink does not support converting ColumnStatisticsData '{}' for Hive column type '{}' yet.", stats, colType);
return null;
@@ -288,11 +308,42 @@ public class HiveStatsUtil {
}
return ColumnStatisticsData.binaryStats(hiveBinaryColumnStats);
}
+ } else if (type.equals(LogicalTypeRoot.DECIMAL)) {
+ if (colStat instanceof CatalogColumnStatisticsDataDouble) {
+ CatalogColumnStatisticsDataDouble flinkStats = (CatalogColumnStatisticsDataDouble) colStat;
+ DecimalColumnStatsData hiveStats = new DecimalColumnStatsData();
+ if (flinkStats.getMax() != null) {
+ // in older versions we cannot create HiveDecimal from Double, so convert Double to BigDecimal first
+ hiveStats.setHighValue(toThriftDecimal(HiveDecimal.create(BigDecimal.valueOf(flinkStats.getMax()))));
+ }
+ if (flinkStats.getMin() != null) {
+ hiveStats.setLowValue(toThriftDecimal(HiveDecimal.create(BigDecimal.valueOf(flinkStats.getMin()))));
+ }
+ if (flinkStats.getNdv() != null) {
+ hiveStats.setNumDVs(flinkStats.getNdv());
+ }
+ if (flinkStats.getNullCount() != null) {
+ hiveStats.setNumNulls(flinkStats.getNullCount());
+ }
+ return ColumnStatisticsData.decimalStats(hiveStats);
+ }
}
throw new CatalogException(String.format("Flink does not support converting ColumnStats '%s' for Hive column " +
"type '%s' yet", colStat, colType));
}
+ private static Decimal toThriftDecimal(HiveDecimal hiveDecimal) {
+ // the constructor signature changed in 3.x. use default constructor and set each field...
+ Decimal res = new Decimal();
+ res.setUnscaled(ByteBuffer.wrap(hiveDecimal.unscaledValue().toByteArray()));
+ res.setScale((short) hiveDecimal.scale());
+ return res;
+ }
+
+ private static HiveDecimal toHiveDecimal(Decimal decimal) {
+ return HiveDecimal.create(new BigInteger(decimal.getUnscaled()), decimal.getScale());
+ }
+
public static int parsePositiveIntStat(Map<String, String> parameters, String key) {
String value = parameters.get(key);
if (value == null) {
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
index 88ef5f6..c0144ab 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
@@ -123,9 +123,11 @@ public class HiveCatalogHiveMetadataTest extends HiveCatalogMetadataTestBase {
.field("third", DataTypes.BOOLEAN())
.field("fourth", DataTypes.DOUBLE())
.field("fifth", DataTypes.BIGINT())
- .field("sixth", DataTypes.BYTES());
+ .field("sixth", DataTypes.BYTES())
+ .field("seventh", DataTypes.DECIMAL(10, 3))
+ .field("eighth", DataTypes.DECIMAL(30, 3));
if (supportDateStats) {
- builder.field("seventh", DataTypes.DATE());
+ builder.field("ninth", DataTypes.DATE());
}
TableSchema tableSchema = builder.build();
CatalogTable catalogTable = new CatalogTableImpl(tableSchema, getBatchTableProperties(), TEST_COMMENT);
@@ -137,8 +139,10 @@ public class HiveCatalogHiveMetadataTest extends HiveCatalogMetadataTestBase {
columnStatisticsDataBaseMap.put("fourth", new CatalogColumnStatisticsDataDouble(15.02, 20.01, 3L, 10L));
columnStatisticsDataBaseMap.put("fifth", new CatalogColumnStatisticsDataLong(0L, 20L, 3L, 2L));
columnStatisticsDataBaseMap.put("sixth", new CatalogColumnStatisticsDataBinary(150L, 20D, 3L));
+ columnStatisticsDataBaseMap.put("seventh", new CatalogColumnStatisticsDataDouble(1.23, 99.456, 100L, 0L));
+ columnStatisticsDataBaseMap.put("eighth", new CatalogColumnStatisticsDataDouble(0.123, 123456.789, 5723L, 19L));
if (supportDateStats) {
- columnStatisticsDataBaseMap.put("seventh", new CatalogColumnStatisticsDataDate(
+ columnStatisticsDataBaseMap.put("ninth", new CatalogColumnStatisticsDataDate(
new Date(71L), new Date(17923L), 132L, 0L));
}
CatalogColumnStatistics catalogColumnStatistics = new CatalogColumnStatistics(columnStatisticsDataBaseMap);