You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by sa...@apache.org on 2017/05/26 15:00:09 UTC

[1/3] incubator-impala git commit: IMPALA-5375: Builds on CentOS 6.4 failing with broken python dependencies

Repository: incubator-impala
Updated Branches:
  refs/heads/master 014c5603f -> bad10da4a


IMPALA-5375: Builds on CentOS 6.4 failing with broken python dependencies

Builds on CentOS 6.4 fail due to dependencies not met for the new
'cryptography' python package.

The ADLS commit states that the new packages are only required for ADLS
and that ADLS on a dev environment is only supported from CentOS 6.7.

This patch moves the compiled requirements for ADLS from
compiled-requirements.txt to adls-requirements.txt and passing a
compiler to the Pip environment while installing the ADLS
requirements.

Testing: Tested it on a machine that with TARGET_FILESYSTEM='adls'
and also tested it on a CentOS 6.4 machine with the default
configuration.

Change-Id: I7d456a861a85edfcad55236aa8b0dbac2ff6fc78
Reviewed-on: http://gerrit.cloudera.org:8080/6998
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/4e549799
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/4e549799
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/4e549799

Branch: refs/heads/master
Commit: 4e5497995b3b1c4e8db658a3a54ab458c5430572
Parents: 014c560
Author: Sailesh Mukil <sa...@cloudera.com>
Authored: Thu May 25 17:58:33 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri May 26 07:52:40 2017 +0000

----------------------------------------------------------------------
 infra/python/bootstrap_virtualenv.py        | 14 ++++++++------
 infra/python/deps/adls-requirements.txt     |  5 +++++
 infra/python/deps/compiled-requirements.txt |  5 -----
 3 files changed, 13 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4e549799/infra/python/bootstrap_virtualenv.py
----------------------------------------------------------------------
diff --git a/infra/python/bootstrap_virtualenv.py b/infra/python/bootstrap_virtualenv.py
index 4fb8689..e26aaf3 100644
--- a/infra/python/bootstrap_virtualenv.py
+++ b/infra/python/bootstrap_virtualenv.py
@@ -218,15 +218,17 @@ def install_compiled_deps_if_possible():
   return True
 
 def install_adls_deps():
-  # The ADLS dependencies require that the OS is at least CentOS 6.6 or above,
+  # The ADLS dependencies require that the OS is at least CentOS 6.7 or above,
   # which is why we break this into a seperate step. If the target filesystem is
-  # ADLS, the expectation is that the dev environment is running at least CentOS 6.6.
-  if reqs_are_installed(ADLS_REQS_PATH):
-    LOG.debug("Skipping ADLS deps: matching adls-installed-requirements.txt found")
-    return True
+  # ADLS, the expectation is that the dev environment is running at least CentOS 6.7.
   if os.environ.get('TARGET_FILESYSTEM') == "adls":
+    if reqs_are_installed(ADLS_REQS_PATH):
+      LOG.debug("Skipping ADLS deps: matching adls-installed-requirements.txt found")
+      return True
+    cc = select_cc()
+    assert cc is not None
     LOG.info("Installing ADLS packages into the virtualenv")
-    exec_pip_install(["-r", ADLS_REQS_PATH])
+    exec_pip_install(["-r", ADLS_REQS_PATH], cc=cc)
     mark_reqs_installed(ADLS_REQS_PATH)
 
 def install_kudu_client_if_possible():

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4e549799/infra/python/deps/adls-requirements.txt
----------------------------------------------------------------------
diff --git a/infra/python/deps/adls-requirements.txt b/infra/python/deps/adls-requirements.txt
index f027a6e..9df42b6 100644
--- a/infra/python/deps/adls-requirements.txt
+++ b/infra/python/deps/adls-requirements.txt
@@ -18,4 +18,9 @@
 # The following dependencies depend on cffi, so it must be installed after the toolchain
 # is bootstrapped and all requirements in requirements.txt and compiled-requirements.txt
 # are installed into the virtualenv.
+
+pycparser == 2.17
+cffi==1.10.0
+cryptography==1.8.1
+  scandir == 1.5
 azure-datalake-store == 0.0.9

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4e549799/infra/python/deps/compiled-requirements.txt
----------------------------------------------------------------------
diff --git a/infra/python/deps/compiled-requirements.txt b/infra/python/deps/compiled-requirements.txt
index b9273d1..b3f9f4d 100644
--- a/infra/python/deps/compiled-requirements.txt
+++ b/infra/python/deps/compiled-requirements.txt
@@ -32,11 +32,6 @@ impyla == 0.14.0
   thrift == 0.9.0
   thrift_sasl == 0.1.0
 psutil == 0.7.1
