You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2020/10/14 22:48:35 UTC
[impala] 04/06: IMPALA-10165: Implement Bucket and Truncate
partition transforms for Iceberg tables
This is an automated email from the ASF dual-hosted git repository.
joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 13a78fc1b080baed16ac42888a54c60e9eb7a894
Author: Gabor Kaszab <ga...@cloudera.com>
AuthorDate: Tue Sep 22 14:53:41 2020 +0200
IMPALA-10165: Implement Bucket and Truncate partition transforms for Iceberg tables
This patch adds support for Iceberg Bucket and Truncate partition
transforms. Both accept a parameter: number of buckets and width
respectively.
Usage:
CREATE TABLE tbl_name (i int, p1 int, p2 timestamp)
PARTITION BY SPEC (
p1 BUCKET 10,
p1 TRUNCATE 5
) STORED AS ICEBERG
TBLPROPERTIES ('iceberg.catalog'='hadoop.tables');
Testing:
- Extended AnalyzerStmtsTest to cover creating partitioned Iceberg
tables with the new partition transforms.
- Extended ParserTest.
- Extended iceberg-create.test to create Iceberg tables with the new
partition transforms.
- Extended show-create-table.test to check that the new partition
transforms are displayed with their parameters in the SHOW CREATE
TABLE output.
Change-Id: Idc75cd23045b274885607c45886319f4f6da19de
Reviewed-on: http://gerrit.cloudera.org:8080/16551
Reviewed-by: Gabor Kaszab <ga...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
common/thrift/CatalogObjects.thrift | 11 +-
fe/src/main/cup/sql-parser.cup | 17 ++-
.../apache/impala/analysis/CreateTableStmt.java | 14 +-
.../impala/analysis/IcebergPartitionField.java | 19 ++-
.../impala/analysis/IcebergPartitionTransform.java | 88 +++++++++++
.../org/apache/impala/catalog/FeIcebergTable.java | 12 +-
.../org/apache/impala/catalog/IcebergTable.java | 9 +-
.../java/org/apache/impala/util/IcebergUtil.java | 162 +++++++++++++++++----
.../apache/impala/analysis/AnalyzeStmtsTest.java | 22 +++
.../org/apache/impala/analysis/ParserTest.java | 4 +
.../queries/QueryTest/iceberg-create.test | 34 ++++-
.../queries/QueryTest/show-create-table.test | 27 +++-
12 files changed, 352 insertions(+), 67 deletions(-)
diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift
index 457d0dd..acac4c3 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -128,7 +128,7 @@ enum TAccessLevel {
WRITE_ONLY = 3
}
-enum TIcebergPartitionTransform {
+enum TIcebergPartitionTransformType {
IDENTITY = 0
HOUR = 1
DAY = 2
@@ -522,12 +522,19 @@ struct TKuduTable {
4: required list<TKuduPartitionParam> partition_by
}
+struct TIcebergPartitionTransform {
+ 1: required TIcebergPartitionTransformType transform_type
+
+ // Parameter for BUCKET and TRUNCATE transforms.
+ 2: optional i32 transform_param
+}
+
struct TIcebergPartitionField {
1: required i32 source_id
2: required i32 field_id
3: required string orig_field_name
4: required string field_name
- 5: required TIcebergPartitionTransform field_type
+ 5: required TIcebergPartitionTransform transform
}
struct TIcebergPartitionSpec {
diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index 4bf208e..e85a55c 100644
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -38,6 +38,7 @@ import org.apache.impala.analysis.TableSampleClause;
import org.apache.impala.analysis.AlterTableAddDropRangePartitionStmt.Operation;
import org.apache.impala.analysis.IcebergPartitionSpec;
import org.apache.impala.analysis.IcebergPartitionField;
+import org.apache.impala.analysis.IcebergPartitionTransform;
import org.apache.impala.catalog.ArrayType;
import org.apache.impala.catalog.MapType;
import org.apache.impala.catalog.RowFormat;
@@ -58,7 +59,6 @@ import org.apache.impala.thrift.TShowStatsOp;
import org.apache.impala.thrift.TTablePropertyType;
import org.apache.impala.thrift.TPrincipalType;
import org.apache.impala.thrift.TSortingOrder;
-import org.apache.impala.thrift.TIcebergPartitionTransform;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.util.IcebergUtil;
import org.apache.impala.common.NotImplementedException;
@@ -495,7 +495,8 @@ nonterminal IcebergPartitionSpec iceberg_partition_spec_def;
nonterminal List<IcebergPartitionField> iceberg_partition_field_list,
iceberg_partition_field_defs;
nonterminal IcebergPartitionField iceberg_partition_field_def;
-nonterminal TIcebergPartitionTransform iceberg_partition_transform;
+nonterminal IcebergPartitionTransform iceberg_partition_transform;
+nonterminal String iceberg_partition_transform_type;
// Options for DDL commands - CREATE/DROP/ALTER
nonterminal HdfsCachingOp cache_op_val, opt_cache_op_val;
nonterminal BigDecimal opt_cache_op_replication;
@@ -1787,10 +1788,20 @@ iceberg_partition_field_def ::=
;
iceberg_partition_transform ::=
- IDENT:transfrom_type
+ iceberg_partition_transform_type:transfrom_type INTEGER_LITERAL:transform_param
+ {: RESULT = IcebergUtil.getPartitionTransform(transfrom_type,
+ transform_param.intValue()); :}
+ | IDENT:transfrom_type
{: RESULT = IcebergUtil.getPartitionTransform(transfrom_type); :}
;
+iceberg_partition_transform_type ::=
+ IDENT:transform_type
+ {: RESULT = transform_type; :}
+ | KW_TRUNCATE
+ {: RESULT = "TRUNCATE"; :}
+;
+
create_udf_stmt ::=
KW_CREATE KW_FUNCTION if_not_exists_val:if_not_exists
function_name:fn_name function_def_args:fn_args
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
index 990496c..ff6df71 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
@@ -271,7 +271,7 @@ public class CreateTableStmt extends StatementBase {
}
if (getFileFormat() == THdfsFileFormat.ICEBERG) {
- analyzeIcebergFormat();
+ analyzeIcebergFormat(analyzer);
}
// If lineage logging is enabled, compute minimal lineage graph.
@@ -565,7 +565,7 @@ public class CreateTableStmt extends StatementBase {
/**
* For iceberg file format, add related storage handler
*/
- private void analyzeIcebergFormat() throws AnalysisException {
+ private void analyzeIcebergFormat(Analyzer analyzer) throws AnalysisException {
// A managed table cannot have 'external.table.purge' property set
if (!isExternal() && Boolean.parseBoolean(
getTblProperties().get(IcebergTable.TBL_PROP_EXTERNAL_TABLE_PURGE))) {
@@ -585,7 +585,7 @@ public class CreateTableStmt extends StatementBase {
}
// Check partition columns for managed iceberg table
- checkPartitionColumns();
+ checkPartitionColumns(analyzer);
}
String handler = getTblProperties().get(IcebergTable.KEY_STORAGE_HANDLER);
@@ -627,13 +627,17 @@ public class CreateTableStmt extends StatementBase {
/**
* For iceberg table, partition column must be from source column
*/
- private void checkPartitionColumns() throws AnalysisException {
+ private void checkPartitionColumns(Analyzer analyzer) throws AnalysisException {
// This check is unnecessary for iceberg table without partition spec
List<IcebergPartitionSpec> specs = tableDef_.getIcebergPartitionSpecs();
if (specs == null || specs.isEmpty()) return;
// Iceberg table only has one partition spec now
- List<IcebergPartitionField> fields = specs.get(0).getIcebergPartitionFields();
+ IcebergPartitionSpec spec = specs.get(0);
+ // Analyzes the partition spec and the underlying partition fields.
+ spec.analyze(analyzer);
+
+ List<IcebergPartitionField> fields = spec.getIcebergPartitionFields();
Preconditions.checkState(fields != null && !fields.isEmpty());
for (IcebergPartitionField field : fields) {
String fieldName = field.getFieldName();
diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionField.java b/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionField.java
index ef927e5..8ebd34c 100644
--- a/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionField.java
+++ b/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionField.java
@@ -19,7 +19,6 @@ package org.apache.impala.analysis;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.thrift.TIcebergPartitionField;
-import org.apache.impala.thrift.TIcebergPartitionTransform;
/**
* Represents a PartitionField of iceberg
@@ -40,21 +39,21 @@ public class IcebergPartitionField extends StmtNode {
// Holds the column name in the table.
private String origFieldName_;
- //Column partition type
- private TIcebergPartitionTransform fieldType_;
+ // Partition transform type and transform param for this partition field.
+ private IcebergPartitionTransform transform_;
public IcebergPartitionField(int sourceId, int fieldId, String origFieldName,
- String fieldName, TIcebergPartitionTransform fieldType) {
+ String fieldName, IcebergPartitionTransform transform) {
sourceId_ = sourceId;
fieldId_ = fieldId;
origFieldName_ = origFieldName;
fieldName_ = fieldName;
- fieldType_ = fieldType;
+ transform_ = transform;
}
// This constructor is called when creating a partitioned Iceberg table.
- public IcebergPartitionField(String fieldName, TIcebergPartitionTransform fieldType) {
- this(0, 0, fieldName, fieldName, fieldType);
+ public IcebergPartitionField(String fieldName, IcebergPartitionTransform transform) {
+ this(0, 0, fieldName, fieldName, transform);
}
public String getFieldName() {
@@ -67,7 +66,7 @@ public class IcebergPartitionField extends StmtNode {
@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
- return;
+ transform_.analyze(analyzer);
}
@Override
@@ -78,7 +77,7 @@ public class IcebergPartitionField extends StmtNode {
@Override
public String toSql(ToSqlOptions options) {
StringBuilder builder = new StringBuilder();
- builder.append(origFieldName_+ " " + fieldType_.toString());
+ builder.append(origFieldName_+ " " + transform_.toSql());
return builder.toString();
}
@@ -88,7 +87,7 @@ public class IcebergPartitionField extends StmtNode {
result.setSource_id(sourceId_);
result.setOrig_field_name(origFieldName_);
result.setField_name(fieldName_);
- result.setField_type(fieldType_);
+ result.setTransform(transform_.toThrift());
return result;
}
}
diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionTransform.java b/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionTransform.java
new file mode 100644
index 0000000..6db788c
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionTransform.java
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import org.apache.impala.common.AnalysisException;
+import org.apache.impala.thrift.TIcebergPartitionTransform;
+import org.apache.impala.thrift.TIcebergPartitionTransformType;
+
+/**
+ * Represents the partition transform part of an Iceberg partition field.
+*/
+public class IcebergPartitionTransform extends StmtNode {
+
+ // Stores the transform type such as HOUR, YEAR, etc.
+ private TIcebergPartitionTransformType transformType_;
+
+ // Stores the parameter of BUCKET or TRUNCATE partition transforms (numBuckets,
+ // width respectively). This is null for partition transforms that don't have a
+ // parameter.
+ private Integer transformParam_;
+
+ // Constructor for parameterless partition transforms.
+ public IcebergPartitionTransform(TIcebergPartitionTransformType transformType) {
+ this(transformType, null);
+ }
+
+ public IcebergPartitionTransform(TIcebergPartitionTransformType transformType,
+ Integer transformParam) {
+ transformType_ = transformType;
+ transformParam_ = transformParam;
+ }
+
+ @Override
+ public void analyze(Analyzer analyzer) throws AnalysisException {
+ if (transformType_ == TIcebergPartitionTransformType.BUCKET ||
+ transformType_ == TIcebergPartitionTransformType.TRUNCATE) {
+ if (transformParam_ == null) {
+ throw new AnalysisException("BUCKET and TRUNCATE partition transforms should " +
+ "have a parameter.");
+ }
+ if (transformParam_ <= 0) {
+ throw new AnalysisException("The parameter of a partition transform should " +
+ "be greater than zero.");
+ }
+ } else {
+ if (transformParam_ != null) {
+ throw new AnalysisException("Only BUCKET and TRUNCATE partition transforms " +
+ "accept a parameter.");
+ }
+ }
+ return;
+ }
+
+ @Override
+ public final String toSql() {
+ return toSql(ToSqlOptions.DEFAULT);
+ }
+
+ @Override
+ public String toSql(ToSqlOptions options) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(transformType_.toString());
+ if (transformParam_ != null) builder.append(" " + transformParam_.toString());
+ return builder.toString();
+ }
+
+ public TIcebergPartitionTransform toThrift() {
+ TIcebergPartitionTransform transform = new TIcebergPartitionTransform();
+ transform.setTransform_type(transformType_);
+ if (transformParam_ != null) transform.setTransform_param(transformParam_);
+ return transform;
+ }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
index 11dddd1..dd7c05e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
@@ -244,15 +244,17 @@ public interface FeIcebergTable extends FeFsTable {
TableMetadata metadata = IcebergUtil.getIcebergTableMetadata(table);
if (!metadata.specs().isEmpty()) {
- // Just show the latest PartitionSpec from iceberg table metadata
- PartitionSpec latestSpec = metadata.specs().get(metadata.specs().size() - 1);
+ // Just show the current PartitionSpec from Iceberg table metadata
+ PartitionSpec latestSpec = metadata.spec();
+ HashMap<String, Integer> transformParams =
+ IcebergUtil.getPartitionTransformParams(latestSpec);
for(PartitionField field : latestSpec.fields()) {
TResultRowBuilder builder = new TResultRowBuilder();
builder.add(latestSpec.specId());
builder.add(field.sourceId());
builder.add(field.fieldId());
builder.add(field.name());
- builder.add(IcebergUtil.getPartitionTransform(field).toString());
+ builder.add(IcebergUtil.getPartitionTransform(field, transformParams).toSql());
result.addToRows(builder.get());
}
}
@@ -369,10 +371,12 @@ public interface FeIcebergTable extends FeFsTable {
List<IcebergPartitionSpec> ret = new ArrayList<>();
for (PartitionSpec spec : metadata.specs()) {
List<IcebergPartitionField> fields = new ArrayList<>();;
+ HashMap<String, Integer> transformParams =
+ IcebergUtil.getPartitionTransformParams(spec);
for (PartitionField field : spec.fields()) {
fields.add(new IcebergPartitionField(field.sourceId(), field.fieldId(),
spec.schema().findColumnName(field.sourceId()), field.name(),
- IcebergUtil.getPartitionTransform(field)));
+ IcebergUtil.getPartitionTransform(field, transformParams)));
}
ret.add(new IcebergPartitionSpec(spec.specId(), fields));
}
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
index 0845c4f..b0f384a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
@@ -32,6 +32,7 @@ import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.types.Types;
import org.apache.impala.analysis.IcebergPartitionField;
import org.apache.impala.analysis.IcebergPartitionSpec;
+import org.apache.impala.analysis.IcebergPartitionTransform;
import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
import org.apache.impala.thrift.TCatalogObjectType;
import org.apache.impala.thrift.THdfsFileDesc;
@@ -312,8 +313,14 @@ public class IcebergTable extends Table implements FeIcebergTable {
if (param.getPartition_fields() != null) {
List<IcebergPartitionField> fields = new ArrayList<>();
for (TIcebergPartitionField field : param.getPartition_fields()) {
+ Integer transformParam = null;
+ if (field.getTransform().isSetTransform_param()) {
+ transformParam = field.getTransform().getTransform_param();
+ }
fields.add(new IcebergPartitionField(field.getSource_id(), field.getField_id(),
- field.getOrig_field_name(), field.getField_name(), field.getField_type()));
+ field.getOrig_field_name(), field.getField_name(),
+ new IcebergPartitionTransform(field.getTransform().getTransform_type(),
+ transformParam)));
}
ret.add(new IcebergPartitionSpec(param.getPartition_id(),
fields));
diff --git a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
index af4ac31..79444f1 100644
--- a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
@@ -18,10 +18,14 @@
package org.apache.impala.util;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import com.google.common.collect.Maps;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
+
+import org.apache.impala.common.Pair;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.DataFile;
@@ -34,7 +38,10 @@ import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.transforms.PartitionSpecVisitor;
+import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.types.Types;
+import org.apache.impala.analysis.IcebergPartitionTransform;
import org.apache.impala.catalog.ArrayType;
import org.apache.impala.catalog.Catalog;
import org.apache.impala.catalog.FeIcebergTable;
@@ -54,6 +61,7 @@ import org.apache.impala.thrift.TIcebergCatalog;
import org.apache.impala.thrift.TIcebergFileFormat;
import org.apache.impala.thrift.TIcebergPartitionField;
import org.apache.impala.thrift.TIcebergPartitionTransform;
+import org.apache.impala.thrift.TIcebergPartitionTransformType;
public class IcebergUtil {
@@ -154,19 +162,27 @@ public class IcebergUtil {
List<TIcebergPartitionField> partitionFields =
params.getPartition_spec().getPartition_fields();
for (TIcebergPartitionField partitionField : partitionFields) {
- if (partitionField.getField_type() == TIcebergPartitionTransform.IDENTITY) {
+ TIcebergPartitionTransformType transformType =
+ partitionField.getTransform().getTransform_type();
+ if (transformType == TIcebergPartitionTransformType.IDENTITY) {
builder.identity(partitionField.getField_name());
- } else if (partitionField.getField_type() == TIcebergPartitionTransform.HOUR) {
+ } else if (transformType == TIcebergPartitionTransformType.HOUR) {
builder.hour(partitionField.getField_name());
- } else if (partitionField.getField_type() == TIcebergPartitionTransform.DAY) {
+ } else if (transformType == TIcebergPartitionTransformType.DAY) {
builder.day(partitionField.getField_name());
- } else if (partitionField.getField_type() == TIcebergPartitionTransform.MONTH) {
+ } else if (transformType == TIcebergPartitionTransformType.MONTH) {
builder.month(partitionField.getField_name());
- } else if (partitionField.getField_type() == TIcebergPartitionTransform.YEAR) {
+ } else if (transformType == TIcebergPartitionTransformType.YEAR) {
builder.year(partitionField.getField_name());
+ } else if (transformType == TIcebergPartitionTransformType.BUCKET) {
+ builder.bucket(partitionField.getField_name(),
+ partitionField.getTransform().getTransform_param());
+ } else if (transformType == TIcebergPartitionTransformType.TRUNCATE) {
+ builder.truncate(partitionField.getField_name(),
+ partitionField.getTransform().getTransform_param());
} else {
throw new ImpalaRuntimeException(String.format("Skip partition: %s, %s",
- partitionField.getField_name(), partitionField.getField_type()));
+ partitionField.getField_name(), transformType));
}
}
return builder.build();
@@ -212,36 +228,126 @@ public class IcebergUtil {
return null;
}
- /**
- * Build TIcebergPartitionTransform by iceberg PartitionField
- */
- public static TIcebergPartitionTransform getPartitionTransform(PartitionField field)
+ public static IcebergPartitionTransform getPartitionTransform(
+ PartitionField field, HashMap<String, Integer> transformParams)
throws TableLoadingException {
String type = field.transform().toString();
- return getPartitionTransform(type);
+ String transformMappingKey = getPartitonTransformMappingKey(field.sourceId(),
+ getPartitionTransformType(type));
+ return getPartitionTransform(type, transformParams.get(transformMappingKey));
}
- public static TIcebergPartitionTransform getPartitionTransform(String type)
+ public static IcebergPartitionTransform getPartitionTransform(String transformType,
+ Integer transformParam) throws TableLoadingException {
+ return new IcebergPartitionTransform(getPartitionTransformType(transformType),
+ transformParam);
+ }
+
+ public static IcebergPartitionTransform getPartitionTransform(String transformType)
throws TableLoadingException {
- type = type.toUpperCase();
- if ("IDENTITY".equals(type)) {
- return TIcebergPartitionTransform.IDENTITY;
- } else if ("HOUR".equals(type)) {
- return TIcebergPartitionTransform.HOUR;
- } else if ("DAY".equals(type)) {
- return TIcebergPartitionTransform.DAY;
- } else if ("MONTH".equals(type)) {
- return TIcebergPartitionTransform.MONTH;
- } else if ("YEAR".equals(type)) {
- return TIcebergPartitionTransform.YEAR;
- } else if ("BUCKET".equals(type)) {
- return TIcebergPartitionTransform.BUCKET;
- } else if ("TRUNCATE".equals(type)) {
- return TIcebergPartitionTransform.TRUNCATE;
+ return getPartitionTransform(transformType, null);
+ }
+
+ public static TIcebergPartitionTransformType getPartitionTransformType(
+ String transformType) throws TableLoadingException {
+ transformType = transformType.toUpperCase();
+ if ("IDENTITY".equals(transformType)) {
+ return TIcebergPartitionTransformType.IDENTITY;
+ } else if ("HOUR".equals(transformType)) {
+ return TIcebergPartitionTransformType.HOUR;
+ } else if ("DAY".equals(transformType)) {
+ return TIcebergPartitionTransformType.DAY;
+ } else if ("MONTH".equals(transformType)) {
+ return TIcebergPartitionTransformType.MONTH;
+ } else if ("YEAR".equals(transformType)) {
+ return TIcebergPartitionTransformType.YEAR;
+ } else if (transformType != null && transformType.startsWith("BUCKET")) {
+ return TIcebergPartitionTransformType.BUCKET;
+ } else if (transformType != null && transformType.startsWith("TRUNCATE")) {
+ return TIcebergPartitionTransformType.TRUNCATE;
} else {
throw new TableLoadingException("Unsupported iceberg partition type: " +
- type);
+ transformType);
+ }
+ }
+
+ private static String getPartitonTransformMappingKey(int sourceId,
+ TIcebergPartitionTransformType transformType) {
+ return sourceId + "_" + transformType.toString();
+ }
+
+ /**
+ * Gets a PartitionSpec object and returns a mapping between a field in the
+ * PartitionSpec and its transform's parameter. Only Bucket and Truncate transforms
+ * have a parameter, for other transforms this mapping will have a null.
+ * source ID and the transform type are needed together to uniquely identify a specific
+ * field in the PartitionSpec. (Unfortunaltely, fieldId is not available in the Visitor
+ * class below.)
+ * The reason for implementing the PartitionSpecVisitor below was that Iceberg doesn't
+ * expose the interface of the transform types outside of their package and the only
+ * way to get the transform's parameter is implementing this visitor class.
+ */
+ public static HashMap<String, Integer> getPartitionTransformParams(PartitionSpec spec)
+ throws TableLoadingException {
+ List<Pair<String, Integer>> transformParams = PartitionSpecVisitor.visit(
+ spec.schema(), spec, new PartitionSpecVisitor<Pair<String, Integer>>() {
+ @Override
+ public Pair<String, Integer> identity(String sourceName, int sourceId) {
+ String mappingKey = getPartitonTransformMappingKey(sourceId,
+ TIcebergPartitionTransformType.IDENTITY);
+ return new Pair<String, Integer>(mappingKey, null);
+ }
+
+ @Override
+ public Pair<String, Integer> bucket(String sourceName, int sourceId,
+ int numBuckets) {
+ String mappingKey = getPartitonTransformMappingKey(sourceId,
+ TIcebergPartitionTransformType.BUCKET);
+ return new Pair<String, Integer>(mappingKey, numBuckets);
+ }
+
+ @Override
+ public Pair<String, Integer> truncate(String sourceName, int sourceId,
+ int width) {
+ String mappingKey = getPartitonTransformMappingKey(sourceId,
+ TIcebergPartitionTransformType.TRUNCATE);
+ return new Pair<String, Integer>(mappingKey, width);
+ }
+
+ @Override
+ public Pair<String, Integer> year(String sourceName, int sourceId) {
+ String mappingKey = getPartitonTransformMappingKey(sourceId,
+ TIcebergPartitionTransformType.YEAR);
+ return new Pair<String, Integer>(mappingKey, null);
+ }
+
+ @Override
+ public Pair<String, Integer> month(String sourceName, int sourceId) {
+ String mappingKey = getPartitonTransformMappingKey(sourceId,
+ TIcebergPartitionTransformType.MONTH);
+ return new Pair<String, Integer>(mappingKey, null);
+ }
+
+ @Override
+ public Pair<String, Integer> day(String sourceName, int sourceId) {
+ String mappingKey = getPartitonTransformMappingKey(sourceId,
+ TIcebergPartitionTransformType.DAY);
+ return new Pair<String, Integer>(mappingKey, null);
+ }
+
+ @Override
+ public Pair<String, Integer> hour(String sourceName, int sourceId) {
+ String mappingKey = getPartitonTransformMappingKey(sourceId,
+ TIcebergPartitionTransformType.HOUR);
+ return new Pair<String, Integer>(mappingKey, null);
+ }
+ });
+ // Move the content of the List into a HashMap for faster querying in the future.
+ HashMap<String, Integer> result = Maps.newHashMap();
+ for (Pair<String, Integer> transformParam : transformParams) {
+ result.put(transformParam.first, transformParam.second);
}
+ return result;
}
/**
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index 5b4d4db..bbce69b 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -4836,4 +4836,26 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
+ "or they should both return 'STRING' or 'VARCHAR' or 'CHAR' types, "
+ "but they return types 'NULL_TYPE' and 'STRING'.");
}
+
+ @Test
+ public void testCreatePartitionedIcebergTable() throws ImpalaException {
+ String tblProperties = " TBLPROPERTIES ('iceberg.catalog'='hadoop.tables')";
+ AnalyzesOk("CREATE TABLE tbl1 (i int, p1 int, p2 timestamp) " +
+ "PARTITION BY SPEC (p1 BUCKET 10, p1 TRUNCATE 5, p2 DAY) STORED AS ICEBERG" +
+ tblProperties);
+ AnalysisError("CREATE TABLE tbl1 (i int, p1 int, p2 timestamp) " +
+ "PARTITION BY SPEC (p1 BUCKET, p2 DAY) STORED AS ICEBERG" + tblProperties,
+ "BUCKET and TRUNCATE partition transforms should have a parameter.");
+ AnalysisError("CREATE TABLE tbl1 (i int, p1 int, p2 timestamp) " +
+ "PARTITION BY SPEC (p1 BUCKET 0, p2 DAY) STORED AS ICEBERG" + tblProperties,
+ "The parameter of a partition transform should be greater than zero.");
+ AnalysisError("CREATE TABLE tbl1 (i int, p1 int, p2 timestamp) " +
+ "PARTITION BY SPEC (p1 TRUNCATE 0, p2 DAY) STORED AS ICEBERG" + tblProperties,
+ "The parameter of a partition transform should be greater than zero.");
+
+ AnalysisError("CREATE TABLE tbl1 (i int, p1 int, p2 timestamp) " +
+ "PARTITION BY SPEC (p1 BUCKET 10, p2 DAY 10) STORED AS ICEBERG" + tblProperties,
+ "Only BUCKET and TRUNCATE partition transforms accept a parameter.");
+ }
+
}
diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
index 582125b..73111e5 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
@@ -2984,6 +2984,10 @@ public class ParserTest extends FrontendTestBase {
ParserError("CREATE TABLE Foo(a int PRIMARY KEY, b int BLOCK_SIZE 1+1) " +
"STORED AS KUDU");
ParserError("CREATE TABLE Foo(a int PRIMARY KEY BLOCK_SIZE -1) STORED AS KUDU");
+
+ // Iceberg TRUNCATE partition transform without parameter results a parse error.
+ ParserError("CREATE TABLE tbl1 (i int, p1 int) PARTITION BY SPEC (p1 TRUNCATE) " +
+ "STORED AS ICEBERG TBLPROPERTIES ('iceberg.catalog'='hadoop.tables')");
}
@Test
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-create.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-create.test
index 436b326..24f5239 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-create.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-create.test
@@ -12,8 +12,10 @@ CREATE TABLE iceberg_test1(
PARTITION BY SPEC
(
level IDENTITY,
+ level TRUNCATE 10,
event_time IDENTITY,
event_time HOUR,
+ event_time BUCKET 1000,
register_time DAY
)
STORED AS ICEBERG
@@ -38,9 +40,11 @@ STRING,STRING,STRING
SHOW PARTITIONS iceberg_test1;
---- RESULTS
0,1,1000,'level','IDENTITY'
-0,2,1001,'event_time','IDENTITY'
-0,2,1002,'event_time_hour','HOUR'
-0,3,1003,'register_time_day','DAY'
+0,1,1001,'level_trunc','TRUNCATE 10'
+0,2,1002,'event_time','IDENTITY'
+0,2,1003,'event_time_hour','HOUR'
+0,2,1004,'event_time_bucket','BUCKET 1000'
+0,3,1005,'register_time_day','DAY'
---- TYPES
BIGINT,BIGINT,BIGINT,STRING,STRING
====
@@ -76,7 +80,9 @@ CREATE TABLE iceberg_test4(
)
PARTITION BY SPEC
(
- level IDENTITY
+ level IDENTITY,
+ level BUCKET 12345,
+ level TRUNCATE 15
)
STORED AS ICEBERG
LOCATION '/$DATABASE.iceberg_test_with_location'
@@ -101,6 +107,8 @@ STRING,STRING,STRING
SHOW PARTITIONS iceberg_test_external;
---- RESULTS
0,1,1000,'level','IDENTITY'
+0,1,1001,'level_bucket','BUCKET 12345'
+0,1,1002,'level_trunc','TRUNCATE 15'
---- TYPES
BIGINT,BIGINT,BIGINT,STRING,STRING
====
@@ -123,6 +131,8 @@ STRING,STRING,STRING
SHOW PARTITIONS iceberg_test_external_empty_column;
---- RESULTS
0,1,1000,'level','IDENTITY'
+0,1,1001,'level_bucket','BUCKET 12345'
+0,1,1002,'level_trunc','TRUNCATE 15'
---- TYPES
BIGINT,BIGINT,BIGINT,STRING,STRING
====
@@ -146,8 +156,10 @@ CREATE TABLE iceberg_test5(
PARTITION BY SPEC
(
level IDENTITY,
+ level TRUNCATE 10,
event_time IDENTITY,
event_time HOUR,
+ event_time BUCKET 1000,
register_time DAY
)
STORED AS ICEBERG
@@ -173,9 +185,11 @@ STRING,STRING,STRING
SHOW PARTITIONS iceberg_test5;
---- RESULTS
0,1,1000,'level','IDENTITY'
-0,2,1001,'event_time','IDENTITY'
-0,2,1002,'event_time_hour','HOUR'
-0,3,1003,'register_time_day','DAY'
+0,1,1001,'level_trunc','TRUNCATE 10'
+0,2,1002,'event_time','IDENTITY'
+0,2,1003,'event_time_hour','HOUR'
+0,2,1004,'event_time_bucket','BUCKET 1000'
+0,3,1005,'register_time_day','DAY'
---- TYPES
BIGINT,BIGINT,BIGINT,STRING,STRING
====
@@ -190,7 +204,9 @@ CREATE TABLE iceberg_test6(
)
PARTITION BY SPEC
(
- level IDENTITY
+ level IDENTITY,
+ level BUCKET 12345,
+ level TRUNCATE 10
)
STORED AS ICEBERG
TBLPROPERTIES('iceberg.catalog'='hadoop.catalog',
@@ -215,6 +231,8 @@ STRING,STRING,STRING
SHOW PARTITIONS iceberg_test6_external;
---- RESULTS
0,1,1000,'level','IDENTITY'
+0,1,1001,'level_bucket','BUCKET 12345'
+0,1,1002,'level_trunc','TRUNCATE 10'
---- TYPES
BIGINT,BIGINT,BIGINT,STRING,STRING
====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
index e46d5e8..5c649b6 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
@@ -630,12 +630,17 @@ CREATE TABLE iceberg_test1_partitioned (
level STRING,
p1 DATE,
p2 STRING,
- p3 TIMESTAMP
+ p3 TIMESTAMP,
+ p4 INT
)
PARTITION BY SPEC (
p1 YEAR,
p2 IDENTITY,
- p3 HOUR
+ p2 BUCKET 500,
+ p2 TRUNCATE 15,
+ p3 HOUR,
+ p4 BUCKET 10,
+ p4 TRUNCATE 5
)
STORED AS ICEBERG
TBLPROPERTIES('iceberg.file_format'='parquet',
@@ -645,13 +650,18 @@ CREATE TABLE show_create_table_test_db.iceberg_test1_partitioned (
level STRING,
p1 DATE,
p2 STRING,
- p3 TIMESTAMP
+ p3 TIMESTAMP,
+ p4 INT
)
PARTITION BY SPEC
(
p1 YEAR,
p2 IDENTITY,
- p3 HOUR
+ p2 BUCKET 500,
+ p2 TRUNCATE 15,
+ p3 HOUR,
+ p4 BUCKET 10,
+ p4 TRUNCATE 5
)
STORED AS ICEBERG
TBLPROPERTIES('iceberg.file_format'='parquet',
@@ -661,13 +671,18 @@ CREATE EXTERNAL TABLE show_create_table_test_db.iceberg_test1_partitioned (
level STRING,
p1 DATE,
p2 STRING,
- p3 TIMESTAMP
+ p3 TIMESTAMP,
+ p4 INT
)
PARTITION BY SPEC
(
p1 YEAR,
p2 IDENTITY,
- p3 HOUR
+ p2 BUCKET 500,
+ p2 TRUNCATE 15,
+ p3 HOUR,
+ p4 BUCKET 10,
+ p4 TRUNCATE 5,
)
STORED AS ICEBERG
TBLPROPERTIES ('external.table.purge'='TRUE', 'iceberg.catalog'='hadoop.catalog',