You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2023/08/02 23:29:33 UTC

[impala] branch master updated (c84221369 -> 8638255e5)

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


    from c84221369 IMPALA-12298: Improve incremental load of Iceberg tables
     new b510e437d IMPALA-12326: Add WaitForLocalServer in StatestoreSubscriber::Start
     new 2a8374d7e IMPALA-11901: Support COPY TESTCASE in local catalog mode
     new 8638255e5 IMPALA-12327: Iceberg V2 operator wrong results in PARTITIONED mode

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/exec/iceberg-delete-node.cc                 | 27 +++++++---
 be/src/statestore/statestore-subscriber.cc         |  2 +
 .../apache/impala/analysis/CopyTestCaseStmt.java   | 23 ++++-----
 .../org/apache/impala/catalog/FeCatalogUtils.java  | 57 ++++++++++++++++++++++
 .../main/java/org/apache/impala/catalog/Table.java |  5 +-
 .../apache/impala/catalog/local/LocalFsTable.java  | 30 +++++++++---
 .../impala/catalog/local/LocalKuduTable.java       |  8 ++-
 .../apache/impala/service/CatalogOpExecutor.java   |  9 ++--
 .../impala/testutil/PlannerTestCaseLoader.java     |  2 +-
 .../functional/functional_schema_template.sql      |  4 +-
 .../iceberg-v2-read-position-deletes.test          | 15 ++++++
 tests/metadata/test_testcase_builder.py            | 23 ++++++++-
 12 files changed, 165 insertions(+), 40 deletions(-)


[impala] 02/03: IMPALA-11901: Support COPY TESTCASE in local catalog mode

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 2a8374d7eb17592e6280b15f9adb046a25f2fb85
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Fri Jun 23 10:20:48 2023 +0800

    IMPALA-11901: Support COPY TESTCASE in local catalog mode
    
    COPY TESTCASE TO statement is used to export the metadata required for
    planning a given query. It will export catalog objects cached on the
    coordinator side to a file. Users can transfer the metadata file to
    another debug cluster and import it by the COPY TESTCASE FROM statement.
    The statement will import the metadata to the new catalogd, including
    the topology of the original cluster. So an EXPLAIN on the same query
    can produce the same query plan for debugging frontend issues.
    
    However, the COPY TESTCASE statements are not supported in LocalCatalog
    mode. In the legacy catalog mode, coordinator has the same catalog
    representation as catalogd, i.e. coordinator just mirrors all the
    metadata from catalogd. So the export is simple - we can dump the
    metadata from coordinator side and they can be ingested into catalogd.
    However, in the local catalog mode, coordinator caches the metadata in a
    finer grained manner which differs a lot to the metadata in catalogd.
    To export the metadata that can be imported in catalogd, we need some
    transformation to convert them into catalog objects used in catalogd.
    
    This patch uses the fine-grained metadata to construct the full thrift
    objects of THdfsTable. So coordinator can export them in the same way as
    we did in the legacy catalog mode.
    
    Tests:
     - We don't have two clusters in our dev env to verify the real use
       cases. Add sanity tests for exporting and importing testcase
       metadata.
    
    Change-Id: I02c1c76d7af15f28bdbc8d98d92a1553570e9e27
    Reviewed-on: http://gerrit.cloudera.org:8080/20110
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../apache/impala/analysis/CopyTestCaseStmt.java   | 23 ++++-----
 .../org/apache/impala/catalog/FeCatalogUtils.java  | 57 ++++++++++++++++++++++
 .../main/java/org/apache/impala/catalog/Table.java |  5 +-
 .../apache/impala/catalog/local/LocalFsTable.java  | 30 +++++++++---
 .../impala/catalog/local/LocalKuduTable.java       |  8 ++-
 .../apache/impala/service/CatalogOpExecutor.java   |  9 ++--
 .../impala/testutil/PlannerTestCaseLoader.java     |  2 +-
 tests/metadata/test_testcase_builder.py            | 23 ++++++++-
 8 files changed, 126 insertions(+), 31 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/CopyTestCaseStmt.java b/fe/src/main/java/org/apache/impala/analysis/CopyTestCaseStmt.java
