You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2020/04/23 02:46:07 UTC

[impala] 03/03: IMPALA-9685: Fix ACID tables missing row__id column in LocalCatalog mode

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 1ab580a0917bba96643c5915dbb2cde21b5e457b
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Wed Apr 22 19:44:57 2020 +0800

    IMPALA-9685: Fix ACID tables missing row__id column in LocalCatalog mode
    
    In catalog server, when extracting the schema of a full ACID table from
    the HMS table meta, we insert the synthetic "row__id" column. We need to
    do the same thing in LocalCatalog mode coordinators, because they don't
    fetch the loaded schema from catalog server. Instead, they fetch the HMS
    table meta and extract the schema again.
    
    Tests:
     - Run test_acid.py in local catalog mode.
     - Add a canary test in test_local_catalog.py. It can be removed once we
       enable local catalog mode by default.
    
    Change-Id: Ieb11e520325908e2e33a257568e15637d66c9901
    Reviewed-on: http://gerrit.cloudera.org:8080/15783
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../java/org/apache/impala/catalog/FeCatalogUtils.java | 11 ++++++++---
 .../main/java/org/apache/impala/catalog/HdfsTable.java |  8 +-------
 .../org/apache/impala/catalog/local/LocalFsTable.java  |  4 ++++
 .../apache/impala/catalog/local/LocalHbaseTable.java   |  2 +-
 .../apache/impala/catalog/local/LocalKuduTable.java    |  6 ++++--
 .../org/apache/impala/catalog/local/LocalTable.java    | 18 ++++++++++--------
 fe/src/main/java/org/apache/impala/util/AcidUtils.java | 14 ++++++++++++++
 tests/custom_cluster/test_local_catalog.py             | 15 +++++++++++++++
 8 files changed, 57 insertions(+), 21 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java b/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
index bfe245e..f88beb6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
@@ -45,6 +45,7 @@ import org.apache.impala.thrift.TColumnDescriptor;
 import org.apache.impala.thrift.TGetCatalogMetricsResult;
 import org.apache.impala.thrift.THdfsPartition;
 import org.apache.impala.thrift.TTableStats;
