You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/08/02 00:42:17 UTC

[impala] branch master updated (ea3f073 -> da2999a)

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from ea3f073  IMPALA-9943,IMPALA-4974: INTERSECT/EXCEPT [DISTINCT]
     new 9d43cfd  IMPALA-5746: Cancel all queries scheduled by failed coordinators
     new da2999a  IMPALA-9903: Reduce Kudu openTable calls per query

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/exec/kudu-scan-node-base.cc                 | 11 +---
 be/src/exec/kudu-scan-node-base.h                  |  8 ++-
 be/src/exec/kudu-scanner.cc                        | 20 +++---
 be/src/runtime/coordinator-backend-state.cc        |  1 +
 be/src/runtime/exec-env.cc                         | 29 ++++++---
 be/src/runtime/query-exec-mgr.cc                   | 71 +++++++++++++++++++++-
 be/src/runtime/query-exec-mgr.h                    | 50 ++++++++++++++-
 be/src/runtime/query-state.cc                      |  9 ++-
 be/src/runtime/query-state.h                       | 10 +++
 be/src/runtime/test-env.cc                         |  2 +
 bin/impala-config.sh                               |  4 +-
 common/protobuf/control_service.proto              |  3 +
 .../java/org/apache/impala/analysis/Analyzer.java  | 39 ++++++++++++
 .../org/apache/impala/catalog/FeKuduTable.java     | 11 ++--
 .../impala/catalog/local/LocalKuduTable.java       | 21 +++++--
 .../org/apache/impala/planner/KuduScanNode.java    |  4 +-
 tests/custom_cluster/test_process_failures.py      | 34 +++++++++++
 17 files changed, 281 insertions(+), 46 deletions(-)


[impala] 02/02: IMPALA-9903: Reduce Kudu openTable calls per query

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit da2999afd9ddc45d35141649d17db507e03ee9bf
Author: Grant Henke <gh...@cloudera.com>
AuthorDate: Mon Jun 29 08:58:35 2020 -0500

    IMPALA-9903: Reduce Kudu openTable calls per query
    
    This patch reduces the number of Kudu openTable calls for the
    lifetime of a query by storing the KuduTable object in the
    Analyzer GlobalState and using it in the KuduScanNode.
    
    It does not cache the KuduTable object longer than a single
    query, does not impact DDL statements, and does not
    introduce the need to invalidate metadata when interacting with
    Kudu tables.
    
    Additionally, this patch adjusts the backend scanner to use the
    KuduTable instance from the KuduScanner instead of using
    openTable to get a new instance.
    
    Reducing the number of openTable calls is important because each
    call results in a GetTableSchema RPC to the remote leader Kudu
    master. With very high rates of queries against Kudu tables this
    can overload the master leading to degraded query performance.
    
    In manual testing this patched reduced the Kudu GetTableSchema
    RPC calls to the master from 5 per query to 1 per query.
    
    Change-Id: Iec12a5be9b30e19a123142af5453a91bd4300b63
    Reviewed-on: http://gerrit.cloudera.org:8080/16120
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/kudu-scan-node-base.cc                 | 11 ++----
 be/src/exec/kudu-scan-node-base.h                  |  8 +++--
 be/src/exec/kudu-scanner.cc                        | 20 +++++------
 bin/impala-config.sh                               |  4 +--
 .../java/org/apache/impala/analysis/Analyzer.java  | 39 ++++++++++++++++++++++
 .../org/apache/impala/catalog/FeKuduTable.java     | 11 +++---
 .../impala/catalog/local/LocalKuduTable.java       | 21 +++++++++---
 .../org/apache/impala/planner/KuduScanNode.java    |  4 ++-
 8 files changed, 86 insertions(+), 32 deletions(-)

diff --git a/be/src/exec/kudu-scan-node-base.cc b/be/src/exec/kudu-scan-node-base.cc
index 0de04de..2f82200 100644
--- a/be/src/exec/kudu-scan-node-base.cc
+++ b/be/src/exec/kudu-scan-node-base.cc
@@ -77,6 +77,7 @@ Status KuduScanNodeBase::Prepare(RuntimeState* state) {
 
   DCHECK(state->desc_tbl().GetTupleDescriptor(tuple_id_) != NULL);
   tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tuple_id_);
+  table_desc_ = static_cast<const KuduTableDescriptor*>(tuple_desc_->table_desc());
 
   // Initialize the list of scan tokens to process from the ScanRangeParamsPB.
   DCHECK(scan_range_params_ != NULL);
@@ -97,21 +98,15 @@ Status KuduScanNodeBase::Open(RuntimeState* state) {
   RETURN_IF_ERROR(QueryMaintenance(state));
   SCOPED_TIMER(runtime_profile_->total_time_counter());
 
-  const KuduTableDescriptor* table_desc =
-      static_cast<const KuduTableDescriptor*>(tuple_desc_->table_desc());
-
   RETURN_IF_ERROR(ExecEnv::GetInstance()->GetKuduClient(
-      table_desc->kudu_master_addresses(), &client_));
+      table_desc_->kudu_master_addresses(), &client_));
 
   uint64_t latest_ts = static_cast<uint64_t>(
       max<int64_t>(0, state->query_ctx().session.kudu_latest_observed_ts));
   VLOG_RPC << "Latest observed Kudu timestamp: " << latest_ts;
   if (latest_ts > 0) client_->SetLatestObservedTimestamp(latest_ts);
 