index f6aba15a4..4caa54d1c 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CopyTestCaseStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CopyTestCaseStmt.java
@@ -25,10 +25,10 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.impala.authorization.Privilege;
+import org.apache.impala.catalog.FeCatalogUtils;
 import org.apache.impala.catalog.FeDb;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.FeView;
-import org.apache.impala.catalog.Table;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
@@ -38,7 +38,8 @@ import org.apache.impala.common.Pair;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TTestCaseData;
 import org.apache.impala.util.CompressionUtil;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -58,7 +59,7 @@ public class CopyTestCaseStmt extends StatementBase {
   // File name prefix of the testcase file for a given query statement.
   private static final String TEST_OUTPUT_FILE_PREFIX = "impala-testcase-data-";
 
-  private static final Logger LOG = Logger.getLogger(CopyTestCaseStmt.class);
+  private static final Logger LOG = LoggerFactory.getLogger(CopyTestCaseStmt.class);
 
   // QueryStmt for which the testcase should be created. Set to null if we are loading
   // an existing testcase.
@@ -158,7 +159,7 @@ public class CopyTestCaseStmt extends StatementBase {
    * views and databases which are then serialized into the TTestCaseData output val.
    */
   @VisibleForTesting
-  public TTestCaseData getTestCaseData() {
+  public TTestCaseData getTestCaseData() throws ImpalaException {
     Preconditions.checkState(queryStmt_.isAnalyzed());
     TTestCaseData result = new TTestCaseData(queryStmt_.getOrigSqlString(),
         hdfsPath_.getLocation(), BackendConfig.INSTANCE.getImpalaBuildVersion());
@@ -172,13 +173,7 @@ public class CopyTestCaseStmt extends StatementBase {
       result.addToDbs(db.toThrift());
     }
     for (FeTable table: referencedTbls) {
-      Preconditions.checkState(table instanceof FeTable);
-      ((Table) table).takeReadLock();
-      try {
-        result.addToTables_and_views(((Table) table).toThrift());
-      } finally {
-        ((Table) table).releaseReadLock();
-      }
+      result.addToTables_and_views(FeCatalogUtils.feTableToThrift(table));
     }
     return result;
   }
@@ -204,8 +199,10 @@ public class CopyTestCaseStmt extends StatementBase {
       throw new ImpalaRuntimeException(String.format("Error writing test case output to" +
           " file: %s", filePath), e);
     }
-    LOG.info(String.format(
-        "Created testcase file %s for query: %s", filePath, data.getQuery_stmt()));
+    LOG.info("Created testcase file {} which contains {} db(s), {} table(s)/view(s)" +
+            " for query: {}",
+        filePath, data.getDbsSize(), data.getTables_and_viewsSize(),
+        data.getQuery_stmt());
     return filePath.toString();
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java b/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
index 245a6ff12..6475199b4 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
@@ -39,13 +39,22 @@ import org.apache.impala.catalog.CatalogObject.ThriftObjectType;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.local.CatalogdMetaProvider;
 import org.apache.impala.catalog.local.LocalCatalog;
+import org.apache.impala.catalog.local.LocalFsTable;
+import org.apache.impala.catalog.local.LocalHbaseTable;
+import org.apache.impala.catalog.local.LocalIcebergTable;
+import org.apache.impala.catalog.local.LocalKuduTable;
+import org.apache.impala.catalog.local.LocalView;
 import org.apache.impala.catalog.local.MetaProvider;
+import org.apache.impala.common.ImpalaException;
+import org.apache.impala.common.NotImplementedException;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TColumnDescriptor;
 import org.apache.impala.thrift.TGetCatalogMetricsResult;
 import org.apache.impala.thrift.THdfsPartition;
+import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TTableStats;
+import org.apache.impala.thrift.TTableType;
 import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.MetaStoreUtil;
 import org.slf4j.Logger;
@@ -392,6 +401,54 @@ public abstract class FeCatalogUtils {
     return thriftHdfsPart;
   }
 
+  /**
+   * Returns the FULL thrift object for a FeTable. The result can be directly loaded into
+   * the catalog cache of catalogd. See CatalogOpExecutor#copyTestCaseData().
+   */
+  public static TTable feTableToThrift(FeTable table) throws ImpalaException {
+    if (table instanceof Table) return ((Table) table).toThrift();
+    // In local-catalog mode, coordinator caches the metadata in finer grained manner.
+    // Construct the thrift table using fine-grained APIs.
+    TTable res = new TTable(table.getDb().getName(), table.getName());
+    res.setTable_stats(table.getTTableStats());
+    res.setMetastore_table(table.getMetaStoreTable());
+    res.setClustering_columns(new ArrayList<>());
+    for (Column c : table.getClusteringColumns()) {
+      res.addToClustering_columns(c.toThrift());
+    }
+    res.setColumns(new ArrayList<>());
+    for (Column c : table.getNonClusteringColumns()) {
+      res.addToColumns(c.toThrift());
+    }
+    res.setVirtual_columns(new ArrayList<>());
+    for (VirtualColumn c : table.getVirtualColumns()) {
+      res.addToVirtual_columns(c.toThrift());
+    }
+    if (table instanceof LocalFsTable) {
+      res.setTable_type(TTableType.HDFS_TABLE);
+      res.setHdfs_table(((LocalFsTable) table).toTHdfsTable(
+          CatalogObject.ThriftObjectType.FULL));
+    } else if (table instanceof LocalKuduTable) {
+      res.setTable_type(TTableType.KUDU_TABLE);
+      res.setKudu_table(((LocalKuduTable) table).toTKuduTable());
+    } else if (table instanceof LocalHbaseTable) {
+      res.setTable_type(TTableType.HBASE_TABLE);
+      res.setHbase_table(FeHBaseTable.Util.getTHBaseTable((FeHBaseTable) table));
+    } else if (table instanceof LocalIcebergTable) {
+      res.setTable_type(TTableType.ICEBERG_TABLE);
+      LocalIcebergTable iceTable = (LocalIcebergTable) table;
+      res.setIceberg_table(FeIcebergTable.Utils.getTIcebergTable(iceTable));
+      res.setHdfs_table(iceTable.transfromToTHdfsTable(/*unused*/true));
+    } else if (table instanceof LocalView) {
+      res.setTable_type(TTableType.VIEW);
+      // Metadata of the view are stored in msTable. Nothing else need to add here.
+    } else {
+      throw new NotImplementedException("Unsupported type to export: " +
+          table.getClass());
+    }
+    return res;
+  }
+
   /**
    * Populates cache metrics in the input TGetCatalogMetricsResult object.
    * No-op if CatalogdMetaProvider is not the configured metadata provider.
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 2ada744f7..2b70bdeeb 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -629,7 +629,10 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
     // the table lock should already be held, and we want the toThrift() to be consistent
     // with the modification. So this check helps us identify places where the lock
     // acquisition is probably missing entirely.
-    if (!isLockedByCurrentThread()) {
+    // Note that we only need the lock in catalogd. In Impalad catalog cache there are no
+    // modification on the table object - we just replace the old object with new ones.
+    // So don't need this lock in Impalad.
+    if (!storedInImpaladCatalogCache_ && !isLockedByCurrentThread()) {
       throw new IllegalStateException(
           "Table.toThrift() called without holding the table lock: " +
               getFullName() + " " + getClass().getName());
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
index d5768511a..a60ba7c23 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
@@ -315,6 +315,19 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
   @Override
   public TTableDescriptor toThriftDescriptor(int tableId,
       Set<Long> referencedPartitions) {
+    TTableDescriptor tableDesc = new TTableDescriptor(tableId, TTableType.HDFS_TABLE,
+        FeCatalogUtils.getTColumnDescriptors(this),
+        getNumClusteringCols(), name_, db_.getName());
+    tableDesc.setHdfsTable(toTHdfsTable(referencedPartitions,
+        ThriftObjectType.DESCRIPTOR_ONLY));
+    return tableDesc;
+  }
+
+  public THdfsTable toTHdfsTable(ThriftObjectType type) {
+    return toTHdfsTable(null, type);
+  }
+
+  private THdfsTable toTHdfsTable(Set<Long> referencedPartitions, ThriftObjectType type) {
     if (referencedPartitions == null) {
       // null means "all partitions".
       referencedPartitions = getPartitionIds();
@@ -323,10 +336,11 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
     List<? extends FeFsPartition> partitions = loadPartitions(referencedPartitions);
     for (FeFsPartition partition : partitions) {
       idToPartition.put(partition.getId(),
-          FeCatalogUtils.fsPartitionToThrift(partition,
-              ThriftObjectType.DESCRIPTOR_ONLY));
+          FeCatalogUtils.fsPartitionToThrift(partition, type));
     }
 
+    // Prototype partition has no partition values and file descriptors etc.
+    // So we always use DESCRIPTOR_ONLY here.
     THdfsPartition tPrototypePartition = FeCatalogUtils.fsPartitionToThrift(
         createPrototypePartition(), ThriftObjectType.DESCRIPTOR_ONLY);
 
@@ -345,10 +359,6 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
     if (AcidUtils.isFullAcidTable(getMetaStoreTable().getParameters())) {
       hdfsTable.setIs_full_acid(true);
     }
-
-    TTableDescriptor tableDesc = new TTableDescriptor(tableId, TTableType.HDFS_TABLE,
-        FeCatalogUtils.getTColumnDescriptors(this),
-        getNumClusteringCols(), name_, db_.getName());
     // 'ref_' can be null when this table is the target of a CTAS statement.
     if (ref_ != null) {
       TValidWriteIdList validWriteIdList =
@@ -356,8 +366,11 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
       if (validWriteIdList != null) hdfsTable.setValid_write_ids(validWriteIdList);
       hdfsTable.setPartition_prefixes(ref_.getPartitionPrefixes());
     }
-    tableDesc.setHdfsTable(hdfsTable);
-    return tableDesc;
+    if (type == ThriftObjectType.FULL) {
+      hdfsTable.setNetwork_addresses(hostIndex_.getList());
+      hdfsTable.setSql_constraints(getSqlConstraints().toThrift());
+    }
+    return hdfsTable;
   }
 
   private static boolean isAvroFormat(Table msTbl) {
@@ -558,6 +571,7 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
    * Populate constraint information by making a request to MetaProvider.
    */
   private void loadConstraints() throws TException {
+    if (sqlConstraints_ != null) return;
     sqlConstraints_ = db_.getCatalog().getMetaProvider().loadConstraints(ref_, msTable_);
   }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java
index b2bd98a00..c82605bba 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java
@@ -213,6 +213,11 @@ public class LocalKuduTable extends LocalTable implements FeKuduTable {
         FeCatalogUtils.getTColumnDescriptors(this),
         getNumClusteringCols(),
         name_, db_.getName());
+    desc.setKuduTable(toTKuduTable());
+    return desc;
+  }
+
+  public TKuduTable toTKuduTable() {
     TKuduTable tbl = new TKuduTable();
     tbl.setIs_primary_key_unique(isPrimaryKeyUnique_);
     tbl.setHas_auto_incrementing(hasAutoIncrementingColumn_);
@@ -226,8 +231,7 @@ public class LocalKuduTable extends LocalTable implements FeKuduTable {
     for (KuduPartitionParam partitionParam: partitionBy_) {
       tbl.addToPartition_by(partitionParam.toThrift());
     }
-    desc.setKuduTable(tbl);
-    return desc;
+    return tbl;
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index ce4cc6f24..d115ea2d0 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -549,7 +549,8 @@ public class CatalogOpExecutor {
           break;
         case COPY_TESTCASE:
           catalogOpMetric_.increment(ddl_type, Optional.empty());
-          copyTestCaseData(ddlRequest.getCopy_test_case_params(), response);
+          copyTestCaseData(ddlRequest.getCopy_test_case_params(), response,
+              wantMinimalResult);
           break;
         default:
           catalogOpMetric_.increment(ddl_type, Optional.empty());
@@ -585,7 +586,7 @@ public class CatalogOpExecutor {
    */
   @VisibleForTesting
   public String copyTestCaseData(
-      TCopyTestCaseReq request, TDdlExecResponse response)
+      TCopyTestCaseReq request, TDdlExecResponse response, boolean wantMinimalResult)
       throws ImpalaException {
     Path inputPath = new Path(Preconditions.checkNotNull(request.input_path));
     // Read the data from the source FS.
@@ -624,7 +625,7 @@ public class CatalogOpExecutor {
         Db ret = catalog_.addDb(db.getName(), db.getMetaStoreDb());
         if (ret != null) {
           ++numDbsAdded;
-          response.result.addToUpdated_catalog_objects(db.toTCatalogObject());
+          addDbToCatalogUpdate(db, wantMinimalResult, response.result);
         }
       }
     }
@@ -651,7 +652,7 @@ public class CatalogOpExecutor {
         // to IMPALA-4092.
         t.takeReadLock();
         try {
-          response.result.addToUpdated_catalog_objects(t.toTCatalogObject());
+          addTableToCatalogUpdate(t, wantMinimalResult, response.result);
         } finally {
           t.releaseReadLock();
         }
diff --git a/fe/src/test/java/org/apache/impala/testutil/PlannerTestCaseLoader.java b/fe/src/test/java/org/apache/impala/testutil/PlannerTestCaseLoader.java
index 3ed7d59d2..311c18c5b 100644
--- a/fe/src/test/java/org/apache/impala/testutil/PlannerTestCaseLoader.java
+++ b/fe/src/test/java/org/apache/impala/testutil/PlannerTestCaseLoader.java
@@ -73,7 +73,7 @@ public class PlannerTestCaseLoader implements AutoCloseable {
    */
   public String loadTestCase(String testCasePath) throws Exception {
     String stmt = catalogOpExecutor_.copyTestCaseData(new TCopyTestCaseReq(testCasePath),
-        new TDdlExecResponse(new TCatalogUpdateResult()));
+        new TDdlExecResponse(new TCatalogUpdateResult()), /*wantMinimalResult*/false);
     TQueryCtx queryCtx = TestUtils.createQueryContext(
         new TQueryOptions().setPlanner_testcase_mode(true));
     queryCtx.client_request.setStmt(stmt);
diff --git a/tests/metadata/test_testcase_builder.py b/tests/metadata/test_testcase_builder.py
index a3d5d38a6..eea71a308 100644
--- a/tests/metadata/test_testcase_builder.py
+++ b/tests/metadata/test_testcase_builder.py
@@ -38,12 +38,28 @@ class TestTestcaseBuilder(ImpalaTestSuite):
       create_uncompressed_text_dimension(cls.get_workload()))
 
   def test_query_without_from(self):
+    self._test_export_and_import(0, 0, 0, "SELECT 5 * 20")
+
+  def test_query_with_tbls(self, unique_database):
+    """Verify the basic usage. Use a unique database so the import won't impact the
+    metadata used by other tests"""
+    self.client.execute(
+        "create table {0}.alltypes like functional.alltypes".format(unique_database))
+    self.client.execute(
+        "create view {0}.alltypes_view as select * from {0}.alltypes"
+        .format(unique_database))
+    # Test SELECT on a view. The view will be expanded and the underlying table will also
+    # be exported.
+    self._test_export_and_import(1, 1, 1,
+        "select count(*) from {0}.alltypes_view".format(unique_database))
+
+  def _test_export_and_import(self, num_dbs, num_tbls, num_views, query):
     tmp_path = get_fs_path("/tmp")
     # Make sure /tmp dir exists
     if not self.filesystem_client.exists(tmp_path):
       self.filesystem_client.make_dir(tmp_path)
     # Generate Testcase Data for query without table reference
-    testcase_generate_query = """COPY TESTCASE TO '%s' SELECT 5 * 20""" % tmp_path
+    testcase_generate_query = "COPY TESTCASE TO '%s' %s" % (tmp_path, query)
     result = self.execute_query_expect_success(self.client, testcase_generate_query)
     assert len(result.data) == 1, "Testcase builder wrong result: {0}".format(result.data)
 
@@ -57,7 +73,10 @@ class TestTestcaseBuilder(ImpalaTestSuite):
     try:
       # Test load testcase works
       testcase_load_query = "COPY TESTCASE FROM {0}".format(testcase_path)
-      self.execute_query_expect_success(self.client, testcase_load_query)
+      result = self.execute_query_expect_success(self.client, testcase_load_query)
+      expected_msg = "{0} db(s), {1} table(s) and {2} view(s) imported for query".format(
+          num_dbs, num_tbls, num_views)
+      assert expected_msg in result.get_data()
     finally:
       # Delete testcase file from tmp
       status = self.filesystem_client.delete_file_dir(hdfs_path)


[impala] 03/03: IMPALA-12327: Iceberg V2 operator wrong results in PARTITIONED mode

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 8638255e5074f1342dfc452bca39f649a76612d6
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Tue Aug 1 14:57:34 2023 +0200

    IMPALA-12327: Iceberg V2 operator wrong results in PARTITIONED mode
    
    The Iceberg delete node tries to do mini merge-joins between data
    records and delete records. This works in BROADCAST mode, and most of
    the time in PARTITIONED mode as well. Though the Iceberg delete node had
    the wrong assumption that if the rows in a row batch belong to the same
    file, and come in ascending order, we rely on the previous delete
    updating IcebergDeleteState to the next deleted row id and skip the
    binary search if it's greater than or equal to the current probe row id.
    
    When PARTITIONED mode is used, we cannot rely on ascending row order,
    not even inside row batches, not even when the previous file path is the
    same as the current one. This is because files with multiple blocks can
    be processed by multiple hosts in parallel, then the rows are getting
    hash-exchanged based on their file paths. Then the exchange-receiver at
    the LHS coalesces the row batches from multiple senders, hence the row
    IDs being unordered.
    
    This patch adds a fix to ignore presumptions and do a binary search when
    the position-based difference between the current row and previous row
    is not one, and we are in PARTITIONED mode.
    
    Tests:
     * added e2e tests
    
    Change-Id: Ib89a53e812af8c3b8ec5bc27bca0a50dcac5d924
    Reviewed-on: http://gerrit.cloudera.org:8080/20295
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/iceberg-delete-node.cc                 | 27 +++++++++++++++++-----
 .../functional/functional_schema_template.sql      |  4 +---
 .../iceberg-v2-read-position-deletes.test          | 15 ++++++++++++
 3 files changed, 37 insertions(+), 9 deletions(-)

diff --git a/be/src/exec/iceberg-delete-node.cc b/be/src/exec/iceberg-delete-node.cc
index 04f024ca0..668181f13 100644
--- a/be/src/exec/iceberg-delete-node.cc
+++ b/be/src/exec/iceberg-delete-node.cc
@@ -324,15 +324,17 @@ void IcebergDeleteNode::IcebergDeleteState::UpdateImpl() {
 }
 
 void IcebergDeleteNode::IcebergDeleteState::Update(
-    impala::StringValue* file_path, int64_t* probe_pos) {
+    impala::StringValue* file_path, int64_t* next_probe_pos) {
   DCHECK(builder_ != nullptr);
   // Making sure the row ids are in ascending order inside a row batch in broadcast mode
   DCHECK(builder_->IsDistributedMode() || current_probe_pos_ == INVALID_ROW_ID
-      || current_probe_pos_ < *probe_pos);
-  DCHECK(!builder_->IsDistributedMode() || previous_file_path_ == nullptr
-      || *file_path != *previous_file_path_ || current_probe_pos_ == INVALID_ROW_ID
-      || current_probe_pos_ < *probe_pos);
-  current_probe_pos_ = *probe_pos;
+      || current_probe_pos_ < *next_probe_pos);
+  bool is_consecutive_pos = false;
+  if(current_probe_pos_ != INVALID_ROW_ID) {
+    const int64_t step = *next_probe_pos - current_probe_pos_;
+    is_consecutive_pos = step == 1;
+  }
+  current_probe_pos_ = *next_probe_pos;
 
   if (previous_file_path_ != nullptr
       && (!builder_->IsDistributedMode() || *file_path == *previous_file_path_)) {
@@ -340,6 +342,19 @@ void IcebergDeleteNode::IcebergDeleteState::Update(
     if (current_deleted_pos_row_id_ != INVALID_ROW_ID
         && current_probe_pos_ > (*current_delete_row_)[current_deleted_pos_row_id_]) {
       UpdateImpl();
+    } else if (builder_->IsDistributedMode() && !is_consecutive_pos) {
+      // In distributed mode (which means PARTITIONED JOIN distribution mode) we cannot
+      // rely on ascending row order, not even inside row batches, not even when the
+      // previous file path is the same as the current one.
+      // This is because files with multiple blocks can be processed by multiple hosts
+      // in parallel, then the rows are getting hash-exchanged based on their file paths.
+      // Then the exchange-receiver at the LHS coalesces the row batches from multiple
+      // senders, hence the row IDs getting unordered. So we are always doing a binary
+      // search here to find the proper delete row id.
+      // This won't be a problem with the DIRECTED distribution mode (see IMPALA-12308)
+      // which will behave similarly to the BROADCAST mode in this regard.
+      DCHECK_EQ(*file_path, *previous_file_path_);
+      UpdateImpl();
     }
   } else {
     auto it = builder_->deleted_rows().find(*file_path);
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index 3a5e5b076..7a1e255a2 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -3694,7 +3694,6 @@ ALTER TABLE {db_name}{db_suffix}.{table_name} SET TBLPROPERTIES('write.format.de
 INSERT INTO TABLE {db_name}{db_suffix}.{table_name} values(2, 'orc', 1.5, false);
 ALTER TABLE {db_name}{db_suffix}.{table_name} SET TBLPROPERTIES('write.format.default'='parquet');
 INSERT INTO TABLE {db_name}{db_suffix}.{table_name} values(3, 'parquet', 2.5, false);
-
 ====
 ---- DATASET
 functional
@@ -3709,8 +3708,7 @@ TBLPROPERTIES('iceberg.catalog'='hadoop.catalog',
               'format-version'='2');
 ---- DEPENDENT_LOAD
 `hadoop fs -mkdir -p /test-warehouse/iceberg_test/hadoop_catalog/ice && \
-hadoop fs -put -f ${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock /test-warehouse/iceberg_test/hadoop_catalog/ice
-
+hadoop fs -Ddfs.block.size=524288 -put -f ${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock /test-warehouse/iceberg_test/hadoop_catalog/ice
 ====
 ---- DATASET
 functional
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test
index c40a968ef..4742defee 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test
@@ -672,6 +672,21 @@ SELECT count(*) from iceberg_lineitem_multiblock;
 bigint
 ====
 ---- QUERY
+select count(*) from iceberg_lineitem_multiblock where l_linenumber%5=0;
+---- RESULTS
+0
+---- TYPES
+bigint
+====
+---- QUERY
+SET BATCH_SIZE=2;
+select count(*) from iceberg_lineitem_multiblock where l_linenumber%5=0;
+---- RESULTS
+0
+---- TYPES
+bigint
+====
+---- QUERY
 SELECT * from iceberg_v2_partitioned_position_deletes;
 ---- RESULTS
 6,'Alex','view',2020-01-01 09:00:00


[impala] 01/03: IMPALA-12326: Add WaitForLocalServer in StatestoreSubscriber::Start

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b510e437de454debd4560cea340f9f1fd8af5e5a
Author: Riza Suminto <ri...@cloudera.com>
AuthorDate: Mon Jul 31 16:04:53 2023 -0700

    IMPALA-12326: Add WaitForLocalServer in StatestoreSubscriber::Start
    
    The Impala daemons start the statestore subscriber service before the
    Thrift heartbeat rpc service is ready. As a result, there is a small
    window where statestore could try to connect with Impala daemons, but
    the rpc service isn't ready and so statestore logs get filled with
    thrift timeout errors.
    
    This patch add WaitForLocalServer in StatestoreSubscriber::Start. The
    wait time is hardcoded to 10 second (10x retries of 1 second period).
    
    Testing:
    - Pass core tests.
    
    Change-Id: Iec73e9d0517df2292270e436c1579ad6ddc90558
    Reviewed-on: http://gerrit.cloudera.org:8080/20297
    Reviewed-by: Wenzhe Zhou <wz...@cloudera.com>
    Reviewed-by: Andrew Sherman <as...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/statestore/statestore-subscriber.cc | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/be/src/statestore/statestore-subscriber.cc b/be/src/statestore/statestore-subscriber.cc
index f0f9f3ce5..2ef1ac5ee 100644
--- a/be/src/statestore/statestore-subscriber.cc
+++ b/be/src/statestore/statestore-subscriber.cc
@@ -238,6 +238,8 @@ Status StatestoreSubscriber::Start() {
   RETURN_IF_ERROR(builder.Build(&server));
   heartbeat_server_.reset(server);
   RETURN_IF_ERROR(heartbeat_server_->Start());
+  RETURN_IF_ERROR(WaitForLocalServer(
+      *heartbeat_server_, /* num_retries */ 10, /* retry_interval_ms */ 1000));
 
   // Specify the port which the heartbeat server is listening on.
   heartbeat_address_.port = heartbeat_server_->port();