You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2019/07/11 00:21:07 UTC

[impala] branch master updated: IMPALA-8732: Use a serialized descriptor table in TQueryCtx

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new b068c2c  IMPALA-8732: Use a serialized descriptor table in TQueryCtx
b068c2c is described below

commit b068c2c4e21ab302ca372f870eccfb2111752e34
Author: Joe McDonnell <jo...@cloudera.com>
AuthorDate: Sun Jun 30 22:22:02 2019 -0700

    IMPALA-8732: Use a serialized descriptor table in TQueryCtx
    
    In IMPALA-8732, there is contention in tcmalloc when sending the
    ExecQueryFInstances messages for a query referencing a large
    number of partitions. This is because each thread in the
    ExecEnv::exec_rpc_thread_pool_ is making a copy of the TQueryCtx,
    which contains the TDescriptorTable and a large map of THdfsPartition
    objects. Every thread in the exec_rpc_thread_pool_ is doing this
    simultaneously.
    
    The threads are copying this structure, but the TQueryCtx and its
    corresponding TDescriptorTable is the same across all messages for
    this query. Copying a large map of THdfsPartition objects is
    wasteful, especially considering that the coordinator does not
    need to access any of the information in TDescriptorTable before
    sending it out to executors.
    
    In future, the entire TQueryCtx can be serialized once and
    embedded in its own sidecar. This change is limited to
    TDescriptorTable to allow easier backports to older versions,
    as this codepath has been converted from Thrift to KRPC and
    a large amount of code has changed.
    
    This changes TQueryCtx to contain a TDescriptorTableSerialized,
    which is a binary blob containing the serialized form of
    TDescriptorTable. This is serialized in the frontend and
    passed directly through to executors. The old unserialized
    TDescriptorTable form is maintained to enable frontend
    planner tests (which use incomplete structures lacking some
    required fields and cannot be serialized).
    
    Testing:
     - Core and exhaustive tests pass
    
    Change-Id: I458aa62dd4d1e4e4a7b1869a604623a69f3b2d9a
    Reviewed-on: http://gerrit.cloudera.org:8080/13772
    Reviewed-by: Joe McDonnell <jo...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/coordinator.cc                      |  6 ++--
 be/src/runtime/data-stream-test.cc                 |  2 +-
 be/src/runtime/descriptors.cc                      | 23 +++++++++++--
 be/src/runtime/descriptors.h                       | 38 +++++++++++++++++-----
 be/src/runtime/query-state.cc                      |  4 +--
 be/src/testutil/desc-tbl-builder.cc                |  2 +-
 common/thrift/Descriptors.thrift                   |  7 ++++
 common/thrift/ImpalaInternalService.thrift         | 13 +++++++-
 .../apache/impala/analysis/DescriptorTable.java    | 10 ++++++
 .../java/org/apache/impala/service/Frontend.java   | 17 ++++++++--
 .../org/apache/impala/planner/PlannerTestBase.java | 18 ++++++----
 11 files changed, 113 insertions(+), 27 deletions(-)

diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 1ef3ca5..b1bfae3 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -580,8 +580,10 @@ Status Coordinator::FinalizeHdfsInsert() {
   Status return_status = UpdateExecState(Status::OK(), nullptr, FLAGS_hostname);
   if (return_status.ok()) {
     HdfsTableDescriptor* hdfs_table;
-    RETURN_IF_ERROR(DescriptorTbl::CreateHdfsTblDescriptor(query_ctx().desc_tbl,
-            finalize_params()->table_id, obj_pool(), &hdfs_table));
+    DCHECK(query_ctx().__isset.desc_tbl_serialized);
+    RETURN_IF_ERROR(DescriptorTbl::CreateHdfsTblDescriptor(
+            query_ctx().desc_tbl_serialized, finalize_params()->table_id, obj_pool(),
+            &hdfs_table));
     DCHECK(hdfs_table != nullptr)
         << "INSERT target table not known in descriptor table: "
         << finalize_params()->table_id;
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index 764070d..5d39c56 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -326,7 +326,7 @@ class DataStreamTest : public testing::Test {
     slot_desc.__set_nullIndicatorBit(-1);
     slot_desc.__set_slotIdx(0);
     thrift_desc_tbl.slotDescriptors.push_back(slot_desc);
-    EXPECT_OK(DescriptorTbl::Create(&obj_pool_, thrift_desc_tbl, &desc_tbl_));
+    EXPECT_OK(DescriptorTbl::CreateInternal(&obj_pool_, thrift_desc_tbl, &desc_tbl_));
 
     vector<TTupleId> row_tids;
     row_tids.push_back(0);
diff --git a/be/src/runtime/descriptors.cc b/be/src/runtime/descriptors.cc
index d3ea12c..0ed4519 100644
--- a/be/src/runtime/descriptors.cc
+++ b/be/src/runtime/descriptors.cc
@@ -32,6 +32,7 @@
 #include "exprs/scalar-expr-evaluator.h"
 #include "gen-cpp/Descriptors_types.h"
 #include "gen-cpp/PlanNodes_types.h"
+#include "rpc/thrift-util.h"
 #include "runtime/runtime-state.h"
 
 #include "common/names.h"
@@ -520,8 +521,19 @@ Status DescriptorTbl::CreatePartKeyExprs(
   return Status::OK();
 }
 
-Status DescriptorTbl::CreateHdfsTblDescriptor(const TDescriptorTable& thrift_tbl,
+Status DescriptorTbl::DeserializeThrift(const TDescriptorTableSerialized& serial_tbl,
+    TDescriptorTable* desc_tbl) {
+  uint32_t serial_tbl_len = serial_tbl.thrift_desc_tbl.length();
+  return DeserializeThriftMsg(
+      reinterpret_cast<const uint8_t*>(serial_tbl.thrift_desc_tbl.data()),
+      &serial_tbl_len, false, desc_tbl);
+}
+
+Status DescriptorTbl::CreateHdfsTblDescriptor(
+    const TDescriptorTableSerialized& serialized_thrift_tbl,
     TableId tbl_id, ObjectPool* pool, HdfsTableDescriptor** desc) {
+  TDescriptorTable thrift_tbl;
+  RETURN_IF_ERROR(DeserializeThrift(serialized_thrift_tbl, &thrift_tbl));
   for (const TTableDescriptor& tdesc: thrift_tbl.tableDescriptors) {
     if (tdesc.id == tbl_id) {
       DCHECK(tdesc.__isset.hdfsTable);
@@ -560,7 +572,14 @@ Status DescriptorTbl::CreateTblDescriptorInternal(const TTableDescriptor& tdesc,
   return Status::OK();
 }
 
-Status DescriptorTbl::Create(ObjectPool* pool, const TDescriptorTable& thrift_tbl,
+Status DescriptorTbl::Create(ObjectPool* pool,
+    const TDescriptorTableSerialized& serialized_thrift_tbl, DescriptorTbl** tbl) {
+  TDescriptorTable thrift_tbl;
+  RETURN_IF_ERROR(DeserializeThrift(serialized_thrift_tbl, &thrift_tbl));
+  return CreateInternal(pool, thrift_tbl, tbl);
+}
+
+Status DescriptorTbl::CreateInternal(ObjectPool* pool, const TDescriptorTable& thrift_tbl,
     DescriptorTbl** tbl) {
   *tbl = pool->Add(new DescriptorTbl());
   // deserialize table descriptors first, they are being referenced by tuple descriptors
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index 982ede3..fd70f3b 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -477,14 +477,17 @@ class TupleDescriptor {
 class DescriptorTbl {
  public:
   /// Creates an HdfsTableDescriptor (allocated in 'pool' and returned via 'desc') for
-  /// table with id 'table_id' within thrift_tbl. DCHECKs if no such descriptor is
-  /// present.
-  static Status CreateHdfsTblDescriptor(const TDescriptorTable& thrift_tbl,
-      TableId table_id, ObjectPool* pool, HdfsTableDescriptor** desc);
-
-  /// Creates a descriptor tbl within 'pool' from thrift_tbl and returns it via 'tbl'.
-  /// Returns OK on success, otherwise error (in which case 'tbl' will be unset).
-  static Status Create(ObjectPool* pool, const TDescriptorTable& thrift_tbl,
+  /// table with id 'table_id' within serialized_thrift_tbl. DCHECKs if no such
+  /// descriptor is present.
+  static Status CreateHdfsTblDescriptor(
+      const TDescriptorTableSerialized& serialized_thrift_tbl,
+      TableId table_id, ObjectPool* pool, HdfsTableDescriptor** desc) WARN_UNUSED_RESULT;
+
+  /// Creates a descriptor tbl within 'pool' from serialized_thrift_tbl and returns it
+  /// via 'tbl'. Returns OK on success, otherwise error (in which case 'tbl' will be
+  /// unset).
+  static Status Create(ObjectPool* pool,
+      const TDescriptorTableSerialized& serialized_thrift_tbl,
       DescriptorTbl** tbl) WARN_UNUSED_RESULT;
 
   /// Free memory allocated in Create().
@@ -500,6 +503,9 @@ class DescriptorTbl {
   std::string DebugString() const;
 
  private:
+  // The friend classes use CreateInternal().
+  friend class DescriptorTblBuilder;
+  friend class DataStreamTest;
   typedef std::unordered_map<TableId, TableDescriptor*> TableDescriptorMap;
   typedef std::unordered_map<TupleId, TupleDescriptor*> TupleDescriptorMap;
   typedef std::unordered_map<SlotId, SlotDescriptor*> SlotDescriptorMap;
@@ -513,10 +519,24 @@ class DescriptorTbl {
   static Status CreatePartKeyExprs(
       const HdfsTableDescriptor& hdfs_tbl, ObjectPool* pool) WARN_UNUSED_RESULT;
 
+  /// Converts a TDescriptorTableSerialized to a TDescriptorTable. Returns
+  /// an error if deserialization fails.
+  static Status DeserializeThrift(const TDescriptorTableSerialized& serial_tbl,
+      TDescriptorTable* desc_tbl) WARN_UNUSED_RESULT;
+
   /// Creates a TableDescriptor (allocated in 'pool', returned via 'desc')
   /// corresponding to tdesc. Returns error status on failure.
   static Status CreateTblDescriptorInternal(const TTableDescriptor& tdesc,
-    ObjectPool* pool, TableDescriptor** desc);
+    ObjectPool* pool, TableDescriptor** desc) WARN_UNUSED_RESULT;
+
+  /// Creates a descriptor tbl within 'pool' from thrift_tbl and returns it via 'tbl'.
+  /// Returns OK on success, otherwise error (in which case 'tbl' will be unset).
+  /// This is the same as Create(), except that it takes the deserialized thrift
+  /// structure. This is useful for tests that produce their own thrift structures,
+  /// as it avoids serialization and allows for incomplete thrift structures
+  /// that cannot be serialized.
+  static Status CreateInternal(ObjectPool* pool, const TDescriptorTable& thrift_tbl,
+      DescriptorTbl** tbl) WARN_UNUSED_RESULT;
 };
 
 /// Records positions of tuples within row produced by ExecNode. RowDescriptors are
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 44708fe..195e238 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -512,9 +512,9 @@ bool QueryState::StartFInstances() {
   int fragment_ctx_idx = 0;
 
   // set up desc tbl
-  DCHECK(query_ctx().__isset.desc_tbl);
+  DCHECK(query_ctx().__isset.desc_tbl_serialized);
   Status start_finstances_status =
-      DescriptorTbl::Create(&obj_pool_, query_ctx().desc_tbl, &desc_tbl_);
+      DescriptorTbl::Create(&obj_pool_, query_ctx().desc_tbl_serialized, &desc_tbl_);
   if (UNLIKELY(!start_finstances_status.ok())) goto error;
   VLOG(2) << "descriptor table for query=" << PrintId(query_id())
           << "\n" << desc_tbl_->DebugString();
diff --git a/be/src/testutil/desc-tbl-builder.cc b/be/src/testutil/desc-tbl-builder.cc
index 77be724..15d6055 100644
--- a/be/src/testutil/desc-tbl-builder.cc
+++ b/be/src/testutil/desc-tbl-builder.cc
@@ -61,7 +61,7 @@ DescriptorTbl* DescriptorTblBuilder::Build() {
   DCHECK(buildDescTblStatus.ok()) << buildDescTblStatus.GetDetail();
 
   DescriptorTbl* desc_tbl;
-  Status status = DescriptorTbl::Create(obj_pool_, thrift_desc_tbl_, &desc_tbl);
+  Status status = DescriptorTbl::CreateInternal(obj_pool_, thrift_desc_tbl_, &desc_tbl);
   DCHECK(status.ok()) << status.GetDetail();
   return desc_tbl;
 }
diff --git a/common/thrift/Descriptors.thrift b/common/thrift/Descriptors.thrift
index c3f1397..68c78bc 100644
--- a/common/thrift/Descriptors.thrift
+++ b/common/thrift/Descriptors.thrift
@@ -97,3 +97,10 @@ struct TDescriptorTable {
   // the fragment.
   3: optional list<TTableDescriptor> tableDescriptors
 }
+
+// Binary blob containing a serialized TDescriptorTable. See desc_tbl_* fields on
+// TQueryCtx for more context on when this is used.
+struct TDescriptorTableSerialized {
+  // TDescriptorTable serialized
+  1: required binary thrift_desc_tbl
+}
\ No newline at end of file
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 658735f..01fdb58 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -462,7 +462,18 @@ struct TQueryCtx {
   12: optional i64 snapshot_timestamp = -1;
 
   // Optional for frontend tests.
-  13: optional Descriptors.TDescriptorTable desc_tbl
+  // The descriptor table can be included in one of two forms:
+  //  - TDescriptorTable - standard Thrift object
+  //  - TDescriptorTableSerialized - binary blob with a serialized TDescriptorTable
+  // Normal end-to-end query execution uses the serialized form to avoid copying a large
+  // number of objects when sending RPCs. For this case, desc_tbl_serialized is set and
+  // desc_tbl_testonly is not set. See IMPALA-8732.
+  // Frontend tests cannot use the serialized form, because some frontend tests deal with
+  // incomplete structures (e.g. THdfsTable without the required nullPartitionKeyValue
+  // field) that cannot be serialized. In this case, desc_tbl_testonly is set and
+  // desc_tbl_serialized is not set. See Frontend.PlanCtx.serializeDescTbl_.
+  13: optional Descriptors.TDescriptorTable desc_tbl_testonly
+  24: optional Descriptors.TDescriptorTableSerialized desc_tbl_serialized
 
   // Milliseconds since UNIX epoch at the start of query execution.
   14: required i64 start_unix_millis
diff --git a/fe/src/main/java/org/apache/impala/analysis/DescriptorTable.java b/fe/src/main/java/org/apache/impala/analysis/DescriptorTable.java
index 61953d8..ef06c76 100644
--- a/fe/src/main/java/org/apache/impala/analysis/DescriptorTable.java
+++ b/fe/src/main/java/org/apache/impala/analysis/DescriptorTable.java
@@ -31,8 +31,11 @@ import org.apache.impala.catalog.StructField;
 import org.apache.impala.catalog.StructType;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.IdGenerator;
+import org.apache.impala.common.ImpalaException;
+import org.apache.impala.common.JniUtil;
 import org.apache.impala.thrift.TColumnType;
 import org.apache.impala.thrift.TDescriptorTable;
+import org.apache.impala.thrift.TDescriptorTableSerialized;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -198,6 +201,13 @@ public class DescriptorTable {
     return result;
   }
 
+  public TDescriptorTableSerialized toSerializedThrift() throws ImpalaException {
+    TDescriptorTableSerialized result = new TDescriptorTableSerialized();
+    TDescriptorTable desc_tbl = toThrift();
+    result.setThrift_desc_tbl(JniUtil.serializeToThrift(desc_tbl));
+    return result;
+  }
+
   public String debugString() {
     StringBuilder out = new StringBuilder();
     out.append("tuples:\n");
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index fbc5d36..5c4c8dc 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -190,6 +190,12 @@ public class Frontend {
     protected final StringBuilder explainBuf_;
     // Flag to indicate whether to capture (return) the plan.
     protected boolean capturePlan_;
+    // Flag to control whether the descriptor table is serialized. This defaults to
+    // true, but some frontend tests set it to false because they are operating on
+    // incomplete structures (e.g. THdfsTable without nullPartitionKeyValue) that cannot
+    // be serialized.
+    protected boolean serializeDescTbl_ = true;
+
     // The physical plan, divided by fragment, before conversion to
     // Thrift. For unit testing.
     protected List<PlanFragment> plan_;
@@ -209,6 +215,8 @@ public class Frontend {
      */
     public void requestPlanCapture() { capturePlan_ = true; }
     public boolean planCaptureRequested() { return capturePlan_; }
+    public void disableDescTblSerialization() { serializeDescTbl_ = false; }
+    public boolean serializeDescTbl() { return serializeDescTbl_; }
     public TQueryCtx getQueryContext() { return queryCtx_; }
 
     /**
@@ -1428,8 +1436,13 @@ public class Frontend {
     TQueryCtx queryCtx = planCtx.getQueryContext();
     Planner planner = new Planner(analysisResult, queryCtx, timeline);
     TQueryExecRequest queryExecRequest = createExecRequest(planner, planCtx);
-    queryCtx.setDesc_tbl(
-        planner.getAnalysisResult().getAnalyzer().getDescTbl().toThrift());
+    if (planCtx.serializeDescTbl()) {
+      queryCtx.setDesc_tbl_serialized(
+          planner.getAnalysisResult().getAnalyzer().getDescTbl().toSerializedThrift());
+    } else {
+      queryCtx.setDesc_tbl_testonly(
+          planner.getAnalysisResult().getAnalyzer().getDescTbl().toThrift());
+    }
     queryExecRequest.setQuery_ctx(queryCtx);
     queryExecRequest.setHost_list(analysisResult.getAnalyzer().getHostIndex().getList());
     return queryExecRequest;
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index 5823c54..c550771 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -160,8 +160,8 @@ public class PlannerTestBase extends FrontendTestBase {
         }
       }
     }
-    if (execRequest.query_ctx.isSetDesc_tbl()) {
-      TDescriptorTable descTbl = execRequest.query_ctx.desc_tbl;
+    if (execRequest.query_ctx.isSetDesc_tbl_testonly()) {
+      TDescriptorTable descTbl = execRequest.query_ctx.desc_tbl_testonly;
       for (TTupleDescriptor tupleDesc: descTbl.tupleDescriptors) {
         tupleMap_.put(tupleDesc.id, tupleDesc);
       }
@@ -241,9 +241,10 @@ public class PlannerTestBase extends FrontendTestBase {
     boolean first = true;
     // Iterate through all partitions of the descriptor table and verify all partitions
     // are referenced.
-    if (execRequest.query_ctx.isSetDesc_tbl()
-        && execRequest.query_ctx.desc_tbl.isSetTableDescriptors()) {
-      for (TTableDescriptor tableDesc: execRequest.query_ctx.desc_tbl.tableDescriptors) {
+    if (execRequest.query_ctx.isSetDesc_tbl_testonly()
+        && execRequest.query_ctx.desc_tbl_testonly.isSetTableDescriptors()) {
+      for (TTableDescriptor tableDesc:
+           execRequest.query_ctx.desc_tbl_testonly.tableDescriptors) {
         // All partitions of insertTableId are okay.
         if (tableDesc.getId() == insertTableId) continue;
         if (!tableDesc.isSetHdfsTable()) continue;
@@ -443,8 +444,8 @@ public class PlannerTestBase extends FrontendTestBase {
     if (request == null || !request.isSetQuery_exec_request()) return;
     TQueryExecRequest execRequest = request.query_exec_request;
     HashSet<Integer> seenTableIds = Sets.newHashSet();
-    if (execRequest.query_ctx.isSetDesc_tbl()) {
-      TDescriptorTable descTbl = execRequest.query_ctx.desc_tbl;
+    if (execRequest.query_ctx.isSetDesc_tbl_testonly()) {
+      TDescriptorTable descTbl = execRequest.query_ctx.desc_tbl_testonly;
       if (descTbl.isSetTableDescriptors()) {
         for (TTableDescriptor tableDesc: descTbl.tableDescriptors) {
           if (seenTableIds.contains(tableDesc.id)) {
@@ -516,6 +517,7 @@ public class PlannerTestBase extends FrontendTestBase {
     String explainStr = "";
     try {
       PlanCtx planCtx = new PlanCtx(queryCtx);
+      planCtx.disableDescTblSerialization();
       execRequest = frontend_.createExecRequest(planCtx);
       explainStr = planCtx.getExplainString();
     } catch (Exception e) {
@@ -574,6 +576,7 @@ public class PlannerTestBase extends FrontendTestBase {
     try {
       queryCtx.client_request.getQuery_options().setExplain_level(TExplainLevel.VERBOSE);
       PlanCtx planCtx = new PlanCtx(queryCtx);
+      planCtx.disableDescTblSerialization();
       execRequest = frontend_.createExecRequest(planCtx);
       explainStr = planCtx.getExplainString();
     } catch (ImpalaException e) {
@@ -693,6 +696,7 @@ public class PlannerTestBase extends FrontendTestBase {
         System.getProperty("user.name"));
     queryCtx.client_request.setStmt(query);
     PlanCtx planCtx = new PlanCtx(queryCtx);
+    planCtx.disableDescTblSerialization();
     TExecRequest execRequest = frontend_.createExecRequest(planCtx);
 
     if (!execRequest.isSetQuery_exec_request()