+import org.apache.impala.util.AcidUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -95,11 +96,15 @@ public abstract class FeCatalogUtils {
    * @throws TableLoadingException if any type is invalid
    */
   public static ImmutableList<Column> fieldSchemasToColumns(
-      Iterable<FieldSchema> fieldSchemas,
-      String tableName) throws TableLoadingException {
+      List<FieldSchema> partCols, List<FieldSchema> nonPartCols,
+      String tableName, boolean isFullAcidTable) throws TableLoadingException {
     int pos = 0;
     ImmutableList.Builder<Column> ret = ImmutableList.builder();
-    for (FieldSchema s : fieldSchemas) {
+    for (FieldSchema s : Iterables.concat(partCols, nonPartCols)) {
+      if (isFullAcidTable && pos == partCols.size()) {
+        ret.add(AcidUtils.getRowIdColumnType(pos));
+        ++pos;
+      }
       Type type = parseColumnType(s, tableName);
       ret.add(new Column(s.getName(), type, s.getComment(), pos));
       ++pos;
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 02f8c90..2928d96 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -504,13 +504,7 @@ public class HdfsTable extends Table implements FeFsTable {
    */
   private void addColumnsForFullAcidTable(List<FieldSchema> fieldSchemas)
       throws TableLoadingException {
-    StructType row__id = new StructType();
-    row__id.addField(new StructField("operation", ScalarType.INT, ""));
-    row__id.addField(new StructField("originaltransaction", ScalarType.BIGINT, ""));
-    row__id.addField(new StructField("bucket", ScalarType.INT, ""));
-    row__id.addField(new StructField("rowid", ScalarType.BIGINT, ""));
-    row__id.addField(new StructField("currenttransaction", ScalarType.BIGINT, ""));
-    addColumn(new Column("row__id", row__id, "", colsByPos_.size()));
+    addColumn(AcidUtils.getRowIdColumnType(colsByPos_.size()));
     addColumnsFromFieldSchemas(fieldSchemas);
   }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
index b464f47..8321a28 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
@@ -59,6 +59,7 @@ import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TResultSet;
 import org.apache.impala.thrift.TTableDescriptor;
 import org.apache.impala.thrift.TTableType;
+import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.AvroSchemaConverter;
 import org.apache.impala.util.AvroSchemaUtils;
 import org.apache.impala.util.ListMap;
@@ -319,6 +320,9 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
       hdfsTable.setAvroSchema(AvroSchemaConverter.convertFieldSchemas(
           getMetaStoreTable().getSd().getCols(), getFullName()).toString());
     }
+    if (AcidUtils.isFullAcidTable(getMetaStoreTable().getParameters())) {
+      hdfsTable.setIs_full_acid(true);
+    }
 
     TTableDescriptor tableDesc = new TTableDescriptor(tableId, TTableType.HDFS_TABLE,
         FeCatalogUtils.getTColumnDescriptors(this),
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalHbaseTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalHbaseTable.java
index ca88a9f..06a597a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalHbaseTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalHbaseTable.java
@@ -57,7 +57,7 @@ public class LocalHbaseTable extends LocalTable implements FeHBaseTable {
       // since we don't support composite hbase rowkeys yet, all hbase tables have a
       // single clustering col
       ColumnMap cmap = new ColumnMap(Util.loadColumns(msTable), 1,
-          msTable.getDbName() + "." + msTable.getTableName());
+          msTable.getDbName() + "." + msTable.getTableName(), /*isFullAcidSchema=*/false);
       return new LocalHbaseTable(db, msTable, ref, cmap);
     } catch (IOException | MetaException | SerDeException e) {
       throw new LocalCatalogException(e);
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java
index 418d009..f764b30 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java
@@ -84,7 +84,8 @@ public class LocalKuduTable extends LocalTable implements FeKuduTable {
 
     List<KuduPartitionParam> partitionBy = Utils.loadPartitionByParams(kuduTable);
 
-    ColumnMap cmap = new ColumnMap(cols, /*numClusteringCols=*/0, fullTableName);
+    ColumnMap cmap = new ColumnMap(cols, /*numClusteringCols=*/0, fullTableName,
+        /*isFullAcidSchema=*/false);
     return new LocalKuduTable(db, msTable, ref, cmap, pkNames, partitionBy);
   }
 
@@ -106,7 +107,8 @@ public class LocalKuduTable extends LocalTable implements FeKuduTable {
       pkNames.add(pkColDef.getColName());
     }
 
-    ColumnMap cmap = new ColumnMap(columns, /*numClusteringCols=*/0, fullTableName);
+    ColumnMap cmap = new ColumnMap(columns, /*numClusteringCols=*/0, fullTableName,
+        /*isFullAcidSchema=*/false);
 
     return new LocalKuduTable(db, msTable, /*ref=*/null, cmap, pkNames,
         kuduPartitionParams);
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
index ccb05dc..6a2891b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
@@ -43,13 +43,13 @@ import org.apache.impala.catalog.local.MetaProvider.TableMetaRef;
 import org.apache.impala.common.Pair;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TTableStats;
+import org.apache.impala.util.AcidUtils;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
 /**
@@ -296,6 +296,7 @@ abstract class LocalTable implements FeTable {
     private final ImmutableMap<String, Column> colsByName_;
 
     private final int numClusteringCols_;
+    private final boolean hasRowIdCol_;
 
     public static ColumnMap fromMsTable(Table msTbl) {
       final String fullName = msTbl.getDbName() + "." + msTbl.getTableName();
@@ -306,18 +307,18 @@ abstract class LocalTable implements FeTable {
       // then all other columns.
       List<Column> cols;
       try {
-        cols = FeCatalogUtils.fieldSchemasToColumns(
-            Iterables.concat(msTbl.getPartitionKeys(),
-                             msTbl.getSd().getCols()),
-            msTbl.getTableName());
-        return new ColumnMap(cols, numClusteringCols, fullName);
+        boolean isFullAcidTable = AcidUtils.isFullAcidTable(msTbl.getParameters());
+        cols = FeCatalogUtils.fieldSchemasToColumns(msTbl.getPartitionKeys(),
+            msTbl.getSd().getCols(), msTbl.getTableName(), isFullAcidTable);
+        return new ColumnMap(cols, numClusteringCols, fullName, isFullAcidTable);
       } catch (TableLoadingException e) {
         throw new LocalCatalogException(e);
       }
     }
 
     public ColumnMap(List<Column> cols, int numClusteringCols,
-        String fullTableName) {
+        String fullTableName, boolean isFullAcidSchema) {
+      hasRowIdCol_ = isFullAcidSchema;
       this.colsByPos_ = ImmutableList.copyOf(cols);
       this.numClusteringCols_ = numClusteringCols;
       colsByName_ = indexColumnNames(colsByPos_);
@@ -347,7 +348,8 @@ abstract class LocalTable implements FeTable {
 
 
     public List<Column> getNonClusteringColumns() {
-      return colsByPos_.subList(numClusteringCols_, colsByPos_.size());
+      return colsByPos_.subList(numClusteringCols_ + (hasRowIdCol_ ? 1 : 0),
+          colsByPos_.size());
     }
 
     public List<Column> getClusteringColumns() {
diff --git a/fe/src/main/java/org/apache/impala/util/AcidUtils.java b/fe/src/main/java/org/apache/impala/util/AcidUtils.java
index 5ef8b1a..075383e 100644
--- a/fe/src/main/java/org/apache/impala/util/AcidUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/AcidUtils.java
@@ -25,7 +25,11 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FileMetadataLoader.LoadStats;
+import org.apache.impala.catalog.ScalarType;
+import org.apache.impala.catalog.StructField;
+import org.apache.impala.catalog.StructType;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TTransactionalType;
@@ -121,6 +125,16 @@ public class AcidUtils {
     return isTransactionalTable(props) && !isInsertOnlyTable(props);
   }
 
+  public static Column getRowIdColumnType(int position) {
+    StructType row__id = new StructType();
+    row__id.addField(new StructField("operation", ScalarType.INT, ""));
+    row__id.addField(new StructField("originaltransaction", ScalarType.BIGINT, ""));
+    row__id.addField(new StructField("bucket", ScalarType.INT, ""));
+    row__id.addField(new StructField("rowid", ScalarType.BIGINT, ""));
+    row__id.addField(new StructField("currenttransaction", ScalarType.BIGINT, ""));
+    return new Column("row__id", row__id, "", position);
+  }
+
   // Sets transaction related table properties for new tables based on manually
   // set table properties and default transactional type.
   public static void setTransactionalProperties(Map<String, String> props,
diff --git a/tests/custom_cluster/test_local_catalog.py b/tests/custom_cluster/test_local_catalog.py
index 64b0eb8..7f04328 100644
--- a/tests/custom_cluster/test_local_catalog.py
+++ b/tests/custom_cluster/test_local_catalog.py
@@ -26,6 +26,7 @@ import time
 from multiprocessing.pool import ThreadPool
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.skip import SkipIfHive2
 from tests.util.filesystem_utils import WAREHOUSE
 
 RETRY_PROFILE_MSG = 'Retried query planning due to inconsistent metadata'
@@ -454,3 +455,17 @@ class TestObservability(CustomClusterTestSuite):
           cache_request_count_prev_run = cache_request_count
     finally:
       client.close()
+
+
+class TestFullAcid(CustomClusterTestSuite):
+  @SkipIfHive2.acid
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args="--use_local_catalog=true",
+    catalogd_args="--catalog_topic_mode=minimal")
+  def test_full_acid_support(self):
+    """IMPALA-9685: canary test for full acid support in local catalog"""
+    self.execute_query("show create table functional_orc_def.alltypestiny")
+    res = self.execute_query("select id from functional_orc_def.alltypestiny")
+    res.data.sort()
+    assert res.data == ['0', '1', '2', '3', '4', '5', '6', '7']