You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2021/10/13 10:11:36 UTC

[hive] branch master updated: HIVE-25610: Handle partition field comments for Iceberg tables (Peter Vary reviewed by Marton Bod)(#2715)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 65d03fc  HIVE-25610: Handle partition field comments for Iceberg tables (Peter Vary reviewed by Marton Bod)(#2715)
65d03fc is described below

commit 65d03fc3a1e40709645ea22a728c8a88468994d1
Author: pvary <pv...@cloudera.com>
AuthorDate: Wed Oct 13 12:11:15 2021 +0200

    HIVE-25610: Handle partition field comments for Iceberg tables (Peter Vary reviewed by Marton Bod)(#2715)
---
 .../apache/iceberg/mr/hive/HiveIcebergSerDe.java   | 78 ++++++++--------------
 .../hive/TestHiveIcebergStorageHandlerNoScan.java  | 28 +++++++-
 .../org/apache/hadoop/hive/ql/plan/PlanUtils.java  |  2 +
 .../apache/hadoop/hive/serde/serdeConstants.java   |  4 ++
 .../apache/hadoop/hive/serde2/AbstractSerDe.java   | 45 ++++++++++++-
 .../hive/metastore/utils/MetaStoreUtils.java       |  2 +-
 6 files changed, 104 insertions(+), 55 deletions(-)

diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
index b260a2b..6bd4214 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
@@ -19,10 +19,8 @@
 
 package org.apache.iceberg.mr.hive;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -31,19 +29,16 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import javax.annotation.Nullable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.ColumnType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.session.SessionStateUtil;
-import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.AbstractSerDe;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeStats;
-import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.io.Writable;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
@@ -60,6 +55,7 @@ import org.apache.iceberg.mr.InputFormatConfig;
 import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergObjectInspector;
 import org.apache.iceberg.mr.mapred.Container;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,7 +66,6 @@ public class HiveIcebergSerDe extends AbstractSerDe {
       " queryable from Hive, since HMS does not know about it.";
 
   private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergSerDe.class);
-  private static final String LIST_COLUMN_COMMENT = "columns.comments";
 
   private ObjectInspector inspector;
   private Schema tableSchema;
@@ -81,6 +76,8 @@ public class HiveIcebergSerDe extends AbstractSerDe {
   @Override
   public void initialize(@Nullable Configuration configuration, Properties serDeProperties,
                          Properties partitionProperties) throws SerDeException {
+    super.initialize(configuration, serDeProperties, partitionProperties);
+
     // HiveIcebergSerDe.initialize is called multiple places in Hive code:
     // - When we are trying to create a table - HiveDDL data is stored at the serDeProperties, but no Iceberg table
     // is created yet.
@@ -113,7 +110,7 @@ public class HiveIcebergSerDe extends AbstractSerDe {
         // provided in the CREATE TABLE query.
         boolean autoConversion = configuration.getBoolean(InputFormatConfig.SCHEMA_AUTO_CONVERSION, false);
         // If we can not load the table try the provided hive schema
-        this.tableSchema = hiveSchemaOrThrow(serDeProperties, e, autoConversion);
+        this.tableSchema = hiveSchemaOrThrow(e, autoConversion);
         // This is only for table creation, it is ok to have an empty partition column list
         this.partitionColumns = ImmutableList.of();
         // create table for CTAS
@@ -160,15 +157,10 @@ public class HiveIcebergSerDe extends AbstractSerDe {
     serDeProperties.setProperty(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(tableSchema));
 
     // build partition spec, if any
-    String partColsString = serDeProperties.getProperty(serdeConstants.LIST_PARTITION_COLUMNS);
-    if (partColsString != null && !partColsString.isEmpty()) {
-      String partColDelimiter = partColsString.contains(String.valueOf(ColumnType.COLUMN_COMMENTS_DELIMITER)) ?
-          String.valueOf(ColumnType.COLUMN_COMMENTS_DELIMITER) : String.valueOf(SerDeUtils.COMMA);
-      String[] partCols = partColsString.split(partColDelimiter);
-      String[] partColTypes = serDeProperties.getProperty(serdeConstants.LIST_PARTITION_COLUMN_TYPES)
-          .split(String.valueOf(SerDeUtils.COLON));
-      List<FieldSchema> partitionFields = IntStream.range(0, partCols.length)
-          .mapToObj(i -> new FieldSchema(partCols[i], partColTypes[i], null))
+    if (!getPartitionColumnNames().isEmpty()) {
+      List<FieldSchema> partitionFields = IntStream.range(0, getPartitionColumnNames().size())
+          .mapToObj(i ->
+               new FieldSchema(getPartitionColumnNames().get(i), getPartitionColumnTypes().get(i).getTypeName(), null))
           .collect(Collectors.toList());
       PartitionSpec spec = HiveSchemaUtil.spec(tableSchema, partitionFields);
       serDeProperties.put(InputFormatConfig.PARTITION_SPEC, PartitionSpecParser.toJson(spec));
@@ -229,46 +221,30 @@ public class HiveIcebergSerDe extends AbstractSerDe {
   }
 
   /**
-   * Gets the hive schema from the serDeProperties, and throws an exception if it is not provided. In the later case
-   * it adds the previousException as a root cause.
-   * @param serDeProperties The source of the hive schema
+   * Gets the hive schema and throws an exception if it is not provided. In the later case it adds the
+   * previousException as a root cause.
    * @param previousException If we had an exception previously
    * @param autoConversion When <code>true</code>, convert unsupported types to more permissive ones, like tinyint to
    *                       int
-   * @return The hive schema parsed from the serDeProperties
+   * @return The hive schema parsed from the serDeProperties provided when the SerDe was initialized
    * @throws SerDeException If there is no schema information in the serDeProperties
    */
-  private static Schema hiveSchemaOrThrow(Properties serDeProperties, Exception previousException,
-                                          boolean autoConversion)
+  private Schema hiveSchemaOrThrow(Exception previousException, boolean autoConversion)
       throws SerDeException {
-    // Read the configuration parameters
-    String columnNames = serDeProperties.getProperty(serdeConstants.LIST_COLUMNS);
-    String columnTypes = serDeProperties.getProperty(serdeConstants.LIST_COLUMN_TYPES);
-    // No constant for column comments and column comments delimiter.
-    String columnComments = serDeProperties.getProperty(LIST_COLUMN_COMMENT);
-    String columnNameDelimiter = serDeProperties.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ?
-        serDeProperties.getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : String.valueOf(SerDeUtils.COMMA);
-    if (columnNames != null && columnTypes != null && columnNameDelimiter != null &&
-        !columnNames.isEmpty() && !columnTypes.isEmpty() && !columnNameDelimiter.isEmpty()) {
-      // Parse the configuration parameters
-      List<String> names = new ArrayList<>();
-      Collections.addAll(names, columnNames.split(columnNameDelimiter));
-      // check if there are partition columns as well
-      String partColNames = serDeProperties.getProperty(serdeConstants.LIST_PARTITION_COLUMNS);
-      if (partColNames != null && !partColNames.isEmpty()) {
-        // add partition col names to regular col names
-        String partColDelimiter = partColNames.contains(String.valueOf(ColumnType.COLUMN_COMMENTS_DELIMITER)) ?
-            String.valueOf(ColumnType.COLUMN_COMMENTS_DELIMITER) : String.valueOf(SerDeUtils.COMMA);
-        Collections.addAll(names, partColNames.split(partColDelimiter));
-        // add partition col types to regular col types
-        columnTypes += SerDeUtils.COLON + serDeProperties.getProperty(serdeConstants.LIST_PARTITION_COLUMN_TYPES);
-      }
-      List<String> comments = new ArrayList<>();
-      if (columnComments != null) {
-        Collections.addAll(comments, columnComments.split(Character.toString(Character.MIN_VALUE)));
-      }
-      Schema hiveSchema = HiveSchemaUtil.convert(names, TypeInfoUtils.getTypeInfosFromTypeString(columnTypes),
-              comments, autoConversion);
+    List<String> names = Lists.newArrayList();
+    names.addAll(getColumnNames());
+    names.addAll(getPartitionColumnNames());
+
+    List<TypeInfo> types = Lists.newArrayList();
+    types.addAll(getColumnTypes());
+    types.addAll(getPartitionColumnTypes());
+
+    List<String> comments = Lists.newArrayList();
+    comments.addAll(getColumnComments());
+    comments.addAll(getPartitionColumnComments());
+
+    if (!names.isEmpty() && !types.isEmpty()) {
+      Schema hiveSchema = HiveSchemaUtil.convert(names, types, comments, autoConversion);
       LOG.info("Using hive schema {}", SchemaParser.toJson(hiveSchema));
       return hiveSchema;
     } else {
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
index 235ed55..266df38 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
@@ -342,7 +342,7 @@ public class TestHiveIcebergStorageHandlerNoScan {
   }
 
   @Test
-  public void testCreateDropTableNonDefaultCatalog() throws TException, InterruptedException {
+  public void testCreateDropTableNonDefaultCatalog() {
     TableIdentifier identifier = TableIdentifier.of("default", "customers");
     String catalogName = "nondefaultcatalog";
     testTables.properties().entrySet()
@@ -722,6 +722,32 @@ public class TestHiveIcebergStorageHandlerNoScan {
   }
 
   @Test
+  public void testCreatePartitionedTableWithColumnComments() {
+    TableIdentifier identifier = TableIdentifier.of("default", "partitioned_with_comment_table");
+    String[] expectedDoc = new String[] {"int column", "string column", null, "partition column", null};
+    shell.executeStatement("CREATE EXTERNAL TABLE partitioned_with_comment_table (" +
+        "t_int INT COMMENT 'int column',  " +
+        "t_string STRING COMMENT 'string column', " +
+        "t_string_2 STRING) " +
+        "PARTITIONED BY (t_string_3 STRING COMMENT 'partition column', t_string_4 STRING) " +
+        "STORED BY ICEBERG " +
+        testTables.locationForCreateTableSQL(identifier) +
+        testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
+    org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
+
+    List<Object[]> rows = shell.executeStatement("DESCRIBE default.partitioned_with_comment_table");
+    List<Types.NestedField> columns = icebergTable.schema().columns();
+    // The partition transform information is 3 extra lines, and 2 more line for the columns
+    Assert.assertEquals(columns.size() + 5, rows.size());
+    for (int i = 0; i < columns.size(); i++) {
+      Types.NestedField field = columns.get(i);
+      Assert.assertArrayEquals(new Object[] {field.name(), HiveSchemaUtil.convert(field.type()).getTypeName(),
+          field.doc() != null ? field.doc() : "from deserializer"}, rows.get(i));
+      Assert.assertEquals(expectedDoc[i], field.doc());
+    }
+  }
+
+  @Test
   public void testAlterTableProperties() {
     TableIdentifier identifier = TableIdentifier.of("default", "customers");
     shell.executeStatement("CREATE EXTERNAL TABLE customers (" +
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
index d57383b..36f75af 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
@@ -268,6 +268,8 @@ public final class PlanUtils {
           MetaStoreUtils.getColumnNamesFromFieldSchema(partCols));
       properties.setProperty(serdeConstants.LIST_PARTITION_COLUMN_TYPES,
           MetaStoreUtils.getColumnTypesFromFieldSchema(partCols, ":"));
+      properties.setProperty(serdeConstants.LIST_PARTITION_COLUMN_COMMENTS,
+          MetaStoreUtils.getColumnCommentsFromFieldSchema(partCols));
     }
 
     if (lastColumnTakesRestOfTheLine) {
diff --git a/serde/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/serde/serdeConstants.java b/serde/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/serde/serdeConstants.java
index 2119e1c..1a64aca 100644
--- a/serde/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/serde/serdeConstants.java
+++ b/serde/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/serde/serdeConstants.java
@@ -97,10 +97,14 @@ public class serdeConstants {
 
   public static final java.lang.String LIST_COLUMN_TYPES = "columns.types";
 
+  public static final java.lang.String LIST_COLUMN_COMMENTS = "columns.comments";
+
   public static final java.lang.String LIST_PARTITION_COLUMNS = "partition.columns";
 
   public static final java.lang.String LIST_PARTITION_COLUMN_TYPES = "partition.columns.types";
 
+  public static final java.lang.String LIST_PARTITION_COLUMN_COMMENTS = "partition.columns.comments";
+
   public static final java.lang.String TIMESTAMP_FORMATS = "timestamp.formats";
 
   public static final java.lang.String COLUMN_NAME_DELIMITER = "column.name.delimiter";
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/AbstractSerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/AbstractSerDe.java
index d51981c..d5c47ec 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/AbstractSerDe.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/AbstractSerDe.java
@@ -27,6 +27,7 @@ import java.util.Optional;
 import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.ColumnType;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
@@ -52,6 +53,11 @@ public abstract class AbstractSerDe implements Deserializer, Serializer {
 
   private List<String> columnNames;
   private List<TypeInfo> columnTypes;
+  private List<String> columnComments;
+
+  private List<String> partitionColumnNames;
+  private List<TypeInfo> partitionColumnTypes;
+  private List<String> partitionColumnComments;
 
   /**
    * Initialize the SerDe. By default, this will use one set of properties,
@@ -76,6 +82,10 @@ public abstract class AbstractSerDe implements Deserializer, Serializer {
     this.properties = SerDeUtils.createOverlayedProperties(tableProperties, partitionProperties);
     this.columnNames = parseColumnNames();
     this.columnTypes = parseColumnTypes();
+    this.columnComments = parseColumnComments(serdeConstants.LIST_COLUMN_COMMENTS);
+    this.partitionColumnNames = parseColumnNames(serdeConstants.LIST_PARTITION_COLUMNS);
+    this.partitionColumnTypes = parseColumnTypes(serdeConstants.LIST_PARTITION_COLUMN_TYPES);
+    this.partitionColumnComments = parseColumnComments(serdeConstants.LIST_PARTITION_COLUMN_COMMENTS);
 
     Preconditions.checkArgument(this.columnNames.size() == this.columnTypes.size(),
         "Column names must match count of column types");
@@ -84,7 +94,11 @@ public abstract class AbstractSerDe implements Deserializer, Serializer {
   }
 
   protected List<String> parseColumnNames() {
-    final String columnNameProperty = this.properties.getProperty(serdeConstants.LIST_COLUMNS, "");
+    return parseColumnNames(serdeConstants.LIST_COLUMNS);
+  }
+
+  protected List<String> parseColumnNames(String key) {
+    final String columnNameProperty = this.properties.getProperty(key, "");
     final String columnNameDelimiter =
         this.properties.getProperty(serdeConstants.COLUMN_NAME_DELIMITER, String.valueOf(SerDeUtils.COMMA));
 
@@ -93,12 +107,23 @@ public abstract class AbstractSerDe implements Deserializer, Serializer {
   }
 
   protected List<TypeInfo> parseColumnTypes() {
-    final String columnTypeProperty = this.properties.getProperty(serdeConstants.LIST_COLUMN_TYPES, "");
+    return parseColumnTypes(serdeConstants.LIST_COLUMN_TYPES);
+  }
+
+  protected List<TypeInfo> parseColumnTypes(String key) {
+    final String columnTypeProperty = this.properties.getProperty(key, "");
 
     return columnTypeProperty.isEmpty() ? Collections.emptyList()
         : Collections.unmodifiableList(TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty));
   }
 
+  protected List<String> parseColumnComments(String key) {
+    final String columnCommentProperty = this.properties.getProperty(key, "");
+
+    return columnCommentProperty.isEmpty() ? Collections.emptyList() : Collections.unmodifiableList(
+        Arrays.asList(columnCommentProperty.split(Character.toString(ColumnType.COLUMN_COMMENTS_DELIMITER))));
+  }
+
   /**
    * Returns the Writable class that would be returned by the serialize method.
    * This is used to initialize SequenceFile header.
@@ -142,6 +167,22 @@ public abstract class AbstractSerDe implements Deserializer, Serializer {
     return columnTypes;
   }
 
+  public List<String> getColumnComments() {
+    return columnComments;
+  }
+
+  public List<String> getPartitionColumnNames() {
+    return partitionColumnNames;
+  }
+
+  public List<TypeInfo> getPartitionColumnTypes() {
+    return partitionColumnTypes;
+  }
+
+  public List<String> getPartitionColumnComments() {
+    return partitionColumnComments;
+  }
+
   public Optional<Configuration> getConfiguration() {
     return configuration;
   }
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
index 48d2bb1..d4d08f3 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
@@ -680,7 +680,7 @@ public class MetaStoreUtils {
       if (!first) {
         colNameBuf.append(columnNameDelimiter);
         colTypeBuf.append(":");
-        colComment.append('\0');
+        colComment.append(ColumnType.COLUMN_COMMENTS_DELIMITER);
       }
       colNameBuf.append(col.getName());
       colTypeBuf.append(col.getType());