-  KUDU_RETURN_IF_ERROR(client_->OpenTable(table_desc->table_name(), &table_),
-      "Unable to open Kudu table");
-
-  runtime_profile_->AddInfoString("Table Name", table_desc->fully_qualified_name());
+  runtime_profile_->AddInfoString("Table Name", table_desc_->fully_qualified_name());
   if (filter_ctxs_.size() > 0) WaitForRuntimeFilters();
   return Status::OK();
 }
diff --git a/be/src/exec/kudu-scan-node-base.h b/be/src/exec/kudu-scan-node-base.h
index 768e904..1a8c0ca 100644
--- a/be/src/exec/kudu-scan-node-base.h
+++ b/be/src/exec/kudu-scan-node-base.h
@@ -62,6 +62,8 @@ class KuduScanNodeBase : public ScanNode {
 
   const TupleDescriptor* tuple_desc() const { return tuple_desc_; }
 
+  const KuduTableDescriptor* table_desc() const { return table_desc_; }
+
  private:
   friend class KuduScanner;
 
@@ -71,13 +73,13 @@ class KuduScanNodeBase : public ScanNode {
   /// Descriptor of tuples read from Kudu table.
   const TupleDescriptor* tuple_desc_ = nullptr;
 
+  /// Descriptor of the Kudu table.
+  const KuduTableDescriptor* table_desc_ = nullptr;
+
   /// Pointer to the KuduClient, which is stored on the QueryState and shared between
   /// scanners and fragment instances.
   kudu::client::KuduClient* client_ = nullptr;
 
-  /// Kudu table reference. Shared between scanner threads for KuduScanNode.
-  kudu::client::sp::shared_ptr<kudu::client::KuduTable> table_;
-
   /// Set of scan tokens to be deserialized into Kudu scanners.
   std::vector<std::string> scan_tokens_;
 
diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc
index bf72c48..348ecd6 100644
--- a/be/src/exec/kudu-scanner.cc
+++ b/be/src/exec/kudu-scanner.cc
@@ -214,7 +214,7 @@ Status KuduScanner::OpenNextScanToken(const string& scan_token, bool* eos) {
   VLOG_ROW << "Starting KuduScanner with ReadMode=" << mode
            << " timeout=" << FLAGS_kudu_operation_timeout_ms
            << " node with id=" << scan_node_->id()
-           << " Kudu table=" << scan_node_->table_->name();
+           << " Kudu table=" << scan_node_->table_desc()->table_name();
 
   if (!timestamp_slots_.empty()) {
     uint64_t row_format_flags =
@@ -251,7 +251,7 @@ Status KuduScanner::OpenNextScanToken(const string& scan_token, bool* eos) {
 
         KUDU_RETURN_IF_ERROR(
             scanner_->AddConjunctPredicate(
-                scan_node_->table_->NewInBloomFilterPredicate(col_name, bbf_vec)),
+                scanner_->GetKuduTable()->NewInBloomFilterPredicate(col_name, bbf_vec)),
             BuildErrorString("Failed to add bloom filter predicate"));
       } else {
         DCHECK(ctx.filter->is_min_max_filter());
@@ -282,15 +282,15 @@ Status KuduScanner::OpenNextScanToken(const string& scan_token, bool* eos) {
 
         KuduValue* min_value;
         RETURN_IF_ERROR(CreateKuduValue(col_type, min, &min_value));
-        KUDU_RETURN_IF_ERROR(
-            scanner_->AddConjunctPredicate(scan_node_->table_->NewComparisonPredicate(
-                col_name, KuduPredicate::ComparisonOp::GREATER_EQUAL, min_value)),
+        KUDU_RETURN_IF_ERROR(scanner_->AddConjunctPredicate(
+          scanner_->GetKuduTable()->NewComparisonPredicate(
+              col_name, KuduPredicate::ComparisonOp::GREATER_EQUAL, min_value)),
             BuildErrorString("Failed to add min predicate"));
 
         KuduValue* max_value;
         RETURN_IF_ERROR(CreateKuduValue(col_type, max, &max_value));
-        KUDU_RETURN_IF_ERROR(
-            scanner_->AddConjunctPredicate(scan_node_->table_->NewComparisonPredicate(
+        KUDU_RETURN_IF_ERROR(scanner_->AddConjunctPredicate(
+            scanner_->GetKuduTable()->NewComparisonPredicate(
                 col_name, KuduPredicate::ComparisonOp::LESS_EQUAL, max_value)),
             BuildErrorString("Failed to add max predicate"));
       }
@@ -379,8 +379,8 @@ Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* row_batch, Tuple** tuple_me
         kudu_tuple->SetNull(slot->null_indicator_offset());
         RETURN_IF_ERROR(state_->LogOrReturnError(
             ErrorMsg::Init(TErrorCode::KUDU_TIMESTAMP_OUT_OF_RANGE,
-              scan_node_->table_->name(),
-              scan_node_->table_->schema().Column(slot->col_pos()).name())));
+              scan_node_->table_desc()->table_name(),
+              scanner_->GetKuduTable()->schema().Column(slot->col_pos()).name())));
       }
     }
 
@@ -441,7 +441,7 @@ Status KuduScanner::GetNextScannerBatch() {
 
 string KuduScanner::BuildErrorString(const char* msg) {
   return Substitute("$0 for node with id '$1' for Kudu table '$2'", msg, scan_node_->id(),
-      scan_node_->table_->name());
+      scan_node_->table_desc()->table_name());
 }
 
 }  // namespace impala
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index b20d1179..3692896 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -68,7 +68,7 @@ fi
 # moving to a different build of the toolchain, e.g. when a version is bumped or a
 # compile option is changed. The build id can be found in the output of the toolchain
 # build jobs, it is constructed from the build number and toolchain git hash prefix.
-export IMPALA_TOOLCHAIN_BUILD_ID=34-fefac6723e
+export IMPALA_TOOLCHAIN_BUILD_ID=52-c3fa626d9b
 # Versions of toolchain dependencies.
 # -----------------------------------
 export IMPALA_AVRO_VERSION=1.7.4-p5
@@ -644,7 +644,7 @@ fi
 # overall build type) and does not apply when using a local Kudu build.
 export USE_KUDU_DEBUG_BUILD=${USE_KUDU_DEBUG_BUILD-false}
 
-export IMPALA_KUDU_VERSION=${IMPALA_KUDU_VERSION-"23f67ae0d"}
+export IMPALA_KUDU_VERSION=${IMPALA_KUDU_VERSION-"5ad5d3d66"}
 export IMPALA_KUDU_JAVA_VERSION=${IMPALA_KUDU_JAVA_VERSION-"1.13.0-SNAPSHOT"}
 export IMPALA_KUDU_HOME=${IMPALA_TOOLCHAIN_PACKAGES_HOME}/kudu-$IMPALA_KUDU_VERSION
 export IMPALA_KUDU_JAVA_HOME=\
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index 929a551..0bb2f24 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -58,6 +58,7 @@ import org.apache.impala.catalog.IcebergTable;
 import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.Type;
+import org.apache.impala.catalog.local.LocalKuduTable;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.IdGenerator;
 import org.apache.impala.common.ImpalaException;
@@ -94,9 +95,11 @@ import org.apache.impala.util.Graph.RandomAccessibleGraph;
 import org.apache.impala.util.Graph.SccCondensedGraph;
 import org.apache.impala.util.Graph.WritableGraph;
 import org.apache.impala.util.IntIterator;
+import org.apache.impala.util.KuduUtil;
 import org.apache.impala.util.ListMap;
 import org.apache.impala.util.MetaStoreUtil;
 import org.apache.impala.util.TSessionStateUtil;
+import org.apache.kudu.client.KuduClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -459,6 +462,11 @@ public class Analyzer {
     // require analysis (e.g. some literal expressions).
     private int numStmtExprs_ = 0;
 
+    // Cache of KuduTables opened for this query. (map from table name to kudu table)
+    // This cache prevent multiple openTable calls for a given table in the same query.
+    public final Map<String, org.apache.kudu.client.KuduTable> kuduTables =
+        new HashMap<>();
+
     public GlobalState(StmtTableCache stmtTableCache, TQueryCtx queryCtx,
         AuthorizationFactory authzFactory, AuthorizationContext authzCtx) {
       this.stmtTableCache = stmtTableCache;
@@ -2813,6 +2821,37 @@ public class Analyzer {
     return table;
   }
 
+  public org.apache.kudu.client.KuduTable getKuduTable(FeKuduTable feKuduTable)
+      throws AnalysisException {
+    String tableName = feKuduTable.getFullName();
+
+    // Use the kuduTable from the global state cache if it exists.
+    org.apache.kudu.client.KuduTable kuduTable = globalState_.kuduTables.get(tableName);
+
+    // Otherwise try use the KuduTable from the FeKuduTable if it exists and
+    // add it to the global state state cache future use.
+    if (kuduTable == null &&
+        feKuduTable instanceof LocalKuduTable &&
+        ((LocalKuduTable) feKuduTable).getKuduTable() != null) {
+      kuduTable = ((LocalKuduTable) feKuduTable).getKuduTable();
+      globalState_.kuduTables.put(tableName, kuduTable);
+    }
+
+    // Last, get the KuduTable via a request to the Kudu server using openTable and
+    // add it to the global state state cache for future use.
+    if (kuduTable == null) {
+      try {
+        KuduClient client = KuduUtil.getKuduClient(feKuduTable.getKuduMasterHosts());
+        kuduTable = client.openTable(feKuduTable.getKuduTableName());
+        globalState_.kuduTables.put(tableName, kuduTable);
+      } catch (Exception ex) {
+        throw new AnalysisException("Unable to open the Kudu table: " + tableName, ex);
+      }
+    }
+
+    return kuduTable;
+  }
+
   /**
    * Returns the table by looking it up in the local Catalog. Returns null if the db/table
    * does not exist. Does *not* force-load the table.
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeKuduTable.java b/fe/src/main/java/org/apache/impala/catalog/FeKuduTable.java
index f2ae9bc..223c705 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeKuduTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeKuduTable.java
@@ -141,6 +141,7 @@ public interface FeKuduTable extends FeTable {
 
       KuduClient client = KuduUtil.getKuduClient(table.getKuduMasterHosts());
       try {
+        // Call openTable to ensure we get the latest metadata for the Kudu table.
         org.apache.kudu.client.KuduTable kuduTable =
             client.openTable(table.getKuduTableName());
         List<LocatedTablet> tablets = kuduTable.getTabletsLocations(
@@ -171,8 +172,9 @@ public interface FeKuduTable extends FeTable {
 
       KuduClient client = KuduUtil.getKuduClient(table.getKuduMasterHosts());
       try {
-        org.apache.kudu.client.KuduTable kuduTable = client.openTable(
-            table.getKuduTableName());
+        // Call openTable to ensure we get the latest metadata for the Kudu table.
+        org.apache.kudu.client.KuduTable kuduTable =
+            client.openTable(table.getKuduTableName());
         List<LocatedTablet> tablets =
             kuduTable.getTabletsLocations(BackendConfig.INSTANCE.getKuduClientTimeoutMs());
         if (tablets.isEmpty()) {
@@ -216,8 +218,9 @@ public interface FeKuduTable extends FeTable {
       resultSchema.addToColumns(new TColumn(header, Type.STRING.toThrift()));
       KuduClient client = KuduUtil.getKuduClient(table.getKuduMasterHosts());
       try {
-        org.apache.kudu.client.KuduTable kuduTable = client.openTable(
-            table.getKuduTableName());
+        // Call openTable to ensure we get the latest metadata for the Kudu table.
+        org.apache.kudu.client.KuduTable kuduTable =
+            client.openTable(table.getKuduTableName());
         // The Kudu table API will return the partitions in sorted order by value.
         List<String> partitions = kuduTable.getFormattedRangePartitions(
             BackendConfig.INSTANCE.getKuduClientTimeoutMs());
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java
index f764b30..36eb716 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java
@@ -18,6 +18,7 @@
 package org.apache.impala.catalog.local;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -42,6 +43,8 @@ import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
 import org.apache.kudu.client.KuduClient;
 import org.apache.kudu.client.KuduException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
@@ -53,6 +56,8 @@ public class LocalKuduTable extends LocalTable implements FeKuduTable {
   private final List<KuduPartitionParam> partitionBy_;
   private final ImmutableList<String> primaryKeyColumnNames_;
 
+  private final org.apache.kudu.client.KuduTable kuduTable_;
+
   /**
    * Create a new instance based on the table metadata 'msTable' stored
    * in the metastore.
@@ -86,7 +91,7 @@ public class LocalKuduTable extends LocalTable implements FeKuduTable {
 
     ColumnMap cmap = new ColumnMap(cols, /*numClusteringCols=*/0, fullTableName,
         /*isFullAcidSchema=*/false);
-    return new LocalKuduTable(db, msTable, ref, cmap, pkNames, partitionBy);
+    return new LocalKuduTable(db, msTable, ref, cmap, kuduTable, pkNames, partitionBy);
   }
 
 
@@ -110,8 +115,8 @@ public class LocalKuduTable extends LocalTable implements FeKuduTable {
     ColumnMap cmap = new ColumnMap(columns, /*numClusteringCols=*/0, fullTableName,
         /*isFullAcidSchema=*/false);
 
-    return new LocalKuduTable(db, msTable, /*ref=*/null, cmap, pkNames,
-        kuduPartitionParams);
+    return new LocalKuduTable(db, msTable, /*ref=*/null, cmap, /*kuduTable*/null,
+        pkNames, kuduPartitionParams);
   }
 
   private static void convertColsFromKudu(Schema schema, List<Column> cols,
@@ -136,9 +141,11 @@ public class LocalKuduTable extends LocalTable implements FeKuduTable {
   }
 
   private LocalKuduTable(LocalDb db, Table msTable, TableMetaRef ref, ColumnMap cmap,
+      org.apache.kudu.client.KuduTable kuduTable,
       List<String> primaryKeyColumnNames,
       List<KuduPartitionParam> partitionBy)  {
     super(db, msTable, ref, cmap);
+    kuduTable_ = kuduTable;
     tableParams_ = new TableParams(msTable);
     partitionBy_ = ImmutableList.copyOf(partitionBy);
     primaryKeyColumnNames_ = ImmutableList.copyOf(primaryKeyColumnNames);
@@ -149,12 +156,18 @@ public class LocalKuduTable extends LocalTable implements FeKuduTable {
     return tableParams_.masters_;
   }
 
-
   @Override
   public String getKuduTableName() {
     return tableParams_.kuduTableName_;
   }
 
+  /**
+   * Return the Kudu table backing this table.
+   */
+  public org.apache.kudu.client.KuduTable getKuduTable() {
+    return kuduTable_;
+  }
+
   @Override
   public List<String> getPrimaryKeyColumnNames() {
     return primaryKeyColumnNames_;
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index 9e17a32..4d54b7d 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -132,8 +132,10 @@ public class KuduScanNode extends ScanNode {
 
     KuduClient client = KuduUtil.getKuduClient(kuduTable_.getKuduMasterHosts());
     try {
+      // Get the KuduTable from the analyzer to retrieve the cached KuduTable
+      // for this query and prevent multiple openTable calls for a single query.
       org.apache.kudu.client.KuduTable rpcTable =
-          client.openTable(kuduTable_.getKuduTableName());
+          analyzer.getKuduTable(kuduTable_);
       validateSchema(rpcTable);
 
       if (canApplyCountStarOptimization(analyzer)) {


[impala] 01/02: IMPALA-5746: Cancel all queries scheduled by failed coordinators

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 9d43cfdaeeb1e0a88af3b7aefdc28aa585927a03
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Sat Jul 18 00:10:18 2020 -0700

    IMPALA-5746: Cancel all queries scheduled by failed coordinators
    
    Executor registers the updating of cluster membership. When coordinators
    are absence from the active cluster membership list, executer cancels
    all the running fragments of the queries which are scheduled by the
    inactive coordinators since the executer cannot send results back to
    the inactive/failed coordinators. This makes executers quickly release
    the resources allocated for those running fragments to be cancelled.
    
    Testing:
    - Added new test case TestProcessFailures::test_kill_coordinator
      and ran the test case as following command:
        ./bin/impala-py.test tests/custom_cluster/test_process_failures.py\
          ::TestProcessFailures::test_kill_coordinator \
          --exploration_strategy=exhaustive.
    - Passed the core test.
    
    Change-Id: I918fcc27649d5d2bbe8b6ef47fbd9810ae5f57bd
    Reviewed-on: http://gerrit.cloudera.org:8080/16215
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/coordinator-backend-state.cc   |  1 +
 be/src/runtime/exec-env.cc                    | 29 ++++++++---
 be/src/runtime/query-exec-mgr.cc              | 71 ++++++++++++++++++++++++++-
 be/src/runtime/query-exec-mgr.h               | 50 +++++++++++++++++--
 be/src/runtime/query-state.cc                 |  9 +++-
 be/src/runtime/query-state.h                  | 10 ++++
 be/src/runtime/test-env.cc                    |  2 +
 common/protobuf/control_service.proto         |  3 ++
 tests/custom_cluster/test_process_failures.py | 34 +++++++++++++
 9 files changed, 195 insertions(+), 14 deletions(-)

diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 5250927..ee6c039 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -110,6 +110,7 @@ void Coordinator::BackendState::SetRpcParams(const DebugOptions& debug_options,
     const FilterRoutingTable& filter_routing_table, ExecQueryFInstancesRequestPB* request,
     TExecPlanFragmentInfo* fragment_info) {
   request->set_coord_state_idx(state_idx_);
+  *request->mutable_coord_backend_id() = ExecEnv::GetInstance()->backend_id();
   request->set_min_mem_reservation_bytes(
       backend_exec_params_.min_mem_reservation_bytes());
   request->set_initial_mem_reservation_total_claims(
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 5850ccd..347f2d7 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -544,14 +544,27 @@ void ExecEnv::SetImpalaServer(ImpalaServer* server) {
   cluster_membership_mgr_->SetLocalBeDescFn([server]() {
     return server->GetLocalBackendDescriptor();
   });
-  cluster_membership_mgr_->RegisterUpdateCallbackFn(
-      [server](ClusterMembershipMgr::SnapshotPtr snapshot) {
-        std::unordered_set<BackendIdPB> current_backend_set;
-        for (const auto& it : snapshot->current_backends) {
-          current_backend_set.insert(it.second.backend_id());
-        }
-        server->CancelQueriesOnFailedBackends(current_backend_set);
-      });
+  if (FLAGS_is_coordinator) {
+    cluster_membership_mgr_->RegisterUpdateCallbackFn(
+        [server](ClusterMembershipMgr::SnapshotPtr snapshot) {
+          std::unordered_set<BackendIdPB> current_backend_set;
+          for (const auto& it : snapshot->current_backends) {
+            current_backend_set.insert(it.second.backend_id());
+          }
+          server->CancelQueriesOnFailedBackends(current_backend_set);
+        });
+  }
+  if (FLAGS_is_executor) {
+    cluster_membership_mgr_->RegisterUpdateCallbackFn(
+        [](ClusterMembershipMgr::SnapshotPtr snapshot) {
+          std::unordered_set<BackendIdPB> current_backend_set;
+          for (const auto& it : snapshot->current_backends) {
+            current_backend_set.insert(it.second.backend_id());
+          }
+          ExecEnv::GetInstance()->query_exec_mgr()->CancelQueriesForFailedCoordinators(
+              current_backend_set);
+        });
+  }
 }
 
 void ExecEnv::InitBufferPool(int64_t min_buffer_size, int64_t capacity,
diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc
index 823bd59..ac07085 100644
--- a/be/src/runtime/query-exec-mgr.cc
+++ b/be/src/runtime/query-exec-mgr.cc
@@ -39,7 +39,7 @@
 #include "util/impalad-metrics.h"
 #include "util/metrics.h"
 #include "util/network-util.h"
-#include "util/thread.h"
+#include "util/thread-pool.h"
 
 #include "common/names.h"
 
@@ -49,6 +49,26 @@ using namespace impala;
 DEFINE_int32(log_mem_usage_interval, 0, "If non-zero, impalad will output memory usage "
     "every log_mem_usage_interval'th fragment completion.");
 
+DEFINE_int32(query_exec_mgr_cancellation_thread_pool_size, 1,
+    "(Advanced) Size of the QueryExecMgr thread-pool processing cancellations due to "
+    "coordinator failure");
+
+const uint32_t QUERY_EXEC_MGR_MAX_CANCELLATION_QUEUE_SIZE = 65536;
+
+QueryExecMgr::QueryExecMgr() {
+  // Initialise the cancellation thread pool with 1 thread (by default). The max queue
+  // size is deliberately set so high that it should never fill; if it does we fill the
+  // queue up to the maximum limit and ignore the rest. The ignored queries will get
+  // cancelled when they time out trying to send status reports.
+  cancellation_thread_pool_.reset(new ThreadPool<QueryCancellationTask>("query-exec-mgr",
+      "cancellation-worker", FLAGS_query_exec_mgr_cancellation_thread_pool_size,
+      QUERY_EXEC_MGR_MAX_CANCELLATION_QUEUE_SIZE,
+      bind<void>(&QueryExecMgr::CancelFromThreadPool, this, _2)));
+  ABORT_IF_ERROR(cancellation_thread_pool_->Init());
+}
+
+QueryExecMgr::~QueryExecMgr() {}
+
 Status QueryExecMgr::StartQuery(const ExecQueryFInstancesRequestPB* request,
     const TQueryCtx& query_ctx, const TExecPlanFragmentInfo& fragment_info) {
   TUniqueId query_id = query_ctx.query_id;
@@ -194,3 +214,52 @@ void QueryExecMgr::ReleaseQueryState(QueryState* qs) {
   // decrement it after we're completely done with the query.
   ImpaladMetrics::BACKEND_NUM_QUERIES_EXECUTING->Increment(-1);
 }
+
+void QueryExecMgr::AcquireQueryStateLocked(QueryState* qs) {
+  if (qs == nullptr) return;
+  int refcnt = qs->refcnt_.Add(1);
+  DCHECK(refcnt > 0);
+}
+
+void QueryExecMgr::CancelQueriesForFailedCoordinators(
+    const std::unordered_set<BackendIdPB>& current_membership) {
+  // Build a list of queries that are scheduled by failed coordinators (as
+  // evidenced by their absence from the cluster membership list).
+  std::vector<QueryCancellationTask> to_cancel;
+  qs_map_.DoFuncForAllEntries([&](QueryState* qs) {
+    if (qs != nullptr && !qs->IsCancelled()) {
+      if (current_membership.find(qs->coord_backend_id()) == current_membership.end()) {
+        // decremented by ReleaseQueryState()
+        AcquireQueryStateLocked(qs);
+        to_cancel.push_back(QueryCancellationTask(qs));
+      }
+    }
+  });
+
+  // Since we are the only producer for the cancellation thread pool, we can find the
+  // remaining capacity of the pool and submit the new cancellation requests without
+  // blocking.
+  int query_num_to_cancel = to_cancel.size();
+  int remaining_queue_size = QUERY_EXEC_MGR_MAX_CANCELLATION_QUEUE_SIZE
+      - cancellation_thread_pool_->GetQueueSize();
+  if (query_num_to_cancel > remaining_queue_size) {
+    // Fill the queue up to maximum limit, and ignore the rest which will get cancelled
+    // eventually anyways when QueryState::ReportExecStatus() hits the timeout.
+    LOG_EVERY_N(WARNING, 60) << "QueryExecMgr cancellation queue is full";
+    query_num_to_cancel = remaining_queue_size;
+    for (int i = query_num_to_cancel; i < to_cancel.size(); ++i) {
+      ReleaseQueryState(to_cancel[i].GetQueryState());
+    }
+  }
+  for (int i = 0; i < query_num_to_cancel; ++i) {
+    cancellation_thread_pool_->Offer(to_cancel[i]);
+  }
+}
+
+void QueryExecMgr::CancelFromThreadPool(const QueryCancellationTask& cancellation_task) {
+  QueryState* qs = cancellation_task.GetQueryState();
+  VLOG(1) << "CancelFromThreadPool(): cancel query " << qs->query_id();
+  qs->Cancel();
+  qs->is_coord_active_.Store(false);
+  ReleaseQueryState(qs);
+}
diff --git a/be/src/runtime/query-exec-mgr.h b/be/src/runtime/query-exec-mgr.h
index cc15cc2..2a6ce74 100644
--- a/be/src/runtime/query-exec-mgr.h
+++ b/be/src/runtime/query-exec-mgr.h
@@ -17,9 +17,11 @@
 
 #pragma once
 
+#include "common/global-types.h"
 #include "common/status.h"
 #include "util/aligned-new.h"
 #include "util/sharded-query-map-util.h"
+#include "util/thread-pool.h"
 
 namespace impala {
 
@@ -29,12 +31,19 @@ class TExecPlanFragmentInfo;
 class TQueryCtx;
 class TUniqueId;
 
-/// A daemon-wide registry and manager of QueryStates. This is the central
-/// entry point for gaining refcounted access to a QueryState. It also initiates
-/// query execution.
+/// A daemon-wide registry and manager of QueryStates. This is the central entry
+/// point for gaining refcounted access to a QueryState. It initiates query execution.
+/// It also registers a callback function for updating of cluster membership. When
+/// coordinators are absent from the active cluster membership list, it cancels all
+/// the running fragments of the queries scheduled by the inactive coordinators.
+/// Note that we have to hold the shard lock in order to increment the refcnt for a
+/// QueryState safely.
 /// Thread-safe.
 class QueryExecMgr : public CacheLineAligned {
  public:
+  QueryExecMgr();
+  ~QueryExecMgr();
+
   /// Creates QueryState if it doesn't exist and initiates execution of all fragment
   /// instance for this query. All fragment instances hold a reference to their
   /// QueryState for the duration of their execution.
@@ -58,11 +67,37 @@ class QueryExecMgr : public CacheLineAligned {
   /// Decrements the refcount for the given QueryState.
   void ReleaseQueryState(QueryState* qs);
 
+  /// Takes a set of backend ids of active backends and cancels all the running
+  /// fragments of the queries which are scheduled by failed coordinators (that
+  /// is, ids not in the active set).
+  void CancelQueriesForFailedCoordinators(
+      const std::unordered_set<BackendIdPB>& current_membership);
+
+  /// Work item for QueryExecMgr::cancellation_thread_pool_.
+  /// This class needs to support move construction and assignment for use in ThreadPool.
+  class QueryCancellationTask {
+   public:
+    // Empty constructor needed to make ThreadPool happy.
+    QueryCancellationTask() : qs_(nullptr) {}
+    QueryCancellationTask(QueryState* qs) : qs_(qs) {}
+
+    QueryState* GetQueryState() const { return qs_; }
+
+   private:
+    // QueryState to be cancelled.
+    QueryState* qs_;
+  };
+
  private:
 
   typedef ShardedQueryMap<QueryState*> QueryStateMap;
   QueryStateMap qs_map_;
 
+  /// Thread pool to process cancellation tasks for queries scheduled by failed
+  /// coordinators to avoid blocking the statestore callback.
+  /// Set thread pool size as 1 by default since the tasks are local function calls.
+  std::unique_ptr<ThreadPool<QueryCancellationTask>> cancellation_thread_pool_;
+
   /// Gets the existing QueryState or creates a new one if not present.
   /// 'created' is set to true if it was created, false otherwise.
   /// Increments the refcount.
@@ -73,5 +108,14 @@ class QueryExecMgr : public CacheLineAligned {
   /// Return only after all fragments complete unless an instances hit
   /// an error or the query is cancelled.
   void ExecuteQueryHelper(QueryState* qs);
+
+  /// Increments the refcount for the given QueryState with caller holding the lock
+  /// of the sharded QueryState map.
+  void AcquireQueryStateLocked(QueryState* qs);
+
+  /// Helper method to process cancellations that result from failed coordinators,
+  /// called from the cancellation thread pool. The cancellation_task contains the
+  /// QueryState to be cancelled.
+  void CancelFromThreadPool(const QueryCancellationTask& cancellation_task);
 };
 }
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 3cd34fb..3ad9ee0 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -210,6 +210,7 @@ Status QueryState::Init(const ExecQueryFInstancesRequestPB* exec_rpc_params,
 
   // don't copy query_ctx, it's large and we already did that in the c'tor
   exec_rpc_params_.set_coord_state_idx(exec_rpc_params->coord_state_idx());
+  *exec_rpc_params_.mutable_coord_backend_id() = exec_rpc_params->coord_backend_id();
   exec_rpc_params_.mutable_fragment_ctxs()->Swap(
       const_cast<google::protobuf::RepeatedPtrField<impala::PlanFragmentCtxPB>*>(
           &exec_rpc_params->fragment_ctxs()));
@@ -386,10 +387,13 @@ void QueryState::UpdateBackendExecState() {
           BackendExecState::EXECUTING : BackendExecState::FINISHED;
     }
   }
-  // Send one last report if the query has reached the terminal state.
+  // Send one last report if the query has reached the terminal state
+  // and the coordinator is active.
   if (IsTerminalState()) {
     VLOG_QUERY << "UpdateBackendExecState(): last report for " << PrintId(query_id());
-    while (!ReportExecStatus()) SleepForMs(GetReportWaitTimeMs());
+    while (is_coord_active_.Load() && !ReportExecStatus()) {
+      SleepForMs(GetReportWaitTimeMs());
+    }
   }
 }
 
@@ -603,6 +607,7 @@ bool QueryState::ReportExecStatus() {
     if (!rpc_status.ok()) {
       LOG(ERROR) << "Cancelling fragment instances due to failure to reach the "
                  << "coordinator. (" << rpc_status.GetDetail() << ").";
+      is_coord_active_.Store(false);
     } else if (!result_status.ok()) {
       // If the ReportExecStatus RPC succeeded in reaching the coordinator and we get
       // back a non-OK status, it means that the coordinator expects us to cancel the
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 352ee64..575eb55 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -152,6 +152,9 @@ class QueryState {
 
   /// The following getters are only valid after Init().
   ScannerMemLimiter* scanner_mem_limiter() const { return scanner_mem_limiter_; }
+  const UniqueIdPB& coord_backend_id() const {
+    return exec_rpc_params_.coord_backend_id();
+  }
 
   /// The following getters are only valid after Init() and should be called only from
   /// the backend execution (ie. not the coordinator side, since they require holding
@@ -215,6 +218,9 @@ class QueryState {
   /// instances have finished their Prepare phase. Idempotent.
   void Cancel();
 
+  /// Return true if the executing fragment instances have been cancelled.
+  bool IsCancelled() const { return (is_cancelled_.Load() == 1); }
+
   /// Increment the resource refcount. Must be decremented before the query state
   /// reference is released. A refcount should be held by a fragment or other entity
   /// for as long as it is consuming query backend execution resources (e.g. memory).
@@ -400,6 +406,10 @@ class QueryState {
   /// initiate cancellation exactly once
   AtomicInt32 is_cancelled_;
 
+  /// set to false when the coordinator has been detected as inactive in the cluster;
+  /// used to avoid sending the last execution report to the inactive/failed coordinator.
+  AtomicBool is_coord_active_{true};
+
   /// True if and only if ReleaseExecResources() has been called.
   bool released_backend_resources_ = false;
 
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index 92b703d..ddd2a03 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -161,7 +161,9 @@ Status TestEnv::CreateQueryState(
   // param
   ExecQueryFInstancesRequestPB rpc_params;
   // create dummy -Ctx fields, we need them for FragmentInstance-/RuntimeState
+  UniqueIdPB dummy_backend_id;
   rpc_params.set_coord_state_idx(0);
+  *rpc_params.mutable_coord_backend_id() = dummy_backend_id;
   rpc_params.add_fragment_ctxs();
   rpc_params.add_fragment_instance_ctxs();
   TExecPlanFragmentInfo fragment_info;
diff --git a/common/protobuf/control_service.proto b/common/protobuf/control_service.proto
index 52495b8..8d97243 100644
--- a/common/protobuf/control_service.proto
+++ b/common/protobuf/control_service.proto
@@ -384,6 +384,9 @@ message ExecQueryFInstancesRequestPB {
   // Execution parameters for specific fragment instances. Corresponds to
   // 'fragment_instance_ctxs' in the TExecPlanFragmentInfo sidecar.
   repeated PlanFragmentInstanceCtxPB fragment_instance_ctxs = 8;
+
+  // The Backend ID of the coordinator.
+  optional UniqueIdPB coord_backend_id = 9;
 }
 
 message ExecQueryFInstancesResponsePB {
diff --git a/tests/custom_cluster/test_process_failures.py b/tests/custom_cluster/test_process_failures.py
index 4fb58a4..4e106ef 100644
--- a/tests/custom_cluster/test_process_failures.py
+++ b/tests/custom_cluster/test_process_failures.py
@@ -17,6 +17,7 @@
 
 import pytest
 
+from beeswaxd.BeeswaxService import QueryState
 from tests.common.custom_cluster_test_suite import (
     DEFAULT_CLUSTER_SIZE,
     CustomClusterTestSuite)
@@ -57,6 +58,39 @@ class TestProcessFailures(CustomClusterTestSuite):
     self.execute_query_expect_success(client, QUERY)
 
   @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(num_exclusive_coordinators=1,
+      impalad_args="--status_report_max_retry_s=600 --status_report_interval_ms=1000")
+  def test_kill_coordinator(self):
+    """"Tests that when a coordinator running multiple queries is killed, all
+    running fragments on executors are cancelled."""
+    impalad = self.cluster.impalads[0]
+    client = impalad.service.create_beeswax_client()
+    assert client is not None
+    # A query which is cancelable and takes long time to execute
+    query = "select * from tpch.lineitem t1, tpch.lineitem t2, tpch.lineitem t3 " \
+        "where t1.l_orderkey = t2.l_orderkey and t1.l_orderkey = t3.l_orderkey and " \
+        "t3.l_orderkey = t2.l_orderkey order by t1.l_orderkey, t2.l_orderkey, " \
+        "t3.l_orderkey limit 300"
+    num_concurrent_queries = 3
+    handles = []
+
+    # Run num_concurrent_queries asynchronously
+    for _ in xrange(num_concurrent_queries):
+      handles.append(client.execute_async(query))
+
+    # Wait for the queries to start running
+    for handle in handles:
+      self.wait_for_state(handle, QueryState.RUNNING, 1000, client=client)
+
+    # Kill the coordinator
+    impalad.kill()
+
+    # Assert that all executors have 0 in-flight fragments
+    for i in xrange(1, len(self.cluster.impalads)):
+      self.cluster.impalads[i].service.wait_for_metric_value(
+        "impala-server.num-fragments-in-flight", 0, timeout=30)
+
+  @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       impalad_args='--use_local_catalog',
       catalogd_args='--catalog_topic_mode=minimal')