You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ar...@apache.org on 2018/07/25 19:28:20 UTC

[09/10] impala git commit: IMPALA-7276. Support CREATE TABLE AS SELECT with LocalCatalog

IMPALA-7276. Support CREATE TABLE AS SELECT with LocalCatalog

This fixed most of the remaining Kudu tests which relied on CTAS. Now only a
few Kudu tests fail:

FAIL query_test/test_kudu.py::TestKuduOperations::()::test_kudu_col_changed
FAIL query_test/test_kudu.py::TestKuduOperations::()::test_kudu_col_null_changed
FAIL query_test/test_kudu.py::TestKuduOperations::()::test_kudu_col_not_null_changed
FAIL query_test/test_kudu.py::TestKuduOperations::()::test_kudu_col_added

The above 4 fail because they are asserting something about the caching
behavior of the old catalog implementation.

FAIL query_test/test_kudu.py::TestImpalaKuduIntegration::()::test_delete_external_kudu_table
FAIL query_test/test_kudu.py::TestImpalaKuduIntegration::()::test_delete_managed_kudu_table

These fail due to attempting to load non-existent tables referred to by a
DELETE statement.  Need to investigate these further, but not related to CTAS.

Change-Id: I93937aed9b76ef6a62b1c588c59c34d3d6831a46
Reviewed-on: http://gerrit.cloudera.org:8080/10913
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/ba813869
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/ba813869
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/ba813869

Branch: refs/heads/master
Commit: ba81386941a7178048fd1469bf6c2a371f253c3e
Parents: c333b55
Author: Todd Lipcon <to...@cloudera.com>
Authored: Tue Jul 10 13:25:15 2018 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Jul 25 19:27:26 2018 +0000

----------------------------------------------------------------------
 .../analysis/CreateTableAsSelectStmt.java       |  6 +--
 .../main/java/org/apache/impala/catalog/Db.java | 17 +++++++
 .../java/org/apache/impala/catalog/FeDb.java    | 15 ++++++
 .../impala/catalog/local/LocalCatalog.java      | 10 +++-
 .../apache/impala/catalog/local/LocalDb.java    | 19 ++++++++
 .../impala/catalog/local/LocalFsTable.java      | 33 ++++++++++++--
 .../impala/catalog/local/LocalKuduTable.java    | 48 +++++++++++++++-----
 .../apache/impala/planner/HdfsTableSink.java    | 12 ++---
 .../org/apache/impala/planner/TableSink.java    |  3 +-
 9 files changed, 135 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/ba813869/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
