You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by lp...@apache.org on 2022/07/21 11:23:56 UTC
[hive] branch master updated: HIVE-26397: Honour Iceberg sort orders when writing a table. (#3445) (Laszlo Pinter, reviewed by Adam Szita and Peter Vary)
This is an automated email from the ASF dual-hosted git repository.
lpinter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new dd424d77230 HIVE-26397: Honour Iceberg sort orders when writing a table. (#3445) (Laszlo Pinter, reviewed by Adam Szita and Peter Vary)
dd424d77230 is described below
commit dd424d77230f275b78d4465548cad5b57aca66e9
Author: László Pintér <47...@users.noreply.github.com>
AuthorDate: Thu Jul 21 13:23:43 2022 +0200
HIVE-26397: Honour Iceberg sort orders when writing a table. (#3445) (Laszlo Pinter, reviewed by Adam Szita and Peter Vary)
---
.../iceberg/mr/hive/HiveIcebergMetaHook.java | 4 +-
.../iceberg/mr/hive/HiveIcebergStorageHandler.java | 116 +++++++++++++++------
.../apache/iceberg/mr/hive/IcebergTableUtil.java | 12 +--
.../java/org/apache/iceberg/mr/TestHelper.java | 19 +++-
.../iceberg/mr/hive/HiveIcebergTestUtils.java | 13 ++-
.../iceberg/mr/hive/TestHiveIcebergInserts.java | 86 +++++++++++++++
.../org/apache/iceberg/mr/hive/TestTables.java | 49 ++++++++-
.../hive/ql/ddl/table/create/CreateTableDesc.java | 6 +-
.../desc/formatter/JsonDescTableFormatter.java | 4 +-
.../desc/formatter/TextDescTableFormatter.java | 15 ++-
.../set/AlterTableSetPartitionSpecAnalyzer.java | 4 +-
.../apache/hadoop/hive/ql/exec/DDLPlanUtils.java | 8 +-
.../hive/ql/metadata/HiveStorageHandler.java | 23 +++-
.../ql/optimizer/SortedDynPartitionOptimizer.java | 38 +++++--
.../hadoop/hive/ql/parse/PartitionTransform.java | 13 ++-
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 2 +-
...titionTransformSpec.java => TransformSpec.java} | 6 +-
.../hadoop/hive/ql/plan/DynamicPartitionCtx.java | 24 +++++
18 files changed, 362 insertions(+), 80 deletions(-)
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
index c8f652d577a..1191ca8e431 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
@@ -48,7 +48,7 @@ import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.PartitionTransform;
-import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec;
+import org.apache.hadoop.hive.ql.parse.TransformSpec;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
@@ -296,7 +296,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
if (hmsTable.isSetPartitionKeys() && !hmsTable.getPartitionKeys().isEmpty()) {
db.dropPartitions(hmsTable.getDbName(), hmsTable.getTableName(), EMPTY_FILTER, DROP_OPTIONS);
- List<PartitionTransformSpec> spec = PartitionTransform.getPartitionTransformSpec(hmsTable.getPartitionKeys());
+ List<TransformSpec> spec = PartitionTransform.getPartitionTransformSpec(hmsTable.getPartitionKeys());
if (!SessionStateUtil.addResource(conf, hive_metastoreConstants.PARTITION_TRANSFORM_SPEC, spec)) {
throw new MetaException("Query state attached to Session state must be not null. " +
"Partition transform metadata cannot be saved.");
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 25881408a63..51e3d60ad9d 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -57,8 +57,8 @@ import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec;
-import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.TransformSpec;
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
@@ -84,11 +84,15 @@ import org.apache.hadoop.mapred.OutputFormat;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.NullOrder;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.SortDirection;
+import org.apache.iceberg.SortField;
+import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.hadoop.HadoopConfigurable;
@@ -332,27 +336,35 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
}
@Override
- public List<PartitionTransformSpec> getPartitionTransformSpec(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+ public List<TransformSpec> getPartitionTransformSpec(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
TableDesc tableDesc = Utilities.getTableDesc(hmsTable);
Table table = IcebergTableUtil.getTable(conf, tableDesc.getProperties());
- return table.spec().fields().stream().map(f -> {
- PartitionTransformSpec spec = new PartitionTransformSpec();
- spec.setColumnName(table.schema().findColumnName(f.sourceId()));
- // right now the only way to fetch the transform type and its params is through the toString() call
- String transformName = f.transform().toString().toUpperCase();
- // if the transform name contains '[' it means it has some config params
- if (transformName.contains("[")) {
- spec.setTransformType(PartitionTransformSpec.TransformType
- .valueOf(transformName.substring(0, transformName.indexOf("["))));
- spec.setTransformParam(Optional.of(Integer
- .valueOf(transformName.substring(transformName.indexOf("[") + 1, transformName.indexOf("]")))));
- } else {
- spec.setTransformType(PartitionTransformSpec.TransformType.valueOf(transformName));
- spec.setTransformParam(Optional.empty());
- }
+ return table.spec().fields().stream().map(f ->
+ getTransformSpec(table, f.transform().toString().toUpperCase(), f.sourceId())
+ ).collect(Collectors.toList());
+ }
+
+ private List<TransformSpec> getSortTransformSpec(Table table) {
+ return table.sortOrder().fields().stream().map(s ->
+ getTransformSpec(table, s.transform().toString().toUpperCase(), s.sourceId())
+ ).collect(Collectors.toList());
+ }
+
+ private TransformSpec getTransformSpec(Table table, String transformName, int sourceId) {
+ TransformSpec spec = new TransformSpec();
+ spec.setColumnName(table.schema().findColumnName(sourceId));
+ // if the transform name contains '[' it means it has some config params
+ if (transformName.contains("[")) {
+ spec.setTransformType(TransformSpec.TransformType
+ .valueOf(transformName.substring(0, transformName.indexOf("["))));
+ spec.setTransformParam(Optional.of(Integer
+ .valueOf(transformName.substring(transformName.indexOf("[") + 1, transformName.indexOf("]")))));
+ } else {
+ spec.setTransformType(TransformSpec.TransformType.valueOf(transformName));
+ spec.setTransformParam(Optional.empty());
+ }
- return spec;
- }).collect(Collectors.toList());
+ return spec;
}
@Override
@@ -367,12 +379,6 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
TableDesc tableDesc = Utilities.getTableDesc(hmsTable);
Table table = IcebergTableUtil.getTable(conf, tableDesc.getProperties());
- if (table.spec().isUnpartitioned()) {
- return null;
- }
-
- // Iceberg currently doesn't have publicly accessible partition transform information, hence use above string parse
- List<PartitionTransformSpec> partitionTransformSpecs = getPartitionTransformSpec(hmsTable);
DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(Maps.newLinkedHashMap(),
hiveConf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME),
@@ -380,6 +386,39 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExprs = Lists.newLinkedList();
dpCtx.setCustomSortExpressions(customSortExprs);
+ if (table.spec().isPartitioned()) {
+ addCustomSortExpr(table, hmsTable, writeOperation, customSortExprs, getPartitionTransformSpec(hmsTable));
+ }
+
+ SortOrder sortOrder = table.sortOrder();
+ if (sortOrder.isSorted()) {
+ List<Integer> customSortPositions = Lists.newLinkedList();
+ List<Integer> customSortOrder = Lists.newLinkedList();
+ dpCtx.setCustomSortOrder(customSortOrder);
+ List<Integer> customSortNullOrder = Lists.newLinkedList();
+ dpCtx.setCustomSortNullOrder(customSortNullOrder);
+ for (SortField sortField : sortOrder.fields()) {
+ int pos = 0;
+ for (Types.NestedField field : table.schema().columns()) {
+ if (sortField.sourceId() == field.fieldId()) {
+ customSortPositions.add(pos);
+ customSortOrder.add(sortField.direction() == SortDirection.ASC ? 1 : 0);
+ customSortNullOrder.add(sortField.nullOrder() == NullOrder.NULLS_FIRST ? 0 : 1);
+ break;
+ }
+ pos++;
+ }
+ }
+
+ addCustomSortExpr(table, hmsTable, writeOperation, customSortExprs, getSortTransformSpec(table));
+ }
+
+ return dpCtx;
+ }
+
+ private void addCustomSortExpr(Table table, org.apache.hadoop.hive.ql.metadata.Table hmsTable,
+ Operation writeOperation, List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExprs,
+ List<TransformSpec> transformSpecs) {
Map<String, Integer> fieldOrderMap = Maps.newHashMap();
List<Types.NestedField> fields = table.schema().columns();
for (int i = 0; i < fields.size(); ++i) {
@@ -387,16 +426,15 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
}
int offset = acidSelectColumns(hmsTable, writeOperation).size();
- for (PartitionTransformSpec spec : partitionTransformSpecs) {
+
+ for (TransformSpec spec : transformSpecs) {
int order = fieldOrderMap.get(spec.getColumnName());
- if (PartitionTransformSpec.TransformType.BUCKET.equals(spec.getTransformType())) {
+ if (TransformSpec.TransformType.BUCKET.equals(spec.getTransformType())) {
customSortExprs.add(BUCKET_SORT_EXPR.apply(order + offset, spec.getTransformParam().get()));
} else {
customSortExprs.add(cols -> cols.get(order + offset).clone());
}
}
-
- return dpCtx;
}
@Override
@@ -570,6 +608,26 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
}
}
+ @Override
+ public boolean supportsSortColumns() {
+ return true;
+ }
+
+ @Override
+ public List<FieldSchema> sortColumns(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+ TableDesc tableDesc = Utilities.getTableDesc(hmsTable);
+ Table table = IcebergTableUtil.getTable(conf, tableDesc.getProperties());
+ if (table.sortOrder().isUnsorted()) {
+ return Collections.emptyList();
+ }
+
+ Schema schema = table.schema();
+ return table.sortOrder().fields().stream().map(s -> new FieldSchema(schema.findColumnName(s.sourceId()),
+ schema.findType(s.sourceId()).toString(),
+ String.format("Transform: %s, Sort direction: %s, Null sort order: %s",
+ s.transform().toString(), s.direction().name(), s.nullOrder().name()))).collect(Collectors.toList());
+ }
+
private void setCommonJobConf(JobConf jobConf) {
jobConf.set("tez.mrreader.config.update.properties", "hive.io.file.readcolumn.names,hive.io.file.readcolumn.ids");
}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
index 3fe2eee39df..344834ec62e 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec;
-import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec;
+import org.apache.hadoop.hive.ql.parse.TransformSpec;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
import org.apache.iceberg.ManageSnapshots;
import org.apache.iceberg.PartitionSpec;
@@ -98,15 +98,15 @@ public class IcebergTableUtil {
/**
* Create {@link PartitionSpec} based on the partition information stored in
- * {@link PartitionTransformSpec}.
+ * {@link TransformSpec}.
* @param configuration a Hadoop configuration
* @param schema iceberg table schema
* @return iceberg partition spec, always non-null
*/
public static PartitionSpec spec(Configuration configuration, Schema schema) {
- List<PartitionTransformSpec> partitionTransformSpecList = SessionStateUtil
+ List<TransformSpec> partitionTransformSpecList = SessionStateUtil
.getResource(configuration, hive_metastoreConstants.PARTITION_TRANSFORM_SPEC)
- .map(o -> (List<PartitionTransformSpec>) o).orElseGet(() -> null);
+ .map(o -> (List<TransformSpec>) o).orElseGet(() -> null);
if (partitionTransformSpecList == null) {
LOG.debug("Iceberg partition transform spec is not found in QueryState.");
@@ -154,9 +154,9 @@ public class IcebergTableUtil {
UpdatePartitionSpec updatePartitionSpec = table.updateSpec().caseSensitive(false);
table.spec().fields().forEach(field -> updatePartitionSpec.removeField(field.name()));
- List<PartitionTransformSpec> partitionTransformSpecList = SessionStateUtil
+ List<TransformSpec> partitionTransformSpecList = SessionStateUtil
.getResource(configuration, hive_metastoreConstants.PARTITION_TRANSFORM_SPEC)
- .map(o -> (List<PartitionTransformSpec>) o).orElseGet(() -> null);
+ .map(o -> (List<TransformSpec>) o).orElseGet(() -> null);
partitionTransformSpecList.forEach(spec -> {
switch (spec.getTransformType()) {
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestHelper.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestHelper.java
index df61cfd8c37..5614281a731 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestHelper.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestHelper.java
@@ -31,6 +31,7 @@ import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
@@ -54,6 +55,7 @@ public class TestHelper {
private final FileFormat fileFormat;
private final TemporaryFolder tmp;
private final Map<String, String> tblProps;
+ private SortOrder order;
private Table table;
@@ -79,6 +81,10 @@ public class TestHelper {
conf.set(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(table.schema()));
}
+ public void setOrder(SortOrder order) {
+ this.order = order;
+ }
+
public Table table() {
return table;
}
@@ -91,13 +97,22 @@ public class TestHelper {
}
public Table createTable(Schema theSchema, PartitionSpec theSpec) {
- Table tbl = tables.create(theSchema, theSpec, properties(), tableIdentifier);
+ return createTable(theSchema, theSpec, null);
+ }
+
+ public Table createTable(Schema theSchema, PartitionSpec theSpec, SortOrder theOrder) {
+ Table tbl;
+ if (theOrder != null) {
+ tbl = tables.create(theSchema, theSpec, theOrder, properties(), tableIdentifier);
+ } else {
+ tbl = tables.create(theSchema, theSpec, properties(), tableIdentifier);
+ }
setTable(tbl);
return tbl;
}
public Table createTable() {
- return createTable(schema, spec);
+ return createTable(schema, spec, order);
}
public Table createUnpartitionedTable() {
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java
index 5e217acc82a..dc68ce9a80b 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java
@@ -261,8 +261,17 @@ public class HiveIcebergTestUtils {
sortedActual.sort(Comparator.comparingInt(record -> record.get(sortBy).hashCode()));
Assert.assertEquals(sortedExpected.size(), sortedActual.size());
- for (int i = 0; i < sortedExpected.size(); ++i) {
- assertEquals(sortedExpected.get(i), sortedActual.get(i));
+ validateData(sortedExpected, sortedActual);
+ }
+
+ /**
+ * Validates whether the 2 sets of records are the same.
+ * @param expected The expected list of Records
+ * @param actual The actual list of Records
+ */
+ public static void validateData(List<Record> expected, List<Record> actual) {
+ for (int i = 0; i < expected.size(); ++i) {
+ assertEquals(expected.get(i), actual.get(i));
}
}
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
index 31a589a7c96..0c15ba4e430 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
@@ -25,6 +25,7 @@ import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
@@ -38,6 +39,10 @@ import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
+import static org.apache.iceberg.NullOrder.NULLS_FIRST;
+import static org.apache.iceberg.NullOrder.NULLS_LAST;
+import static org.apache.iceberg.expressions.Expressions.bucket;
+import static org.apache.iceberg.expressions.Expressions.truncate;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
@@ -47,6 +52,87 @@ import static org.apache.iceberg.types.Types.NestedField.required;
*/
public class TestHiveIcebergInserts extends HiveIcebergStorageHandlerWithEngineBase {
+ @Test
+ public void testSortedInsert() throws IOException {
+ TableIdentifier identifier = TableIdentifier.of("default", "sort_table");
+
+ Schema schema = new Schema(
+ optional(1, "id", Types.IntegerType.get(), "unique ID"),
+ optional(2, "data", Types.StringType.get())
+ );
+ SortOrder order = SortOrder.builderFor(schema)
+ .asc("id", NULLS_FIRST)
+ .desc("data", NULLS_LAST)
+ .build();
+
+ testTables.createTable(shell, identifier.name(), schema, order, PartitionSpec.unpartitioned(), fileFormat,
+ ImmutableList.of(), 1, ImmutableMap.of());
+ shell.executeStatement(String.format("INSERT INTO TABLE %s VALUES (4, 'a'), (1, 'a'), (3, 'a'), (2, 'a'), " +
+ "(null, 'a'), (3, 'b'), (3, null)", identifier.name()));
+
+ List<Record> expected = TestHelper.RecordsBuilder.newInstance(schema)
+ .add(null, "a").add(1, "a").add(2, "a").add(3, "b").add(3, "a").add(3, null).add(4, "a")
+ .build();
+ List<Object[]> result = shell.executeStatement(String.format("SELECT * FROM %s", identifier.name()));
+ HiveIcebergTestUtils.validateData(expected, HiveIcebergTestUtils.valueForRow(schema, result));
+ }
+
+ @Test
+ public void testSortedAndTransformedInsert() throws IOException {
+ TableIdentifier identifier = TableIdentifier.of("default", "sort_table");
+
+ SortOrder order = SortOrder.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+ .asc(bucket("customer_id", 2), NULLS_FIRST)
+ .desc(truncate("first_name", 4), NULLS_LAST)
+ .asc("last_name", NULLS_LAST)
+ .build();
+
+ testTables.createTable(shell, identifier.name(), HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, order,
+ PartitionSpec.unpartitioned(), fileFormat, ImmutableList.of(), 1, ImmutableMap.of());
+
+ StringBuilder insertQuery = new StringBuilder().append(String.format("INSERT INTO %s VALUES ", identifier.name()));
+ HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2.forEach(record -> insertQuery.append("(")
+ .append(record.get(0)).append(",'")
+ .append(record.get(1)).append("','")
+ .append(record.get(2)).append("'),"));
+ insertQuery.setLength(insertQuery.length() - 1);
+
+ shell.executeStatement(insertQuery.toString());
+ List<Record> expected = TestHelper.RecordsBuilder.newInstance(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+ .add(2L, "Susan", "Morrison").add(1L, "Sharon", "Taylor").add(1L, "Joanna", "Pierce")
+ .add(2L, "Joanna", "Silver").add(2L, "Jake", "Donnel").add(2L, "Bob", "Silver").add(3L, "Trudy", "Henderson")
+ .add(3L, "Trudy", "Johnson").add(3L, "Blake", "Burr").build();
+ List<Object[]> result = shell.executeStatement(String.format("SELECT * FROM %s", identifier.name()));
+ HiveIcebergTestUtils.validateData(expected,
+ HiveIcebergTestUtils.valueForRow(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, result));
+ }
+
+ @Test
+ public void testSortedAndTransformedInsertIntoPartitionedTable() throws IOException {
+ TableIdentifier identifier = TableIdentifier.of("default", "tbl_bucketed");
+ Schema schema = new Schema(
+ optional(1, "a", Types.IntegerType.get()),
+ optional(2, "b", Types.StringType.get()),
+ optional(3, "c", Types.IntegerType.get())
+ );
+ SortOrder order = SortOrder.builderFor(schema)
+ .desc("c", NULLS_FIRST)
+ .asc(truncate("b", 1))
+ .build();
+ PartitionSpec partitionSpec = PartitionSpec.builderFor(schema)
+ .bucket("b", 2)
+ .build();
+ testTables.createTable(shell, identifier.name(), schema, order, partitionSpec, fileFormat, ImmutableList.of(), 1,
+ ImmutableMap.of());
+ shell.executeStatement(String.format("INSERT INTO %s VALUES (1, 'EUR', 10), (5, 'HUF', 30), (2, 'EUR', 10), " +
+ "(8, 'PLN', 20), (6, 'USD', null)", identifier.name()));
+ List<Object[]> result = shell.executeStatement(String.format("SELECT * FROM %s", identifier.name()));
+ List<Record> expected =
+ TestHelper.RecordsBuilder.newInstance(schema).add(1, "EUR", 10).add(2, "EUR", 10).add(6, "USD", null)
+ .add(5, "HUF", 30).add(8, "PLN", 20).build();
+ HiveIcebergTestUtils.validateData(expected, HiveIcebergTestUtils.valueForRow(schema, result));
+ }
+
@Test
public void testInsert() throws IOException {
Table table = testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
index f2b58d776c5..efaea9a8a01 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
@@ -251,9 +251,33 @@ abstract class TestTables {
*/
public Table createTable(TestHiveShell shell, String tableName, Schema schema, FileFormat fileFormat,
List<Record> records, int formatVersion, Map<String, String> tblProperties) throws IOException {
+ return createTable(shell, tableName, schema, SortOrder.unsorted(), PartitionSpec.unpartitioned(), fileFormat,
+ records, formatVersion, tblProperties);
+ }
+
+ /**
+ * Creates a non partitioned Hive test table. Creates the Iceberg table/data and creates the corresponding Hive
+ * table as well when needed. The table will be in the 'default' database. The table will be populated with the
+ * provided List of {@link Record}s.
+ * @param shell The HiveShell used for Hive table creation
+ * @param tableName The name of the test table
+ * @param schema The schema used for the table creation
+ * @param order The sort order used for the table creation
+ * @param partSpec The partition spec used for the table creation
+ * @param fileFormat The file format used for writing the data
+ * @param records The records with which the table is populated
+ * @param formatVersion The version of the spec the table should use (format-version)
+ * @param tblProperties Additional table properties
+ * @return The created table
+ * @throws IOException If there is an error writing data
+ */
+ public Table createTable(TestHiveShell shell, String tableName, Schema schema, SortOrder order,
+ PartitionSpec partSpec, FileFormat fileFormat, List<Record> records, int formatVersion,
+ Map<String, String> tblProperties) throws IOException {
ImmutableMap<String, String> tblProps = ImmutableMap.<String, String>builder().putAll(tblProperties)
.put(TableProperties.FORMAT_VERSION, Integer.toString(formatVersion)).build();
- Table table = createIcebergTable(shell.getHiveConf(), tableName, schema, fileFormat, tblProps, records);
+ Table table = createIcebergTable(shell.getHiveConf(), tableName, schema, order, partSpec, fileFormat, tblProps,
+ records);
String createHiveSQL = createHiveTableSQL(TableIdentifier.of("default", tableName), tblProps);
if (createHiveSQL != null) {
shell.executeStatement(createHiveSQL);
@@ -386,9 +410,30 @@ abstract class TestTables {
*/
public Table createIcebergTable(Configuration configuration, String tableName, Schema schema, FileFormat fileFormat,
Map<String, String> additionalTableProps, List<Record> records) throws IOException {
+ return createIcebergTable(configuration, tableName, schema, SortOrder.unsorted(), PartitionSpec.unpartitioned(),
+ fileFormat, additionalTableProps, records);
+ }
+
+ /**
+ * Creates an Iceberg table/data without creating the corresponding Hive table. The table will be in the 'default'
+ * namespace.
+ * @param configuration The configuration used during the table creation
+ * @param tableName The name of the test table
+ * @param schema The schema used for the table creation
+ * @param order The sort order used for the table creation
+ * @param partSpec The partition spec used for the table creation
+ * @param fileFormat The file format used for writing the data
+ * @param records The records with which the table is populated
+ * @return The create table
+ * @throws IOException If there is an error writing data
+ */
+ public Table createIcebergTable(Configuration configuration, String tableName, Schema schema, SortOrder order,
+ PartitionSpec partSpec, FileFormat fileFormat, Map<String, String> additionalTableProps, List<Record> records)
+ throws IOException {
String identifier = identifier("default." + tableName);
TestHelper helper = new TestHelper(new Configuration(configuration), tables(), identifier, schema,
- PartitionSpec.unpartitioned(), fileFormat, additionalTableProps, temp);
+ partSpec, fileFormat, additionalTableProps, temp);
+ helper.setOrder(order);
Table table = helper.createTable();
if (records != null && !records.isEmpty()) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableDesc.java
index b484428cc07..eeac3a6d339 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableDesc.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.PartitionManagementTask;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
@@ -45,7 +44,6 @@ import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -59,7 +57,7 @@ import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.ParseUtils;
import org.apache.hadoop.hive.ql.parse.PartitionTransform;
-import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec;
+import org.apache.hadoop.hive.ql.parse.TransformSpec;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.Explain;
@@ -833,7 +831,7 @@ public class CreateTableDesc implements DDLDesc, Serializable {
if (partCols.isPresent() && !partCols.get().isEmpty()) {
// Add the partition columns to the normal columns and save the transform to the session state
tbl.getSd().getCols().addAll(partCols.get());
- List<PartitionTransformSpec> spec = PartitionTransform.getPartitionTransformSpec(partCols.get());
+ List<TransformSpec> spec = PartitionTransform.getPartitionTransformSpec(partCols.get());
if (!SessionStateUtil.addResource(conf, hive_metastoreConstants.PARTITION_TRANSFORM_SPEC, spec)) {
throw new HiveException("Query state attached to Session state must be not null. " +
"Partition transform metadata cannot be saved.");
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/desc/formatter/JsonDescTableFormatter.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/desc/formatter/JsonDescTableFormatter.java
index 1444eb23263..d115935b4ca 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/desc/formatter/JsonDescTableFormatter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/desc/formatter/JsonDescTableFormatter.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.hive.ql.metadata.PrimaryKeyInfo;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.metadata.UniqueConstraint;
import org.apache.hadoop.hive.ql.metadata.formatting.MapBuilder;
-import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec;
+import org.apache.hadoop.hive.ql.parse.TransformSpec;
import java.io.DataOutputStream;
import java.util.ArrayList;
@@ -245,7 +245,7 @@ public class JsonDescTableFormatter extends DescTableFormatter {
}
if (table.isNonNative() && table.getStorageHandler() != null &&
table.getStorageHandler().supportsPartitionTransform()) {
- List<PartitionTransformSpec> specs = table.getStorageHandler().getPartitionTransformSpec(table);
+ List<TransformSpec> specs = table.getStorageHandler().getPartitionTransformSpec(table);
if (!specs.isEmpty()) {
builder.put("partitionSpecInfo", specs.stream().map(s -> {
Map<String, String> result = new LinkedHashMap<>();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/desc/formatter/TextDescTableFormatter.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/desc/formatter/TextDescTableFormatter.java
index 53273b9810c..5583337ce20 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/desc/formatter/TextDescTableFormatter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/desc/formatter/TextDescTableFormatter.java
@@ -47,7 +47,7 @@ import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.metadata.UniqueConstraint;
import org.apache.hadoop.hive.ql.metadata.ForeignKeyInfo.ForeignKeyCol;
import org.apache.hadoop.hive.ql.metadata.UniqueConstraint.UniqueConstraintCol;
-import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec;
+import org.apache.hadoop.hive.ql.parse.TransformSpec;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hive.common.util.HiveStringUtils;
@@ -110,12 +110,12 @@ class TextDescTableFormatter extends DescTableFormatter {
if (table.isNonNative() && table.getStorageHandler() != null &&
table.getStorageHandler().supportsPartitionTransform()) {
- List<PartitionTransformSpec> partSpecs = table.getStorageHandler().getPartitionTransformSpec(table);
+ List<TransformSpec> partSpecs = table.getStorageHandler().getPartitionTransformSpec(table);
if (partSpecs != null && !partSpecs.isEmpty()) {
TextMetaDataTable metaDataTable = new TextMetaDataTable();
partitionTransformOutput += LINE_DELIM + "# Partition Transform Information" + LINE_DELIM + "# ";
metaDataTable.addRow(DescTableDesc.PARTITION_TRANSFORM_SPEC_SCHEMA.split("#")[0].split(","));
- for (PartitionTransformSpec spec : partSpecs) {
+ for (TransformSpec spec : partSpecs) {
String[] row = new String[2];
row[0] = spec.getColumnName();
if (spec.getTransformType() != null) {
@@ -278,7 +278,14 @@ class TextDescTableFormatter extends DescTableFormatter {
formatOutput("Num Buckets:", String.valueOf(storageDesc.getNumBuckets()), tableInfo);
formatOutput("Bucket Columns:", storageDesc.getBucketCols().toString(), tableInfo);
}
- formatOutput("Sort Columns:", storageDesc.getSortCols().toString(), tableInfo);
+
+ String sortColumnsInfo;
+ if (table.isNonNative() && table.getStorageHandler() != null && table.getStorageHandler().supportsSortColumns()) {
+ sortColumnsInfo = table.getStorageHandler().sortColumns(table).toString();
+ } else {
+ sortColumnsInfo = storageDesc.getSortCols().toString();
+ }
+ formatOutput("Sort Columns:", sortColumnsInfo, tableInfo);
if (storageDesc.isStoredAsSubDirectories()) {
formatOutput("Stored As SubDirectories:", "Yes", tableInfo);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/set/AlterTableSetPartitionSpecAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/set/AlterTableSetPartitionSpecAnalyzer.java
index 442d927b1b7..4764eedf561 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/set/AlterTableSetPartitionSpecAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/set/AlterTableSetPartitionSpecAnalyzer.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.HiveParser;
import org.apache.hadoop.hive.ql.parse.PartitionTransform;
-import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec;
+import org.apache.hadoop.hive.ql.parse.TransformSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
@@ -54,7 +54,7 @@ public class AlterTableSetPartitionSpecAnalyzer extends AbstractAlterTableAnalyz
Table table = getTable(tableName);
validateAlterTableType(table, AlterTableType.SETPARTITIONSPEC, false);
inputs.add(new ReadEntity(table));
- List<PartitionTransformSpec> partitionTransformSpec =
+ List<TransformSpec> partitionTransformSpec =
PartitionTransform.getPartitionTransformSpec(command);
if (!SessionStateUtil.addResource(conf, hive_metastoreConstants.PARTITION_TRANSFORM_SPEC,
partitionTransformSpec)) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLPlanUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLPlanUtils.java
index 62248ff4f71..649455163fe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLPlanUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLPlanUtils.java
@@ -57,7 +57,7 @@ import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.PrimaryKeyInfo;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.metadata.UniqueConstraint;
-import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec;
+import org.apache.hadoop.hive.ql.parse.TransformSpec;
import org.apache.hadoop.hive.ql.util.DirectionUtils;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
@@ -920,13 +920,13 @@ public class DDLPlanUtils {
private String getPartitionsBySpec(Table table) {
if (table.isNonNative() && table.getStorageHandler() != null &&
table.getStorageHandler().supportsPartitionTransform()) {
- List<PartitionTransformSpec> specs = table.getStorageHandler().getPartitionTransformSpec(table);
+ List<TransformSpec> specs = table.getStorageHandler().getPartitionTransformSpec(table);
if (specs.isEmpty()) {
return "";
}
List<String> partitionTransforms = new ArrayList<>();
- for (PartitionTransformSpec spec : specs) {
- if (spec.getTransformType() == PartitionTransformSpec.TransformType.IDENTITY) {
+ for (TransformSpec spec : specs) {
+ if (spec.getTransformType() == TransformSpec.TransformType.IDENTITY) {
partitionTransforms.add(spec.getColumnName());
} else {
partitionTransforms.add(spec.getTransformType().name() + "(" +
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
index b4b3cfca9d8..b67d5d83476 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
@@ -36,7 +36,7 @@ import org.apache.hadoop.hive.ql.Context.Operation;
import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec;
-import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec;
+import org.apache.hadoop.hive.ql.parse.TransformSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -346,6 +346,25 @@ public interface HiveStorageHandler extends Configurable {
return Collections.emptyList();
}
+ /**
+ * Check if the underlying storage handler implementation supports sort columns.
+ * @return true if the storage handler can support it
+ */
+ default boolean supportsSortColumns() {
+ return false;
+ }
+
+ /**
+ * Collect the columns that are used to sort the content of the data files
+ * @param table the table which is being sorted
+ * @return the list of columns that are used during data sorting
+ */
+ default List<FieldSchema> sortColumns(org.apache.hadoop.hive.ql.metadata.Table table) {
+ Preconditions.checkState(supportsSortColumns(), "Should only be called for table formats where data sorting " +
+ "is supported");
+ return Collections.emptyList();
+ }
+
/**
* Check if the underlying storage handler implementation support partition transformations.
* @return true if the storage handler can support it
@@ -360,7 +379,7 @@ public interface HiveStorageHandler extends Configurable {
* @param table the HMS table, must be non-null
* @return partition transform specification, can be null.
*/
- default List<PartitionTransformSpec> getPartitionTransformSpec(org.apache.hadoop.hive.ql.metadata.Table table) {
+ default List<TransformSpec> getPartitionTransformSpec(org.apache.hadoop.hive.ql.metadata.Table table) {
return null;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
index 2668f269b5f..b57ddd8e6c4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hive.ql.exec.SelectOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.Utilities.ReduceField;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
import org.apache.hadoop.hive.ql.lib.SemanticDispatcher;
@@ -61,7 +62,7 @@ import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.parse.type.*;
+import org.apache.hadoop.hive.ql.parse.type.ExprNodeTypeCheck;
import org.apache.hadoop.hive.ql.plan.ColStatistics;
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
@@ -200,6 +201,8 @@ public class SortedDynPartitionOptimizer extends Transform {
List<Integer> partitionPositions = getPartitionPositions(dpCtx, fsParent.getSchema());
LinkedList<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExprs =
new LinkedList<>(dpCtx.getCustomSortExpressions());
+ LinkedList<Integer> customSortOrder = new LinkedList<>(dpCtx.getCustomSortOrder());
+ LinkedList<Integer> customNullOrder = new LinkedList<>(dpCtx.getCustomSortNullOrder());
// If custom sort expressions are present, there is an explicit requirement to do sorting
if (customSortExprs.isEmpty() && !shouldDo(partitionPositions, fsParent)) {
@@ -301,8 +304,9 @@ public class SortedDynPartitionOptimizer extends Transform {
fsOp.getConf().setTotalFiles(1);
// Create ReduceSink operator
- ReduceSinkOperator rsOp = getReduceSinkOp(partitionPositions, sortPositions, customSortExprs, sortOrder,
- sortNullOrder, allRSCols, bucketColumns, numBuckets, fsParent, fsOp.getConf().getWriteType());
+ ReduceSinkOperator rsOp = getReduceSinkOp(partitionPositions, sortPositions, sortOrder,
+ sortNullOrder, customSortExprs, customSortOrder, customNullOrder, allRSCols, bucketColumns, numBuckets,
+ fsParent, fsOp.getConf().getWriteType());
// we have to make sure not to reorder the child operators as it might cause weird behavior in the tasks at
// the same level. when there is auto stats gather at the same level as another operation then it might
// cause unnecessary preemption. Maintaining the order here to avoid such preemption and possible errors
@@ -572,8 +576,10 @@ public class SortedDynPartitionOptimizer extends Transform {
}
public ReduceSinkOperator getReduceSinkOp(List<Integer> partitionPositions, List<Integer> sortPositions,
- List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExprs, List<Integer> sortOrder,
- List<Integer> sortNullOrder, ArrayList<ExprNodeDesc> allCols, ArrayList<ExprNodeDesc> bucketColumns,
+ List<Integer> sortOrder, List<Integer> sortNullOrder,
+ List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExprs,
+ List<Integer> customSortOrder, List<Integer> customSortNullOrder,
+ ArrayList<ExprNodeDesc> allCols, ArrayList<ExprNodeDesc> bucketColumns,
int numBuckets, Operator<? extends OperatorDesc> parent, AcidUtils.Operation writeType) {
// Order of KEY columns, if custom sort is present partition and bucket columns are disregarded:
@@ -601,17 +607,25 @@ public class SortedDynPartitionOptimizer extends Transform {
}
keyColsPosInVal.addAll(sortPositions);
- // by default partition and bucket columns are sorted in ascending order
Integer order = 1;
+ // by default partition and bucket columns are sorted in ascending order
if (sortOrder != null && !sortOrder.isEmpty()) {
if (sortOrder.get(0) == 0) {
order = 0;
}
}
- for (int i = 0; i < keyColsPosInVal.size() + customSortExprs.size(); i++) {
+
+ for (Integer ignored : keyColsPosInVal) {
newSortOrder.add(order);
}
+ if (customSortExprPresent) {
+ for (int i = 0; i < customSortExprs.size() - customSortOrder.size(); i++) {
+ newSortOrder.add(order);
+ }
+ newSortOrder.addAll(customSortOrder);
+ }
+
String orderStr = "";
for (Integer i : newSortOrder) {
if (i == 1) {
@@ -631,10 +645,18 @@ public class SortedDynPartitionOptimizer extends Transform {
nullOrder = 1;
}
}
- for (int i = 0; i < keyColsPosInVal.size() + customSortExprs.size(); i++) {
+
+ for (Integer ignored : keyColsPosInVal) {
newSortNullOrder.add(nullOrder);
}
+ if (customSortExprPresent) {
+ for (int i = 0; i < customSortExprs.size() - customSortNullOrder.size(); i++) {
+ newSortNullOrder.add(nullOrder);
+ }
+ newSortNullOrder.addAll(customSortNullOrder);
+ }
+
String nullOrderStr = "";
for (Integer i : newSortNullOrder) {
if (i == 0) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/PartitionTransform.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/PartitionTransform.java
index 50a6371a1bd..dbc40e8bd11 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/PartitionTransform.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/PartitionTransform.java
@@ -18,11 +18,10 @@
package org.apache.hadoop.hive.ql.parse;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec.TransformType;
+import org.apache.hadoop.hive.ql.parse.TransformSpec.TransformType;
import java.util.ArrayList;
import java.util.List;
-import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -45,9 +44,9 @@ public class PartitionTransform {
* @param fields The partition column fields
* @return list of partition transforms
*/
- public static List<PartitionTransformSpec> getPartitionTransformSpec(List<FieldSchema> fields) {
+ public static List<TransformSpec> getPartitionTransformSpec(List<FieldSchema> fields) {
return fields.stream()
- .map(field -> new PartitionTransformSpec(field.getName(), TransformType.IDENTITY, Optional.empty()))
+ .map(field -> new TransformSpec(field.getName(), TransformType.IDENTITY, Optional.empty()))
.collect(Collectors.toList());
}
@@ -56,10 +55,10 @@ public class PartitionTransform {
* @param node AST Tree node, must be not null
* @return list of partition transforms
*/
- public static List<PartitionTransformSpec> getPartitionTransformSpec(ASTNode node) {
- List<PartitionTransformSpec> partSpecList = new ArrayList<>();
+ public static List<TransformSpec> getPartitionTransformSpec(ASTNode node) {
+ List<TransformSpec> partSpecList = new ArrayList<>();
for (int i = 0; i < node.getChildCount(); i++) {
- PartitionTransformSpec spec = new PartitionTransformSpec();
+ TransformSpec spec = new TransformSpec();
ASTNode child = (ASTNode) node.getChild(i);
for (int j = 0; j < child.getChildCount(); j++) {
ASTNode grandChild = (ASTNode) child.getChild(j);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 485089e4ad3..1e51d51caf2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -13645,7 +13645,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
}
break;
case HiveParser.TOK_TABLEPARTCOLSBYSPEC:
- List<PartitionTransformSpec> partitionTransformSpec =
+ List<TransformSpec> partitionTransformSpec =
PartitionTransform.getPartitionTransformSpec(child);
if (!SessionStateUtil.addResource(conf, hive_metastoreConstants.PARTITION_TRANSFORM_SPEC,
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/PartitionTransformSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TransformSpec.java
similarity index 90%
rename from ql/src/java/org/apache/hadoop/hive/ql/parse/PartitionTransformSpec.java
rename to ql/src/java/org/apache/hadoop/hive/ql/parse/TransformSpec.java
index 268660f8efa..f25cbda5af8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/PartitionTransformSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TransformSpec.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.hive.ql.parse;
import java.util.Optional;
-public class PartitionTransformSpec {
+public class TransformSpec {
public enum TransformType {
IDENTITY, YEAR, MONTH, DAY, HOUR, TRUNCATE, BUCKET, VOID
@@ -29,10 +29,10 @@ public class PartitionTransformSpec {
private TransformType transformType;
private Optional<Integer> transformParam;
- public PartitionTransformSpec() {
+ public TransformSpec() {
}
- public PartitionTransformSpec(String columnName, TransformType transformType, Optional<Integer> transformParam) {
+ public TransformSpec(String columnName, TransformType transformType, Optional<Integer> transformParam) {
this.columnName = columnName;
this.transformType = transformType;
this.transformParam = transformParam;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
index 4acc5406fb0..3497f3120cd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
@@ -60,6 +60,8 @@ public class DynamicPartitionCtx implements Serializable {
* schema and returns a single expression. Example for simply just referencing column 3: cols -> cols.get(3).clone()
*/
private transient List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExpressions;
+ private transient List<Integer> customSortOrder;
+ private transient List<Integer> customSortNullOrder;
public DynamicPartitionCtx() {
}
@@ -93,6 +95,8 @@ public class DynamicPartitionCtx implements Serializable {
}
this.whiteListPattern = confVal == null || confVal.isEmpty() ? null : Pattern.compile(confVal);
this.customSortExpressions = new LinkedList<>();
+ this.customSortOrder = new LinkedList<>();
+ this.customSortNullOrder = new LinkedList<>();
}
public DynamicPartitionCtx(Map<String, String> partSpec, String defaultPartName,
@@ -126,6 +130,8 @@ public class DynamicPartitionCtx implements Serializable {
}
this.whiteListPattern = confVal == null || confVal.isEmpty() ? null : Pattern.compile(confVal);
this.customSortExpressions = new LinkedList<>();
+ this.customSortOrder = new LinkedList<>();
+ this.customSortNullOrder = new LinkedList<>();
}
public DynamicPartitionCtx(DynamicPartitionCtx dp) {
@@ -141,6 +147,8 @@ public class DynamicPartitionCtx implements Serializable {
this.maxPartsPerNode = dp.maxPartsPerNode;
this.whiteListPattern = dp.whiteListPattern;
this.customSortExpressions = dp.customSortExpressions;
+ this.customSortOrder = dp.customSortOrder;
+ this.customSortNullOrder = dp.customSortNullOrder;
}
public Pattern getWhiteListPattern() {
@@ -234,4 +242,20 @@ public class DynamicPartitionCtx implements Serializable {
public void setCustomSortExpressions(List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExpressions) {
this.customSortExpressions = customSortExpressions;
}
+
+ public List<Integer> getCustomSortOrder() {
+ return customSortOrder;
+ }
+
+ public void setCustomSortOrder(List<Integer> customSortOrder) {
+ this.customSortOrder = customSortOrder;
+ }
+
+ public List<Integer> getCustomSortNullOrder() {
+ return customSortNullOrder;
+ }
+
+ public void setCustomSortNullOrder(List<Integer> customSortNullOrder) {
+ this.customSortNullOrder = customSortNullOrder;
+ }
}