You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ar...@apache.org on 2018/07/25 19:28:20 UTC
[09/10] impala git commit: IMPALA-7276. Support CREATE TABLE AS
SELECT with LocalCatalog
IMPALA-7276. Support CREATE TABLE AS SELECT with LocalCatalog
This fixed most of the remaining Kudu tests which relied on CTAS. Now only a
few Kudu tests fail:
FAIL query_test/test_kudu.py::TestKuduOperations::()::test_kudu_col_changed
FAIL query_test/test_kudu.py::TestKuduOperations::()::test_kudu_col_null_changed
FAIL query_test/test_kudu.py::TestKuduOperations::()::test_kudu_col_not_null_changed
FAIL query_test/test_kudu.py::TestKuduOperations::()::test_kudu_col_added
The above 4 fail because they are asserting something about the caching
behavior of the old catalog implementation.
FAIL query_test/test_kudu.py::TestImpalaKuduIntegration::()::test_delete_external_kudu_table
FAIL query_test/test_kudu.py::TestImpalaKuduIntegration::()::test_delete_managed_kudu_table
These fail due to attempting to load non-existent tables referred to by a
DELETE statement. Need to investigate these further, but not related to CTAS.
Change-Id: I93937aed9b76ef6a62b1c588c59c34d3d6831a46
Reviewed-on: http://gerrit.cloudera.org:8080/10913
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Todd Lipcon <to...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/ba813869
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/ba813869
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/ba813869
Branch: refs/heads/master
Commit: ba81386941a7178048fd1469bf6c2a371f253c3e
Parents: c333b55
Author: Todd Lipcon <to...@cloudera.com>
Authored: Tue Jul 10 13:25:15 2018 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Jul 25 19:27:26 2018 +0000
----------------------------------------------------------------------
.../analysis/CreateTableAsSelectStmt.java | 6 +--
.../main/java/org/apache/impala/catalog/Db.java | 17 +++++++
.../java/org/apache/impala/catalog/FeDb.java | 15 ++++++
.../impala/catalog/local/LocalCatalog.java | 10 +++-
.../apache/impala/catalog/local/LocalDb.java | 19 ++++++++
.../impala/catalog/local/LocalFsTable.java | 33 ++++++++++++--
.../impala/catalog/local/LocalKuduTable.java | 48 +++++++++++++++-----
.../apache/impala/planner/HdfsTableSink.java | 12 ++---
.../org/apache/impala/planner/TableSink.java | 3 +-
9 files changed, 135 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/ba813869/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
index 33a271b..4753b62 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
@@ -213,13 +213,11 @@ public class CreateTableAsSelectStmt extends StatementBase {
FeTable tmpTable = null;
if (KuduTable.isKuduTable(msTbl)) {
- // TODO(todd): avoid downcast to 'Db' here
- tmpTable = KuduTable.createCtasTarget((Db)db, msTbl, createStmt_.getColumnDefs(),
+ tmpTable = db.createKuduCtasTarget(msTbl, createStmt_.getColumnDefs(),
createStmt_.getPrimaryKeyColumnDefs(),
createStmt_.getKuduPartitionParams());
} else if (HdfsFileFormat.isHdfsInputFormatClass(msTbl.getSd().getInputFormat())) {
- // TODO(todd): avoid downcast to 'Db' here
- tmpTable = HdfsTable.createCtasTarget((Db)db, msTbl);
+ tmpTable = db.createFsCtasTarget(msTbl);
}
Preconditions.checkState(tmpTable != null &&
(tmpTable instanceof FeFsTable || tmpTable instanceof FeKuduTable));
http://git-wip-us.apache.org/repos/asf/impala/blob/ba813869/fe/src/main/java/org/apache/impala/catalog/Db.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Db.java b/fe/src/main/java/org/apache/impala/catalog/Db.java
index 5955e0a..0c3c2bd 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Db.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Db.java
@@ -29,6 +29,8 @@ import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TCompactProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.impala.analysis.ColumnDef;
+import org.apache.impala.analysis.KuduPartitionParam;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.thrift.TCatalogObject;
@@ -170,6 +172,21 @@ public class Db extends CatalogObjectImpl implements FeDb {
return tableCache_.remove(tableName.toLowerCase());
}
+ @Override
+ public FeKuduTable createKuduCtasTarget(
+ org.apache.hadoop.hive.metastore.api.Table msTbl,
+ List<ColumnDef> columnDefs, List<ColumnDef> primaryKeyColumnDefs,
+ List<KuduPartitionParam> kuduPartitionParams) {
+ return KuduTable.createCtasTarget(this, msTbl, columnDefs, primaryKeyColumnDefs,
+ kuduPartitionParams);
+ }
+
+ @Override
+ public FeFsTable createFsCtasTarget(org.apache.hadoop.hive.metastore.api.Table msTbl)
+ throws CatalogException {
+ return HdfsTable.createCtasTarget(this, msTbl);
+ }
+
/**
* Comparator that sorts function overloads. We want overloads to be always considered
* in a canonical order so that overload resolution in the case of multiple valid
http://git-wip-us.apache.org/repos/asf/impala/blob/ba813869/fe/src/main/java/org/apache/impala/catalog/FeDb.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeDb.java b/fe/src/main/java/org/apache/impala/catalog/FeDb.java
index 111fbd0..057a2fa 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeDb.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeDb.java
@@ -19,6 +19,9 @@ package org.apache.impala.catalog;
import java.util.List;
import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.impala.analysis.ColumnDef;
+import org.apache.impala.analysis.KuduPartitionParam;
import org.apache.impala.thrift.TDatabase;
import org.apache.impala.thrift.TFunctionCategory;
import org.apache.impala.util.PatternMatcher;
@@ -98,4 +101,16 @@ public interface FeDb extends HasName {
* @return the Thrift-serialized structure for this database
*/
TDatabase toThrift();
+
+ /**
+ * Create a target Kudu table object for CTAS.
+ */
+ FeKuduTable createKuduCtasTarget(Table msTbl, List<ColumnDef> columnDefs,
+ List<ColumnDef> primaryKeyColumnDefs,
+ List<KuduPartitionParam> kuduPartitionParams);
+
+ /**
+ * Create a target FS table object for CTAS.
+ */
+ FeFsTable createFsCtasTarget(Table msTbl) throws CatalogException;
}
http://git-wip-us.apache.org/repos/asf/impala/blob/ba813869/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
index 03438c6..0d376b3 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
@@ -37,6 +37,7 @@ import org.apache.impala.catalog.FeFsPartition;
import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.Function;
import org.apache.impala.catalog.Function.CompareMode;
+import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
import org.apache.impala.catalog.HdfsCachePool;
import org.apache.impala.thrift.TCatalogObject;
import org.apache.impala.thrift.TPartitionKeyValue;
@@ -184,7 +185,14 @@ public class LocalCatalog implements FeCatalog {
@Override
public Path getTablePath(Table msTbl) {
- throw new UnsupportedOperationException("TODO");
+ // If the table did not have its path set, build the path based on the
+ // location property of the parent database.
+ if (msTbl.getSd().getLocation() == null || msTbl.getSd().getLocation().isEmpty()) {
+ String dbLocation = getDb(msTbl.getDbName()).getMetaStoreDb().getLocationUri();
+ return new Path(dbLocation, msTbl.getTableName().toLowerCase());
+ } else {
+ return new Path(msTbl.getSd().getLocation());
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/impala/blob/ba813869/fe/src/main/java/org/apache/impala/catalog/local/LocalDb.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalDb.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalDb.java
index 6a779d7..6b9209f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalDb.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalDb.java
@@ -22,7 +22,13 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.impala.analysis.ColumnDef;
+import org.apache.impala.analysis.KuduPartitionParam;
+import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.FeDb;
+import org.apache.impala.catalog.FeFsTable;
+import org.apache.impala.catalog.FeKuduTable;
import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.Function;
import org.apache.impala.catalog.Function.CompareMode;
@@ -104,6 +110,19 @@ class LocalDb implements FeDb {
}
@Override
+ public FeKuduTable createKuduCtasTarget(Table msTbl, List<ColumnDef> columnDefs,
+ List<ColumnDef> primaryKeyColumnDefs,
+ List<KuduPartitionParam> kuduPartitionParams) {
+ return LocalKuduTable.createCtasTarget(this, msTbl, columnDefs, primaryKeyColumnDefs,
+ kuduPartitionParams);
+ }
+
+ @Override
+ public FeFsTable createFsCtasTarget(Table msTbl) throws CatalogException {
+ return LocalFsTable.createCtasTarget(this, msTbl);
+ }
+
+ @Override
public List<String> getAllTableNames() {
loadTableNames();
return ImmutableList.copyOf(tables_.keySet());
http://git-wip-us.apache.org/repos/asf/impala/blob/ba813869/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
index 5a5f3bd..81dcae1 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
@@ -54,6 +54,7 @@ import org.apache.thrift.TException;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -103,6 +104,20 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
FeFsTable.DEFAULT_NULL_COLUMN_VALUE;
}
+ /**
+ * Creates a temporary FsTable object populated with the specified properties.
+ * This is used for CTAS statements.
+ */
+ public static LocalFsTable createCtasTarget(LocalDb db,
+ Table msTbl) throws CatalogException {
+ // TODO(todd): set a member variable indicating this is a CTAS target
+ // so we can checkState() against it in various other methods and make
+ // sure we don't try to do something like load partitions for a not-yet-created
+ // table.
+ return new LocalFsTable(db, msTbl);
+ }
+
+
@Override
public boolean isCacheable() {
// TODO Auto-generated method stub
@@ -162,8 +177,14 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
@Override
public HdfsFileFormat getMajorityFormat() {
- // Needed by HdfsTableSink.
- throw new UnsupportedOperationException("TODO: implement me");
+ // TODO(todd): can we avoid loading all partitions here? this is called
+ // for any INSERT query, even if the partition is specified.
+ Collection<? extends FeFsPartition> parts = FeCatalogUtils.loadAllPartitions(this);
+ // In the case that we have no partitions added to the table yet, it's
+ // important to add the "prototype" partition as a fallback.
+ Iterable<FeFsPartition> partitionsToConsider = Iterables.concat(
+ parts, Collections.singleton(createPrototypePartition()));
+ return FeCatalogUtils.getMajorityFormat(partitionsToConsider);
}
@Override
@@ -184,8 +205,12 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
}
@Override
- public TTableDescriptor toThriftDescriptor(int tableId, Set<Long> referencedPartitions) {
- Preconditions.checkNotNull(referencedPartitions);
+ public TTableDescriptor toThriftDescriptor(int tableId,
+ Set<Long> referencedPartitions) {
+ if (referencedPartitions == null) {
+ // null means "all partitions".
+ referencedPartitions = getPartitionIds();
+ }
Map<Long, THdfsPartition> idToPartition = Maps.newHashMap();
List<? extends FeFsPartition> partitions = loadPartitions(referencedPartitions);
for (FeFsPartition partition : partitions) {
http://git-wip-us.apache.org/repos/asf/impala/blob/ba813869/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java
index e12e3ac..8a6bb67 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java
@@ -26,6 +26,7 @@ import javax.annotation.concurrent.Immutable;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.impala.analysis.ColumnDef;
import org.apache.impala.analysis.KuduPartitionParam;
import org.apache.impala.catalog.Column;
import org.apache.impala.catalog.FeCatalogUtils;
@@ -49,7 +50,6 @@ import com.google.common.collect.Lists;
public class LocalKuduTable extends LocalTable implements FeKuduTable {
private final TableParams tableParams_;
private final List<KuduPartitionParam> partitionBy_;
- private final org.apache.kudu.client.KuduTable kuduTable_;
private final ImmutableList<String> primaryKeyColumnNames_;
/**
@@ -74,8 +74,38 @@ public class LocalKuduTable extends LocalTable implements FeKuduTable {
// Use the schema derived from Kudu, rather than the one stored in the HMS.
msTable.getSd().setCols(fieldSchemas);
+
+ List<String> pkNames = new ArrayList<>();
+ for (ColumnSchema c: kuduTable.getSchema().getPrimaryKeyColumns()) {
+ pkNames.add(c.getName().toLowerCase());
+ }
+
+ List<KuduPartitionParam> partitionBy = Utils.loadPartitionByParams(kuduTable);
+
ColumnMap cmap = new ColumnMap(cols, /*numClusteringCols=*/0, fullTableName);
- return new LocalKuduTable(db, msTable, cmap, kuduTable);
+ return new LocalKuduTable(db, msTable, cmap, pkNames, partitionBy);
+ }
+
+
+ public static FeKuduTable createCtasTarget(LocalDb db, Table msTable,
+ List<ColumnDef> columnDefs, List<ColumnDef> primaryKeyColumnDefs,
+ List<KuduPartitionParam> kuduPartitionParams) {
+ String fullTableName = msTable.getDbName() + "." + msTable.getTableName();
+
+ List<Column> columns = new ArrayList<>();
+ List<String> pkNames = new ArrayList<>();
+ int pos = 0;
+ for (ColumnDef colDef: columnDefs) {
+ // TODO(todd): it seems odd that for CTAS targets, the columns are of type
+ // 'Column' instead of 'KuduColumn'.
+ columns.add(new Column(colDef.getColName(), colDef.getType(), pos++));
+ }
+ for (ColumnDef pkColDef: primaryKeyColumnDefs) {
+ pkNames.add(pkColDef.getColName());
+ }
+
+ ColumnMap cmap = new ColumnMap(columns, /*numClusteringCols=*/0, fullTableName);
+ return new LocalKuduTable(db, msTable, cmap, pkNames, kuduPartitionParams);
}
private static void convertColsFromKudu(Schema schema, List<Column> cols,
@@ -100,18 +130,12 @@ public class LocalKuduTable extends LocalTable implements FeKuduTable {
}
private LocalKuduTable(LocalDb db, Table msTable, ColumnMap cmap,
- org.apache.kudu.client.KuduTable kuduTable) {
+ List<String> primaryKeyColumnNames,
+ List<KuduPartitionParam> partitionBy) {
super(db, msTable, cmap);
tableParams_ = new TableParams(msTable);
- kuduTable_ = kuduTable;
- partitionBy_ = ImmutableList.copyOf(Utils.loadPartitionByParams(
- kuduTable));
-
- ImmutableList.Builder<String> b = ImmutableList.builder();
- for (ColumnSchema c: kuduTable_.getSchema().getPrimaryKeyColumns()) {
- b.add(c.getName().toLowerCase());
- }
- primaryKeyColumnNames_ = b.build();
+ partitionBy_ = ImmutableList.copyOf(partitionBy);
+ primaryKeyColumnNames_ = ImmutableList.copyOf(primaryKeyColumnNames);
}
@Override
http://git-wip-us.apache.org/repos/asf/impala/blob/ba813869/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
index 0de2edb..7426641 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
@@ -21,10 +21,9 @@ import java.util.List;
import org.apache.impala.analysis.DescriptorTable;
import org.apache.impala.analysis.Expr;
+import org.apache.impala.catalog.FeFsTable;
import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.HdfsFileFormat;
-import org.apache.impala.catalog.HdfsTable;
-import org.apache.impala.catalog.Table;
import org.apache.impala.thrift.TDataSink;
import org.apache.impala.thrift.TDataSinkType;
import org.apache.impala.thrift.TExplainLevel;
@@ -36,8 +35,9 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
/**
- * Base class for Hdfs data sinks such as HdfsTextTableSink.
+ * Sink for inserting into filesystem-backed tables.
*
+ * TODO(vercegovac): rename to FsTableSink
*/
public class HdfsTableSink extends TableSink {
// Default number of partitions used for computeResourceProfile() in the absence of
@@ -61,7 +61,7 @@ public class HdfsTableSink extends TableSink {
public HdfsTableSink(FeTable targetTable, List<Expr> partitionKeyExprs,
boolean overwrite, boolean inputIsClustered, List<Integer> sortColumns) {
super(targetTable, Op.INSERT);
- Preconditions.checkState(targetTable instanceof HdfsTable);
+ Preconditions.checkState(targetTable instanceof FeFsTable);
partitionKeyExprs_ = partitionKeyExprs;
overwrite_ = overwrite;
inputIsClustered_ = inputIsClustered;
@@ -70,7 +70,7 @@ public class HdfsTableSink extends TableSink {
@Override
public void computeResourceProfile(TQueryOptions queryOptions) {
- HdfsTable table = (HdfsTable) targetTable_;
+ FeFsTable table = (FeFsTable) targetTable_;
// TODO: Estimate the memory requirements more accurately by partition type.
HdfsFileFormat format = table.getMajorityFormat();
PlanNode inputNode = fragment_.getPlanRoot();
@@ -164,7 +164,7 @@ public class HdfsTableSink extends TableSink {
TDataSink result = new TDataSink(TDataSinkType.TABLE_SINK);
THdfsTableSink hdfsTableSink = new THdfsTableSink(
Expr.treesToThrift(partitionKeyExprs_), overwrite_, inputIsClustered_);
- HdfsTable table = (HdfsTable) targetTable_;
+ FeFsTable table = (FeFsTable) targetTable_;
StringBuilder error = new StringBuilder();
int skipHeaderLineCount = table.parseSkipHeaderLineCount(error);
// Errors will be caught during analysis.
http://git-wip-us.apache.org/repos/asf/impala/blob/ba813869/fe/src/main/java/org/apache/impala/planner/TableSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/TableSink.java b/fe/src/main/java/org/apache/impala/planner/TableSink.java
index 677805e..214c9b4 100644
--- a/fe/src/main/java/org/apache/impala/planner/TableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/TableSink.java
@@ -20,6 +20,7 @@ package org.apache.impala.planner;
import java.util.List;
import org.apache.impala.analysis.Expr;
+import org.apache.impala.catalog.FeFsTable;
import org.apache.impala.catalog.FeKuduTable;
import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.HBaseTable;
@@ -96,7 +97,7 @@ public abstract class TableSink extends DataSink {
Preconditions.checkNotNull(partitionKeyExprs);
Preconditions.checkNotNull(referencedColumns);
Preconditions.checkNotNull(sortColumns);
- if (table instanceof HdfsTable) {
+ if (table instanceof FeFsTable) {
// Hdfs only supports inserts.
Preconditions.checkState(sinkAction == Op.INSERT);
// Referenced columns don't make sense for an Hdfs table.