You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2023/08/02 23:29:35 UTC

[impala] 02/03: IMPALA-11901: Support COPY TESTCASE in local catalog mode

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 2a8374d7eb17592e6280b15f9adb046a25f2fb85
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Fri Jun 23 10:20:48 2023 +0800

    IMPALA-11901: Support COPY TESTCASE in local catalog mode
    
    COPY TESTCASE TO statement is used to export the metadata required for
    planning a given query. It will export catalog objects cached on the
    coordinator side to a file. Users can transfer the metadata file to
    another debug cluster and import it by the COPY TESTCASE FROM statement.
    The statement will import the metadata to the new catalogd, including
    the topology of the original cluster. So an EXPLAIN on the same query
    can produce the same query plan for debugging frontend issues.
    
    However, the COPY TESTCASE statements are not supported in LocalCatalog
    mode. In the legacy catalog mode, coordinator has the same catalog
    representation as catalogd, i.e. coordinator just mirrors all the
    metadata from catalogd. So the export is simple - we can dump the
    metadata from coordinator side and they can be ingested into catalogd.
    However, in the local catalog mode, coordinator caches the metadata in a
    finer grained manner which differs a lot to the metadata in catalogd.
    To export the metadata that can be imported in catalogd, we need some
    transformation to convert them into catalog objects used in catalogd.
    
    This patch uses the fine-grained metadata to construct the full thrift
    objects of THdfsTable. So coordinator can export them in the same way as
    we did in the legacy catalog mode.
    
    Tests:
     - We don't have two clusters in our dev env to verify the real use
       cases. Add sanity tests for exporting and importing testcase
       metadata.
    
    Change-Id: I02c1c76d7af15f28bdbc8d98d92a1553570e9e27
    Reviewed-on: http://gerrit.cloudera.org:8080/20110
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../apache/impala/analysis/CopyTestCaseStmt.java   | 23 ++++-----
 .../org/apache/impala/catalog/FeCatalogUtils.java  | 57 ++++++++++++++++++++++
 .../main/java/org/apache/impala/catalog/Table.java |  5 +-
 .../apache/impala/catalog/local/LocalFsTable.java  | 30 +++++++++---
 .../impala/catalog/local/LocalKuduTable.java       |  8 ++-
 .../apache/impala/service/CatalogOpExecutor.java   |  9 ++--
 .../impala/testutil/PlannerTestCaseLoader.java     |  2 +-
 tests/metadata/test_testcase_builder.py            | 23 ++++++++-
 8 files changed, 126 insertions(+), 31 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/CopyTestCaseStmt.java b/fe/src/main/java/org/apache/impala/analysis/CopyTestCaseStmt.java
index f6aba15a4..4caa54d1c 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CopyTestCaseStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CopyTestCaseStmt.java
@@ -25,10 +25,10 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.impala.authorization.Privilege;
+import org.apache.impala.catalog.FeCatalogUtils;
 import org.apache.impala.catalog.FeDb;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.FeView;
