You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2021/10/13 10:11:36 UTC
[hive] branch master updated: HIVE-25610: Handle partition field
comments for Iceberg tables (Peter Vary reviewed by Marton Bod)(#2715)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 65d03fc HIVE-25610: Handle partition field comments for Iceberg tables (Peter Vary reviewed by Marton Bod)(#2715)
65d03fc is described below
commit 65d03fc3a1e40709645ea22a728c8a88468994d1
Author: pvary <pv...@cloudera.com>
AuthorDate: Wed Oct 13 12:11:15 2021 +0200
HIVE-25610: Handle partition field comments for Iceberg tables (Peter Vary reviewed by Marton Bod)(#2715)
---
.../apache/iceberg/mr/hive/HiveIcebergSerDe.java | 78 ++++++++--------------
.../hive/TestHiveIcebergStorageHandlerNoScan.java | 28 +++++++-
.../org/apache/hadoop/hive/ql/plan/PlanUtils.java | 2 +
.../apache/hadoop/hive/serde/serdeConstants.java | 4 ++
.../apache/hadoop/hive/serde2/AbstractSerDe.java | 45 ++++++++++++-
.../hive/metastore/utils/MetaStoreUtils.java | 2 +-
6 files changed, 104 insertions(+), 55 deletions(-)
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
index b260a2b..6bd4214 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
@@ -19,10 +19,8 @@
package org.apache.iceberg.mr.hive;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -31,19 +29,16 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.ColumnType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
-import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStats;
-import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.Writable;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
@@ -60,6 +55,7 @@ import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergObjectInspector;
import org.apache.iceberg.mr.mapred.Container;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,7 +66,6 @@ public class HiveIcebergSerDe extends AbstractSerDe {
" queryable from Hive, since HMS does not know about it.";
private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergSerDe.class);
- private static final String LIST_COLUMN_COMMENT = "columns.comments";
private ObjectInspector inspector;
private Schema tableSchema;
@@ -81,6 +76,8 @@ public class HiveIcebergSerDe extends AbstractSerDe {
@Override
public void initialize(@Nullable Configuration configuration, Properties serDeProperties,
Properties partitionProperties) throws SerDeException {
+ super.initialize(configuration, serDeProperties, partitionProperties);
+
// HiveIcebergSerDe.initialize is called multiple places in Hive code:
// - When we are trying to create a table - HiveDDL data is stored at the serDeProperties, but no Iceberg table
// is created yet.
@@ -113,7 +110,7 @@ public class HiveIcebergSerDe extends AbstractSerDe {
// provided in the CREATE TABLE query.
boolean autoConversion = configuration.getBoolean(InputFormatConfig.SCHEMA_AUTO_CONVERSION, false);
// If we can not load the table try the provided hive schema
- this.tableSchema = hiveSchemaOrThrow(serDeProperties, e, autoConversion);
+ this.tableSchema = hiveSchemaOrThrow(e, autoConversion);
// This is only for table creation, it is ok to have an empty partition column list
this.partitionColumns = ImmutableList.of();
// create table for CTAS
@@ -160,15 +157,10 @@ public class HiveIcebergSerDe extends AbstractSerDe {
serDeProperties.setProperty(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(tableSchema));
// build partition spec, if any
- String partColsString = serDeProperties.getProperty(serdeConstants.LIST_PARTITION_COLUMNS);
- if (partColsString != null && !partColsString.isEmpty()) {
- String partColDelimiter = partColsString.contains(String.valueOf(ColumnType.COLUMN_COMMENTS_DELIMITER)) ?
- String.valueOf(ColumnType.COLUMN_COMMENTS_DELIMITER) : String.valueOf(SerDeUtils.COMMA);
- String[] partCols = partColsString.split(partColDelimiter);
- String[] partColTypes = serDeProperties.getProperty(serdeConstants.LIST_PARTITION_COLUMN_TYPES)
- .split(String.valueOf(SerDeUtils.COLON));
- List<FieldSchema> partitionFields = IntStream.range(0, partCols.length)
- .mapToObj(i -> new FieldSchema(partCols[i], partColTypes[i], null))
+ if (!getPartitionColumnNames().isEmpty()) {
+ List<FieldSchema> partitionFields = IntStream.range(0, getPartitionColumnNames().size())
+ .mapToObj(i ->
+ new FieldSchema(getPartitionColumnNames().get(i), getPartitionColumnTypes().get(i).getTypeName(), null))
.collect(Collectors.toList());
PartitionSpec spec = HiveSchemaUtil.spec(tableSchema, partitionFields);
serDeProperties.put(InputFormatConfig.PARTITION_SPEC, PartitionSpecParser.toJson(spec));
@@ -229,46 +221,30 @@ public class HiveIcebergSerDe extends AbstractSerDe {
}
/**
- * Gets the hive schema from the serDeProperties, and throws an exception if it is not provided. In the later case
- * it adds the previousException as a root cause.
- * @param serDeProperties The source of the hive schema
+ * Gets the hive schema and throws an exception if it is not provided. In the later case it adds the
+ * previousException as a root cause.
* @param previousException If we had an exception previously
* @param autoConversion When <code>true</code>, convert unsupported types to more permissive ones, like tinyint to
* int
- * @return The hive schema parsed from the serDeProperties
+ * @return The hive schema parsed from the serDeProperties provided when the SerDe was initialized
* @throws SerDeException If there is no schema information in the serDeProperties
*/
- private static Schema hiveSchemaOrThrow(Properties serDeProperties, Exception previousException,
- boolean autoConversion)
+ private Schema hiveSchemaOrThrow(Exception previousException, boolean autoConversion)
throws SerDeException {
- // Read the configuration parameters
- String columnNames = serDeProperties.getProperty(serdeConstants.LIST_COLUMNS);
- String columnTypes = serDeProperties.getProperty(serdeConstants.LIST_COLUMN_TYPES);
- // No constant for column comments and column comments delimiter.
- String columnComments = serDeProperties.getProperty(LIST_COLUMN_COMMENT);
- String columnNameDelimiter = serDeProperties.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ?
- serDeProperties.getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : String.valueOf(SerDeUtils.COMMA);
- if (columnNames != null && columnTypes != null && columnNameDelimiter != null &&
- !columnNames.isEmpty() && !columnTypes.isEmpty() && !columnNameDelimiter.isEmpty()) {
- // Parse the configuration parameters
- List<String> names = new ArrayList<>();
- Collections.addAll(names, columnNames.split(columnNameDelimiter));
- // check if there are partition columns as well
- String partColNames = serDeProperties.getProperty(serdeConstants.LIST_PARTITION_COLUMNS);
- if (partColNames != null && !partColNames.isEmpty()) {
- // add partition col names to regular col names
- String partColDelimiter = partColNames.contains(String.valueOf(ColumnType.COLUMN_COMMENTS_DELIMITER)) ?
- String.valueOf(ColumnType.COLUMN_COMMENTS_DELIMITER) : String.valueOf(SerDeUtils.COMMA);
- Collections.addAll(names, partColNames.split(partColDelimiter));
- // add partition col types to regular col types
- columnTypes += SerDeUtils.COLON + serDeProperties.getProperty(serdeConstants.LIST_PARTITION_COLUMN_TYPES);
- }
- List<String> comments = new ArrayList<>();
- if (columnComments != null) {
- Collections.addAll(comments, columnComments.split(Character.toString(Character.MIN_VALUE)));
- }
- Schema hiveSchema = HiveSchemaUtil.convert(names, TypeInfoUtils.getTypeInfosFromTypeString(columnTypes),
- comments, autoConversion);
+ List<String> names = Lists.newArrayList();
+ names.addAll(getColumnNames());
+ names.addAll(getPartitionColumnNames());
+
+ List<TypeInfo> types = Lists.newArrayList();
+ types.addAll(getColumnTypes());
+ types.addAll(getPartitionColumnTypes());
+
+ List<String> comments = Lists.newArrayList();
+ comments.addAll(getColumnComments());
+ comments.addAll(getPartitionColumnComments());
+
+ if (!names.isEmpty() && !types.isEmpty()) {
+ Schema hiveSchema = HiveSchemaUtil.convert(names, types, comments, autoConversion);
LOG.info("Using hive schema {}", SchemaParser.toJson(hiveSchema));
return hiveSchema;
} else {
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
index 235ed55..266df38 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
@@ -342,7 +342,7 @@ public class TestHiveIcebergStorageHandlerNoScan {
}
@Test
- public void testCreateDropTableNonDefaultCatalog() throws TException, InterruptedException {
+ public void testCreateDropTableNonDefaultCatalog() {
TableIdentifier identifier = TableIdentifier.of("default", "customers");
String catalogName = "nondefaultcatalog";
testTables.properties().entrySet()
@@ -722,6 +722,32 @@ public class TestHiveIcebergStorageHandlerNoScan {
}
@Test
+ public void testCreatePartitionedTableWithColumnComments() {
+ TableIdentifier identifier = TableIdentifier.of("default", "partitioned_with_comment_table");
+ String[] expectedDoc = new String[] {"int column", "string column", null, "partition column", null};
+ shell.executeStatement("CREATE EXTERNAL TABLE partitioned_with_comment_table (" +
+ "t_int INT COMMENT 'int column', " +
+ "t_string STRING COMMENT 'string column', " +
+ "t_string_2 STRING) " +
+ "PARTITIONED BY (t_string_3 STRING COMMENT 'partition column', t_string_4 STRING) " +
+ "STORED BY ICEBERG " +
+ testTables.locationForCreateTableSQL(identifier) +
+ testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
+ org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
+
+ List<Object[]> rows = shell.executeStatement("DESCRIBE default.partitioned_with_comment_table");
+ List<Types.NestedField> columns = icebergTable.schema().columns();
+ // The partition transform information is 3 extra lines, and 2 more line for the columns
+ Assert.assertEquals(columns.size() + 5, rows.size());
+ for (int i = 0; i < columns.size(); i++) {
+ Types.NestedField field = columns.get(i);
+ Assert.assertArrayEquals(new Object[] {field.name(), HiveSchemaUtil.convert(field.type()).getTypeName(),
+ field.doc() != null ? field.doc() : "from deserializer"}, rows.get(i));
+ Assert.assertEquals(expectedDoc[i], field.doc());
+ }
+ }
+
+ @Test
public void testAlterTableProperties() {
TableIdentifier identifier = TableIdentifier.of("default", "customers");
shell.executeStatement("CREATE EXTERNAL TABLE customers (" +
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
index d57383b..36f75af 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
@@ -268,6 +268,8 @@ public final class PlanUtils {
MetaStoreUtils.getColumnNamesFromFieldSchema(partCols));
properties.setProperty(serdeConstants.LIST_PARTITION_COLUMN_TYPES,
MetaStoreUtils.getColumnTypesFromFieldSchema(partCols, ":"));
+ properties.setProperty(serdeConstants.LIST_PARTITION_COLUMN_COMMENTS,
+ MetaStoreUtils.getColumnCommentsFromFieldSchema(partCols));
}
if (lastColumnTakesRestOfTheLine) {
diff --git a/serde/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/serde/serdeConstants.java b/serde/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/serde/serdeConstants.java
index 2119e1c..1a64aca 100644
--- a/serde/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/serde/serdeConstants.java
+++ b/serde/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/serde/serdeConstants.java
@@ -97,10 +97,14 @@ public class serdeConstants {
public static final java.lang.String LIST_COLUMN_TYPES = "columns.types";
+ public static final java.lang.String LIST_COLUMN_COMMENTS = "columns.comments";
+
public static final java.lang.String LIST_PARTITION_COLUMNS = "partition.columns";
public static final java.lang.String LIST_PARTITION_COLUMN_TYPES = "partition.columns.types";
+ public static final java.lang.String LIST_PARTITION_COLUMN_COMMENTS = "partition.columns.comments";
+
public static final java.lang.String TIMESTAMP_FORMATS = "timestamp.formats";
public static final java.lang.String COLUMN_NAME_DELIMITER = "column.name.delimiter";
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/AbstractSerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/AbstractSerDe.java
index d51981c..d5c47ec 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/AbstractSerDe.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/AbstractSerDe.java
@@ -27,6 +27,7 @@ import java.util.Optional;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.ColumnType;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
@@ -52,6 +53,11 @@ public abstract class AbstractSerDe implements Deserializer, Serializer {
private List<String> columnNames;
private List<TypeInfo> columnTypes;
+ private List<String> columnComments;
+
+ private List<String> partitionColumnNames;
+ private List<TypeInfo> partitionColumnTypes;
+ private List<String> partitionColumnComments;
/**
* Initialize the SerDe. By default, this will use one set of properties,
@@ -76,6 +82,10 @@ public abstract class AbstractSerDe implements Deserializer, Serializer {
this.properties = SerDeUtils.createOverlayedProperties(tableProperties, partitionProperties);
this.columnNames = parseColumnNames();
this.columnTypes = parseColumnTypes();
+ this.columnComments = parseColumnComments(serdeConstants.LIST_COLUMN_COMMENTS);
+ this.partitionColumnNames = parseColumnNames(serdeConstants.LIST_PARTITION_COLUMNS);
+ this.partitionColumnTypes = parseColumnTypes(serdeConstants.LIST_PARTITION_COLUMN_TYPES);
+ this.partitionColumnComments = parseColumnComments(serdeConstants.LIST_PARTITION_COLUMN_COMMENTS);
Preconditions.checkArgument(this.columnNames.size() == this.columnTypes.size(),
"Column names must match count of column types");
@@ -84,7 +94,11 @@ public abstract class AbstractSerDe implements Deserializer, Serializer {
}
protected List<String> parseColumnNames() {
- final String columnNameProperty = this.properties.getProperty(serdeConstants.LIST_COLUMNS, "");
+ return parseColumnNames(serdeConstants.LIST_COLUMNS);
+ }
+
+ protected List<String> parseColumnNames(String key) {
+ final String columnNameProperty = this.properties.getProperty(key, "");
final String columnNameDelimiter =
this.properties.getProperty(serdeConstants.COLUMN_NAME_DELIMITER, String.valueOf(SerDeUtils.COMMA));
@@ -93,12 +107,23 @@ public abstract class AbstractSerDe implements Deserializer, Serializer {
}
protected List<TypeInfo> parseColumnTypes() {
- final String columnTypeProperty = this.properties.getProperty(serdeConstants.LIST_COLUMN_TYPES, "");
+ return parseColumnTypes(serdeConstants.LIST_COLUMN_TYPES);
+ }
+
+ protected List<TypeInfo> parseColumnTypes(String key) {
+ final String columnTypeProperty = this.properties.getProperty(key, "");
return columnTypeProperty.isEmpty() ? Collections.emptyList()
: Collections.unmodifiableList(TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty));
}
+ protected List<String> parseColumnComments(String key) {
+ final String columnCommentProperty = this.properties.getProperty(key, "");
+
+ return columnCommentProperty.isEmpty() ? Collections.emptyList() : Collections.unmodifiableList(
+ Arrays.asList(columnCommentProperty.split(Character.toString(ColumnType.COLUMN_COMMENTS_DELIMITER))));
+ }
+
/**
* Returns the Writable class that would be returned by the serialize method.
* This is used to initialize SequenceFile header.
@@ -142,6 +167,22 @@ public abstract class AbstractSerDe implements Deserializer, Serializer {
return columnTypes;
}
+ public List<String> getColumnComments() {
+ return columnComments;
+ }
+
+ public List<String> getPartitionColumnNames() {
+ return partitionColumnNames;
+ }
+
+ public List<TypeInfo> getPartitionColumnTypes() {
+ return partitionColumnTypes;
+ }
+
+ public List<String> getPartitionColumnComments() {
+ return partitionColumnComments;
+ }
+
public Optional<Configuration> getConfiguration() {
return configuration;
}
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
index 48d2bb1..d4d08f3 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
@@ -680,7 +680,7 @@ public class MetaStoreUtils {
if (!first) {
colNameBuf.append(columnNameDelimiter);
colTypeBuf.append(":");
- colComment.append('\0');
+ colComment.append(ColumnType.COLUMN_COMMENTS_DELIMITER);
}
colNameBuf.append(col.getName());
colTypeBuf.append(col.getType());