You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/03/24 18:41:19 UTC

[iceberg] 04/18: Hive: Fix identity partitioned writes (#2151)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch 0.11.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit d71b2493a9007b1e7a16ad6e63d5765f71f486fb
Author: pvary <pv...@cloudera.com>
AuthorDate: Wed Jan 27 15:56:55 2021 +0100

    Hive: Fix identity partitioned writes (#2151)
---
 .../iceberg/mr/hive/HiveIcebergOutputFormat.java   |  2 +-
 .../TestHiveIcebergStorageHandlerWithEngine.java   | 82 +++++++++-------------
 .../org/apache/iceberg/mr/hive/TestTables.java     | 69 +++++++++++++++++-
 3 files changed, 102 insertions(+), 51 deletions(-)

diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
index 6ebc677..823490e 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
@@ -77,7 +77,7 @@ public class HiveIcebergOutputFormat<T> implements OutputFormat<NullWritable, Co
         new OutputFileFactory(spec, fileFormat, location, io, encryption, taskAttemptID.getTaskID().getId(),
             taskAttemptID.getId(), jc.get(HiveConf.ConfVars.HIVEQUERYID.varname) + "-" + taskAttemptID.getJobID());
     HiveIcebergRecordWriter writer = new HiveIcebergRecordWriter(schema, spec, fileFormat,
-        new GenericAppenderFactory(schema), outputFileFactory, io, targetFileSize, taskAttemptID);
+        new GenericAppenderFactory(schema, spec), outputFileFactory, io, targetFileSize, taskAttemptID);
 
     return writer;
   }
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
index 7200e3d..ec79c12 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
@@ -20,9 +20,6 @@
 package org.apache.iceberg.mr.hive;
 
 import java.io.IOException;
-import java.sql.Timestamp;
-import java.time.LocalDateTime;
-import java.time.OffsetDateTime;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -30,16 +27,11 @@ import java.util.Map;
 import java.util.stream.Collectors;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.PartitionSpecParser;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.SchemaParser;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.hive.HiveSchemaUtil;
-import org.apache.iceberg.mr.InputFormatConfig;
 import org.apache.iceberg.mr.TestHelper;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.types.Type;
@@ -327,19 +319,14 @@ public class TestHiveIcebergStorageHandlerWithEngine {
       if (type.equals(Types.BinaryType.get()) || type.equals(Types.FixedType.ofLength(5))) {
         continue;
       }
-      String tableName = type.typeId().toString().toLowerCase() + "_table_" + i;
       String columnName = type.typeId().toString().toLowerCase() + "_column";
 
       Schema schema = new Schema(required(1, "id", Types.LongType.get()), required(2, columnName, type));
       List<Record> expected = TestHelper.generateRandomRecords(schema, 5, 0L);
 
-      Table table = testTables.createTable(shell, tableName, schema, fileFormat, ImmutableList.of());
-      StringBuilder query = new StringBuilder("INSERT INTO ").append(tableName).append(" VALUES")
-              .append(expected.stream()
-                      .map(r -> String.format("(%s,%s)", r.get(0),
-                              getStringValueForInsert(r.get(1), type)))
-                      .collect(Collectors.joining(",")));
-      shell.executeStatement(query.toString());
+      Table table = testTables.createTable(shell, type.typeId().toString().toLowerCase() + "_table_" + i,
+          schema, PartitionSpec.unpartitioned(), fileFormat, expected);
+
       HiveIcebergTestUtils.validateData(table, expected, 0);
     }
   }
@@ -527,29 +514,44 @@ public class TestHiveIcebergStorageHandlerWithEngine {
         .bucket("customer_id", 3)
         .build();
 
-    TableIdentifier identifier = TableIdentifier.of("default", "partitioned_customers");
+    List<Record> records = TestHelper.generateRandomRecords(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, 4, 0L);
+
+    Table table = testTables.createTable(shell, "partitioned_customers",
+        HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, spec, fileFormat, records);
+
+    HiveIcebergTestUtils.validateData(table, records, 0);
+  }
+
+  @Test
+  public void testIdentityPartitionedWrite() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
 
-    shell.executeStatement("CREATE EXTERNAL TABLE " + identifier +
-        " STORED BY '" + HiveIcebergStorageHandler.class.getName() + "' " +
-        testTables.locationForCreateTableSQL(identifier) +
-        "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
-        SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "', " +
-        "'" + InputFormatConfig.PARTITION_SPEC + "'='" +
-        PartitionSpecParser.toJson(spec) + "', " +
-        "'" + TableProperties.DEFAULT_FILE_FORMAT + "'='" + fileFormat + "')");
+    PartitionSpec spec = PartitionSpec.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+        .identity("customer_id")
+        .build();
 
     List<Record> records = TestHelper.generateRandomRecords(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, 4, 0L);
 
-    StringBuilder query = new StringBuilder().append("INSERT INTO " + identifier + " VALUES ");
-    records.forEach(record -> query.append("(")
-        .append(record.get(0)).append(",'")
-        .append(record.get(1)).append("','")
-        .append(record.get(2)).append("'),"));
-    query.setLength(query.length() - 1);
+    Table table = testTables.createTable(shell, "partitioned_customers",
+        HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, spec, fileFormat, records);
 