index 33a271b..4753b62 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
@@ -213,13 +213,11 @@ public class CreateTableAsSelectStmt extends StatementBase {
 
       FeTable tmpTable = null;
       if (KuduTable.isKuduTable(msTbl)) {
-        // TODO(todd): avoid downcast to 'Db' here
-        tmpTable = KuduTable.createCtasTarget((Db)db, msTbl, createStmt_.getColumnDefs(),
+        tmpTable = db.createKuduCtasTarget(msTbl, createStmt_.getColumnDefs(),
             createStmt_.getPrimaryKeyColumnDefs(),
             createStmt_.getKuduPartitionParams());
       } else if (HdfsFileFormat.isHdfsInputFormatClass(msTbl.getSd().getInputFormat())) {
-        // TODO(todd): avoid downcast to 'Db' here
-        tmpTable = HdfsTable.createCtasTarget((Db)db, msTbl);
+        tmpTable = db.createFsCtasTarget(msTbl);
       }
       Preconditions.checkState(tmpTable != null &&
           (tmpTable instanceof FeFsTable || tmpTable instanceof FeKuduTable));

http://git-wip-us.apache.org/repos/asf/impala/blob/ba813869/fe/src/main/java/org/apache/impala/catalog/Db.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Db.java b/fe/src/main/java/org/apache/impala/catalog/Db.java
index 5955e0a..0c3c2bd 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Db.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Db.java
@@ -29,6 +29,8 @@ import org.apache.thrift.TSerializer;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.impala.analysis.ColumnDef;
+import org.apache.impala.analysis.KuduPartitionParam;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.thrift.TCatalogObject;
@@ -170,6 +172,21 @@ public class Db extends CatalogObjectImpl implements FeDb {
     return tableCache_.remove(tableName.toLowerCase());
   }
 
+  @Override
+  public FeKuduTable createKuduCtasTarget(
+      org.apache.hadoop.hive.metastore.api.Table msTbl,
+      List<ColumnDef> columnDefs, List<ColumnDef> primaryKeyColumnDefs,
+      List<KuduPartitionParam> kuduPartitionParams) {
+    return KuduTable.createCtasTarget(this, msTbl, columnDefs, primaryKeyColumnDefs,
+        kuduPartitionParams);
+  }
+
+  @Override
+  public FeFsTable createFsCtasTarget(org.apache.hadoop.hive.metastore.api.Table msTbl)
+      throws CatalogException {
+    return HdfsTable.createCtasTarget(this, msTbl);
+  }
+
   /**
    * Comparator that sorts function overloads. We want overloads to be always considered
    * in a canonical order so that overload resolution in the case of multiple valid

http://git-wip-us.apache.org/repos/asf/impala/blob/ba813869/fe/src/main/java/org/apache/impala/catalog/FeDb.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeDb.java b/fe/src/main/java/org/apache/impala/catalog/FeDb.java
index 111fbd0..057a2fa 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeDb.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeDb.java
@@ -19,6 +19,9 @@ package org.apache.impala.catalog;
 import java.util.List;
 
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.impala.analysis.ColumnDef;
+import org.apache.impala.analysis.KuduPartitionParam;
 import org.apache.impala.thrift.TDatabase;
 import org.apache.impala.thrift.TFunctionCategory;
 import org.apache.impala.util.PatternMatcher;
@@ -98,4 +101,16 @@ public interface FeDb extends HasName {
    * @return the Thrift-serialized structure for this database
    */
   TDatabase toThrift();
+
+  /**
+   * Create a target Kudu table object for CTAS.
+   */
+  FeKuduTable createKuduCtasTarget(Table msTbl, List<ColumnDef> columnDefs,
+      List<ColumnDef> primaryKeyColumnDefs,
+      List<KuduPartitionParam> kuduPartitionParams);
+
+  /**
+   * Create a target FS table object for CTAS.
+   */
+  FeFsTable createFsCtasTarget(Table msTbl) throws CatalogException;
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/ba813869/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
index 03438c6..0d376b3 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
@@ -37,6 +37,7 @@ import org.apache.impala.catalog.FeFsPartition;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.Function;
 import org.apache.impala.catalog.Function.CompareMode;
+import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.catalog.HdfsCachePool;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TPartitionKeyValue;
@@ -184,7 +185,14 @@ public class LocalCatalog implements FeCatalog {
 
   @Override
   public Path getTablePath(Table msTbl) {
-    throw new UnsupportedOperationException("TODO");
+    // If the table did not have its path set, build the path based on the
+    // location property of the parent database.
+    if (msTbl.getSd().getLocation() == null || msTbl.getSd().getLocation().isEmpty()) {
+      String dbLocation = getDb(msTbl.getDbName()).getMetaStoreDb().getLocationUri();
+      return new Path(dbLocation, msTbl.getTableName().toLowerCase());
+    } else {
+      return new Path(msTbl.getSd().getLocation());
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/impala/blob/ba813869/fe/src/main/java/org/apache/impala/catalog/local/LocalDb.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalDb.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalDb.java
index 6a779d7..6b9209f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalDb.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalDb.java
@@ -22,7 +22,13 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.impala.analysis.ColumnDef;
+import org.apache.impala.analysis.KuduPartitionParam;
+import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.FeDb;
+import org.apache.impala.catalog.FeFsTable;
+import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.Function;
 import org.apache.impala.catalog.Function.CompareMode;
@@ -104,6 +110,19 @@ class LocalDb implements FeDb {
   }
 
   @Override
+  public FeKuduTable createKuduCtasTarget(Table msTbl, List<ColumnDef> columnDefs,
+      List<ColumnDef> primaryKeyColumnDefs,
+      List<KuduPartitionParam> kuduPartitionParams) {
+    return LocalKuduTable.createCtasTarget(this, msTbl, columnDefs, primaryKeyColumnDefs,
+        kuduPartitionParams);
+  }
+
+  @Override
+  public FeFsTable createFsCtasTarget(Table msTbl) throws CatalogException {
+    return LocalFsTable.createCtasTarget(this, msTbl);
+  }
+
+  @Override
   public List<String> getAllTableNames() {
     loadTableNames();
     return ImmutableList.copyOf(tables_.keySet());

http://git-wip-us.apache.org/repos/asf/impala/blob/ba813869/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
index 5a5f3bd..81dcae1 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
@@ -54,6 +54,7 @@ import org.apache.thrift.TException;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -103,6 +104,20 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
         FeFsTable.DEFAULT_NULL_COLUMN_VALUE;
   }
 
+  /**
+   * Creates a temporary FsTable object populated with the specified properties.
+   * This is used for CTAS statements.
+   */
+  public static LocalFsTable createCtasTarget(LocalDb db,
+      Table msTbl) throws CatalogException {
+    // TODO(todd): set a member variable indicating this is a CTAS target
+    // so we can checkState() against it in various other methods and make
+    // sure we don't try to do something like load partitions for a not-yet-created
+    // table.
+    return new LocalFsTable(db, msTbl);
+  }
+
+
   @Override
   public boolean isCacheable() {
     // TODO Auto-generated method stub
@@ -162,8 +177,14 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
 
   @Override
   public HdfsFileFormat getMajorityFormat() {
-    // Needed by HdfsTableSink.
-    throw new UnsupportedOperationException("TODO: implement me");
+    // TODO(todd): can we avoid loading all partitions here? this is called
+    // for any INSERT query, even if the partition is specified.
+    Collection<? extends FeFsPartition> parts = FeCatalogUtils.loadAllPartitions(this);
+    // In the case that we have no partitions added to the table yet, it's
+    // important to add the "prototype" partition as a fallback.
+    Iterable<FeFsPartition> partitionsToConsider = Iterables.concat(
+        parts, Collections.singleton(createPrototypePartition()));
+    return FeCatalogUtils.getMajorityFormat(partitionsToConsider);
   }
 
   @Override
@@ -184,8 +205,12 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
   }
 
   @Override
-  public TTableDescriptor toThriftDescriptor(int tableId, Set<Long> referencedPartitions) {
-    Preconditions.checkNotNull(referencedPartitions);
+  public TTableDescriptor toThriftDescriptor(int tableId,
+      Set<Long> referencedPartitions) {
+    if (referencedPartitions == null) {
+      // null means "all partitions".
+      referencedPartitions = getPartitionIds();
+    }
     Map<Long, THdfsPartition> idToPartition = Maps.newHashMap();
     List<? extends FeFsPartition> partitions = loadPartitions(referencedPartitions);
     for (FeFsPartition partition : partitions) {

http://git-wip-us.apache.org/repos/asf/impala/blob/ba813869/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java
index e12e3ac..8a6bb67 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java
@@ -26,6 +26,7 @@ import javax.annotation.concurrent.Immutable;
 
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.impala.analysis.ColumnDef;
 import org.apache.impala.analysis.KuduPartitionParam;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FeCatalogUtils;
@@ -49,7 +50,6 @@ import com.google.common.collect.Lists;
 public class LocalKuduTable extends LocalTable implements FeKuduTable {
   private final TableParams tableParams_;
   private final List<KuduPartitionParam> partitionBy_;
-  private final org.apache.kudu.client.KuduTable kuduTable_;
   private final ImmutableList<String> primaryKeyColumnNames_;
 
   /**
@@ -74,8 +74,38 @@ public class LocalKuduTable extends LocalTable implements FeKuduTable {
     // Use the schema derived from Kudu, rather than the one stored in the HMS.
     msTable.getSd().setCols(fieldSchemas);
 
+
+    List<String> pkNames = new ArrayList<>();
+    for (ColumnSchema c: kuduTable.getSchema().getPrimaryKeyColumns()) {
+      pkNames.add(c.getName().toLowerCase());
+    }
+
+    List<KuduPartitionParam> partitionBy = Utils.loadPartitionByParams(kuduTable);
+
     ColumnMap cmap = new ColumnMap(cols, /*numClusteringCols=*/0, fullTableName);
-    return new LocalKuduTable(db, msTable, cmap, kuduTable);
+    return new LocalKuduTable(db, msTable, cmap, pkNames, partitionBy);
+  }
+
+
+  public static FeKuduTable createCtasTarget(LocalDb db, Table msTable,
+      List<ColumnDef> columnDefs, List<ColumnDef> primaryKeyColumnDefs,
+      List<KuduPartitionParam> kuduPartitionParams) {
+    String fullTableName = msTable.getDbName() + "." + msTable.getTableName();
+
+    List<Column> columns = new ArrayList<>();
+    List<String> pkNames = new ArrayList<>();
+    int pos = 0;
+    for (ColumnDef colDef: columnDefs) {
+      // TODO(todd): it seems odd that for CTAS targets, the columns are of type
+      // 'Column' instead of 'KuduColumn'.
+      columns.add(new Column(colDef.getColName(), colDef.getType(), pos++));
+    }
+    for (ColumnDef pkColDef: primaryKeyColumnDefs) {
+      pkNames.add(pkColDef.getColName());
+    }
+
+    ColumnMap cmap = new ColumnMap(columns, /*numClusteringCols=*/0, fullTableName);
+    return new LocalKuduTable(db, msTable, cmap, pkNames, kuduPartitionParams);
   }
 
   private static void convertColsFromKudu(Schema schema, List<Column> cols,
@@ -100,18 +130,12 @@ public class LocalKuduTable extends LocalTable implements FeKuduTable {
   }
 
   private LocalKuduTable(LocalDb db, Table msTable, ColumnMap cmap,
-      org.apache.kudu.client.KuduTable kuduTable) {
+      List<String> primaryKeyColumnNames,
+      List<KuduPartitionParam> partitionBy)  {
     super(db, msTable, cmap);
     tableParams_ = new TableParams(msTable);
-    kuduTable_ = kuduTable;
-    partitionBy_ = ImmutableList.copyOf(Utils.loadPartitionByParams(
-        kuduTable));
-
-    ImmutableList.Builder<String> b = ImmutableList.builder();
-    for (ColumnSchema c: kuduTable_.getSchema().getPrimaryKeyColumns()) {
-      b.add(c.getName().toLowerCase());
-    }
-    primaryKeyColumnNames_ = b.build();
+    partitionBy_ = ImmutableList.copyOf(partitionBy);
+    primaryKeyColumnNames_ = ImmutableList.copyOf(primaryKeyColumnNames);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/impala/blob/ba813869/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
index 0de2edb..7426641 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
@@ -21,10 +21,9 @@ import java.util.List;
 
 import org.apache.impala.analysis.DescriptorTable;
 import org.apache.impala.analysis.Expr;
+import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.HdfsFileFormat;
-import org.apache.impala.catalog.HdfsTable;
-import org.apache.impala.catalog.Table;
 import org.apache.impala.thrift.TDataSink;
 import org.apache.impala.thrift.TDataSinkType;
 import org.apache.impala.thrift.TExplainLevel;
@@ -36,8 +35,9 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 /**
- * Base class for Hdfs data sinks such as HdfsTextTableSink.
+ * Sink for inserting into filesystem-backed tables.
  *
+ * TODO(vercegovac): rename to FsTableSink
  */
 public class HdfsTableSink extends TableSink {
   // Default number of partitions used for computeResourceProfile() in the absence of
@@ -61,7 +61,7 @@ public class HdfsTableSink extends TableSink {
   public HdfsTableSink(FeTable targetTable, List<Expr> partitionKeyExprs,
       boolean overwrite, boolean inputIsClustered, List<Integer> sortColumns) {
     super(targetTable, Op.INSERT);
-    Preconditions.checkState(targetTable instanceof HdfsTable);
+    Preconditions.checkState(targetTable instanceof FeFsTable);
     partitionKeyExprs_ = partitionKeyExprs;
     overwrite_ = overwrite;
     inputIsClustered_ = inputIsClustered;
@@ -70,7 +70,7 @@ public class HdfsTableSink extends TableSink {
 
   @Override
   public void computeResourceProfile(TQueryOptions queryOptions) {
-    HdfsTable table = (HdfsTable) targetTable_;
+    FeFsTable table = (FeFsTable) targetTable_;
     // TODO: Estimate the memory requirements more accurately by partition type.
     HdfsFileFormat format = table.getMajorityFormat();
     PlanNode inputNode = fragment_.getPlanRoot();
@@ -164,7 +164,7 @@ public class HdfsTableSink extends TableSink {
     TDataSink result = new TDataSink(TDataSinkType.TABLE_SINK);
     THdfsTableSink hdfsTableSink = new THdfsTableSink(
         Expr.treesToThrift(partitionKeyExprs_), overwrite_, inputIsClustered_);
-    HdfsTable table = (HdfsTable) targetTable_;
+    FeFsTable table = (FeFsTable) targetTable_;
     StringBuilder error = new StringBuilder();
     int skipHeaderLineCount = table.parseSkipHeaderLineCount(error);
     // Errors will be caught during analysis.

http://git-wip-us.apache.org/repos/asf/impala/blob/ba813869/fe/src/main/java/org/apache/impala/planner/TableSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/TableSink.java b/fe/src/main/java/org/apache/impala/planner/TableSink.java
index 677805e..214c9b4 100644
--- a/fe/src/main/java/org/apache/impala/planner/TableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/TableSink.java
@@ -20,6 +20,7 @@ package org.apache.impala.planner;
 import java.util.List;
 
 import org.apache.impala.analysis.Expr;
+import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.HBaseTable;
@@ -96,7 +97,7 @@ public abstract class TableSink extends DataSink {
     Preconditions.checkNotNull(partitionKeyExprs);
     Preconditions.checkNotNull(referencedColumns);
     Preconditions.checkNotNull(sortColumns);
-    if (table instanceof HdfsTable) {
+    if (table instanceof FeFsTable) {
       // Hdfs only supports inserts.
       Preconditions.checkState(sinkAction == Op.INSERT);
       // Referenced columns don't make sense for an Hdfs table.