You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/06/08 10:47:41 UTC

[flink] branch release-1.11 updated: [FLINK-18046][hive] Decimal column stats not supported for Hive table

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 406b5d2  [FLINK-18046][hive] Decimal column stats not supported for Hive table
406b5d2 is described below

commit 406b5d24404c3f72ce84d8308f47aa76246000d3
Author: Rui Li <li...@apache.org>
AuthorDate: Mon Jun 8 18:41:11 2020 +0800

    [FLINK-18046][hive] Decimal column stats not supported for Hive table
    
    
    This closes #12424
---
 .../table/catalog/hive/util/HiveStatsUtil.java     | 51 ++++++++++++++++++++++
 .../catalog/hive/HiveCatalogHiveMetadataTest.java  | 10 +++--
 2 files changed, 58 insertions(+), 3 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java
index 9fd4394..d4ae6d2 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java
@@ -31,12 +31,15 @@ import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataString;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 
+import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.Decimal;
+import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
@@ -50,6 +53,9 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -171,6 +177,20 @@ public class HiveStatsUtil {
 					stringStats.isSetAvgColLen() ? stringStats.getAvgColLen() : null,
 					stringStats.isSetNumDVs() ? stringStats.getNumDVs() : null,
 					stringStats.isSetNumDVs() ? stringStats.getNumNulls() : null);
+		} else if (stats.isSetDecimalStats()) {
+			DecimalColumnStatsData decimalStats = stats.getDecimalStats();
+			// for now, just return CatalogColumnStatisticsDataDouble for decimal columns
+			Double max = null;
+			if (decimalStats.isSetHighValue()) {
+				max = toHiveDecimal(decimalStats.getHighValue()).doubleValue();
+			}
+			Double min = null;
+			if (decimalStats.isSetLowValue()) {
+				min = toHiveDecimal(decimalStats.getLowValue()).doubleValue();
+			}
+			Long ndv = decimalStats.isSetNumDVs() ? decimalStats.getNumDVs() : null;
+			Long nullCount = decimalStats.isSetNumNulls() ? decimalStats.getNumNulls() : null;
+			return new CatalogColumnStatisticsDataDouble(min, max, ndv, nullCount);
 		} else {
 			LOG.warn("Flink does not support converting ColumnStatisticsData '{}' for Hive column type '{}' yet.", stats, colType);
 			return null;
@@ -288,11 +308,42 @@ public class HiveStatsUtil {
 				}
 				return ColumnStatisticsData.binaryStats(hiveBinaryColumnStats);
 			}
+		} else if (type.equals(LogicalTypeRoot.DECIMAL)) {
+			if (colStat instanceof CatalogColumnStatisticsDataDouble) {
+				CatalogColumnStatisticsDataDouble flinkStats = (CatalogColumnStatisticsDataDouble) colStat;
+				DecimalColumnStatsData hiveStats = new DecimalColumnStatsData();
+				if (flinkStats.getMax() != null) {
+					// in older versions we cannot create HiveDecimal from Double, so convert Double to BigDecimal first
+					hiveStats.setHighValue(toThriftDecimal(HiveDecimal.create(BigDecimal.valueOf(flinkStats.getMax()))));
+				}
+				if (flinkStats.getMin() != null) {
+					hiveStats.setLowValue(toThriftDecimal(HiveDecimal.create(BigDecimal.valueOf(flinkStats.getMin()))));
+				}
+				if (flinkStats.getNdv() != null) {
+					hiveStats.setNumDVs(flinkStats.getNdv());
+				}
+				if (flinkStats.getNullCount() != null) {
+					hiveStats.setNumNulls(flinkStats.getNullCount());
+				}
+				return ColumnStatisticsData.decimalStats(hiveStats);
+			}
 		}
 		throw new CatalogException(String.format("Flink does not support converting ColumnStats '%s' for Hive column " +
 												"type '%s' yet", colStat, colType));
 	}
 
+	private static Decimal toThriftDecimal(HiveDecimal hiveDecimal) {
+		// the constructor signature changed in 3.x. use default constructor and set each field...
+		Decimal res = new Decimal();
+		res.setUnscaled(ByteBuffer.wrap(hiveDecimal.unscaledValue().toByteArray()));
+		res.setScale((short) hiveDecimal.scale());
+		return res;
+	}
+
+	private static HiveDecimal toHiveDecimal(Decimal decimal) {
+		return HiveDecimal.create(new BigInteger(decimal.getUnscaled()), decimal.getScale());
+	}
+
 	public static int parsePositiveIntStat(Map<String, String> parameters, String key) {
 		String value = parameters.get(key);
 		if (value == null) {
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
index 88ef5f6..c0144ab 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
@@ -123,9 +123,11 @@ public class HiveCatalogHiveMetadataTest extends HiveCatalogMetadataTestBase {
 				.field("third", DataTypes.BOOLEAN())
 				.field("fourth", DataTypes.DOUBLE())
 				.field("fifth", DataTypes.BIGINT())
-				.field("sixth", DataTypes.BYTES());
+				.field("sixth", DataTypes.BYTES())
+				.field("seventh", DataTypes.DECIMAL(10, 3))
+				.field("eighth", DataTypes.DECIMAL(30, 3));
 		if (supportDateStats) {
-			builder.field("seventh", DataTypes.DATE());
+			builder.field("ninth", DataTypes.DATE());
 		}
 		TableSchema tableSchema = builder.build();
 		CatalogTable catalogTable = new CatalogTableImpl(tableSchema, getBatchTableProperties(), TEST_COMMENT);
@@ -137,8 +139,10 @@ public class HiveCatalogHiveMetadataTest extends HiveCatalogMetadataTestBase {
 		columnStatisticsDataBaseMap.put("fourth", new CatalogColumnStatisticsDataDouble(15.02, 20.01, 3L, 10L));
 		columnStatisticsDataBaseMap.put("fifth", new CatalogColumnStatisticsDataLong(0L, 20L, 3L, 2L));
 		columnStatisticsDataBaseMap.put("sixth", new CatalogColumnStatisticsDataBinary(150L, 20D, 3L));
+		columnStatisticsDataBaseMap.put("seventh", new CatalogColumnStatisticsDataDouble(1.23, 99.456, 100L, 0L));
+		columnStatisticsDataBaseMap.put("eighth", new CatalogColumnStatisticsDataDouble(0.123, 123456.789, 5723L, 19L));
 		if (supportDateStats) {
-			columnStatisticsDataBaseMap.put("seventh", new CatalogColumnStatisticsDataDate(
+			columnStatisticsDataBaseMap.put("ninth", new CatalogColumnStatisticsDataDate(
 					new Date(71L), new Date(17923L), 132L, 0L));
 		}
 		CatalogColumnStatistics catalogColumnStatistics = new CatalogColumnStatistics(columnStatisticsDataBaseMap);