-# Required for ADLS Python client
-  pycparser == 2.17
-  cffi==1.10.0
-  cryptography==1.8.1
-    scandir == 1.5
 # Required for Kudu:
   Cython == 0.23.4
   numpy == 1.10.4


[2/3] incubator-impala git commit: IMPALA-5325: Do not update totalHdfsBytes_/numHdfsFiles_ on Catalogd

Posted by sa...@apache.org.
IMPALA-5325: Do not update totalHdfsBytes_/numHdfsFiles_ on Catalogd

We need not account these hdfs table metrics on the Catalog
server as they are eventually calculated again on the Impalads
while unpacking the corresponding thrift table.

This fix can potentially affect the frontend tests that directly load
the Catalog's version of HdfsTable without the loadFromThrift() call
paths that do the accounting. That is fixed by adding a separate call
that computes these values and is called from
ImpaladTestCatalog.getTable().

Testing: Enough tests already cover these code paths like show stats/
table or partition refresh tests etc. No new tests are added.

Change-Id: I03cc6f9e9b2c03cafb87029ea0802dfdb2745be1
Reviewed-on: http://gerrit.cloudera.org:8080/6970
Reviewed-by: Bharath Vissapragada <bh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/5d59d854
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/5d59d854
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/5d59d854

Branch: refs/heads/master
Commit: 5d59d854d0274c89debb6f654f5cb3a0d2ef48e5
Parents: 4e54979
Author: Bharath Vissapragada <bh...@cloudera.com>
Authored: Tue May 23 17:45:20 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri May 26 09:52:46 2017 +0000