-import org.apache.impala.catalog.Table;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
@@ -38,7 +38,8 @@ import org.apache.impala.common.Pair;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TTestCaseData;
 import org.apache.impala.util.CompressionUtil;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -58,7 +59,7 @@ public class CopyTestCaseStmt extends StatementBase {
   // File name prefix of the testcase file for a given query statement.
   private static final String TEST_OUTPUT_FILE_PREFIX = "impala-testcase-data-";
 
-  private static final Logger LOG = Logger.getLogger(CopyTestCaseStmt.class);
+  private static final Logger LOG = LoggerFactory.getLogger(CopyTestCaseStmt.class);
 
   // QueryStmt for which the testcase should be created. Set to null if we are loading
   // an existing testcase.
@@ -158,7 +159,7 @@ public class CopyTestCaseStmt extends StatementBase {
    * views and databases which are then serialized into the TTestCaseData output val.
    */
   @VisibleForTesting
-  public TTestCaseData getTestCaseData() {
+  public TTestCaseData getTestCaseData() throws ImpalaException {
     Preconditions.checkState(queryStmt_.isAnalyzed());
     TTestCaseData result = new TTestCaseData(queryStmt_.getOrigSqlString(),
         hdfsPath_.getLocation(), BackendConfig.INSTANCE.getImpalaBuildVersion());
@@ -172,13 +173,7 @@ public class CopyTestCaseStmt extends StatementBase {
       result.addToDbs(db.toThrift());
     }
     for (FeTable table: referencedTbls) {
-      Preconditions.checkState(table instanceof FeTable);
-      ((Table) table).takeReadLock();
-      try {
-        result.addToTables_and_views(((Table) table).toThrift());
-      } finally {
-        ((Table) table).releaseReadLock();
-      }
+      result.addToTables_and_views(FeCatalogUtils.feTableToThrift(table));
     }
     return result;
   }
@@ -204,8 +199,10 @@ public class CopyTestCaseStmt extends StatementBase {
       throw new ImpalaRuntimeException(String.format("Error writing test case output to" +
           " file: %s", filePath), e);
     }
-    LOG.info(String.format(
-        "Created testcase file %s for query: %s", filePath, data.getQuery_stmt()));
+    LOG.info("Created testcase file {} which contains {} db(s), {} table(s)/view(s)" +
+            " for query: {}",
+        filePath, data.getDbsSize(), data.getTables_and_viewsSize(),
+        data.getQuery_stmt());
     return filePath.toString();
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java b/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
index 245a6ff12..6475199b4 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
@@ -39,13 +39,22 @@ import org.apache.impala.catalog.CatalogObject.ThriftObjectType;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.local.CatalogdMetaProvider;
 import org.apache.impala.catalog.local.LocalCatalog;
+import org.apache.impala.catalog.local.LocalFsTable;
+import org.apache.impala.catalog.local.LocalHbaseTable;
+import org.apache.impala.catalog.local.LocalIcebergTable;
+import org.apache.impala.catalog.local.LocalKuduTable;
+import org.apache.impala.catalog.local.LocalView;
 import org.apache.impala.catalog.local.MetaProvider;
+import org.apache.impala.common.ImpalaException;
+import org.apache.impala.common.NotImplementedException;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TColumnDescriptor;
 import org.apache.impala.thrift.TGetCatalogMetricsResult;
 import org.apache.impala.thrift.THdfsPartition;
+import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TTableStats;
+import org.apache.impala.thrift.TTableType;
 import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.MetaStoreUtil;
 import org.slf4j.Logger;
@@ -392,6 +401,54 @@ public abstract class FeCatalogUtils {
     return thriftHdfsPart;
   }
 
+  /**
+   * Returns the FULL thrift object for a FeTable. The result can be directly loaded into
+   * the catalog cache of catalogd. See CatalogOpExecutor#copyTestCaseData().
+   */
+  public static TTable feTableToThrift(FeTable table) throws ImpalaException {
+    if (table instanceof Table) return ((Table) table).toThrift();
+    // In local-catalog mode, coordinator caches the metadata in finer grained manner.
+    // Construct the thrift table using fine-grained APIs.
+    TTable res = new TTable(table.getDb().getName(), table.getName());
+    res.setTable_stats(table.getTTableStats());
+    res.setMetastore_table(table.getMetaStoreTable());
+    res.setClustering_columns(new ArrayList<>());
+    for (Column c : table.getClusteringColumns()) {
+      res.addToClustering_columns(c.toThrift());
+    }
+    res.setColumns(new ArrayList<>());
+    for (Column c : table.getNonClusteringColumns()) {
+      res.addToColumns(c.toThrift());
+    }
+    res.setVirtual_columns(new ArrayList<>());
+    for (VirtualColumn c : table.getVirtualColumns()) {
+      res.addToVirtual_columns(c.toThrift());
+    }
+    if (table instanceof LocalFsTable) {
+      res.setTable_type(TTableType.HDFS_TABLE);
+      res.setHdfs_table(((LocalFsTable) table).toTHdfsTable(
+          CatalogObject.ThriftObjectType.FULL));
+    } else if (table instanceof LocalKuduTable) {
+      res.setTable_type(TTableType.KUDU_TABLE);
+      res.setKudu_table(((LocalKuduTable) table).toTKuduTable());
+    } else if (table instanceof LocalHbaseTable) {
+      res.setTable_type(TTableType.HBASE_TABLE);
+      res.setHbase_table(FeHBaseTable.Util.getTHBaseTable((FeHBaseTable) table));
+    } else if (table instanceof LocalIcebergTable) {
+      res.setTable_type(TTableType.ICEBERG_TABLE);
+      LocalIcebergTable iceTable = (LocalIcebergTable) table;
+      res.setIceberg_table(FeIcebergTable.Utils.getTIcebergTable(iceTable));
+      res.setHdfs_table(iceTable.transfromToTHdfsTable(/*unused*/true));
+    } else if (table instanceof LocalView) {
+      res.setTable_type(TTableType.VIEW);
+      // Metadata of the view are stored in msTable. Nothing else need to add here.
+    } else {
+      throw new NotImplementedException("Unsupported type to export: " +
+          table.getClass());
+    }
+    return res;
+  }
+
   /**
    * Populates cache metrics in the input TGetCatalogMetricsResult object.
    * No-op if CatalogdMetaProvider is not the configured metadata provider.
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 2ada744f7..2b70bdeeb 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -629,7 +629,10 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
     // the table lock should already be held, and we want the toThrift() to be consistent
     // with the modification. So this check helps us identify places where the lock
     // acquisition is probably missing entirely.
-    if (!isLockedByCurrentThread()) {
+    // Note that we only need the lock in catalogd. In Impalad catalog cache there are no
+    // modification on the table object - we just replace the old object with new ones.
+    // So don't need this lock in Impalad.
+    if (!storedInImpaladCatalogCache_ && !isLockedByCurrentThread()) {
       throw new IllegalStateException(
           "Table.toThrift() called without holding the table lock: " +
               getFullName() + " " + getClass().getName());
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
index d5768511a..a60ba7c23 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
@@ -315,6 +315,19 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
   @Override
   public TTableDescriptor toThriftDescriptor(int tableId,
       Set<Long> referencedPartitions) {
+    TTableDescriptor tableDesc = new TTableDescriptor(tableId, TTableType.HDFS_TABLE,
+        FeCatalogUtils.getTColumnDescriptors(this),
+        getNumClusteringCols(), name_, db_.getName());
+    tableDesc.setHdfsTable(toTHdfsTable(referencedPartitions,
+        ThriftObjectType.DESCRIPTOR_ONLY));
+    return tableDesc;
+  }
+
+  public THdfsTable toTHdfsTable(ThriftObjectType type) {
+    return toTHdfsTable(null, type);
+  }
+
+  private THdfsTable toTHdfsTable(Set<Long> referencedPartitions, ThriftObjectType type) {
     if (referencedPartitions == null) {
       // null means "all partitions".
       referencedPartitions = getPartitionIds();
@@ -323,10 +336,11 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
     List<? extends FeFsPartition> partitions = loadPartitions(referencedPartitions);
     for (FeFsPartition partition : partitions) {
       idToPartition.put(partition.getId(),
-          FeCatalogUtils.fsPartitionToThrift(partition,
-              ThriftObjectType.DESCRIPTOR_ONLY));
+          FeCatalogUtils.fsPartitionToThrift(partition, type));
     }
 
+    // Prototype partition has no partition values and file descriptors etc.
+    // So we always use DESCRIPTOR_ONLY here.
     THdfsPartition tPrototypePartition = FeCatalogUtils.fsPartitionToThrift(
         createPrototypePartition(), ThriftObjectType.DESCRIPTOR_ONLY);
 
@@ -345,10 +359,6 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
     if (AcidUtils.isFullAcidTable(getMetaStoreTable().getParameters())) {
       hdfsTable.setIs_full_acid(true);
     }
-
-    TTableDescriptor tableDesc = new TTableDescriptor(tableId, TTableType.HDFS_TABLE,
-        FeCatalogUtils.getTColumnDescriptors(this),
-        getNumClusteringCols(), name_, db_.getName());
     // 'ref_' can be null when this table is the target of a CTAS statement.
     if (ref_ != null) {
       TValidWriteIdList validWriteIdList =
@@ -356,8 +366,11 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
       if (validWriteIdList != null) hdfsTable.setValid_write_ids(validWriteIdList);
       hdfsTable.setPartition_prefixes(ref_.getPartitionPrefixes());
     }
-    tableDesc.setHdfsTable(hdfsTable);
-    return tableDesc;
+    if (type == ThriftObjectType.FULL) {
+      hdfsTable.setNetwork_addresses(hostIndex_.getList());
+      hdfsTable.setSql_constraints(getSqlConstraints().toThrift());
+    }
+    return hdfsTable;
   }
 
   private static boolean isAvroFormat(Table msTbl) {
@@ -558,6 +571,7 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
    * Populate constraint information by making a request to MetaProvider.
    */
   private void loadConstraints() throws TException {
+    if (sqlConstraints_ != null) return;
     sqlConstraints_ = db_.getCatalog().getMetaProvider().loadConstraints(ref_, msTable_);
   }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java
index b2bd98a00..c82605bba 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java
@@ -213,6 +213,11 @@ public class LocalKuduTable extends LocalTable implements FeKuduTable {
         FeCatalogUtils.getTColumnDescriptors(this),
         getNumClusteringCols(),
         name_, db_.getName());
+    desc.setKuduTable(toTKuduTable());
+    return desc;
+  }
+
+  public TKuduTable toTKuduTable() {
     TKuduTable tbl = new TKuduTable();
     tbl.setIs_primary_key_unique(isPrimaryKeyUnique_);
     tbl.setHas_auto_incrementing(hasAutoIncrementingColumn_);
@@ -226,8 +231,7 @@ public class LocalKuduTable extends LocalTable implements FeKuduTable {
     for (KuduPartitionParam partitionParam: partitionBy_) {
       tbl.addToPartition_by(partitionParam.toThrift());
     }
-    desc.setKuduTable(tbl);
-    return desc;
+    return tbl;
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index ce4cc6f24..d115ea2d0 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -549,7 +549,8 @@ public class CatalogOpExecutor {
           break;
         case COPY_TESTCASE:
           catalogOpMetric_.increment(ddl_type, Optional.empty());
-          copyTestCaseData(ddlRequest.getCopy_test_case_params(), response);
+          copyTestCaseData(ddlRequest.getCopy_test_case_params(), response,
+              wantMinimalResult);
           break;
         default:
           catalogOpMetric_.increment(ddl_type, Optional.empty());
@@ -585,7 +586,7 @@ public class CatalogOpExecutor {
    */
   @VisibleForTesting
   public String copyTestCaseData(
-      TCopyTestCaseReq request, TDdlExecResponse response)
+      TCopyTestCaseReq request, TDdlExecResponse response, boolean wantMinimalResult)
       throws ImpalaException {
     Path inputPath = new Path(Preconditions.checkNotNull(request.input_path));
     // Read the data from the source FS.
@@ -624,7 +625,7 @@ public class CatalogOpExecutor {
         Db ret = catalog_.addDb(db.getName(), db.getMetaStoreDb());
         if (ret != null) {
           ++numDbsAdded;
-          response.result.addToUpdated_catalog_objects(db.toTCatalogObject());
+          addDbToCatalogUpdate(db, wantMinimalResult, response.result);
         }
       }
     }
@@ -651,7 +652,7 @@ public class CatalogOpExecutor {
         // to IMPALA-4092.
         t.takeReadLock();
         try {
-          response.result.addToUpdated_catalog_objects(t.toTCatalogObject());
+          addTableToCatalogUpdate(t, wantMinimalResult, response.result);
         } finally {
           t.releaseReadLock();
         }
diff --git a/fe/src/test/java/org/apache/impala/testutil/PlannerTestCaseLoader.java b/fe/src/test/java/org/apache/impala/testutil/PlannerTestCaseLoader.java
index 3ed7d59d2..311c18c5b 100644
--- a/fe/src/test/java/org/apache/impala/testutil/PlannerTestCaseLoader.java
+++ b/fe/src/test/java/org/apache/impala/testutil/PlannerTestCaseLoader.java
@@ -73,7 +73,7 @@ public class PlannerTestCaseLoader implements AutoCloseable {
    */
   public String loadTestCase(String testCasePath) throws Exception {
     String stmt = catalogOpExecutor_.copyTestCaseData(new TCopyTestCaseReq(testCasePath),
-        new TDdlExecResponse(new TCatalogUpdateResult()));
+        new TDdlExecResponse(new TCatalogUpdateResult()), /*wantMinimalResult*/false);
     TQueryCtx queryCtx = TestUtils.createQueryContext(
         new TQueryOptions().setPlanner_testcase_mode(true));
     queryCtx.client_request.setStmt(stmt);
diff --git a/tests/metadata/test_testcase_builder.py b/tests/metadata/test_testcase_builder.py
index a3d5d38a6..eea71a308 100644
--- a/tests/metadata/test_testcase_builder.py
+++ b/tests/metadata/test_testcase_builder.py
@@ -38,12 +38,28 @@ class TestTestcaseBuilder(ImpalaTestSuite):
       create_uncompressed_text_dimension(cls.get_workload()))
 
   def test_query_without_from(self):
+    self._test_export_and_import(0, 0, 0, "SELECT 5 * 20")
+
+  def test_query_with_tbls(self, unique_database):
+    """Verify the basic usage. Use a unique database so the import won't impact the
+    metadata used by other tests"""
+    self.client.execute(
+        "create table {0}.alltypes like functional.alltypes".format(unique_database))
+    self.client.execute(
+        "create view {0}.alltypes_view as select * from {0}.alltypes"
+        .format(unique_database))
+    # Test SELECT on a view. The view will be expanded and the underlying table will also
+    # be exported.
+    self._test_export_and_import(1, 1, 1,
+        "select count(*) from {0}.alltypes_view".format(unique_database))
+
+  def _test_export_and_import(self, num_dbs, num_tbls, num_views, query):
     tmp_path = get_fs_path("/tmp")
     # Make sure /tmp dir exists
     if not self.filesystem_client.exists(tmp_path):
       self.filesystem_client.make_dir(tmp_path)
     # Generate Testcase Data for query without table reference
-    testcase_generate_query = """COPY TESTCASE TO '%s' SELECT 5 * 20""" % tmp_path
+    testcase_generate_query = "COPY TESTCASE TO '%s' %s" % (tmp_path, query)
     result = self.execute_query_expect_success(self.client, testcase_generate_query)
     assert len(result.data) == 1, "Testcase builder wrong result: {0}".format(result.data)
 
@@ -57,7 +73,10 @@ class TestTestcaseBuilder(ImpalaTestSuite):
     try:
       # Test load testcase works
       testcase_load_query = "COPY TESTCASE FROM {0}".format(testcase_path)
-      self.execute_query_expect_success(self.client, testcase_load_query)
+      result = self.execute_query_expect_success(self.client, testcase_load_query)
+      expected_msg = "{0} db(s), {1} table(s) and {2} view(s) imported for query".format(
+          num_dbs, num_tbls, num_views)
+      assert expected_msg in result.get_data()
     finally:
       # Delete testcase file from tmp
       status = self.filesystem_client.delete_file_dir(hdfs_path)