You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/03/24 18:41:19 UTC
[iceberg] 04/18: Hive: Fix identity partitioned writes (#2151)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch 0.11.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git
commit d71b2493a9007b1e7a16ad6e63d5765f71f486fb
Author: pvary <pv...@cloudera.com>
AuthorDate: Wed Jan 27 15:56:55 2021 +0100
Hive: Fix identity partitioned writes (#2151)
---
.../iceberg/mr/hive/HiveIcebergOutputFormat.java | 2 +-
.../TestHiveIcebergStorageHandlerWithEngine.java | 82 +++++++++-------------
.../org/apache/iceberg/mr/hive/TestTables.java | 69 +++++++++++++++++-
3 files changed, 102 insertions(+), 51 deletions(-)
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
index 6ebc677..823490e 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
@@ -77,7 +77,7 @@ public class HiveIcebergOutputFormat<T> implements OutputFormat<NullWritable, Co
new OutputFileFactory(spec, fileFormat, location, io, encryption, taskAttemptID.getTaskID().getId(),
taskAttemptID.getId(), jc.get(HiveConf.ConfVars.HIVEQUERYID.varname) + "-" + taskAttemptID.getJobID());
HiveIcebergRecordWriter writer = new HiveIcebergRecordWriter(schema, spec, fileFormat,
- new GenericAppenderFactory(schema), outputFileFactory, io, targetFileSize, taskAttemptID);
+ new GenericAppenderFactory(schema, spec), outputFileFactory, io, targetFileSize, taskAttemptID);
return writer;
}
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
index 7200e3d..ec79c12 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
@@ -20,9 +20,6 @@
package org.apache.iceberg.mr.hive;
import java.io.IOException;
-import java.sql.Timestamp;
-import java.time.LocalDateTime;
-import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -30,16 +27,11 @@ import java.util.Map;
import java.util.stream.Collectors;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.hive.HiveSchemaUtil;
-import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.TestHelper;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.types.Type;
@@ -327,19 +319,14 @@ public class TestHiveIcebergStorageHandlerWithEngine {
if (type.equals(Types.BinaryType.get()) || type.equals(Types.FixedType.ofLength(5))) {
continue;
}
- String tableName = type.typeId().toString().toLowerCase() + "_table_" + i;
String columnName = type.typeId().toString().toLowerCase() + "_column";
Schema schema = new Schema(required(1, "id", Types.LongType.get()), required(2, columnName, type));
List<Record> expected = TestHelper.generateRandomRecords(schema, 5, 0L);
- Table table = testTables.createTable(shell, tableName, schema, fileFormat, ImmutableList.of());
- StringBuilder query = new StringBuilder("INSERT INTO ").append(tableName).append(" VALUES")
- .append(expected.stream()
- .map(r -> String.format("(%s,%s)", r.get(0),
- getStringValueForInsert(r.get(1), type)))
- .collect(Collectors.joining(",")));
- shell.executeStatement(query.toString());
+ Table table = testTables.createTable(shell, type.typeId().toString().toLowerCase() + "_table_" + i,
+ schema, PartitionSpec.unpartitioned(), fileFormat, expected);
+
HiveIcebergTestUtils.validateData(table, expected, 0);
}
}
@@ -527,29 +514,44 @@ public class TestHiveIcebergStorageHandlerWithEngine {
.bucket("customer_id", 3)
.build();
- TableIdentifier identifier = TableIdentifier.of("default", "partitioned_customers");
+ List<Record> records = TestHelper.generateRandomRecords(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, 4, 0L);
+
+ Table table = testTables.createTable(shell, "partitioned_customers",
+ HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, spec, fileFormat, records);
+
+ HiveIcebergTestUtils.validateData(table, records, 0);
+ }
+
+ @Test
+ public void testIdentityPartitionedWrite() throws IOException {
+ Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
- shell.executeStatement("CREATE EXTERNAL TABLE " + identifier +
- " STORED BY '" + HiveIcebergStorageHandler.class.getName() + "' " +
- testTables.locationForCreateTableSQL(identifier) +
- "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
- SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "', " +
- "'" + InputFormatConfig.PARTITION_SPEC + "'='" +
- PartitionSpecParser.toJson(spec) + "', " +
- "'" + TableProperties.DEFAULT_FILE_FORMAT + "'='" + fileFormat + "')");
+ PartitionSpec spec = PartitionSpec.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+ .identity("customer_id")
+ .build();
List<Record> records = TestHelper.generateRandomRecords(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, 4, 0L);
- StringBuilder query = new StringBuilder().append("INSERT INTO " + identifier + " VALUES ");
- records.forEach(record -> query.append("(")
- .append(record.get(0)).append(",'")
- .append(record.get(1)).append("','")
- .append(record.get(2)).append("'),"));
- query.setLength(query.length() - 1);
+ Table table = testTables.createTable(shell, "partitioned_customers",
+ HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, spec, fileFormat, records);
- shell.executeStatement(query.toString());
+ HiveIcebergTestUtils.validateData(table, records, 0);
+ }
+
+ @Test
+ public void testMultilevelIdentityPartitionedWrite() throws IOException {
+ Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+ PartitionSpec spec = PartitionSpec.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+ .identity("customer_id")
+ .identity("last_name")
+ .build();
+
+ List<Record> records = TestHelper.generateRandomRecords(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, 4, 0L);
+
+ Table table = testTables.createTable(shell, "partitioned_customers",
+ HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, spec, fileFormat, records);
- Table table = testTables.loadTable(identifier);
HiveIcebergTestUtils.validateData(table, records, 0);
}
@@ -613,18 +615,4 @@ public class TestHiveIcebergStorageHandlerWithEngine {
}
return query;
}
-
- private String getStringValueForInsert(Object value, Type type) {
- String template = "\'%s\'";
- if (type.equals(Types.TimestampType.withoutZone())) {
- return String.format(template, Timestamp.valueOf((LocalDateTime) value).toString());
- } else if (type.equals(Types.TimestampType.withZone())) {
- return String.format(template, Timestamp.from(((OffsetDateTime) value).toInstant()).toString());
- } else if (type.equals(Types.BooleanType.get())) {
- // in hive2 boolean type values must not be surrounded in apostrophes. Otherwise the value is translated to true.
- return value.toString();
- } else {
- return String.format(template, value.toString());
- }
- }
}
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
index 4bc27a7..375fff3 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
@@ -22,16 +22,23 @@ package org.apache.iceberg.mr.hive;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.Tables;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -47,6 +54,8 @@ import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ObjectArrays;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.rules.TemporaryFolder;
@@ -117,9 +126,9 @@ abstract class TestTables {
}
/**
- * Creates a Hive test table. Creates the Iceberg table/data and creates the corresponding Hive table as well when
- * needed. The table will be in the 'default' database. The table will be populated with the provided List of
- * {@link Record}s.
+ * Creates an non partitioned Hive test table. Creates the Iceberg table/data and creates the corresponding Hive
+ * table as well when needed. The table will be in the 'default' database. The table will be populated with the
+ * provided List of {@link Record}s.
* @param shell The HiveShell used for Hive table creation
* @param tableName The name of the test table
* @param schema The schema used for the table creation
@@ -140,6 +149,46 @@ abstract class TestTables {
}
/**
+ * Creates a partitioned Hive test table using Hive SQL. The table will be in the 'default' database.
+ * The table will be populated with the provided List of {@link Record}s using a Hive insert statement.
+ * @param shell The HiveShell used for Hive table creation
+ * @param tableName The name of the test table
+ * @param schema The schema used for the table creation
+ * @param spec The partition specification for the table
+ * @param fileFormat The file format used for writing the data
+ * @param records The records with which the table is populated
+ * @return The created table
+ * @throws IOException If there is an error writing data
+ */
+ public Table createTable(TestHiveShell shell, String tableName, Schema schema, PartitionSpec spec,
+ FileFormat fileFormat, List<Record> records) {
+ TableIdentifier identifier = TableIdentifier.of("default", tableName);
+ shell.executeStatement("CREATE EXTERNAL TABLE " + identifier +
+ " STORED BY '" + HiveIcebergStorageHandler.class.getName() + "' " +
+ locationForCreateTableSQL(identifier) +
+ "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
+ SchemaParser.toJson(schema) + "', " +
+ "'" + InputFormatConfig.PARTITION_SPEC + "'='" +
+ PartitionSpecParser.toJson(spec) + "', " +
+ "'" + TableProperties.DEFAULT_FILE_FORMAT + "'='" + fileFormat + "')");
+
+ StringBuilder query = new StringBuilder().append("INSERT INTO " + identifier + " VALUES ");
+
+ records.forEach(record -> {
+ query.append("(");
+ query.append(record.struct().fields().stream()
+ .map(field -> getStringValueForInsert(record.getField(field.name()), field.type()))
+ .collect(Collectors.joining(",")));
+ query.append("),");
+ });
+ query.setLength(query.length() - 1);
+
+ shell.executeStatement(query.toString());
+
+ return loadTable(identifier);
+ }
+
+ /**
* Creates a Hive test table. Creates the Iceberg table/data and creates the corresponding Hive table as well when
* needed. The table will be in the 'default' database. The table will be populated with the provided with randomly
* generated {@link Record}s.
@@ -346,6 +395,20 @@ abstract class TestTables {
return "/" + Joiner.on("/").join(identifier.namespace().levels()) + "/" + identifier.name();
}
+ private String getStringValueForInsert(Object value, Type type) {
+ String template = "\'%s\'";
+ if (type.equals(Types.TimestampType.withoutZone())) {
+ return String.format(template, Timestamp.valueOf((LocalDateTime) value).toString());
+ } else if (type.equals(Types.TimestampType.withZone())) {
+ return String.format(template, Timestamp.from(((OffsetDateTime) value).toInstant()).toString());
+ } else if (type.equals(Types.BooleanType.get())) {
+ // in hive2 boolean type values must not be surrounded in apostrophes. Otherwise the value is translated to true.
+ return value.toString();
+ } else {
+ return String.format(template, value.toString());
+ }
+ }
+
enum TestTableType {
HADOOP_TABLE {
public TestTables instance(Configuration conf, TemporaryFolder temporaryFolder) {