-    shell.executeStatement(query.toString());
+    HiveIcebergTestUtils.validateData(table, records, 0);
+  }
+
+  @Test
+  public void testMultilevelIdentityPartitionedWrite() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    PartitionSpec spec = PartitionSpec.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+        .identity("customer_id")
+        .identity("last_name")
+        .build();
+
+    List<Record> records = TestHelper.generateRandomRecords(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, 4, 0L);
+
+    Table table = testTables.createTable(shell, "partitioned_customers",
+        HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, spec, fileFormat, records);
 
-    Table table = testTables.loadTable(identifier);
     HiveIcebergTestUtils.validateData(table, records, 0);
   }
 
@@ -613,18 +615,4 @@ public class TestHiveIcebergStorageHandlerWithEngine {
     }
     return query;
   }
-
-  private String getStringValueForInsert(Object value, Type type) {
-    String template = "\'%s\'";
-    if (type.equals(Types.TimestampType.withoutZone())) {
-      return String.format(template, Timestamp.valueOf((LocalDateTime) value).toString());
-    } else if (type.equals(Types.TimestampType.withZone())) {
-      return String.format(template, Timestamp.from(((OffsetDateTime) value).toInstant()).toString());
-    } else if (type.equals(Types.BooleanType.get())) {
-      // in hive2 boolean type values must not be surrounded in apostrophes. Otherwise the value is translated to true.
-      return value.toString();
-    } else {
-      return String.format(template, value.toString());
-    }
-  }
 }
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
index 4bc27a7..375fff3 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
@@ -22,16 +22,23 @@ package org.apache.iceberg.mr.hive;
 import java.io.File;
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionSpecParser;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
 import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.Tables;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
@@ -47,6 +54,8 @@ import org.apache.iceberg.relocated.com.google.common.base.Joiner;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.ObjectArrays;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
 import org.junit.Assert;
 import org.junit.rules.TemporaryFolder;
 
@@ -117,9 +126,9 @@ abstract class TestTables {
   }
 
   /**
-   * Creates a Hive test table. Creates the Iceberg table/data and creates the corresponding Hive table as well when
-   * needed. The table will be in the 'default' database. The table will be populated with the provided List of
-   * {@link Record}s.
+   * Creates an non partitioned Hive test table. Creates the Iceberg table/data and creates the corresponding Hive
+   * table as well when needed. The table will be in the 'default' database. The table will be populated with the
+   * provided List of {@link Record}s.
    * @param shell The HiveShell used for Hive table creation
    * @param tableName The name of the test table
    * @param schema The schema used for the table creation
@@ -140,6 +149,46 @@ abstract class TestTables {
   }
 
   /**
+   * Creates a partitioned Hive test table using Hive SQL. The table will be in the 'default' database.
+   * The table will be populated with the provided List of {@link Record}s using a Hive insert statement.
+   * @param shell The HiveShell used for Hive table creation
+   * @param tableName The name of the test table
+   * @param schema The schema used for the table creation
+   * @param spec The partition specification for the table
+   * @param fileFormat The file format used for writing the data
+   * @param records The records with which the table is populated
+   * @return The created table
+   * @throws IOException If there is an error writing data
+   */
+  public Table createTable(TestHiveShell shell, String tableName, Schema schema, PartitionSpec spec,
+      FileFormat fileFormat, List<Record> records)  {
+    TableIdentifier identifier = TableIdentifier.of("default", tableName);
+    shell.executeStatement("CREATE EXTERNAL TABLE " + identifier +
+        " STORED BY '" + HiveIcebergStorageHandler.class.getName() + "' " +
+        locationForCreateTableSQL(identifier) +
+        "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
+        SchemaParser.toJson(schema) + "', " +
+        "'" + InputFormatConfig.PARTITION_SPEC + "'='" +
+        PartitionSpecParser.toJson(spec) + "', " +
+        "'" + TableProperties.DEFAULT_FILE_FORMAT + "'='" + fileFormat + "')");
+
+    StringBuilder query = new StringBuilder().append("INSERT INTO " + identifier + " VALUES ");
+
+    records.forEach(record -> {
+      query.append("(");
+      query.append(record.struct().fields().stream()
+          .map(field -> getStringValueForInsert(record.getField(field.name()), field.type()))
+          .collect(Collectors.joining(",")));
+      query.append("),");
+    });
+    query.setLength(query.length() - 1);
+
+    shell.executeStatement(query.toString());
+
+    return loadTable(identifier);
+  }
+
+  /**
    * Creates a Hive test table. Creates the Iceberg table/data and creates the corresponding Hive table as well when
    * needed. The table will be in the 'default' database. The table will be populated with the provided with randomly
    * generated {@link Record}s.
@@ -346,6 +395,20 @@ abstract class TestTables {
     return "/" + Joiner.on("/").join(identifier.namespace().levels()) + "/" + identifier.name();
   }
 
+  private String getStringValueForInsert(Object value, Type type) {
+    String template = "\'%s\'";
+    if (type.equals(Types.TimestampType.withoutZone())) {
+      return String.format(template, Timestamp.valueOf((LocalDateTime) value).toString());
+    } else if (type.equals(Types.TimestampType.withZone())) {
+      return String.format(template, Timestamp.from(((OffsetDateTime) value).toInstant()).toString());
+    } else if (type.equals(Types.BooleanType.get())) {
+      // in hive2 boolean type values must not be surrounded in apostrophes. Otherwise the value is translated to true.
+      return value.toString();
+    } else {
+      return String.format(template, value.toString());
+    }
+  }
+
   enum TestTableType {
     HADOOP_TABLE {
       public TestTables instance(Configuration conf, TemporaryFolder temporaryFolder) {