----------------------------------------------------------------------
 .../org/apache/impala/catalog/HdfsTable.java    | 33 +++++++++++++-------
 .../impala/testutil/ImpaladTestCatalog.java     |  7 ++++-
 2 files changed, 27 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5d59d854/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 23a8c96..d8d8d4a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -165,10 +165,12 @@ public class HdfsTable extends Table {
 
   private HdfsPartitionLocationCompressor partitionLocationCompressor_;
 
-  // Total number of Hdfs files in this table. Set in load().
+  // Total number of Hdfs files in this table. Accounted only in the Impalad catalog
+  // cache. Set to -1 on Catalogd.
   private long numHdfsFiles_;
 
-  // Sum of sizes of all Hdfs files in this table. Set in load().
+  // Sum of sizes of all Hdfs files in this table. Accounted only in the Impalad
+  // catalog cache. Set to -1 on Catalogd.
   private long totalHdfsBytes_;
 
   // True iff the table's partitions are located on more than one filesystem.
@@ -231,6 +233,21 @@ public class HdfsTable extends Table {
   }
 
   /**
+   * Updates numHdfsFiles_ and totalHdfsBytes_ based on the partition information.
+   * This is used only for the frontend tests that do not spawn a separate Catalog
+   * instance.
+   */
+  public void computeHdfsStatsForTesting() {
+    Preconditions.checkState(numHdfsFiles_ == -1 && totalHdfsBytes_ == -1);
+    numHdfsFiles_ = 0;
+    totalHdfsBytes_ = 0;
+    for (HdfsPartition partition: partitionMap_.values()) {
+      numHdfsFiles_ += partition.getNumFileDescriptors();
+      totalHdfsBytes_ += partition.getSize();
+    }
+  }
+
+  /**
    * Drops and re-loads the block metadata for all partitions in 'partsByPath' whose
    * location is under the given 'dirPath'. It involves the following steps:
    * - Clear the current block metadata of the partitions.
@@ -305,8 +322,6 @@ public class HdfsTable extends Table {
         // Update the partitions' metadata that this file belongs to.
         for (HdfsPartition partition: partitions) {
           partition.getFileDescriptors().add(fd);
-          numHdfsFiles_++;
-          totalHdfsBytes_ += fd.getFileLength();
         }
       }
 
@@ -369,8 +384,6 @@ public class HdfsTable extends Table {
       // Update the partitions' metadata that this file belongs to.
       for (HdfsPartition partition: partitions) {
         partition.getFileDescriptors().add(fd);
-        numHdfsFiles_++;
-        totalHdfsBytes_ += fd.getFileLength();
       }
     }
   }
@@ -726,8 +739,6 @@ public class HdfsTable extends Table {
         newPartSizeBytes += fileStatus.getLen();
       }
       partition.setFileDescriptors(newFileDescs);
-      numHdfsFiles_ += newFileDescs.size();
-      totalHdfsBytes_ += newPartSizeBytes;
     } catch(IOException e) {
       throw new CatalogException("Error loading block metadata for partition " +
           partition.toString(), e);
@@ -1072,6 +1083,8 @@ public class HdfsTable extends Table {
       }
       if (loadTableSchema) setAvroSchema(client, msTbl);
       updateStatsFromHmsTable(msTbl);
+      numHdfsFiles_ = -1;
+      totalHdfsBytes_ = -1;
     } catch (TableLoadingException e) {
       throw e;
     } catch (Exception e) {
@@ -1477,10 +1490,6 @@ public class HdfsTable extends Table {
     Path partDirPath = new Path(storageDescriptor.getLocation());
     FileSystem fs = partDirPath.getFileSystem(CONF);
     if (!fs.exists(partDirPath)) return;
-
-    numHdfsFiles_ -= partition.getNumFileDescriptors();
-    totalHdfsBytes_ -= partition.getSize();
-    Preconditions.checkState(numHdfsFiles_ >= 0 && totalHdfsBytes_ >= 0);
     refreshFileMetadata(partition);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5d59d854/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java b/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
index b3167ce..fdc64e6 100644
--- a/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
+++ b/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
@@ -24,6 +24,7 @@ import org.apache.impala.catalog.Db;
 import org.apache.impala.catalog.HdfsCachePool;
 import org.apache.impala.catalog.ImpaladCatalog;
 import org.apache.impala.catalog.Table;
+import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.util.PatternMatcher;
 import com.google.common.base.Preconditions;
 
@@ -86,6 +87,10 @@ public class ImpaladTestCatalog extends ImpaladCatalog {
     Db db = getDb(dbName);
     Preconditions.checkNotNull(db);
     db.addTable(newTbl);
-    return super.getTable(dbName, tableName);
+    Table resultTable = super.getTable(dbName, tableName);
+    if (resultTable instanceof HdfsTable) {
+      ((HdfsTable) resultTable).computeHdfsStatsForTesting();
+    }
+    return resultTable;
   }
 }


[3/3] incubator-impala git commit: IMPALA-4890/5143: Coordinator race involving TearDown()

Posted by sa...@apache.org.
IMPALA-4890/5143: Coordinator race involving TearDown()

TearDown() releases resources and destroys control
structures (the QueryState reference), and it can be called
while a concurrent thread executes Exec() or might call
GetNext() in the future. The solution is not to destroy
the control structures.

This also releases resources automatically at the end
of query execution.

Change-Id: I457a6424a0255c137336c4bc01a6e7ed830d18c7
Reviewed-on: http://gerrit.cloudera.org:8080/6897
Reviewed-by: Marcel Kornacker <ma...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/bad10da4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/bad10da4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/bad10da4

Branch: refs/heads/master
Commit: bad10da4a6eb13673aa44eb4d45d6ad87a2dd690
Parents: 5d59d85
Author: Marcel Kornacker <ma...@cloudera.com>
Authored: Sun May 14 21:32:41 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri May 26 13:45:42 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/coordinator.cc          | 55 +++++++++++++----------------
 be/src/runtime/coordinator.h           | 36 ++++++++-----------
 be/src/service/client-request-state.cc |  3 --
 3 files changed, 38 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bad10da4/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 741da32..5b86690 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -108,7 +108,11 @@ Coordinator::Coordinator(
     query_events_(events) {}
 
 Coordinator::~Coordinator() {
-  DCHECK(torn_down_) << "TearDown() must be called before Coordinator is destroyed";
+  DCHECK(released_resources_)
+      << "ReleaseResources() must be called before Coordinator is destroyed";
+  if (query_state_ != nullptr) {
+    ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(query_state_);
+  }
 }
 
 Status Coordinator::Exec() {
@@ -416,8 +420,6 @@ Status Coordinator::FinishBackendStartup() {
       "Backend startup latencies", latencies.ToHumanReadable());
 
   if (!status.ok()) {
-    // TODO: do not allow cancellation via the debug page until Exec() has returned
-    //DCHECK(query_status_.ok()); // nobody should have been able to cancel
     query_status_ = status;
     CancelInternal();
   }
@@ -881,19 +883,14 @@ Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) {
 
   if (*eos) {
     returned_all_results_ = true;
-    // Trigger tear-down of coordinator fragment by closing the consumer. Must do before
-    // WaitForBackendCompletion().
-    coord_sink_->CloseConsumer();
-    coord_sink_ = nullptr;
-
-    // Don't return final NULL until all instances have completed.  GetNext must wait for
-    // all instances to complete before ultimately signalling the end of execution via a
-    // NULL batch. After NULL is returned, the coordinator may tear down query state, and
-    // perform post-query finalization which might depend on the reports from all
-    // backends.
-    //
-    // TODO: Waiting should happen in TearDown() (and then we wouldn't need to call
-    // CloseConsumer() here). See IMPALA-4275 for details.
+    // release resources here, since we won't be fetching more result rows
+    {
+      lock_guard<mutex> l(lock_);
+      ReleaseResources();
+    }
+
+    // wait for all backends to complete before computing the summary
+    // TODO: relocate this so GetNext() won't have to wait for backends to complete?
     RETURN_IF_ERROR(WaitForBackendCompletion());
     // if the query completed successfully, compute the summary
     if (query_status_.ok()) ComputeQuerySummary();
@@ -912,12 +909,15 @@ void Coordinator::Cancel(const Status* cause) {
   // should explicitly pass Status::OK()). Fragment instances may be cancelled at the end
   // of a successful query. Need to clean up relationship between query_status_ here and
   // in QueryExecState. See IMPALA-4279.
-  query_status_ = (cause != NULL && !cause->ok()) ? *cause : Status::CANCELLED;
+  query_status_ = (cause != nullptr && !cause->ok()) ? *cause : Status::CANCELLED;
   CancelInternal();
 }
 
 void Coordinator::CancelInternal() {
   VLOG_QUERY << "Cancel() query_id=" << query_id();
+  // TODO: remove when restructuring cancellation, which should happen automatically
+  // as soon as the coordinator knows that the query is finished
+  DCHECK(!query_status_.ok());
 
   int num_cancelled = 0;
   for (BackendState* backend_state: backend_states_) {
@@ -931,6 +931,8 @@ void Coordinator::CancelInternal() {
 
   // Report the summary with whatever progress the query made before being cancelled.
   ComputeQuerySummary();
+
+  ReleaseResources();
 }
 
 Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& params) {
@@ -1076,11 +1078,9 @@ string Coordinator::GetErrorLog() {
   return PrintErrorMapToString(merged);
 }
 
-// TODO: call this as soon as it's clear that we won't reference the state
-// anymore, ie, in CancelInternal() and when GetNext() hits eos
-void Coordinator::TearDown() {
-  DCHECK(!torn_down_) << "Coordinator::TearDown() must not be called twice";
-  torn_down_ = true;
+void Coordinator::ReleaseResources() {
+  if (released_resources_) return;
+  released_resources_ = true;
   if (filter_routing_table_.size() > 0) {
     query_profile_->AddInfoString("Final filter table", FilterDebugString());
   }
@@ -1094,20 +1094,13 @@ void Coordinator::TearDown() {
   }
   // This may be NULL while executing UDFs.
   if (filter_mem_tracker_.get() != nullptr) {
+    // TODO: move this elsewhere, this isn't releasing resources (it's dismantling
+    // control structures)
     filter_mem_tracker_->UnregisterFromParent();
-    filter_mem_tracker_.reset();
   }
   // Need to protect against failed Prepare(), where root_sink() would not be set.
   if (coord_sink_ != nullptr) {
     coord_sink_->CloseConsumer();
-    coord_sink_ = nullptr;
-  }
-  coord_instance_ = nullptr;
-  if (query_state_ != nullptr) {
-    // Tear down the query state last - other members like 'filter_mem_tracker_'
-    // may reference objects with query lifetime, like the query MemTracker.
-    ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(query_state_);
-    query_state_ = nullptr;
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bad10da4/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index bc635b1..0d772eb 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -75,7 +75,10 @@ class QueryState;
 /// Query coordinator: handles execution of fragment instances on remote nodes, given a
 /// TQueryExecRequest. As part of that, it handles all interactions with the executing
 /// backends; it is also responsible for implementing all client requests regarding the
-/// query, including cancellation.
+/// query, including cancellation. Once a query ends, either through cancellation or
+/// by returning eos, the coordinator releases resources. (Note that DML requests
+/// always end with cancellation, via ImpalaServer::UnregisterQuery()/
+/// ImpalaServer::CancelInternal()/ClientRequestState::Cancel().)
 ///
 /// The coordinator monitors the execution status of fragment instances and aborts the
 /// entire query if an error is reported by any of them.
@@ -84,10 +87,7 @@ class QueryState;
 /// rows are produced by a fragment instance that always executes on the same machine as
 /// the coordinator.
 ///
-/// Once a query has finished executing and all results have been returned either to the
-/// caller of GetNext() or a data sink, execution_completed() will return true. If the
-/// query is aborted, execution_completed should also be set to true. Coordinator is
-/// thread-safe, with the exception of GetNext().
+/// Thread-safe, with the exception of GetNext().
 //
 /// A typical sequence of calls for a single query (calls under the same numbered
 /// item can happen concurrently):
@@ -98,9 +98,6 @@ class QueryState;
 /// The implementation ensures that setting an overall error status and initiating
 /// cancellation of all fragment instances is atomic.
 ///
-/// TODO: remove TearDown() and replace with ReleaseResources(); TearDown() currently
-/// also disassembles the control structures (such as the local reference to the
-/// coordinator's FragmentInstanceState)
 /// TODO: move into separate subdirectory and move nested classes into separate files
 /// and unnest them
 /// TODO: clean up locking behavior; in particular, clarify dependency on lock_
@@ -143,7 +140,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// if any, as well as all plan fragments on remote nodes. Sets query_status_ to the
   /// given cause if non-NULL. Otherwise, sets query_status_ to Status::CANCELLED.
   /// Idempotent.
-  void Cancel(const Status* cause = NULL);
+  void Cancel(const Status* cause = nullptr);
 
   /// Updates execution status of a particular backend as well as Insert-related
   /// status (per_partition_status_ and files_to_move_). Also updates
@@ -151,13 +148,8 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   Status UpdateBackendExecStatus(const TReportExecStatusParams& params)
       WARN_UNUSED_RESULT;
 
-  /// Returns the query state.
-  /// Only valid to call after Exec() and before TearDown(). The returned
-  /// reference only remains valid until TearDown() is called.
-  QueryState* query_state() const {
-    DCHECK(!torn_down_);
-    return query_state_;
-  }
+  /// Only valid to call after Exec().
+  QueryState* query_state() const { return query_state_; }
 
   /// Only valid *after* calling Exec(). Return nullptr if the running query does not
   /// produce any rows.
@@ -207,10 +199,6 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// filter to fragment instances.
   void UpdateFilter(const TUpdateFilterParams& params);
 
-  /// Called once query execution is complete to tear down any remaining state.
-  /// TODO: change to ReleaseResources() and don't tear down control structures.
-  void TearDown();
-
  private:
   class BackendState;
   struct FilterTarget;
@@ -368,8 +356,8 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// safe to concurrently read from filter_routing_table_.
   bool filter_routing_table_complete_ = false;
 
-  /// True if and only if TearDown() has been called.
-  bool torn_down_ = false;
+  /// True if and only if ReleaseResources() has been called.
+  bool released_resources_ = false;
 
   /// Returns a local object pool.
   ObjectPool* obj_pool() { return obj_pool_.get(); }
@@ -447,6 +435,10 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// Build the filter routing table by iterating over all plan nodes and collecting the
   /// filters that they either produce or consume.
   void InitFilterRoutingTable();
+
+  /// Releases filter resources, unregisters the filter mem tracker, and calls
+  /// CloseConsumer() on coord_sink_. Requires lock_ to be held. Idempotent.
+  void ReleaseResources();
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bad10da4/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 42e8de3..dcea8cb 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -446,8 +446,6 @@ Status ClientRequestState::ExecQueryOrDmlRequest(
     lock_guard<mutex> l(lock_);
     // Don't start executing the query if Cancel() was called concurrently with Exec().
     if (is_cancelled_) return Status::CANCELLED;
-    // TODO: make schedule local to coordinator and move schedule_->Release() into
-    // Coordinator::TearDown()
     schedule_.reset(new QuerySchedule(query_id(), query_exec_request,
         exec_request_.query_options, &summary_profile_, query_events_));
   }
@@ -585,7 +583,6 @@ void ClientRequestState::Done() {
                      << " because of error: " << status.GetDetail();
       }
     }
-    coord_->TearDown();
   }
 }