You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/12/15 23:00:28 UTC
[02/50] [abbrv] incubator-impala git commit: IMPALA-4561: Replace
DISTRIBUTE BY with PARTITION BY in CREATE TABLE
IMPALA-4561: Replace DISTRIBUTE BY with PARTITION BY in CREATE TABLE
Change-Id: I0e07c41eabb4c8cb95754cf04293cbd9e03d6ab2
Reviewed-on: http://gerrit.cloudera.org:8080/5317
Reviewed-by: Dimitris Tsirogiannis <dt...@cloudera.com>
Tested-by: Internal Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/cba93f1a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/cba93f1a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/cba93f1a
Branch: refs/heads/hadoop-next
Commit: cba93f1ac3d4c0219c6266924493ad19c8c10556
Parents: f837754
Author: Dimitris Tsirogiannis <dt...@cloudera.com>
Authored: Thu Dec 1 20:43:43 2016 -0800
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Tue Dec 6 10:41:53 2016 +0000
----------------------------------------------------------------------
common/thrift/CatalogObjects.thrift | 20 +-
common/thrift/JniCatalog.thrift | 4 +-
fe/src/main/cup/sql-parser.cup | 56 ++---
.../AlterTableAddDropRangePartitionStmt.java | 4 +-
.../analysis/CreateTableAsSelectStmt.java | 3 +-
.../apache/impala/analysis/CreateTableStmt.java | 38 ++--
.../apache/impala/analysis/DistributeParam.java | 211 -------------------
.../impala/analysis/KuduPartitionParam.java | 211 +++++++++++++++++++
.../apache/impala/analysis/RangePartition.java | 18 +-
.../apache/impala/analysis/TableDataLayout.java | 21 +-
.../org/apache/impala/analysis/TableDef.java | 6 +-
.../org/apache/impala/analysis/ToSqlUtils.java | 14 +-
.../org/apache/impala/catalog/KuduTable.java | 81 +++----
.../impala/service/KuduCatalogOpExecutor.java | 36 ++--
.../apache/impala/analysis/AnalyzeDDLTest.java | 164 +++++++-------
.../impala/analysis/AuthorizationTest.java | 2 +-
.../org/apache/impala/analysis/ParserTest.java | 58 ++---
testdata/bin/generate-schema-statements.py | 2 +-
.../functional/functional_schema_template.sql | 28 +--
testdata/datasets/tpcds/tpcds_kudu_template.sql | 48 ++---
testdata/datasets/tpch/tpch_kudu_template.sql | 16 +-
testdata/datasets/tpch/tpch_schema_template.sql | 16 +-
.../queries/PlannerTest/lineage.test | 4 +-
.../queries/QueryTest/kudu-scan-node.test | 6 +-
.../QueryTest/kudu-timeouts-catalogd.test | 2 +-
.../queries/QueryTest/kudu_alter.test | 6 +-
.../queries/QueryTest/kudu_create.test | 24 +--
.../queries/QueryTest/kudu_delete.test | 6 +-
.../queries/QueryTest/kudu_describe.test | 2 +-
.../queries/QueryTest/kudu_insert.test | 10 +-
.../queries/QueryTest/kudu_partition_ddl.test | 28 +--
.../queries/QueryTest/kudu_stats.test | 2 +-
.../queries/QueryTest/kudu_update.test | 2 +-
.../queries/QueryTest/kudu_upsert.test | 4 +-
tests/query_test/test_cancellation.py | 2 +-
tests/query_test/test_kudu.py | 38 ++--
tests/shell/test_shell_commandline.py | 2 +-
37 files changed, 598 insertions(+), 597 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/common/thrift/CatalogObjects.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift
index 10cb777..de89b51 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -357,8 +357,8 @@ struct TDataSourceTable {
2: required string init_string
}
-// Parameters needed for hash distribution
-struct TDistributeByHashParam {
+// Parameters needed for hash partitioning
+struct TKuduPartitionByHashParam {
1: required list<string> columns
2: required i32 num_buckets
}
@@ -370,16 +370,16 @@ struct TRangePartition {
4: optional bool is_upper_bound_inclusive
}
-// A range distribution is identified by a list of columns and a list of range partitions.
-struct TDistributeByRangeParam {
+// A range partitioning is identified by a list of columns and a list of range partitions.
+struct TKuduPartitionByRangeParam {
1: required list<string> columns
2: optional list<TRangePartition> range_partitions
}
-// Parameters for the DISTRIBUTE BY clause.
-struct TDistributeParam {
- 1: optional TDistributeByHashParam by_hash_param;
- 2: optional TDistributeByRangeParam by_range_param;
+// Parameters for the PARTITION BY clause.
+struct TKuduPartitionParam {
+ 1: optional TKuduPartitionByHashParam by_hash_param;
+ 2: optional TKuduPartitionByRangeParam by_range_param;
}
// Represents a Kudu table
@@ -392,8 +392,8 @@ struct TKuduTable {
// Name of the key columns
3: required list<string> key_columns
- // Distribution schemes
- 4: required list<TDistributeParam> distribute_by
+ // Partitioning
+ 4: required list<TKuduPartitionParam> partition_by
}
// Represents a table or view.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/common/thrift/JniCatalog.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift
index 8bb07e6..d224658 100644
--- a/common/thrift/JniCatalog.thrift
+++ b/common/thrift/JniCatalog.thrift
@@ -415,9 +415,9 @@ struct TCreateTableParams {
// If set, the table will be cached after creation with details specified in cache_op.
13: optional THdfsCachingOp cache_op
- // If set, the table is automatically distributed according to this parameter.
+ // If set, the table is automatically partitioned according to this parameter.
// Kudu-only.
- 14: optional list<CatalogObjects.TDistributeParam> distribute_by
+ 14: optional list<CatalogObjects.TKuduPartitionParam> partition_by
// Primary key column names (Kudu-only)
15: optional list<string> primary_key_column_names;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/main/cup/sql-parser.cup
----------------------------------------------------------------------
diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index a375c75..e09993b 100644
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -401,21 +401,21 @@ nonterminal CreateTableAsSelectStmt create_tbl_as_select_stmt;
nonterminal CreateTableLikeStmt create_tbl_like_stmt;
nonterminal CreateTableStmt create_tbl_stmt;
nonterminal TableDef tbl_def_without_col_defs, tbl_def_with_col_defs;
-nonterminal TableDataLayout opt_tbl_data_layout, distributed_data_layout;
+nonterminal TableDataLayout opt_tbl_data_layout, partitioned_data_layout;
nonterminal TableDef.Options tbl_options;
nonterminal CreateViewStmt create_view_stmt;
nonterminal CreateDataSrcStmt create_data_src_stmt;
nonterminal DropDataSrcStmt drop_data_src_stmt;
nonterminal ShowDataSrcsStmt show_data_srcs_stmt;
nonterminal StructField struct_field_def;
-nonterminal DistributeParam distribute_hash_param;
+nonterminal KuduPartitionParam hash_partition_param;
nonterminal List<RangePartition> range_params_list;
nonterminal RangePartition range_param;
nonterminal Pair<Expr, Boolean> opt_lower_range_val,
opt_upper_range_val;
-nonterminal ArrayList<DistributeParam> distribute_hash_param_list;
-nonterminal ArrayList<DistributeParam> distribute_param_list;
-nonterminal DistributeParam distribute_range_param;
+nonterminal ArrayList<KuduPartitionParam> hash_partition_param_list;
+nonterminal ArrayList<KuduPartitionParam> partition_param_list;
+nonterminal KuduPartitionParam range_partition_param;
nonterminal ColumnDef column_def, view_column_def;
nonterminal ArrayList<ColumnDef> column_def_list, partition_column_defs,
view_column_def_list, view_column_defs;
@@ -1013,12 +1013,12 @@ create_tbl_as_select_stmt ::=
// An optional clause cannot be used directly below because it would conflict with
// the first rule in "create_tbl_stmt".
primary_keys:primary_keys
- distributed_data_layout:distribute_params
+ partitioned_data_layout:partition_params
tbl_options:options
KW_AS query_stmt:select_stmt
{:
tbl_def.getPrimaryKeyColumnNames().addAll(primary_keys);
- tbl_def.getDistributeParams().addAll(distribute_params.getDistributeParams());
+ tbl_def.getKuduPartitionParams().addAll(partition_params.getKuduPartitionParams());
tbl_def.setOptions(options);
RESULT = new CreateTableAsSelectStmt(new CreateTableStmt(tbl_def), select_stmt, null);
:}
@@ -1057,7 +1057,7 @@ create_tbl_stmt ::=
tbl_options:options
{:
tbl_def.getPartitionColumnDefs().addAll(data_layout.getPartitionColumnDefs());
- tbl_def.getDistributeParams().addAll(data_layout.getDistributeParams());
+ tbl_def.getKuduPartitionParams().addAll(data_layout.getKuduPartitionParams());
tbl_def.setOptions(options);
RESULT = new CreateTableStmt(tbl_def);
:}
@@ -1082,7 +1082,7 @@ create_tbl_stmt ::=
tbl_options:options
{:
tbl_def.getPartitionColumnDefs().addAll(data_layout.getPartitionColumnDefs());
- tbl_def.getDistributeParams().addAll(data_layout.getDistributeParams());
+ tbl_def.getKuduPartitionParams().addAll(data_layout.getKuduPartitionParams());
tbl_def.setOptions(options);
RESULT = new CreateTableLikeFileStmt(new CreateTableStmt(tbl_def),
schema_file_format, new HdfsUri(schema_location));
@@ -1148,13 +1148,13 @@ tbl_options ::=
opt_tbl_data_layout ::=
partition_column_defs:partition_column_defs
{: RESULT = TableDataLayout.createPartitionedLayout(partition_column_defs); :}
- | distributed_data_layout:data_layout
+ | partitioned_data_layout:data_layout
{: RESULT = data_layout; :}
;
-distributed_data_layout ::=
- distribute_param_list:distribute_params
- {: RESULT = TableDataLayout.createDistributedLayout(distribute_params); :}
+partitioned_data_layout ::=
+ partition_param_list:partition_params
+ {: RESULT = TableDataLayout.createKuduPartitionedLayout(partition_params); :}
| /* empty */
{: RESULT = TableDataLayout.createEmptyLayout(); :}
;
@@ -1164,25 +1164,25 @@ partition_column_defs ::=
{: RESULT = col_defs; :}
;
-// The DISTRIBUTE clause contains any number of HASH() clauses followed by exactly zero
+// The PARTITION BY clause contains any number of HASH() clauses followed by exactly zero
// or one RANGE clauses
-distribute_param_list ::=
- KW_DISTRIBUTE KW_BY distribute_hash_param_list:list
+partition_param_list ::=
+ KW_PARTITION KW_BY hash_partition_param_list:list
{: RESULT = list; :}
- | KW_DISTRIBUTE KW_BY distribute_range_param:rng
+ | KW_PARTITION KW_BY range_partition_param:rng
{: RESULT = Lists.newArrayList(rng); :}
- | KW_DISTRIBUTE KW_BY distribute_hash_param_list:list COMMA distribute_range_param:rng
+ | KW_PARTITION KW_BY hash_partition_param_list:list COMMA range_partition_param:rng
{:
list.add(rng);
RESULT = list;
:}
;
-// A list of HASH distribution clauses used for flexible partitioning
-distribute_hash_param_list ::=
- distribute_hash_param:dc
+// A list of HASH partitioning clauses used for flexible partitioning
+hash_partition_param_list ::=
+ hash_partition_param:dc
{: RESULT = Lists.newArrayList(dc); :}
- | distribute_hash_param_list:list COMMA distribute_hash_param:d
+ | hash_partition_param_list:list COMMA hash_partition_param:d
{:
list.add(d);
RESULT = list;
@@ -1190,26 +1190,26 @@ distribute_hash_param_list ::=
;
// The column list for a HASH clause is optional.
-distribute_hash_param ::=
+hash_partition_param ::=
KW_HASH LPAREN ident_list:cols RPAREN KW_INTO
INTEGER_LITERAL:buckets KW_BUCKETS
- {: RESULT = DistributeParam.createHashParam(cols, buckets.intValue()); :}
+ {: RESULT = KuduPartitionParam.createHashParam(cols, buckets.intValue()); :}
| KW_HASH KW_INTO INTEGER_LITERAL:buckets KW_BUCKETS
{:
- RESULT = DistributeParam.createHashParam(Lists.<String>newArrayList(),
+ RESULT = KuduPartitionParam.createHashParam(Lists.<String>newArrayList(),
buckets.intValue());
:}
;
// The column list for a RANGE clause is optional.
-distribute_range_param ::=
+range_partition_param ::=
KW_RANGE LPAREN ident_list:cols RPAREN LPAREN range_params_list:ranges RPAREN
{:
- RESULT = DistributeParam.createRangeParam(cols, ranges);
+ RESULT = KuduPartitionParam.createRangeParam(cols, ranges);
:}
| KW_RANGE LPAREN range_params_list:ranges RPAREN
{:
- RESULT = DistributeParam.createRangeParam(Collections.<String>emptyList(), ranges);
+ RESULT = KuduPartitionParam.createRangeParam(Collections.<String>emptyList(), ranges);
:}
;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/main/java/org/apache/impala/analysis/AlterTableAddDropRangePartitionStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableAddDropRangePartitionStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableAddDropRangePartitionStmt.java
index b1618e0..aee07f7 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableAddDropRangePartitionStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableAddDropRangePartitionStmt.java
@@ -93,10 +93,10 @@ public class AlterTableAddDropRangePartitionStmt extends AlterTableStmt {
"partitions: RANGE %s", table.getFullName(), rangePartitionSpec_.toSql()));
}
KuduTable kuduTable = (KuduTable) table;
- List<String> colNames = kuduTable.getRangeDistributionColNames();
+ List<String> colNames = kuduTable.getRangePartitioningColNames();
if (colNames.isEmpty()) {
throw new AnalysisException(String.format("Cannot add/drop partition %s: " +
- "Kudu table %s doesn't have a range-based distribution.",
+ "Kudu table %s doesn't have a range-based partitioning.",
rangePartitionSpec_.toSql(), kuduTable.getName()));
}
List<ColumnDef> rangeColDefs = Lists.newArrayListWithCapacity(colNames.size());
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
index cd7b1c8..eb492c6 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
@@ -193,7 +193,8 @@ public class CreateTableAsSelectStmt extends StatementBase {
Table tmpTable = null;
if (KuduTable.isKuduTable(msTbl)) {
tmpTable = KuduTable.createCtasTarget(db, msTbl, createStmt_.getColumnDefs(),
- createStmt_.getTblPrimaryKeyColumnNames(), createStmt_.getDistributeParams());
+ createStmt_.getTblPrimaryKeyColumnNames(),
+ createStmt_.getKuduPartitionParams());
} else {
// TODO: Creating a tmp table using load() is confusing.
// Refactor it to use a 'createCtasTarget()' function similar to Kudu table.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
index f2db81b..1139005 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
@@ -88,8 +88,8 @@ public class CreateTableStmt extends StatementBase {
public List<ColumnDef> getPartitionColumnDefs() {
return tableDef_.getPartitionColumnDefs();
}
- public List<DistributeParam> getDistributeParams() {
- return tableDef_.getDistributeParams();
+ public List<KuduPartitionParam> getKuduPartitionParams() {
+ return tableDef_.getKuduPartitionParams();
}
public String getComment() { return tableDef_.getComment(); }
Map<String, String> getTblProperties() { return tableDef_.getTblProperties(); }
@@ -146,8 +146,8 @@ public class CreateTableStmt extends StatementBase {
params.setIf_not_exists(getIfNotExists());
params.setTable_properties(getTblProperties());
params.setSerde_properties(getSerdeProperties());
- for (DistributeParam d: getDistributeParams()) {
- params.addToDistribute_by(d.toThrift());
+ for (KuduPartitionParam d: getKuduPartitionParams()) {
+ params.addToPartition_by(d.toThrift());
}
for (ColumnDef pkColDef: getPrimaryKeyColumnDefs()) {
params.addToPrimary_key_column_names(pkColDef.getColName());
@@ -188,8 +188,8 @@ public class CreateTableStmt extends StatementBase {
getTblProperties().get(KuduTable.KEY_STORAGE_HANDLER))) {
throw new AnalysisException(KUDU_STORAGE_HANDLER_ERROR_MESSAGE);
}
- AnalysisUtils.throwIfNotEmpty(getDistributeParams(),
- "Only Kudu tables can use the DISTRIBUTE BY clause.");
+ AnalysisUtils.throwIfNotEmpty(getKuduPartitionParams(),
+ "Only Kudu tables can use the PARTITION BY clause.");
if (hasPrimaryKey()) {
throw new AnalysisException("Only Kudu tables can specify a PRIMARY KEY.");
}
@@ -263,8 +263,8 @@ public class CreateTableStmt extends StatementBase {
KuduTable.KEY_TABLET_REPLICAS));
AnalysisUtils.throwIfNotEmpty(getColumnDefs(),
"Columns cannot be specified with an external Kudu table.");
- AnalysisUtils.throwIfNotEmpty(getDistributeParams(),
- "DISTRIBUTE BY cannot be used with an external Kudu table.");
+ AnalysisUtils.throwIfNotEmpty(getKuduPartitionParams(),
+ "PARTITION BY cannot be used with an external Kudu table.");
}
/**
@@ -305,29 +305,29 @@ public class CreateTableStmt extends StatementBase {
}
}
- if (!getDistributeParams().isEmpty()) {
- analyzeDistributeParams(analyzer);
+ if (!getKuduPartitionParams().isEmpty()) {
+ analyzeKuduPartitionParams(analyzer);
} else {
- throw new AnalysisException("Table distribution must be specified for " +
+ throw new AnalysisException("Table partitioning must be specified for " +
"managed Kudu tables.");
}
}
/**
- * Analyzes the distribution schemes specified in the CREATE TABLE statement.
+ * Analyzes the partitioning schemes specified in the CREATE TABLE statement.
*/
- private void analyzeDistributeParams(Analyzer analyzer) throws AnalysisException {
+ private void analyzeKuduPartitionParams(Analyzer analyzer) throws AnalysisException {
Preconditions.checkState(getFileFormat() == THdfsFileFormat.KUDU);
Map<String, ColumnDef> pkColDefsByName =
ColumnDef.mapByColumnNames(getPrimaryKeyColumnDefs());
- for (DistributeParam distributeParam: getDistributeParams()) {
- // If no column names were specified in this distribution scheme, use all the
+ for (KuduPartitionParam partitionParam: getKuduPartitionParams()) {
+ // If no column names were specified in this partitioning scheme, use all the
// primary key columns.
- if (!distributeParam.hasColumnNames()) {
- distributeParam.setColumnNames(pkColDefsByName.keySet());
+ if (!partitionParam.hasColumnNames()) {
+ partitionParam.setColumnNames(pkColDefsByName.keySet());
}
- distributeParam.setPkColumnDefMap(pkColDefsByName);
- distributeParam.analyze(analyzer);
+ partitionParam.setPkColumnDefMap(pkColDefsByName);
+ partitionParam.analyze(analyzer);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/main/java/org/apache/impala/analysis/DistributeParam.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/DistributeParam.java b/fe/src/main/java/org/apache/impala/analysis/DistributeParam.java
deleted file mode 100644
index 0eb1329..0000000
--- a/fe/src/main/java/org/apache/impala/analysis/DistributeParam.java
+++ /dev/null
@@ -1,211 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.impala.analysis;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.impala.common.AnalysisException;
-import org.apache.impala.thrift.TDistributeByHashParam;
-import org.apache.impala.thrift.TDistributeByRangeParam;
-import org.apache.impala.thrift.TDistributeParam;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-/**
- * Represents the distribution of a Kudu table as defined in the DISTRIBUTE BY
- * clause of a CREATE TABLE statement. The distribution can be hash-based or
- * range-based or both. See RangePartition for details on the supported range partitions.
- *
- * Examples:
- * - Hash-based:
- * DISTRIBUTE BY HASH(id) INTO 10 BUCKETS
- * - Single column range-based:
- * DISTRIBUTE BY RANGE(age)
- * (
- * PARTITION VALUES < 10,
- * PARTITION 10 <= VALUES < 20,
- * PARTITION 20 <= VALUES < 30,
- * PARTITION VALUE = 100
- * )
- * - Combination of hash and range based:
- * DISTRIBUTE BY HASH (id) INTO 3 BUCKETS,
- * RANGE (age)
- * (
- * PARTITION 10 <= VALUES < 20,
- * PARTITION VALUE = 100
- * )
- * - Multi-column range based:
- * DISTRIBUTE BY RANGE (year, quarter)
- * (
- * PARTITION VALUE = (2001, 1),
- * PARTITION VALUE = (2001, 2),
- * PARTITION VALUE = (2002, 1)
- * )
- *
- */
-public class DistributeParam implements ParseNode {
-
- /**
- * Creates a hash-based DistributeParam.
- */
- public static DistributeParam createHashParam(List<String> cols, int buckets) {
- return new DistributeParam(Type.HASH, cols, buckets, null);
- }
-
- /**
- * Creates a range-based DistributeParam.
- */
- public static DistributeParam createRangeParam(List<String> cols,
- List<RangePartition> rangePartitions) {
- return new DistributeParam(Type.RANGE, cols, NO_BUCKETS, rangePartitions);
- }
-
- private static final int NO_BUCKETS = -1;
-
- /**
- * The distribution type.
- */
- public enum Type {
- HASH, RANGE
- }
-
- // Columns of this distribution. If no columns are specified, all
- // the primary key columns of the associated table are used.
- private final List<String> colNames_ = Lists.newArrayList();
-
- // Map of primary key column names to the associated column definitions. Must be set
- // before the call to analyze().
- private Map<String, ColumnDef> pkColumnDefByName_;
-
- // Distribution scheme type
- private final Type type_;
-
- // Only relevant for hash-based distribution, -1 otherwise
- private final int numBuckets_;
-
- // List of range partitions specified in a range-based distribution.
- private List<RangePartition> rangePartitions_;
-
- private DistributeParam(Type t, List<String> colNames, int buckets,
- List<RangePartition> partitions) {
- type_ = t;
- for (String name: colNames) colNames_.add(name.toLowerCase());
- rangePartitions_ = partitions;
- numBuckets_ = buckets;
- }
-
- @Override
- public void analyze(Analyzer analyzer) throws AnalysisException {
- Preconditions.checkState(!colNames_.isEmpty());
- Preconditions.checkNotNull(pkColumnDefByName_);
- Preconditions.checkState(!pkColumnDefByName_.isEmpty());
- // Validate that the columns specified in this distribution are primary key columns.
- for (String colName: colNames_) {
- if (!pkColumnDefByName_.containsKey(colName)) {
- throw new AnalysisException(String.format("Column '%s' in '%s' is not a key " +
- "column. Only key columns can be used in DISTRIBUTE BY.", colName, toSql()));
- }
- }
- if (type_ == Type.RANGE) analyzeRangeParam(analyzer);
- }
-
- /**
- * Analyzes a range-based distribution. This function does not check for overlapping
- * range partitions; these checks are performed by Kudu and an error is reported back
- * to the user.
- */
- public void analyzeRangeParam(Analyzer analyzer) throws AnalysisException {
- List<ColumnDef> pkColDefs = Lists.newArrayListWithCapacity(colNames_.size());
- for (String colName: colNames_) pkColDefs.add(pkColumnDefByName_.get(colName));
- for (RangePartition rangePartition: rangePartitions_) {
- rangePartition.analyze(analyzer, pkColDefs);
- }
- }
-
- @Override
- public String toSql() {
- StringBuilder builder = new StringBuilder(type_.toString());
- if (!colNames_.isEmpty()) {
- builder.append(" (");
- Joiner.on(", ").appendTo(builder, colNames_).append(")");
- }
- if (type_ == Type.HASH) {
- builder.append(" INTO ");
- Preconditions.checkState(numBuckets_ != NO_BUCKETS);
- builder.append(numBuckets_).append(" BUCKETS");
- } else {
- builder.append(" (");
- if (rangePartitions_ != null) {
- List<String> partsSql = Lists.newArrayList();
- for (RangePartition rangePartition: rangePartitions_) {
- partsSql.add(rangePartition.toSql());
- }
- builder.append(Joiner.on(", ").join(partsSql));
- } else {
- builder.append("...");
- }
- builder.append(")");
- }
- return builder.toString();
- }
-
- @Override
- public String toString() { return toSql(); }
-
- public TDistributeParam toThrift() {
- TDistributeParam result = new TDistributeParam();
- // TODO: Add a validate() function to ensure the validity of distribute params.
- if (type_ == Type.HASH) {
- TDistributeByHashParam hash = new TDistributeByHashParam();
- Preconditions.checkState(numBuckets_ != NO_BUCKETS);
- hash.setNum_buckets(numBuckets_);
- hash.setColumns(colNames_);
- result.setBy_hash_param(hash);
- } else {
- Preconditions.checkState(type_ == Type.RANGE);
- TDistributeByRangeParam rangeParam = new TDistributeByRangeParam();
- rangeParam.setColumns(colNames_);
- if (rangePartitions_ == null) {
- result.setBy_range_param(rangeParam);
- return result;
- }
- for (RangePartition rangePartition: rangePartitions_) {
- rangeParam.addToRange_partitions(rangePartition.toThrift());
- }
- result.setBy_range_param(rangeParam);
- }
- return result;
- }
-
- void setPkColumnDefMap(Map<String, ColumnDef> pkColumnDefByName) {
- pkColumnDefByName_ = pkColumnDefByName;
- }
-
- boolean hasColumnNames() { return !colNames_.isEmpty(); }
- public List<String> getColumnNames() { return ImmutableList.copyOf(colNames_); }
- void setColumnNames(Collection<String> colNames) {
- Preconditions.checkState(colNames_.isEmpty());
- colNames_.addAll(colNames);
- }
-
- public Type getType() { return type_; }
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/main/java/org/apache/impala/analysis/KuduPartitionParam.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/KuduPartitionParam.java b/fe/src/main/java/org/apache/impala/analysis/KuduPartitionParam.java
new file mode 100644
index 0000000..3f69fae
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/KuduPartitionParam.java
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.impala.common.AnalysisException;
+import org.apache.impala.thrift.TKuduPartitionByHashParam;
+import org.apache.impala.thrift.TKuduPartitionByRangeParam;
+import org.apache.impala.thrift.TKuduPartitionParam;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+/**
+ * Represents the partitioning of a Kudu table as defined in the PARTITION BY
+ * clause of a CREATE TABLE statement. The partitioning can be hash-based or
+ * range-based or both. See RangePartition for details on the supported range partitions.
+ *
+ * Examples:
+ * - Hash-based:
+ * PARTITION BY HASH(id) INTO 10 BUCKETS
+ * - Single column range-based:
+ * PARTITION BY RANGE(age)
+ * (
+ * PARTITION VALUES < 10,
+ * PARTITION 10 <= VALUES < 20,
+ * PARTITION 20 <= VALUES < 30,
+ * PARTITION VALUE = 100
+ * )
+ * - Combination of hash and range based:
+ * PARTITION BY HASH (id) INTO 3 BUCKETS,
+ * RANGE (age)
+ * (
+ * PARTITION 10 <= VALUES < 20,
+ * PARTITION VALUE = 100
+ * )
+ * - Multi-column range based:
+ * PARTITION BY RANGE (year, quarter)
+ * (
+ * PARTITION VALUE = (2001, 1),
+ * PARTITION VALUE = (2001, 2),
+ * PARTITION VALUE = (2002, 1)
+ * )
+ *
+ */
+public class KuduPartitionParam implements ParseNode {
+
+ /**
+ * Creates a hash-based KuduPartitionParam.
+ */
+ public static KuduPartitionParam createHashParam(List<String> cols, int buckets) {
+ return new KuduPartitionParam(Type.HASH, cols, buckets, null);
+ }
+
+ /**
+ * Creates a range-based KuduPartitionParam.
+ */
+ public static KuduPartitionParam createRangeParam(List<String> cols,
+ List<RangePartition> rangePartitions) {
+ return new KuduPartitionParam(Type.RANGE, cols, NO_BUCKETS, rangePartitions);
+ }
+
+ private static final int NO_BUCKETS = -1;
+
+ /**
+ * The partitioning type.
+ */
+ public enum Type {
+ HASH, RANGE
+ }
+
+ // Columns of this partitioning. If no columns are specified, all
+ // the primary key columns of the associated table are used.
+ private final List<String> colNames_ = Lists.newArrayList();
+
+ // Map of primary key column names to the associated column definitions. Must be set
+ // before the call to analyze().
+ private Map<String, ColumnDef> pkColumnDefByName_;
+
+ // partitioning scheme type
+ private final Type type_;
+
+ // Only relevant for hash-based partitioning, -1 otherwise
+ private final int numBuckets_;
+
+ // List of range partitions specified in a range-based partitioning.
+ private List<RangePartition> rangePartitions_;
+
+ private KuduPartitionParam(Type t, List<String> colNames, int buckets,
+ List<RangePartition> partitions) {
+ type_ = t;
+ for (String name: colNames) colNames_.add(name.toLowerCase());
+ rangePartitions_ = partitions;
+ numBuckets_ = buckets;
+ }
+
+ @Override
+ public void analyze(Analyzer analyzer) throws AnalysisException {
+ Preconditions.checkState(!colNames_.isEmpty());
+ Preconditions.checkNotNull(pkColumnDefByName_);
+ Preconditions.checkState(!pkColumnDefByName_.isEmpty());
+ // Validate that the columns specified in this partitioning are primary key columns.
+ for (String colName: colNames_) {
+ if (!pkColumnDefByName_.containsKey(colName)) {
+ throw new AnalysisException(String.format("Column '%s' in '%s' is not a key " +
+ "column. Only key columns can be used in PARTITION BY.", colName, toSql()));
+ }
+ }
+ if (type_ == Type.RANGE) analyzeRangeParam(analyzer);
+ }
+
+ /**
+ * Analyzes a range-based partitioning. This function does not check for overlapping
+ * range partitions; these checks are performed by Kudu and an error is reported back
+ * to the user.
+ */
+ public void analyzeRangeParam(Analyzer analyzer) throws AnalysisException {
+ List<ColumnDef> pkColDefs = Lists.newArrayListWithCapacity(colNames_.size());
+ for (String colName: colNames_) pkColDefs.add(pkColumnDefByName_.get(colName));
+ for (RangePartition rangePartition: rangePartitions_) {
+ rangePartition.analyze(analyzer, pkColDefs);
+ }
+ }
+
+ @Override
+ public String toSql() {
+ StringBuilder builder = new StringBuilder(type_.toString());
+ if (!colNames_.isEmpty()) {
+ builder.append(" (");
+ Joiner.on(", ").appendTo(builder, colNames_).append(")");
+ }
+ if (type_ == Type.HASH) {
+ builder.append(" INTO ");
+ Preconditions.checkState(numBuckets_ != NO_BUCKETS);
+ builder.append(numBuckets_).append(" BUCKETS");
+ } else {
+ builder.append(" (");
+ if (rangePartitions_ != null) {
+ List<String> partsSql = Lists.newArrayList();
+ for (RangePartition rangePartition: rangePartitions_) {
+ partsSql.add(rangePartition.toSql());
+ }
+ builder.append(Joiner.on(", ").join(partsSql));
+ } else {
+ builder.append("...");
+ }
+ builder.append(")");
+ }
+ return builder.toString();
+ }
+
+ @Override
+ public String toString() { return toSql(); }
+
+ public TKuduPartitionParam toThrift() {
+ TKuduPartitionParam result = new TKuduPartitionParam();
+ // TODO: Add a validate() function to ensure the validity of distribute params.
+ if (type_ == Type.HASH) {
+ TKuduPartitionByHashParam hash = new TKuduPartitionByHashParam();
+ Preconditions.checkState(numBuckets_ != NO_BUCKETS);
+ hash.setNum_buckets(numBuckets_);
+ hash.setColumns(colNames_);
+ result.setBy_hash_param(hash);
+ } else {
+ Preconditions.checkState(type_ == Type.RANGE);
+ TKuduPartitionByRangeParam rangeParam = new TKuduPartitionByRangeParam();
+ rangeParam.setColumns(colNames_);
+ if (rangePartitions_ == null) {
+ result.setBy_range_param(rangeParam);
+ return result;
+ }
+ for (RangePartition rangePartition: rangePartitions_) {
+ rangeParam.addToRange_partitions(rangePartition.toThrift());
+ }
+ result.setBy_range_param(rangeParam);
+ }
+ return result;
+ }
+
+ void setPkColumnDefMap(Map<String, ColumnDef> pkColumnDefByName) {
+ pkColumnDefByName_ = pkColumnDefByName;
+ }
+
+ boolean hasColumnNames() { return !colNames_.isEmpty(); }
+ public List<String> getColumnNames() { return ImmutableList.copyOf(colNames_); }
+ void setColumnNames(Collection<String> colNames) {
+ Preconditions.checkState(colNames_.isEmpty());
+ colNames_.addAll(colNames);
+ }
+
+ public Type getType() { return type_; }
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/main/java/org/apache/impala/analysis/RangePartition.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/RangePartition.java b/fe/src/main/java/org/apache/impala/analysis/RangePartition.java
index 3e41bc4..e2640a8 100644
--- a/fe/src/main/java/org/apache/impala/analysis/RangePartition.java
+++ b/fe/src/main/java/org/apache/impala/analysis/RangePartition.java
@@ -110,26 +110,26 @@ public class RangePartition implements ParseNode {
throw new IllegalStateException("Not implemented");
}
- public void analyze(Analyzer analyzer, List<ColumnDef> distributionColDefs)
+ public void analyze(Analyzer analyzer, List<ColumnDef> partColDefs)
throws AnalysisException {
- analyzeBoundaryValues(lowerBound_, distributionColDefs, analyzer);
+ analyzeBoundaryValues(lowerBound_, partColDefs, analyzer);
if (!isSingletonRange_) {
- analyzeBoundaryValues(upperBound_, distributionColDefs, analyzer);
+ analyzeBoundaryValues(upperBound_, partColDefs, analyzer);
}
}
private void analyzeBoundaryValues(List<Expr> boundaryValues,
- List<ColumnDef> distributionColDefs, Analyzer analyzer) throws AnalysisException {
+ List<ColumnDef> partColDefs, Analyzer analyzer) throws AnalysisException {
if (!boundaryValues.isEmpty()
- && boundaryValues.size() != distributionColDefs.size()) {
+ && boundaryValues.size() != partColDefs.size()) {
throw new AnalysisException(String.format("Number of specified range " +
- "partition values is different than the number of distribution " +
+ "partition values is different than the number of partitioning " +
"columns: (%d vs %d). Range partition: '%s'", boundaryValues.size(),
- distributionColDefs.size(), toSql()));
+ partColDefs.size(), toSql()));
}
for (int i = 0; i < boundaryValues.size(); ++i) {
LiteralExpr literal = analyzeBoundaryValue(boundaryValues.get(i),
- distributionColDefs.get(i), analyzer);
+ partColDefs.get(i), analyzer);
Preconditions.checkNotNull(literal);
boundaryValues.set(i, literal);
}
@@ -162,7 +162,7 @@ public class RangePartition implements ParseNode {
if (!org.apache.impala.catalog.Type.isImplicitlyCastable(literalType, colType,
true)) {
throw new AnalysisException(String.format("Range partition value %s " +
- "(type: %s) is not type compatible with distribution column '%s' (type: %s).",
+ "(type: %s) is not type compatible with partitioning column '%s' (type: %s).",
literal.toSql(), literalType, pkColumn.getColName(), colType.toSql()));
}
if (!literalType.equals(colType)) {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java b/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java
index 4d3ed80..aef5732 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java
@@ -22,35 +22,34 @@ import com.google.common.collect.Lists;
import java.util.List;
/**
- * Represents the PARTITION BY and DISTRIBUTED BY clauses of a DDL statement.
- * TODO: Reconsider this class when we add support for new range partitioning syntax (see
- * IMPALA-3724).
+ * Represents the PARTITION BY and PARTITIONED BY clauses of a DDL statement.
*/
class TableDataLayout {
private final List<ColumnDef> partitionColDefs_;
- private final List<DistributeParam> distributeParams_;
+ private final List<KuduPartitionParam> kuduPartitionParams_;
private TableDataLayout(List<ColumnDef> partitionColumnDefs,
- List<DistributeParam> distributeParams) {
+ List<KuduPartitionParam> partitionParams) {
partitionColDefs_ = partitionColumnDefs;
- distributeParams_ = distributeParams;
+ kuduPartitionParams_ = partitionParams;
}
static TableDataLayout createPartitionedLayout(List<ColumnDef> partitionColumnDefs) {
return new TableDataLayout(partitionColumnDefs,
- Lists.<DistributeParam>newArrayList());
+ Lists.<KuduPartitionParam>newArrayList());
}
- static TableDataLayout createDistributedLayout(List<DistributeParam> distributeParams) {
- return new TableDataLayout(Lists.<ColumnDef>newArrayList(), distributeParams);
+ static TableDataLayout createKuduPartitionedLayout(
+ List<KuduPartitionParam> partitionParams) {
+ return new TableDataLayout(Lists.<ColumnDef>newArrayList(), partitionParams);
}
static TableDataLayout createEmptyLayout() {
return new TableDataLayout(Lists.<ColumnDef>newArrayList(),
- Lists.<DistributeParam>newArrayList());
+ Lists.<KuduPartitionParam>newArrayList());
}
List<ColumnDef> getPartitionColumnDefs() { return partitionColDefs_; }
- List<DistributeParam> getDistributeParams() { return distributeParams_; }
+ List<KuduPartitionParam> getKuduPartitionParams() { return kuduPartitionParams_; }
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/main/java/org/apache/impala/analysis/TableDef.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableDef.java b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
index 1c16954..a40b4d2 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableDef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
@@ -74,7 +74,7 @@ class TableDef {
// If true, no errors are thrown if the table already exists.
private final boolean ifNotExists_;
- // Partitioned/distribute by parameters.
+ // Partitioning parameters.
private final TableDataLayout dataLayout_;
// True if analyze() has been called.
@@ -152,8 +152,8 @@ class TableDef {
List<ColumnDef> getPrimaryKeyColumnDefs() { return primaryKeyColDefs_; }
boolean isExternal() { return isExternal_; }
boolean getIfNotExists() { return ifNotExists_; }
- List<DistributeParam> getDistributeParams() {
- return dataLayout_.getDistributeParams();
+ List<KuduPartitionParam> getKuduPartitionParams() {
+ return dataLayout_.getKuduPartitionParams();
}
void setOptions(Options options) {
Preconditions.checkNotNull(options);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
index 4cd095c..be4ab6f 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
@@ -197,7 +197,7 @@ public class ToSqlUtils {
String storageHandlerClassName = table.getStorageHandlerClassName();
List<String> primaryKeySql = Lists.newArrayList();
- String kuduDistributeByParams = null;
+ String kuduPartitionByParams = null;
if (table instanceof KuduTable) {
KuduTable kuduTable = (KuduTable) table;
// Kudu tables don't use LOCATION syntax
@@ -219,10 +219,10 @@ public class ToSqlUtils {
primaryKeySql.addAll(kuduTable.getPrimaryKeyColumnNames());
List<String> paramsSql = Lists.newArrayList();
- for (DistributeParam param: kuduTable.getDistributeBy()) {
+ for (KuduPartitionParam param: kuduTable.getPartitionBy()) {
paramsSql.add(param.toSql());
}
- kuduDistributeByParams = Joiner.on(", ").join(paramsSql);
+ kuduPartitionByParams = Joiner.on(", ").join(paramsSql);
} else {
// We shouldn't output the columns for external tables
colsSql = null;
@@ -230,7 +230,7 @@ public class ToSqlUtils {
}
HdfsUri tableLocation = location == null ? null : new HdfsUri(location);
return getCreateTableSql(table.getDb().getName(), table.getName(), comment, colsSql,
- partitionColsSql, primaryKeySql, kuduDistributeByParams, properties,
+ partitionColsSql, primaryKeySql, kuduPartitionByParams, properties,
serdeParameters, isExternal, false, rowFormat, format, compression,
storageHandlerClassName, tableLocation);
}
@@ -242,7 +242,7 @@ public class ToSqlUtils {
*/
public static String getCreateTableSql(String dbName, String tableName,
String tableComment, List<String> columnsSql, List<String> partitionColumnsSql,
- List<String> primaryKeysSql, String kuduDistributeByParams,
+ List<String> primaryKeysSql, String kuduPartitionByParams,
Map<String, String> tblProperties, Map<String, String> serdeParameters,
boolean isExternal, boolean ifNotExists, RowFormat rowFormat,
HdfsFileFormat fileFormat, HdfsCompression compression, String storageHandlerClass,
@@ -271,8 +271,8 @@ public class ToSqlUtils {
Joiner.on(", \n ").join(partitionColumnsSql)));
}
- if (kuduDistributeByParams != null) {
- sb.append("DISTRIBUTE BY " + kuduDistributeByParams + "\n");
+ if (kuduPartitionByParams != null) {
+ sb.append("PARTITION BY " + kuduPartitionByParams + "\n");
}
if (rowFormat != null && !rowFormat.isDefault()) {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
index a7f72c3..9bbcbd5 100644
--- a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
@@ -29,15 +29,15 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.impala.analysis.ColumnDef;
-import org.apache.impala.analysis.DistributeParam;
+import org.apache.impala.analysis.KuduPartitionParam;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.service.CatalogOpExecutor;
import org.apache.impala.thrift.TCatalogObjectType;
import org.apache.impala.thrift.TColumn;
-import org.apache.impala.thrift.TDistributeByHashParam;
-import org.apache.impala.thrift.TDistributeByRangeParam;
-import org.apache.impala.thrift.TDistributeParam;
+import org.apache.impala.thrift.TKuduPartitionByHashParam;
+import org.apache.impala.thrift.TKuduPartitionByRangeParam;
+import org.apache.impala.thrift.TKuduPartitionParam;
import org.apache.impala.thrift.TKuduTable;
import org.apache.impala.thrift.TResultSet;
import org.apache.impala.thrift.TResultSetMetadata;
@@ -110,9 +110,9 @@ public class KuduTable extends Table {
// Primary key column names.
private final List<String> primaryKeyColumnNames_ = Lists.newArrayList();
- // Distribution schemes of this Kudu table. Both range and hash-based distributions are
+ // Partitioning schemes of this Kudu table. Both range and hash-based partitioning are
// supported.
- private final List<DistributeParam> distributeBy_ = Lists.newArrayList();
+ private final List<KuduPartitionParam> partitionBy_ = Lists.newArrayList();
// Schema of the underlying Kudu table.
private org.apache.kudu.Schema kuduSchema_;
@@ -148,36 +148,36 @@ public class KuduTable extends Table {
return ImmutableList.copyOf(primaryKeyColumnNames_);
}
- public List<DistributeParam> getDistributeBy() {
- return ImmutableList.copyOf(distributeBy_);
+ public List<KuduPartitionParam> getPartitionBy() {
+ return ImmutableList.copyOf(partitionBy_);
}
/**
- * Returns the range-based distribution of this table if it exists, null otherwise.
+ * Returns the range-based partitioning of this table if it exists, null otherwise.
*/
- private DistributeParam getRangeDistribution() {
- for (DistributeParam distributeParam: distributeBy_) {
- if (distributeParam.getType() == DistributeParam.Type.RANGE) {
- return distributeParam;
+ private KuduPartitionParam getRangePartitioning() {
+ for (KuduPartitionParam partitionParam: partitionBy_) {
+ if (partitionParam.getType() == KuduPartitionParam.Type.RANGE) {
+ return partitionParam;
}
}
return null;
}
/**
- * Returns the column names of the table's range-based distribution or an empty
- * list if the table doesn't have a range-based distribution.
+ * Returns the column names of the table's range-based partitioning or an empty
+ * list if the table doesn't have a range-based partitioning.
*/
- public List<String> getRangeDistributionColNames() {
- DistributeParam rangeDistribution = getRangeDistribution();
- if (rangeDistribution == null) return Collections.<String>emptyList();
- return rangeDistribution.getColumnNames();
+ public List<String> getRangePartitioningColNames() {
+ KuduPartitionParam rangePartitioning = getRangePartitioning();
+ if (rangePartitioning == null) return Collections.<String>emptyList();
+ return rangePartitioning.getColumnNames();
}
/**
* Loads the metadata of a Kudu table.
*
- * Schema and distribution schemes are loaded directly from Kudu whereas column stats
+ * Schema and partitioning schemes are loaded directly from Kudu whereas column stats
* are loaded from HMS. The function also updates the table schema in HMS in order to
* propagate alterations made to the Kudu table to HMS.
*/
@@ -209,7 +209,7 @@ public class KuduTable extends Table {
// Load metadata from Kudu and HMS
try {
loadSchema(kuduTable);
- loadDistributeByParams(kuduTable);
+ loadPartitionByParams(kuduTable);
loadAllColumnStats(msClient);
} catch (ImpalaRuntimeException e) {
throw new TableLoadingException("Error loading metadata for Kudu table " +
@@ -255,19 +255,19 @@ public class KuduTable extends Table {
}
}
- private void loadDistributeByParams(org.apache.kudu.client.KuduTable kuduTable) {
+ private void loadPartitionByParams(org.apache.kudu.client.KuduTable kuduTable) {
Preconditions.checkNotNull(kuduTable);
Schema tableSchema = kuduTable.getSchema();
PartitionSchema partitionSchema = kuduTable.getPartitionSchema();
Preconditions.checkState(!colsByPos_.isEmpty());
- distributeBy_.clear();
+ partitionBy_.clear();
for (HashBucketSchema hashBucketSchema: partitionSchema.getHashBucketSchemas()) {
List<String> columnNames = Lists.newArrayList();
for (int colId: hashBucketSchema.getColumnIds()) {
columnNames.add(getColumnNameById(tableSchema, colId));
}
- distributeBy_.add(
- DistributeParam.createHashParam(columnNames, hashBucketSchema.getNumBuckets()));
+ partitionBy_.add(KuduPartitionParam.createHashParam(columnNames,
+ hashBucketSchema.getNumBuckets()));
}
RangeSchema rangeSchema = partitionSchema.getRangeSchema();
List<Integer> columnIds = rangeSchema.getColumns();
@@ -277,7 +277,7 @@ public class KuduTable extends Table {
// We don't populate the split values because Kudu's API doesn't currently support
// retrieving the split values for range partitions.
// TODO: File a Kudu JIRA.
- distributeBy_.add(DistributeParam.createRangeParam(columnNames, null));
+ partitionBy_.add(KuduPartitionParam.createRangeParam(columnNames, null));
}
/**
@@ -297,14 +297,14 @@ public class KuduTable extends Table {
*/
public static KuduTable createCtasTarget(Db db,
org.apache.hadoop.hive.metastore.api.Table msTbl, List<ColumnDef> columnDefs,
- List<String> primaryKeyColumnNames, List<DistributeParam> distributeParams) {
+ List<String> primaryKeyColumnNames, List<KuduPartitionParam> partitionParams) {
KuduTable tmpTable = new KuduTable(msTbl, db, msTbl.getTableName(), msTbl.getOwner());
int pos = 0;
for (ColumnDef colDef: columnDefs) {
tmpTable.addColumn(new Column(colDef.getColName(), colDef.getType(), pos++));
}
tmpTable.primaryKeyColumnNames_.addAll(primaryKeyColumnNames);
- tmpTable.distributeBy_.addAll(distributeParams);
+ tmpTable.partitionBy_.addAll(partitionParams);
return tmpTable;
}
@@ -324,27 +324,28 @@ public class KuduTable extends Table {
kuduMasters_ = Joiner.on(',').join(tkudu.getMaster_addresses());
primaryKeyColumnNames_.clear();
primaryKeyColumnNames_.addAll(tkudu.getKey_columns());
- loadDistributeByParamsFromThrift(tkudu.getDistribute_by());
+ loadPartitionByParamsFromThrift(tkudu.getPartition_by());
}
- private void loadDistributeByParamsFromThrift(List<TDistributeParam> params) {
- distributeBy_.clear();
- for (TDistributeParam param: params) {
+ private void loadPartitionByParamsFromThrift(List<TKuduPartitionParam> params) {
+ partitionBy_.clear();
+ for (TKuduPartitionParam param: params) {
if (param.isSetBy_hash_param()) {
- TDistributeByHashParam hashParam = param.getBy_hash_param();
- distributeBy_.add(DistributeParam.createHashParam(
+ TKuduPartitionByHashParam hashParam = param.getBy_hash_param();
+ partitionBy_.add(KuduPartitionParam.createHashParam(
hashParam.getColumns(), hashParam.getNum_buckets()));
} else {
Preconditions.checkState(param.isSetBy_range_param());
- TDistributeByRangeParam rangeParam = param.getBy_range_param();
- distributeBy_.add(DistributeParam.createRangeParam(rangeParam.getColumns(),
+ TKuduPartitionByRangeParam rangeParam = param.getBy_range_param();
+ partitionBy_.add(KuduPartitionParam.createRangeParam(rangeParam.getColumns(),
null));
}
}
}
@Override
- public TTableDescriptor toThriftDescriptor(int tableId, Set<Long> referencedPartitions) {
+ public TTableDescriptor toThriftDescriptor(int tableId,
+ Set<Long> referencedPartitions) {
TTableDescriptor desc = new TTableDescriptor(tableId, TTableType.KUDU_TABLE,
getTColumnDescriptors(), numClusteringCols_, kuduTableName_, db_.getName());
desc.setKuduTable(getTKuduTable());
@@ -356,9 +357,9 @@ public class KuduTable extends Table {
tbl.setKey_columns(Preconditions.checkNotNull(primaryKeyColumnNames_));
tbl.setMaster_addresses(Lists.newArrayList(kuduMasters_.split(",")));
tbl.setTable_name(kuduTableName_);
- Preconditions.checkNotNull(distributeBy_);
- for (DistributeParam distributeParam: distributeBy_) {
- tbl.addToDistribute_by(distributeParam.toThrift());
+ Preconditions.checkNotNull(partitionBy_);
+ for (KuduPartitionParam partitionParam: partitionBy_) {
+ tbl.addToPartition_by(partitionParam.toThrift());
}
return tbl;
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
index 0f8f8fd..b0616c4 100644
--- a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
@@ -34,7 +34,7 @@ import org.apache.impala.common.Pair;
import org.apache.impala.thrift.TAlterTableAddDropRangePartitionParams;
import org.apache.impala.thrift.TColumn;
import org.apache.impala.thrift.TCreateTableParams;
-import org.apache.impala.thrift.TDistributeParam;
+import org.apache.impala.thrift.TKuduPartitionParam;
import org.apache.impala.thrift.TRangePartition;
import org.apache.impala.thrift.TRangePartitionOperationType;
import org.apache.impala.util.KuduUtil;
@@ -138,22 +138,22 @@ public class KuduCatalogOpExecutor {
org.apache.hadoop.hive.metastore.api.Table msTbl,
TCreateTableParams params, Schema schema) throws ImpalaRuntimeException {
CreateTableOptions tableOpts = new CreateTableOptions();
- // Set the distribution schemes
- List<TDistributeParam> distributeParams = params.getDistribute_by();
- if (distributeParams != null) {
+ // Set the partitioning schemes
+ List<TKuduPartitionParam> partitionParams = params.getPartition_by();
+ if (partitionParams != null) {
boolean hasRangePartitioning = false;
- for (TDistributeParam distParam: distributeParams) {
- if (distParam.isSetBy_hash_param()) {
- Preconditions.checkState(!distParam.isSetBy_range_param());
- tableOpts.addHashPartitions(distParam.getBy_hash_param().getColumns(),
- distParam.getBy_hash_param().getNum_buckets());
+ for (TKuduPartitionParam partParam: partitionParams) {
+ if (partParam.isSetBy_hash_param()) {
+ Preconditions.checkState(!partParam.isSetBy_range_param());
+ tableOpts.addHashPartitions(partParam.getBy_hash_param().getColumns(),
+ partParam.getBy_hash_param().getNum_buckets());
} else {
- Preconditions.checkState(distParam.isSetBy_range_param());
+ Preconditions.checkState(partParam.isSetBy_range_param());
hasRangePartitioning = true;
- List<String> rangePartitionColumns = distParam.getBy_range_param().getColumns();
+ List<String> rangePartitionColumns = partParam.getBy_range_param().getColumns();
tableOpts.setRangePartitionColumns(rangePartitionColumns);
for (TRangePartition rangePartition:
- distParam.getBy_range_param().getRange_partitions()) {
+ partParam.getBy_range_param().getRange_partitions()) {
List<Pair<PartialRow, RangePartitionBound>> rangeBounds =
getRangePartitionBounds(rangePartition, schema, rangePartitionColumns);
Preconditions.checkState(rangeBounds.size() == 2);
@@ -164,7 +164,7 @@ public class KuduCatalogOpExecutor {
}
}
}
- // If no range-based distribution is specified in a CREATE TABLE statement, Kudu
+ // If no range-based partitioning is specified in a CREATE TABLE statement, Kudu
// generates one by default that includes all the primary key columns. To prevent
// this from happening, explicitly set the range partition columns to be
// an empty list.
@@ -333,7 +333,7 @@ public class KuduCatalogOpExecutor {
private static List<Pair<PartialRow, RangePartitionBound>> getRangePartitionBounds(
TRangePartition rangePartition, KuduTable tbl) throws ImpalaRuntimeException {
return getRangePartitionBounds(rangePartition, tbl.getKuduSchema(),
- tbl.getRangeDistributionColNames());
+ tbl.getRangePartitioningColNames());
}
/**
@@ -342,20 +342,20 @@ public class KuduCatalogOpExecutor {
*/
private static List<Pair<PartialRow, RangePartitionBound>> getRangePartitionBounds(
TRangePartition rangePartition, Schema schema,
- List<String> rangeDistributionColNames) throws ImpalaRuntimeException {
+ List<String> rangePartitioningColNames) throws ImpalaRuntimeException {
Preconditions.checkNotNull(schema);
- Preconditions.checkState(!rangeDistributionColNames.isEmpty());
+ Preconditions.checkState(!rangePartitioningColNames.isEmpty());
Preconditions.checkState(rangePartition.isSetLower_bound_values()
|| rangePartition.isSetUpper_bound_values());
List<Pair<PartialRow, RangePartitionBound>> rangeBounds =
Lists.newArrayListWithCapacity(2);
Pair<PartialRow, RangePartitionBound> lowerBound =
- KuduUtil.buildRangePartitionBound(schema, rangeDistributionColNames,
+ KuduUtil.buildRangePartitionBound(schema, rangePartitioningColNames,
rangePartition.getLower_bound_values(),
rangePartition.isIs_lower_bound_inclusive());
rangeBounds.add(lowerBound);
Pair<PartialRow, RangePartitionBound> upperBound =
- KuduUtil.buildRangePartitionBound(schema, rangeDistributionColNames,
+ KuduUtil.buildRangePartitionBound(schema, rangePartitioningColNames,
rangePartition.getUpper_bound_values(),
rangePartition.isIs_upper_bound_inclusive());
rangeBounds.add(upperBound);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
index 4aeb96d..d806a2f 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -1440,16 +1440,16 @@ public class AnalyzeDDLTest extends FrontendTestBase {
"Partition column name mismatch: tinyint_col != int_col");
// CTAS into managed Kudu tables
- AnalyzesOk("create table t primary key (id) distribute by hash (id) into 3 buckets" +
+ AnalyzesOk("create table t primary key (id) partition by hash (id) into 3 buckets" +
" stored as kudu as select id, bool_col, tinyint_col, smallint_col, int_col, " +
"bigint_col, float_col, double_col, date_string_col, string_col " +
"from functional.alltypestiny");
- AnalyzesOk("create table t primary key (id) distribute by range (id) " +
+ AnalyzesOk("create table t primary key (id) partition by range (id) " +
"(partition values < 10, partition 20 <= values < 30, partition value = 50) " +
"stored as kudu as select id, bool_col, tinyint_col, smallint_col, int_col, " +
"bigint_col, float_col, double_col, date_string_col, string_col " +
"from functional.alltypestiny");
- AnalyzesOk("create table t primary key (id) distribute by hash (id) into 3 buckets, "+
+ AnalyzesOk("create table t primary key (id) partition by hash (id) into 3 buckets, "+
"range (id) (partition values < 10, partition 10 <= values < 20, " +
"partition value = 30) stored as kudu as select id, bool_col, tinyint_col, " +
"smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, " +
@@ -1461,27 +1461,27 @@ public class AnalyzeDDLTest extends FrontendTestBase {
"external Kudu tables.");
// CTAS into Kudu tables with unsupported types
- AnalysisError("create table t primary key (id) distribute by hash into 3 buckets" +
+ AnalysisError("create table t primary key (id) partition by hash into 3 buckets" +
" stored as kudu as select id, timestamp_col from functional.alltypestiny",
"Cannot create table 't': Type TIMESTAMP is not supported in Kudu");
- AnalysisError("create table t primary key (cs) distribute by hash into 3 buckets" +
+ AnalysisError("create table t primary key (cs) partition by hash into 3 buckets" +
" stored as kudu as select cs from functional.chars_tiny",
"Cannot create table 't': Type CHAR(5) is not supported in Kudu");
- AnalysisError("create table t primary key (vc) distribute by hash into 3 buckets" +
+ AnalysisError("create table t primary key (vc) partition by hash into 3 buckets" +
" stored as kudu as select vc from functional.chars_tiny",
"Cannot create table 't': Type VARCHAR(32) is not supported in Kudu");
- AnalysisError("create table t primary key (id) distribute by hash into 3 buckets" +
+ AnalysisError("create table t primary key (id) partition by hash into 3 buckets" +
" stored as kudu as select c1 as id from functional.decimal_tiny",
"Cannot create table 't': Type DECIMAL(10,4) is not supported in Kudu");
- AnalysisError("create table t primary key (id) distribute by hash into 3 buckets" +
+ AnalysisError("create table t primary key (id) partition by hash into 3 buckets" +
" stored as kudu as select id, s from functional.complextypes_fileformat",
"Expr 's' in select list returns a complex type 'STRUCT<f1:STRING,f2:INT>'.\n" +
"Only scalar types are allowed in the select list.");
- AnalysisError("create table t primary key (id) distribute by hash into 3 buckets" +
+ AnalysisError("create table t primary key (id) partition by hash into 3 buckets" +
" stored as kudu as select id, m from functional.complextypes_fileformat",
"Expr 'm' in select list returns a complex type 'MAP<STRING,BIGINT>'.\n" +
"Only scalar types are allowed in the select list.");
- AnalysisError("create table t primary key (id) distribute by hash into 3 buckets" +
+ AnalysisError("create table t primary key (id) partition by hash into 3 buckets" +
" stored as kudu as select id, a from functional.complextypes_fileformat",
"Expr 'a' in select list returns a complex type 'ARRAY<INT>'.\n" +
"Only scalar types are allowed in the select list.");
@@ -1912,85 +1912,85 @@ public class AnalyzeDDLTest extends FrontendTestBase {
@Test
public void TestCreateManagedKuduTable() {
TestUtils.assumeKuduIsSupported();
- // Test primary keys and distribute by clauses
- AnalyzesOk("create table tab (x int primary key) distribute by hash(x) " +
+ // Test primary keys and partition by clauses
+ AnalyzesOk("create table tab (x int primary key) partition by hash(x) " +
"into 8 buckets stored as kudu");
- AnalyzesOk("create table tab (x int, primary key(x)) distribute by hash(x) " +
+ AnalyzesOk("create table tab (x int, primary key(x)) partition by hash(x) " +
"into 8 buckets stored as kudu");
AnalyzesOk("create table tab (x int, y int, primary key (x, y)) " +
- "distribute by hash(x, y) into 8 buckets stored as kudu");
+ "partition by hash(x, y) into 8 buckets stored as kudu");
AnalyzesOk("create table tab (x int, y int, primary key (x)) " +
- "distribute by hash(x) into 8 buckets stored as kudu");
+ "partition by hash(x) into 8 buckets stored as kudu");
AnalyzesOk("create table tab (x int, y int, primary key(x, y)) " +
- "distribute by hash(y) into 8 buckets stored as kudu");
- AnalyzesOk("create table tab (x int, y string, primary key (x)) distribute by " +
+ "partition by hash(y) into 8 buckets stored as kudu");
+ AnalyzesOk("create table tab (x int, y string, primary key (x)) partition by " +
"hash (x) into 3 buckets, range (x) (partition values < 1, partition " +
"1 <= values < 10, partition 10 <= values < 20, partition value = 30) " +
"stored as kudu");
- AnalyzesOk("create table tab (x int, y int, primary key (x, y)) distribute by " +
+ AnalyzesOk("create table tab (x int, y int, primary key (x, y)) partition by " +
"range (x, y) (partition value = (2001, 1), partition value = (2002, 1), " +
"partition value = (2003, 2)) stored as kudu");
// Non-literal boundary values in range partitions
- AnalyzesOk("create table tab (x int, y int, primary key (x)) distribute by " +
+ AnalyzesOk("create table tab (x int, y int, primary key (x)) partition by " +
"range (x) (partition values < 1 + 1, partition (1+3) + 2 < values < 10, " +
"partition factorial(4) < values < factorial(5), " +
"partition value = factorial(6)) stored as kudu");
- AnalyzesOk("create table tab (x int, y int, primary key(x, y)) distribute by " +
+ AnalyzesOk("create table tab (x int, y int, primary key(x, y)) partition by " +
"range(x, y) (partition value = (1+1, 2+2), partition value = ((1+1+1)+1, 10), " +
"partition value = (cast (30 as int), factorial(5))) stored as kudu");
- AnalysisError("create table tab (x int primary key) distribute by range (x) " +
+ AnalysisError("create table tab (x int primary key) partition by range (x) " +
"(partition values < x + 1) stored as kudu", "Only constant values are allowed " +
"for range-partition bounds: x + 1");
- AnalysisError("create table tab (x int primary key) distribute by range (x) " +
+ AnalysisError("create table tab (x int primary key) partition by range (x) " +
"(partition values <= isnull(null, null)) stored as kudu", "Range partition " +
"values cannot be NULL. Range partition: 'PARTITION VALUES <= " +
"isnull(NULL, NULL)'");
- AnalysisError("create table tab (x int primary key) distribute by range (x) " +
+ AnalysisError("create table tab (x int primary key) partition by range (x) " +
"(partition values <= (select count(*) from functional.alltypestiny)) " +
"stored as kudu", "Only constant values are allowed for range-partition " +
"bounds: (SELECT count(*) FROM functional.alltypestiny)");
// Multilevel partitioning. Data is split into 3 buckets based on 'x' and each
// bucket is partitioned into 4 tablets based on the range partitions of 'y'.
AnalyzesOk("create table tab (x int, y string, primary key(x, y)) " +
- "distribute by hash(x) into 3 buckets, range(y) " +
+ "partition by hash(x) into 3 buckets, range(y) " +
"(partition values < 'aa', partition 'aa' <= values < 'bb', " +
"partition 'bb' <= values < 'cc', partition 'cc' <= values) " +
"stored as kudu");
// Key column in upper case
AnalyzesOk("create table tab (x int, y int, primary key (X)) " +
- "distribute by hash (x) into 8 buckets stored as kudu");
+ "partition by hash (x) into 8 buckets stored as kudu");
// Flexible Partitioning
AnalyzesOk("create table tab (a int, b int, c int, d int, primary key (a, b, c))" +
- "distribute by hash (a, b) into 8 buckets, hash(c) into 2 buckets stored as " +
+ "partition by hash (a, b) into 8 buckets, hash(c) into 2 buckets stored as " +
"kudu");
- // No columns specified in the DISTRIBUTE BY HASH clause
+ // No columns specified in the PARTITION BY HASH clause
AnalyzesOk("create table tab (a int primary key, b int, c int, d int) " +
- "distribute by hash into 8 buckets stored as kudu");
+ "partition by hash into 8 buckets stored as kudu");
// Distribute range data types are picked up during analysis and forwarded to Kudu.
// Column names in distribute params should also be case-insensitive.
AnalyzesOk("create table tab (a int, b int, c int, d int, primary key(a, b, c, d))" +
- "distribute by hash (a, B, c) into 8 buckets, " +
+ "partition by hash (a, B, c) into 8 buckets, " +
"range (A) (partition values < 1, partition 1 <= values < 2, " +
"partition 2 <= values < 3, partition 3 <= values < 4, partition 4 <= values) " +
"stored as kudu");
- // Allowing range distribution on a subset of the primary keys
+ // Allowing range partitioning on a subset of the primary keys
AnalyzesOk("create table tab (id int, name string, valf float, vali bigint, " +
- "primary key (id, name)) distribute by range (name) " +
+ "primary key (id, name)) partition by range (name) " +
"(partition 'aa' < values <= 'bb') stored as kudu");
// Null values in range partition values
AnalysisError("create table tab (id int, name string, primary key(id, name)) " +
- "distribute by hash (id) into 3 buckets, range (name) " +
+ "partition by hash (id) into 3 buckets, range (name) " +
"(partition value = null, partition value = 1) stored as kudu",
"Range partition values cannot be NULL. Range partition: 'PARTITION " +
"VALUE = NULL'");
// Primary key specified in tblproperties
- AnalysisError(String.format("create table tab (x int) distribute by hash (x) " +
+ AnalysisError(String.format("create table tab (x int) partition by hash (x) " +
"into 8 buckets stored as kudu tblproperties ('%s' = 'x')",
KuduTable.KEY_KEY_COLUMNS), "PRIMARY KEY must be used instead of the table " +
"property");
// Primary key column that doesn't exist
AnalysisError("create table tab (x int, y int, primary key (z)) " +
- "distribute by hash (x) into 8 buckets stored as kudu",
+ "partition by hash (x) into 8 buckets stored as kudu",
"PRIMARY KEY column 'z' does not exist in the table");
// Invalid composite primary key
AnalysisError("create table tab (x int primary key, primary key(x)) stored " +
@@ -2002,65 +2002,65 @@ public class AnalyzeDDLTest extends FrontendTestBase {
"be specified using the PRIMARY KEY (col1, col2, ...) syntax at the end " +
"of the column definition.");
// Specifying the same primary key column multiple times
- AnalysisError("create table tab (x int, primary key (x, x)) distribute by hash (x) " +
+ AnalysisError("create table tab (x int, primary key (x, x)) partition by hash (x) " +
"into 8 buckets stored as kudu",
"Column 'x' is listed multiple times as a PRIMARY KEY.");
// Number of range partition boundary values should be equal to the number of range
// columns.
AnalysisError("create table tab (a int, b int, c int, d int, primary key(a, b, c)) " +
- "distribute by range(a) (partition value = (1, 2), " +
+ "partition by range(a) (partition value = (1, 2), " +
"partition value = 3, partition value = 4) stored as kudu",
"Number of specified range partition values is different than the number of " +
- "distribution columns: (2 vs 1). Range partition: 'PARTITION VALUE = (1,2)'");
+ "partitioning columns: (2 vs 1). Range partition: 'PARTITION VALUE = (1,2)'");
// Key ranges must match the column types.
AnalysisError("create table tab (a int, b int, c int, d int, primary key(a, b, c)) " +
- "distribute by hash (a, b, c) into 8 buckets, range (a) " +
+ "partition by hash (a, b, c) into 8 buckets, range (a) " +
"(partition value = 1, partition value = 'abc', partition 3 <= values) " +
"stored as kudu", "Range partition value 'abc' (type: STRING) is not type " +
- "compatible with distribution column 'a' (type: INT).");
- AnalysisError("create table tab (a tinyint primary key) distribute by range (a) " +
+ "compatible with partitioning column 'a' (type: INT).");
+ AnalysisError("create table tab (a tinyint primary key) partition by range (a) " +
"(partition value = 128) stored as kudu", "Range partition value 128 " +
- "(type: SMALLINT) is not type compatible with distribution column 'a' " +
+ "(type: SMALLINT) is not type compatible with partitioning column 'a' " +
"(type: TINYINT)");
- AnalysisError("create table tab (a smallint primary key) distribute by range (a) " +
+ AnalysisError("create table tab (a smallint primary key) partition by range (a) " +
"(partition value = 32768) stored as kudu", "Range partition value 32768 " +
- "(type: INT) is not type compatible with distribution column 'a' " +
+ "(type: INT) is not type compatible with partitioning column 'a' " +
"(type: SMALLINT)");
- AnalysisError("create table tab (a int primary key) distribute by range (a) " +
+ AnalysisError("create table tab (a int primary key) partition by range (a) " +
"(partition value = 2147483648) stored as kudu", "Range partition value " +
- "2147483648 (type: BIGINT) is not type compatible with distribution column 'a' " +
+ "2147483648 (type: BIGINT) is not type compatible with partitioning column 'a' " +
"(type: INT)");
- AnalysisError("create table tab (a bigint primary key) distribute by range (a) " +
+ AnalysisError("create table tab (a bigint primary key) partition by range (a) " +
"(partition value = 9223372036854775808) stored as kudu", "Range partition " +
"value 9223372036854775808 (type: DECIMAL(19,0)) is not type compatible with " +
- "distribution column 'a' (type: BIGINT)");
+ "partitioning column 'a' (type: BIGINT)");
// Test implicit casting/folding of partition values.
- AnalyzesOk("create table tab (a int primary key) distribute by range (a) " +
+ AnalyzesOk("create table tab (a int primary key) partition by range (a) " +
"(partition value = false, partition value = true) stored as kudu");
- // Non-key column used in DISTRIBUTE BY
+ // Non-key column used in PARTITION BY
AnalysisError("create table tab (a int, b string, c bigint, primary key (a)) " +
- "distribute by range (b) (partition value = 'abc') stored as kudu",
+ "partition by range (b) (partition value = 'abc') stored as kudu",
"Column 'b' in 'RANGE (b) (PARTITION VALUE = 'abc')' is not a key column. " +
- "Only key columns can be used in DISTRIBUTE BY.");
+ "Only key columns can be used in PARTITION BY.");
// No float range partition values
AnalysisError("create table tab (a int, b int, c int, d int, primary key (a, b, c))" +
- "distribute by hash (a, b, c) into 8 buckets, " +
+ "partition by hash (a, b, c) into 8 buckets, " +
"range (a) (partition value = 1.2, partition value = 2) stored as kudu",
"Range partition value 1.2 (type: DECIMAL(2,1)) is not type compatible with " +
- "distribution column 'a' (type: INT).");
- // Non-existing column used in DISTRIBUTE BY
+ "partitioning column 'a' (type: INT).");
+ // Non-existing column used in PARTITION BY
AnalysisError("create table tab (a int, b int, primary key (a, b)) " +
- "distribute by range(unknown_column) (partition value = 'abc') stored as kudu",
+ "partition by range(unknown_column) (partition value = 'abc') stored as kudu",
"Column 'unknown_column' in 'RANGE (unknown_column) (PARTITION VALUE = 'abc')' " +
- "is not a key column. Only key columns can be used in DISTRIBUTE BY");
+ "is not a key column. Only key columns can be used in PARTITION BY");
// Kudu table name is specified in tblproperties
- AnalyzesOk("create table tab (x int primary key) distribute by hash (x) " +
+ AnalyzesOk("create table tab (x int primary key) partition by hash (x) " +
"into 8 buckets stored as kudu tblproperties ('kudu.table_name'='tab_1'," +
"'kudu.num_tablet_replicas'='1'," +
"'kudu.master_addresses' = '127.0.0.1:8080, 127.0.0.1:8081')");
// No port is specified in kudu master address
AnalyzesOk("create table tdata_no_port (id int primary key, name string, " +
- "valf float, vali bigint) distribute by range(id) (partition values <= 10, " +
+ "valf float, vali bigint) partition by range(id) (partition values <= 10, " +
"partition 10 < values <= 30, partition 30 < values) " +
"stored as kudu tblproperties('kudu.master_addresses'='127.0.0.1')");
// Not using the STORED AS KUDU syntax to specify a Kudu table
@@ -2078,12 +2078,12 @@ public class AnalyzeDDLTest extends FrontendTestBase {
AnalysisError("create table tab (x int primary key) stored as kudu cached in " +
"'testPool'", "A Kudu table cannot be cached in HDFS.");
// LOCATION cannot be used with Kudu tables
- AnalysisError("create table tab (a int primary key) distribute by hash (a) " +
+ AnalysisError("create table tab (a int primary key) partition by hash (a) " +
"into 3 buckets stored as kudu location '/test-warehouse/'",
"LOCATION cannot be specified for a Kudu table.");
- // DISTRIBUTE BY is required for managed tables.
+ // PARTITION BY is required for managed tables.
AnalysisError("create table tab (a int, primary key (a)) stored as kudu",
- "Table distribution must be specified for managed Kudu tables.");
+ "Table partitioning must be specified for managed Kudu tables.");
AnalysisError("create table tab (a int) stored as kudu",
"A primary key is required for a Kudu table.");
// Using ROW FORMAT with a Kudu table
@@ -2105,12 +2105,12 @@ public class AnalyzeDDLTest extends FrontendTestBase {
// Unsupported type is PK and partition col
String stmt = String.format("create table tab (x %s primary key) " +
- "distribute by hash(x) into 3 buckets stored as kudu", t);
+ "partition by hash(x) into 3 buckets stored as kudu", t);
AnalysisError(stmt, expectedError);
// Unsupported type is not PK/partition col
stmt = String.format("create table tab (x int primary key, y %s) " +
- "distribute by hash(x) into 3 buckets stored as kudu", t);
+ "partition by hash(x) into 3 buckets stored as kudu", t);
AnalysisError(stmt, expectedError);
}
@@ -2125,7 +2125,7 @@ public class AnalyzeDDLTest extends FrontendTestBase {
for (String block: blockSize) {
AnalyzesOk(String.format("create table tab (x int primary key " +
"not null encoding %s compression %s %s %s, y int encoding %s " +
- "compression %s %s %s %s) distribute by hash (x) " +
+ "compression %s %s %s %s) partition by hash (x) " +
"into 3 buckets stored as kudu", enc, comp, def, block, enc,
comp, def, nul, block));
}
@@ -2136,23 +2136,23 @@ public class AnalyzeDDLTest extends FrontendTestBase {
// Primary key specified using the PRIMARY KEY clause
AnalyzesOk("create table tab (x int not null encoding plain_encoding " +
"compression snappy block_size 1, y int null encoding rle compression lz4 " +
- "default 1, primary key(x)) distribute by hash (x) into 3 buckets " +
+ "default 1, primary key(x)) partition by hash (x) into 3 buckets " +
"stored as kudu");
// Primary keys can't be null
AnalysisError("create table tab (x int primary key null, y int not null) " +
- "distribute by hash (x) into 3 buckets stored as kudu", "Primary key columns " +
+ "partition by hash (x) into 3 buckets stored as kudu", "Primary key columns " +
"cannot be nullable: x INT PRIMARY KEY NULL");
AnalysisError("create table tab (x int not null, y int null, primary key (x, y)) " +
- "distribute by hash (x) into 3 buckets stored as kudu", "Primary key columns " +
+ "partition by hash (x) into 3 buckets stored as kudu", "Primary key columns " +
"cannot be nullable: y INT NULL");
// Unsupported encoding value
AnalysisError("create table tab (x int primary key, y int encoding invalid_enc) " +
- "distribute by hash (x) into 3 buckets stored as kudu", "Unsupported encoding " +
+ "partition by hash (x) into 3 buckets stored as kudu", "Unsupported encoding " +
"value 'INVALID_ENC'. Supported encoding values are: " +
Joiner.on(", ").join(Encoding.values()));
// Unsupported compression algorithm
AnalysisError("create table tab (x int primary key, y int compression " +
- "invalid_comp) distribute by hash (x) into 3 buckets stored as kudu",
+ "invalid_comp) partition by hash (x) into 3 buckets stored as kudu",
"Unsupported compression algorithm 'INVALID_COMP'. Supported compression " +
"algorithms are: " + Joiner.on(", ").join(CompressionAlgorithm.values()));
// Default values
@@ -2160,38 +2160,38 @@ public class AnalyzeDDLTest extends FrontendTestBase {
"i3 int default 100, i4 bigint default 1000, vals string default 'test', " +
"valf float default cast(1.2 as float), vald double default " +
"cast(3.1452 as double), valb boolean default true, " +
- "primary key (i1, i2, i3, i4, vals)) distribute by hash (i1) into 3 " +
+ "primary key (i1, i2, i3, i4, vals)) partition by hash (i1) into 3 " +
"buckets stored as kudu");
AnalyzesOk("create table tab (i int primary key default 1+1+1) " +
- "distribute by hash (i) into 3 buckets stored as kudu");
+ "partition by hash (i) into 3 buckets stored as kudu");
AnalyzesOk("create table tab (i int primary key default factorial(5)) " +
- "distribute by hash (i) into 3 buckets stored as kudu");
+ "partition by hash (i) into 3 buckets stored as kudu");
AnalyzesOk("create table tab (i int primary key, x int null default " +
- "isnull(null, null)) distribute by hash (i) into 3 buckets stored as kudu");
+ "isnull(null, null)) partition by hash (i) into 3 buckets stored as kudu");
// Invalid default values
AnalysisError("create table tab (i int primary key default 'string_val') " +
- "distribute by hash (i) into 3 buckets stored as kudu", "Default value " +
+ "partition by hash (i) into 3 buckets stored as kudu", "Default value " +
"'string_val' (type: STRING) is not compatible with column 'i' (type: INT).");
AnalysisError("create table tab (i int primary key, x int default 1.1) " +
- "distribute by hash (i) into 3 buckets stored as kudu",
+ "partition by hash (i) into 3 buckets stored as kudu",
"Default value 1.1 (type: DECIMAL(2,1)) is not compatible with column " +
"'x' (type: INT).");
AnalysisError("create table tab (i tinyint primary key default 128) " +
- "distribute by hash (i) into 3 buckets stored as kudu", "Default value " +
+ "partition by hash (i) into 3 buckets stored as kudu", "Default value " +
"128 (type: SMALLINT) is not compatible with column 'i' (type: TINYINT).");
AnalysisError("create table tab (i int primary key default isnull(null, null)) " +
- "distribute by hash (i) into 3 buckets stored as kudu", "Default value of " +
+ "partition by hash (i) into 3 buckets stored as kudu", "Default value of " +
"NULL not allowed on non-nullable column: 'i'");
AnalysisError("create table tab (i int primary key, x int not null " +
- "default isnull(null, null)) distribute by hash (i) into 3 buckets " +
+ "default isnull(null, null)) partition by hash (i) into 3 buckets " +
"stored as kudu", "Default value of NULL not allowed on non-nullable column: " +
"'x'");
// Invalid block_size values
AnalysisError("create table tab (i int primary key block_size 1.1) " +
- "distribute by hash (i) into 3 buckets stored as kudu", "Invalid value " +
+ "partition by hash (i) into 3 buckets stored as kudu", "Invalid value " +
"for BLOCK_SIZE: 1.1. A positive INTEGER value is expected.");
AnalysisError("create table tab (i int primary key block_size 'val') " +
- "distribute by hash (i) into 3 buckets stored as kudu", "Invalid value " +
+ "partition by hash (i) into 3 buckets stored as kudu", "Invalid value " +
"for BLOCK_SIZE: 'val'. A positive INTEGER value is expected.");
}
@@ -2438,8 +2438,8 @@ public class AnalyzeDDLTest extends FrontendTestBase {
// Kudu specific clauses used in an Avro table.
AnalysisError("create table functional.new_table (i int) " +
- "distribute by hash(i) into 3 buckets stored as avro",
- "Only Kudu tables can use the DISTRIBUTE BY clause.");
+ "partition by hash(i) into 3 buckets stored as avro",
+ "Only Kudu tables can use the PARTITION BY clause.");
AnalysisError("create table functional.new_table (i int primary key) " +
"stored as avro", "Unsupported column options for file format 'AVRO': " +
"'i INT PRIMARY KEY'");
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/test/java/org/apache/impala/analysis/AuthorizationTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AuthorizationTest.java b/fe/src/test/java/org/apache/impala/analysis/AuthorizationTest.java
index 431ac01..3d6bbec 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AuthorizationTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AuthorizationTest.java
@@ -911,7 +911,7 @@ public class AuthorizationTest {
// IMPALA-4000: ALL privileges on SERVER are not required to create managed tables.
AuthzOk("create table tpch.kudu_tbl (i int, j int, primary key (i))" +
- " DISTRIBUTE BY HASH (i) INTO 9 BUCKETS stored as kudu TBLPROPERTIES " +
+ " PARTITION BY HASH (i) INTO 9 BUCKETS stored as kudu TBLPROPERTIES " +
"('kudu.master_addresses'='127.0.0.1')");
// User does not have permission to create table at the specified location..