You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/27 05:01:17 UTC

[2/5] impala git commit: IMPALA-7340. Only send necessary fields in THdfsPartition

IMPALA-7340. Only send necessary fields in THdfsPartition

The THdfsPartition Thrift struct is used both for partition metadata
(sent from catalogd to coordinator) and for descriptors (sent from
coordinator frontend to backends). In the case of the descriptor,
not all fields are actually used.

This patch cleans up the Thrift struct definition to be more clear
which fields are used where, and changes the serialization code to
only fill in the necessary fields.

Change-Id: I97e8402efdfdeea06463bb71a40ebb6abd1f11f0
Reviewed-on: http://gerrit.cloudera.org:8080/11026
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/04ed7ac1
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/04ed7ac1
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/04ed7ac1

Branch: refs/heads/master
Commit: 04ed7ac132d37bff1c648a807059705ae5fac220
Parents: b0da2e0
Author: Todd Lipcon <to...@cloudera.com>
Authored: Wed Jul 18 16:53:04 2018 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Thu Jul 26 00:14:26 2018 +0000

----------------------------------------------------------------------
 common/thrift/CatalogObjects.thrift             | 51 +++++++++++++++-----
 .../apache/impala/catalog/CatalogObject.java    | 12 +++++
 .../apache/impala/catalog/FeCatalogUtils.java   | 28 ++++++-----
 .../org/apache/impala/catalog/HdfsTable.java    | 36 ++++++++------
 .../impala/catalog/local/LocalFsTable.java      |  6 +--
 5 files changed, 90 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/04ed7ac1/common/thrift/CatalogObjects.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift
index d463300..b73c5e6 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -241,18 +241,38 @@ struct THdfsPartitionLocation {
 // Represents an HDFS partition
 // TODO(vercegovac): rename to TFsPartition
 struct THdfsPartition {
+
+  // ============================================================
+  // Fields included in the "Descriptor" format sent to the backend
+  // as part of query plans and fragments.
+  // ============================================================
+
   1: required byte lineDelim
   2: required byte fieldDelim
   3: required byte collectionDelim
   4: required byte mapKeyDelim
   5: required byte escapeChar
   6: required THdfsFileFormat fileFormat
+
   // These are Literal expressions
   7: list<Exprs.TExpr> partitionKeyExprs
   8: required i32 blockSize
-  9: optional list<THdfsFileDesc> file_desc
+
   10: optional THdfsPartitionLocation location
 
+  // Unique (in this table) id of this partition. May be set to
+  // PROTOTYPE_PARTITION_ID when this object is used to describe
+  // a partition which will be created as part of a query.
+  14: optional i64 id
+
+
+  // ============================================================
+  // Fields only included when the catalogd serializes a table to be
+  // sent to the impalad as part of a catalog update.
+  // ============================================================
+
+  9: optional list<THdfsFileDesc> file_desc
+
   // The access level Impala has on this partition (READ_WRITE, READ_ONLY, etc).
   11: optional TAccessLevel access_level
 
@@ -263,10 +283,6 @@ struct THdfsPartition {
   // underlying data is cached).
   13: optional bool is_marked_cached
 
-  // Unique (in this table) id of this partition. If -1, the partition does not currently
-  // exist.
-  14: optional i64 id
-
   // (key,value) pairs stored in the Hive Metastore.
   15: optional map<string, string> hms_parameters
 
@@ -282,12 +298,17 @@ struct THdfsPartition {
   18: optional bool has_incremental_stats
 }
 
-// Constant partition ID used for THdfsPartition.prototype_partition above.
+// Constant partition ID used for THdfsPartition.prototype_partition below.
 // Must be < 0 to avoid collisions
 const i64 PROTOTYPE_PARTITION_ID = -1;
 
 
 struct THdfsTable {
+  // ============================================================
+  // Fields included in the "Descriptor" format sent to the backend
+  // as part of query plans and fragments.
+  // ============================================================
+
   1: required string hdfsBaseDir
 
   // Deprecated. Use TTableDescriptor.colNames.
@@ -303,22 +324,28 @@ struct THdfsTable {
   6: optional string avroSchema
 
   // Map from partition id to partition metadata.
-  // Does not include the special prototype partition -1 (see below).
+  // Does not include the special prototype partition with id=PROTOTYPE_PARTITION_ID --
+  // that partition is separately included below.
   4: required map<i64, THdfsPartition> partitions
 
   // Prototype partition, used when creating new partitions during insert.
   10: required THdfsPartition prototype_partition
 
-  // Each TNetworkAddress is a datanode which contains blocks of a file in the table.
-  // Used so that each THdfsFileBlock can just reference an index in this list rather
-  // than duplicate the list of network address, which helps reduce memory usage.
-  7: optional list<Types.TNetworkAddress> network_addresses
-
   // REMOVED: 8: optional bool multiple_filesystems
 
   // The prefixes of locations of partitions in this table. See THdfsPartitionLocation for
   // the description of how a prefix is computed.
   9: optional list<string> partition_prefixes
+
+  // ============================================================
+  // Fields only included when the catalogd serializes a table to be
+  // sent to the impalad as part of a catalog update.
+  // ============================================================
+
+  // Each TNetworkAddress is a datanode which contains blocks of a file in the table.
+  // Used so that each THdfsFileBlock can just reference an index in this list rather
+  // than duplicate the list of network address, which helps reduce memory usage.
+  7: optional list<Types.TNetworkAddress> network_addresses
 }
 
 struct THBaseTable {

http://git-wip-us.apache.org/repos/asf/impala/blob/04ed7ac1/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java b/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java
index 34310c3..78b8368 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java
@@ -23,6 +23,18 @@ import org.apache.impala.thrift.TCatalogObjectType;
  * Interface that all catalog objects implement.
  */
 public interface CatalogObject extends HasName {
+
+  /**
+   * Catalog objects are often serialized to Thrift. When doing so, many of the
+   * objects have a minimal "descriptor" form used in query execution as well as
+   * a more complete "full" form with all information, used when transferring the
+   * catalog object from catalogd to the impalads.
+   */
+  static enum ThriftObjectType {
+    FULL,
+    DESCRIPTOR_ONLY
+  };
+
   // Returns the TCatalogObject type of this Catalog object.
   public TCatalogObjectType getCatalogObjectType();
 

http://git-wip-us.apache.org/repos/asf/impala/blob/04ed7ac1/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java b/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
index 12b152f..4f1d68d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
@@ -31,6 +31,7 @@ import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.analysis.NullLiteral;
 import org.apache.impala.analysis.PartitionKeyValue;
 import org.apache.impala.analysis.ToSqlUtils;
+import org.apache.impala.catalog.CatalogObject.ThriftObjectType;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.thrift.TColumnDescriptor;
 import org.apache.impala.thrift.THdfsPartition;
@@ -322,7 +323,7 @@ public abstract class FeCatalogUtils {
   }
 
   public static THdfsPartition fsPartitionToThrift(FeFsPartition part,
-      boolean includeFileDesc, boolean includeIncrementalStats) {
+      ThriftObjectType type, boolean includeIncrementalStats) {
     HdfsStorageDescriptor sd = part.getInputFormatDescriptor();
     THdfsPartition thriftHdfsPart = new THdfsPartition(
         sd.getLineDelim(),
@@ -333,19 +334,20 @@ public abstract class FeCatalogUtils {
         sd.getFileFormat().toThrift(),
         Expr.treesToThrift(part.getPartitionValues()),
         sd.getBlockSize());
-    thriftHdfsPart.setLocation(part.getLocationAsThrift());
-    thriftHdfsPart.setStats(new TTableStats(part.getNumRows()));
-    thriftHdfsPart.setAccess_level(part.getAccessLevel());
-    thriftHdfsPart.setIs_marked_cached(part.isMarkedCached());
     thriftHdfsPart.setId(part.getId());
-    thriftHdfsPart.setHas_incremental_stats(part.hasIncrementalStats());
-    // IMPALA-4902: Shallow-clone the map to avoid concurrent modifications. One thread
-    // may try to serialize the returned THdfsPartition after releasing the table's lock,
-    // and another thread doing DDL may modify the map.
-    thriftHdfsPart.setHms_parameters(Maps.newHashMap(
-        includeIncrementalStats ? part.getParameters() :
-          part.getFilteredHmsParameters()));
-    if (includeFileDesc) {
+    thriftHdfsPart.setLocation(part.getLocationAsThrift());
+    if (type == ThriftObjectType.FULL) {
+      thriftHdfsPart.setStats(new TTableStats(part.getNumRows()));
+      thriftHdfsPart.setAccess_level(part.getAccessLevel());
+      thriftHdfsPart.setIs_marked_cached(part.isMarkedCached());
+      thriftHdfsPart.setHas_incremental_stats(part.hasIncrementalStats());
+      // IMPALA-4902: Shallow-clone the map to avoid concurrent modifications. One thread
+      // may try to serialize the returned THdfsPartition after releasing the table's lock,
+      // and another thread doing DDL may modify the map.
+      thriftHdfsPart.setHms_parameters(Maps.newHashMap(
+          includeIncrementalStats ? part.getParameters() :
+            part.getFilteredHmsParameters()));
+
       // Add block location information
       long numBlocks = 0;
       long totalFileBytes = 0;

http://git-wip-us.apache.org/repos/asf/impala/blob/04ed7ac1/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 0995086..4ae1f9b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -1722,7 +1722,8 @@ public class HdfsTable extends Table implements FeFsTable {
     // need any information below the THdfsPartition level.
     TTableDescriptor tableDesc = new TTableDescriptor(tableId, TTableType.HDFS_TABLE,
         getTColumnDescriptors(), numClusteringCols_, name_, db_.getName());
-    tableDesc.setHdfsTable(getTHdfsTable(false, referencedPartitions));
+    tableDesc.setHdfsTable(getTHdfsTable(ThriftObjectType.DESCRIPTOR_ONLY,
+        referencedPartitions));
     return tableDesc;
   }
 
@@ -1731,25 +1732,30 @@ public class HdfsTable extends Table implements FeFsTable {
     // Send all metadata between the catalog service and the FE.
     TTable table = super.toThrift();
     table.setTable_type(TTableType.HDFS_TABLE);
-    table.setHdfs_table(getTHdfsTable(true, null));
+    table.setHdfs_table(getTHdfsTable(ThriftObjectType.FULL, null));
     return table;
   }
 
   /**
-   * Create a THdfsTable corresponding to this HdfsTable. If includeFileDesc is true,
-   * then then all partitions and THdfsFileDescs of each partition should be included.
-   * Otherwise, don't include any THdfsFileDescs, and include only those partitions in
-   * the refPartitions set (the backend doesn't need metadata for unreferenced
-   * partitions). To prevent the catalog from hitting an OOM error while trying to
+   * Create a THdfsTable corresponding to this HdfsTable. If serializing the "FULL"
+   * information, then then all partitions and THdfsFileDescs of each partition should be
+   * included. Otherwise, don't include any THdfsFileDescs, and include only those
+   * partitions in the refPartitions set (the backend doesn't need metadata for
+   * unreferenced partitions). In addition, metadata that is not used by the backend will
+   * be omitted.
+   *
+   * To prevent the catalog from hitting an OOM error while trying to
    * serialize large partition incremental stats, we estimate the stats size and filter
    * the incremental stats data from partition objects if the estimate exceeds
    * --inc_stats_size_limit_bytes. This function also collects storage related statistics
    *  (e.g. number of blocks, files, etc) in order to compute an estimate of the metadata
    *  size of this table.
    */
-  private THdfsTable getTHdfsTable(boolean includeFileDesc, Set<Long> refPartitions) {
-    // includeFileDesc implies all partitions should be included (refPartitions == null).
-    Preconditions.checkState(!includeFileDesc || refPartitions == null);
+  private THdfsTable getTHdfsTable(ThriftObjectType type, Set<Long> refPartitions) {
+    if (type == ThriftObjectType.FULL) {
+      // "full" implies all partitions should be included.
+      Preconditions.checkArgument(refPartitions == null);
+    }
     long memUsageEstimate = 0;
     int numPartitions =
         (refPartitions == null) ? partitionMap_.values().size() : refPartitions.size();
@@ -1764,13 +1770,13 @@ public class HdfsTable extends Table implements FeFsTable {
       long id = partition.getId();
       if (refPartitions == null || refPartitions.contains(id)) {
         THdfsPartition tHdfsPartition = FeCatalogUtils.fsPartitionToThrift(
-            partition, includeFileDesc, includeIncrementalStats);
+            partition, type, includeIncrementalStats);
         if (tHdfsPartition.isSetHas_incremental_stats() &&
             tHdfsPartition.isHas_incremental_stats()) {
           memUsageEstimate += getColumns().size() * STATS_SIZE_PER_COLUMN_BYTES;
           hasIncrementalStats_ = true;
         }
-        if (includeFileDesc) {
+        if (type == ThriftObjectType.FULL) {
           Preconditions.checkState(tHdfsPartition.isSetNum_blocks() &&
               tHdfsPartition.isSetTotal_file_size_bytes());
           stats.numBlocks += tHdfsPartition.getNum_blocks();
@@ -1781,10 +1787,10 @@ public class HdfsTable extends Table implements FeFsTable {
         idToPartition.put(id, tHdfsPartition);
       }
     }
-    if (includeFileDesc) fileMetadataStats_.set(stats);
+    if (type == ThriftObjectType.FULL) fileMetadataStats_.set(stats);
 
     THdfsPartition prototypePartition = FeCatalogUtils.fsPartitionToThrift(
-        prototypePartition_, false, false);
+        prototypePartition_, ThriftObjectType.DESCRIPTOR_ONLY, false);
 
     memUsageEstimate += fileMetadataStats_.numFiles * PER_FD_MEM_USAGE_BYTES +
         fileMetadataStats_.numBlocks * PER_BLOCK_MEM_USAGE_BYTES;
@@ -1792,7 +1798,7 @@ public class HdfsTable extends Table implements FeFsTable {
     THdfsTable hdfsTable = new THdfsTable(hdfsBaseDir_, getColumnNames(),
         nullPartitionKeyValue_, nullColumnValue_, idToPartition, prototypePartition);
     hdfsTable.setAvroSchema(avroSchema_);
-    if (includeFileDesc) {
+    if (type == ThriftObjectType.FULL) {
       // Network addresses are used only by THdfsFileBlocks which are inside
       // THdfsFileDesc, so include network addreses only when including THdfsFileDesc.
       hdfsTable.setNetwork_addresses(hostIndex_.getList());

http://git-wip-us.apache.org/repos/asf/impala/blob/04ed7ac1/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
index 35044e3..82af240 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.analysis.NullLiteral;
 import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.CatalogObject.ThriftObjectType;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.ColumnStats;
 import org.apache.impala.catalog.FeCatalogUtils;
@@ -229,13 +230,12 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
     for (FeFsPartition partition : partitions) {
       idToPartition.put(partition.getId(),
           FeCatalogUtils.fsPartitionToThrift(partition,
-              /*includeFileDesc=*/false,
+              ThriftObjectType.DESCRIPTOR_ONLY,
               /*includeIncrementalStats=*/false));
     }
 
     THdfsPartition tPrototypePartition = FeCatalogUtils.fsPartitionToThrift(
-        createPrototypePartition(),
-        /*includeFileDesc=*/false,
+        createPrototypePartition(), ThriftObjectType.DESCRIPTOR_ONLY,
         /*includeIncrementalStats=*/false);
 
     // TODO(todd): implement avro schema support